In [None]:
%%bash
pip install astroML
pip install sklearn

In [None]:
import numpy as np
from jointpdf.jointpdf import JointProbabilityMatrix
from jointpdf.jointpdf import FullNestedArrayOfProbabilities

In [None]:
from functools import reduce

def compute_joint(multi_index, marginal_distributions):
    return reduce(lambda x, y: x*y, [marginal_distributions[combination] for
            combination in zip(range(len(multi_index)), multi_index)])

def compute_joint_from_marginals(marginal_distributions):
    dimension_joint_distribution = [len(dist) for dist in marginal_distributions]
    full_distribution_temp = np.zeros(dimension_joint_distribution)
    full_distribution = np.zeros(dimension_joint_distribution)

    it = np.nditer(full_distribution, flags=['multi_index'], op_flags=[['readwrite']])
    while not it.finished:
        full_distribution[it.multi_index]= compute_joint(it.multi_index, marginal_distributions)
        it.iternext()

    return full_distribution

marginal_distributions2 = np.array([[0.3, 0.7], [0.3, 0.7]])
print(compute_joint_from_marginals(marginal_distributions2))

def xor(it):
    """apply xor to binary numbers x1 and x2"""
    return 0 if it[0]==it[1] else 1

def _and(it):
    """calculate the and of it
    
    params:
        it: binary iterable of length 2
        
    returns: a binary number 
    """
    return 1 if it[0]==1 and it[1]==1 else 0
    
class subset_and():
    def __init__(self, subsets):
        self.subsets = subsets

    def __call__(self, it):
        """take and over every subset provided in subsets 

        params:
            it: iterable with the same length as all tuples in subsets combined
            subsets: iterable of tuples every tuple should have at least length 2
        """
        entries = np.zeros(len(self.subsets))
        arr = np.array(it)
        for count, subset in enumerate(self.subsets):
            entries[count] = np.all(arr.take(subset)==1)

        return [1 if entry else 0 for entry in entries]

def parity(it):
    """calculate the parity (sum(it)%2) of the iterable
    
    params:
        it: a binary iterable
        
    returns: a binary number 
    """
    
    return sum(it)%2
    
def double_xor(it):
    """apply xor to every pair in it and return the outcome
    
    params:
        it: 1-d iterable with binary entries and len(it)%2==0
    """

    return [xor(it[2*i:2*i+2]) for i in range(len(it)/2)]
        



Do some testing of the above defined functions

In [None]:
subset_and_obj = subset_and([(0,3), (1,2), (4,5)])  
print(subset_and_obj([0,1,1,0,1,1]))
print(_and([0,0]))
print(parity([0,0,1,1,1]))
print(double_xor((1,1,0,1)))

In [None]:
tryout_dict = {
    (0, 1, 0):0.2,
    (1, 1, 0):0.1,
    (1, 1, 1):0.3,
    (0, 0, 1):0.4,
}

def f(x, y):
    print("called")
    print(x[0])
    print(y[0])
    print(x[0]>y[0])
    if x[0]>y[0]:
        return -1
    else:
        return 1

print(sorted(tryout_dict.items(), f))

print(sorted(tryout_dict.items(), lambda x, y: -1 if not x[0]>y[0] else 1))


