In [1]:
import pandas as pd
from collections.abc import Sequence,Iterable, Iterator
import numpy as np

def gen_data():
    
    '''
        I for intelligence taking value i_0, i_1 (low, high)
        D for course difficulty taking value d_0, d_1 (easy, hard)
        G for grade taking value g_1, g_2, g_3 (different grades)
        
        This generates a joint distribution
    
    '''
    prob = [0.126,0.168,0.126,0.009,0.045,0.126,
            0.252,0.0224,0.0056,0.06,0.036,0.024]
    
    i = [ f'i_{t}' for t in range(2)]
    d = [ f'd_{t}' for t in range(2)]
    g = [ f'g_{t}' for t in range(1,4)]

    cats = [[ii,dd,gg] for ii in i for dd in d for gg in g]
    df = pd.DataFrame(cats, columns  =['I','D','G'])
    df['prob'] = prob
    return df

In [2]:
df = gen_data()

In [3]:
df

Unnamed: 0,I,D,G,prob
0,i_0,d_0,g_1,0.126
1,i_0,d_0,g_2,0.168
2,i_0,d_0,g_3,0.126
3,i_0,d_1,g_1,0.009
4,i_0,d_1,g_2,0.045
5,i_0,d_1,g_3,0.126
6,i_1,d_0,g_1,0.252
7,i_1,d_0,g_2,0.0224
8,i_1,d_0,g_3,0.0056
9,i_1,d_1,g_1,0.06


**You got joint distribution from ETL operations**

In [11]:
class DiscreteProb():
    
    def __init__(self, joint, variables = ['I','D','G'], prob_col = 'prob'): 
        self.joint = joint
        self.variables = variables
        self.prob_col = prob_col
        
        
    def _check_variable(self, var):
        
        if isinstance(var,str):
            var = [var]
            
        for v in var:
            if v not in self.variables:
                raise ValueError('variable not found in data')
        
             
    def cond_dist(self, cond_dict):
        self._check_variable(cond_dict.keys())
        
        return self._normalize(self._condition_on(cond_dict))
    
    def marginal(self, on):
        if isinstance(on, str):
            on = [on]
            
        self._check_variable(on)
            
        p = self.joint.groupby(on)[self.prob_col].sum().reset_index()
        return self._normalize(p)
    
    def cpd(self, conditions, variable):
        
        if isinstance(conditions, str):
            conditions = [conditions]
            
        if not isinstance(variable, str):
            raise ValueError('variable should be a string')
            
        self._check_variable(conditions)
        self._check_variable(variable)
        
        tmp = self.joint.pivot_table(index = conditions, 
                                     columns = [variable], 
                                     values = self.prob_col)
        
        return self._row_normalize(tmp)
        
        
    def _condition_on(self, cond_dict):
        
        filters = []
        for var, allow in cond_dict.items():
            if not isinstance(allow, str):
                filters.append(self.joint[var].isin(allow).values)
            else:
                filters.append((self.joint[var] == allow).values)
        criteria = np.array(filters).sum(axis = 0) == len(cond_dict)
        return self.joint[criteria]
    
    def _normalize(self, df):
        
        df = df.copy()
        df[self.prob_col] = df[self.prob_col] / df[self.prob_col].sum()
        return df 
    
    def _row_normalize(self, df):
        return df.apply(lambda x:x/sum(x), axis = 1)
    
    def log(self,df = None):
        df = df if df else self.joint.copy()
        df[self.prob_col] = np.log(df[self.prob_col])
        return df
        

**Some simple test**

In [5]:
discrete = DiscreteProb(df)

In [6]:
discrete.cond_dist({'G':'g_1'})

Unnamed: 0,I,D,G,prob
0,i_0,d_0,g_1,0.281879
3,i_0,d_1,g_1,0.020134
6,i_1,d_0,g_1,0.563758
9,i_1,d_1,g_1,0.134228


In [7]:
discrete.marginal(['I'])

Unnamed: 0,I,prob
0,i_0,0.6
1,i_1,0.4


In [8]:
discrete.marginal(['D'])

Unnamed: 0,D,prob
0,d_0,0.7
1,d_1,0.3


In [9]:
discrete.marginal(['I','D'])

Unnamed: 0,I,D,prob
0,i_0,d_0,0.42
1,i_0,d_1,0.18
2,i_1,d_0,0.28
3,i_1,d_1,0.12


In [10]:
discrete.cpd(['I','D'],'G')

Unnamed: 0_level_0,G,g_1,g_2,g_3
I,D,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
i_0,d_0,0.3,0.4,0.3
i_0,d_1,0.05,0.25,0.7
i_1,d_0,0.9,0.08,0.02
i_1,d_1,0.5,0.3,0.2


**Factor product**

In [15]:
a = [ f'a_{t}' for t in range(1,4)]
b = [ f'b_{t}' for t in range(1,3)]
dfab = pd.DataFrame([[aa,bb] for aa in a for bb in b])
dfab['val'] = [0.5,0.8,0.1,0,0.3,0.9]
dfab.columns = ['A','B','val']

b = [ f'b_{t}' for t in range(1,3)]
c = [ f'c_{t}' for t in range(1,3)]
dfbc = pd.DataFrame([[aa,bb] for aa in b for bb in c])
dfbc['val'] = [0.5,0.7,0.1,0.2]
dfbc.columns = ['B','C','val']
dfab.rename(columns = {'val':'prob'}, inplace = True)
dfbc.rename(columns = {'val':'prob'}, inplace = True)

In [18]:
df1 = dfab

In [19]:
df2 = dfbc

In [None]:
prob_col = 'prob'
df1.rename(columns = {'prob':'prob_left'})

In [None]:
df_merged = dfab.merge(dfbc, on = 'B', how = 'inner')
df_merged['joint'] = df_merged['prob_ab'] * df_merged['prob_bc']
df_merged = df_merged[['A','B','C','prob_ab','prob_bc','joint']]
df_merged