In [None]:
class ProbabilityDict():
    def __init__(self, probability_dict):
        self.probability_dict = probability_dict
        
    def print_dict(self, sort=False):
        if sort:
            prob_items = sorted(self.probability_dict.items(), 
                   lambda x, y: -1 if not x[0]>y[0] else 1)
        else:
            prob_items = self.probability_dict.items()
            
        for key, value in prob_items:
            print("{}: {}".format(key, value))
            
    def calculate_marginal_distribution(self, chosen_variable_indices):
        """ calculates the marginal distribution (the given axis) given the joint 

        params:
            distribution: Should be a dict with keys representing the different
            states (as tuples!) and values representing their probability
            chosen_variable_indices:  a set of the variable(s) for which the marginal
            should be calculated
        """

        marginal_distribution = {}
        for state, value in self.probability_dict.items():
            marginal_state = tuple(
                [entry for count, entry in enumerate(state) 
                 if count in chosen_variable_indices]
            )
            marginal_distribution[marginal_state] = value + marginal_distribution.get(marginal_state, 0)

        return marginal_distribution

    def calculate_entropy(self, variable_indices):
        """ calculate the entropy

        params:
            variable_indices: A set of all the indices for which the entropy must be calculated

        returns: the entropy value
        """
        distribution = self.calculate_marginal_distribution(variable_indices) 
        distribution_values_arr = np.array(distribution.values())
        distribution_values = distribution_values_arr[distribution_values_arr != 0]
        return - np.sum(np.log2(distribution_values) * distribution_values)

    def calulate_mutual_information(self, variable_indices1, variable_indices2):
        return (self.calculate_entropy(variable_indices1) +
                self.calculate_entropy(variable_indices2) -
                self.calculate_entropy(variable_indices1+variable_indices2))



In [None]:
class JointProbabilityMatrixExtended(JointProbabilityMatrix):
    """this class extends the JointProbabilityMatrix by allowing different
    variables to have different statespaces"""
    def __init__(self, numvariables, max_num_states, joint_pdf):
        self.numvariables = numvariables
        self.max_num_states = max_num_states
        self.joint_probabilities = joint_pdf 
        #super(JointProbabilityMatrix, self).__init__(num_variables, num_states, joint_pdf)
    
    #needed to access methods in parent class
    def adjust_to_old_format(self):
        """ensure that every variable in joint has as many states and add states as needed"""
        self.old_joint_probabilities = self.joint_probabilities
        max_number_of_states = max(self.joint_probabilities.joint_probabilities.shape)
        temp_joint_probabilities = np.zeros([max_number_of_states]*self.numvariables)
        it = np.nditer(self.joint_probabilities.joint_probabilities, flags=['multi_index'])
        while not it.finished:
            temp_joint_probabilities[it.multi_index] = it.value
            it.iternext()
            
        self.joint_probabilities.joint_probabilities = temp_joint_probabilities
        self.numvalues = max_number_of_states
    
    #rewrite this to work on a nd-array instead!
    def append_determinstic_function(self, func, num_variables, num_states):
        """append an extra variable to the distribution given by the 
        deterministic function func
        
        params:
            func: a deterministic discrete function over the already defined variables
            num_variables: the number of variables
            num_states: iterable, the number of outcomes of the function
        """
        old_shape = list(self.joint_probabilities.joint_probabilities.shape)
        new_shape = old_shape + num_states
        dummy_joint_probability = np.zeros(new_shape)
        temp2_joint_probability = np.zeros(new_shape)
            
        it = np.nditer(dummy_joint_probability, flags=['multi_index'])
        while not it.finished:
            arguments = tuple(list(it.multi_index)[:-num_variables])
            if np.all(np.array(func(arguments)) == np.array(it.multi_index[-num_variables:])):
                temp2_joint_probability[it.multi_index] = self.joint_probabilities.joint_probabilities[arguments]
                
            it.iternext()
            
        self.joint_probabilities.joint_probabilities = temp2_joint_probability
        self.numvariables = self.numvariables + num_variables 
        
    def to_dict(self):
        probability_dict = {}
        it = np.nditer(self.joint_probabilities.joint_probabilities, flags=['multi_index'])
        while not it.finished:
            if it.value != 0:
                probability_dict[tuple(it.multi_index)]= it.value
            
            it.iternext()
            
        return probability_dict
    
    

First define some marginal distribution that are assumed to be independent 

In [None]:
marginal_distributions = np.array([[0.5, 0.5], [0.5, 0.5], [0.5, 0.5], [0.5,  0.5]])

def create_uniform_marginal_distributions(num_states, num_variables):
    """create a list of uniform probability distributions 
    
    params:
        num_states: how many states the variables have
        num_variables: how many variables to create
    """
    
    return np.array([[1.0/num_states]*num_states]*num_variables)
    
#print(compute_joint_from_marginals(create_uniform_marginal_distributions(3, 3)))

Tryout whether the implementation works

In [None]:
pdf = JointProbabilityMatrix(4, 2, compute_joint_from_marginals(marginal_distributions))
pdf_extended = JointProbabilityMatrixExtended(
    4, 2, FullNestedArrayOfProbabilities(compute_joint_from_marginals(marginal_distributions))
)
pdf_extended.adjust_to_old_format()

marginal_extended = pdf_extended.marginalize_distribution([0, 1])
marginal = pdf.marginalize_distribution([0, 1])

pdf_extended.append_determinstic_function(double_xor, num_variables=2, num_states=[2, 2])
pdf_extended.adjust_to_old_format()
marginal_extended = pdf_extended.marginalize_distribution([0, 1])

print(np.all(
        marginal_extended.joint_probabilities.joint_probabilities==marginal.joint_probabilities.joint_probabilities
))

probability_array = np.array(
    [
        [
            [0.1, 0.2, 0.25]
        ],
        [
            [0.2, 0.1, 0.15]
        ]
    ]
)

pdf_extended = JointProbabilityMatrixExtended(
    3, 3, FullNestedArrayOfProbabilities(probability_array)
)
first_dict = ProbabilityDict(pdf_extended.to_dict())
pdf_extended.adjust_to_old_format()
second_dict = ProbabilityDict(pdf_extended.to_dict())

print("first dict")
first_dict.print_dict(True)
print("second dict")
second_dict.print_dict(True)

In [None]:
import itertools

def calculate_mi_profile(joint_distribution, function_labels, testing=False):
    """create a mutual information profile
    
    params:
        joint_distribution: A JointProbabilityMatrix object from the jointpdf package.
        The joint probabilities properties should be of both all the variables and the
        function.
        function_labels: The labels in the joint distribution which the function represents
        
    returns: a 1-d nd-array where every value represents the average normalized mutual information
        between subsets of variables (of the size of the index) and the ouput variable. 
    """

    number_of_arguments = joint_distribution.numvariables-len(function_labels)
    mi_values = np.zeros(number_of_arguments+1)
    for number_of_variables in np.arange(1, number_of_arguments+1, 1):
        combinations = itertools.combinations(range(number_of_arguments), 
                                              number_of_variables)
        
        mi_values[number_of_variables] = np.mean(
            [joint_distribution.mutual_information(list(combination), function_labels) 
             for combination in combinations]
        )
        
        if testing:
            for comb in itertools.combinations(range(number_of_arguments), number_of_variables):
                print("input variables {} function labels {} mi value {}".format(
                    list(comb), 
                    function_labels,
                    joint_distribution.mutual_information(list(comb), function_labels)
                ))
        
    return mi_values
   
def create_mutual_information_profile(pdf, function, dim_output_function, 
                                      num_states_output_function, testing=False):
    """create mutual information profile values
    
    params:
        pdf: An object of the JointProbabilityMatrixExtendedClass
        function: a function object that takes an iterable of the same length
        as the number of variables in the pdf_extended object
        dim_output_function: the dimension (number of variables) the function outputs
        num_states_output_function: a list, where every entry represents the amount of 
        states that variable has. The length should be equal to dim_output_function
        
    returns: An array representing the MI profile values for number of variables 
    starting at zero and going to the total amount of variables in the distribution
    """
     
    
    pdf.append_determinstic_function(double_xor, dim_output_function, num_states_output_function)
    pdf.adjust_to_old_format()
    
    function_indices = list(range(pdf.numvariables-dim_output_function, pdf.numvariables))
    
    if testing:
        print("the function indices are {}".format(function_indices))
    
    return calculate_mi_profile(pdf_extended, function_indices)
    

Do some testing. I found out that xor for i.i.d variables has mutual information with the individual variables.
This was a huge surprise so I tested it with multiple implementation and also using playground.ipnb. 

In [None]:
pdf_extended = JointProbabilityMatrixExtended(
    4, 2, FullNestedArrayOfProbabilities(compute_joint_from_marginals(marginal_distributions))
)

pdf_extended.append_determinstic_function(double_xor, num_variables=2, num_states=[2, 2])
pdf_extended.adjust_to_old_format()
print(calculate_mi_profile(pdf_extended, [4, 5]))

print("now using the create function")
print("")


pdf_extended2 = JointProbabilityMatrixExtended(
    4, 2, FullNestedArrayOfProbabilities(compute_joint_from_marginals(marginal_distributions))
)

print(create_mutual_information_profile(pdf_extended2, double_xor, 2, [2,2]))

In [None]:
pdf_extended3 = JointProbabilityMatrixExtended(
    4, 2, FullNestedArrayOfProbabilities(compute_joint_from_marginals(marginal_distributions[:4]))
)
print(create_mutual_information_profile(pdf_extended3, double_xor, 2, [2,2]))


num_states1 = 2
num_variables1 = 4
probabilities_arr = FullNestedArrayOfProbabilities(
    compute_joint_from_marginals(create_uniform_marginal_distributions(2, 4))
)
pdf_extended4 = JointProbabilityMatrixExtended(num_variables1, num_states1, probabilities_arr)
subset_and_func = subset_and([(0,2), (1,3)])  
print(create_mutual_information_profile(pdf_extended4, subset_and_func, 2, [2,2]))


num_states1 = 2
num_variables1 = 4
probabilities_arr = FullNestedArrayOfProbabilities(
    compute_joint_from_marginals(create_uniform_marginal_distributions(2, 4))
)
pdf_extended5 = JointProbabilityMatrixExtended(num_variables1, num_states1, probabilities_arr)
pdf_extended5.append_determinstic_function(sum, 1, [5])

probabability_dict5 = pdf_extended5.to_dict()
ProbabilityDict(probabability_dict5).print_dict(True)

pdf_extended5.adjust_to_old_format()
probabability_dict5 = ProbabilityDict(pdf_extended5.to_dict())
print("max entropy is: {}".format(probabability_dict5.calculate_entropy([4])))
print(sum([probabability_dict5.calulate_mutual_information([i], [4]) for i in range(4)]))
    


print(pdf_extended5.joint_probabilities.joint_probabilities.shape)

print(calculate_mi_profile(pdf_extended5, [4]))



In [None]:

#so to calculate the mutual information profile of a function
#take the following steps

#create a new pdf holding the joint pdf of the variables
pdf_extended = JointProbabilityMatrixExtended(
    2, 2, FullNestedArrayOfProbabilities(compute_joint_from_marginals(marginal_distributions[:2]))
)

#create a new pdf holding the joint pdf of the variables
pdf_extended = JointProbabilityMatrixExtended(
    4, 2, FullNestedArrayOfProbabilities(compute_joint_from_marginals(marginal_distributions))
)
    
#state the amount of variables the function outputs
#and the number of states of each variable

double_xor_output_dim = 2
double_xor_num_states = [2, 2]

print("for xor the mi profile is {}".format(
    create_mutual_information_profile(pdf_extended, double_xor, double_xor_output_dim, double_xor_num_states)
))

pdf_extended = JointProbabilityMatrixExtended(
    4, 2, FullNestedArrayOfProbabilities(compute_joint_from_marginals(marginal_distributions))
)
pdf_extended.append_determinstic_function(double_xor, 
                                              num_variables=2, 
                                              num_states=[2, 2])

pdf_extended.adjust_to_old_format()
print(pdf_extended.mutual_information([0,1,2,3], [4, 5]))
