In [1]:
from tqdm import tqdm
import json
import numpy as np
import pandas as pd
import random

class Node:
    def __init__(self, num_vals, id, alpha=1.) -> None:
        """
            num_vals: Int
            id: Int
            alpha: Float (dirichlet's alpha)
        """
        self.id = id
        self.num_vals = num_vals
        self.local_marginal_probs = pd.DataFrame({f"X{self.id}": [i for i in range(num_vals)]})
        self.parents = []
        self.cur_val = -1
        self.alpha = alpha
        self.local_conditional_probs = None
        
    def set_parents(self, parents):
        """
            parents: List[Node]
        """
        # print(f"Node {self.id} have parents: {[pa.id for pa in parents]}")
        self.parents = parents
        return
        
    def get_level(self):
        if len(self.parents):
            return max([pa.get_level() for pa in self.parents]) + 1
        else:
            return 0
    
    def set_condprob(self):
        if len(self.parents):
            self.local_conditional_probs = self.local_marginal_probs.copy()
            pa_comb = 1
            for pa in self.parents:
                pa_comb *= pa.num_vals
                self.local_conditional_probs = self.local_conditional_probs.merge(pa.local_marginal_probs, how="cross")
            
            # Generating conditional probability, constraints: sum (P(X=x|Z=z)) = 1 for all x
            self.local_conditional_probs['prob'] = [1] * len(self.local_conditional_probs)
            self.local_conditional_probs = self.local_conditional_probs.sort_values(by=[f'X{pa.id}' for pa in self.parents])
            for i in range(pa_comb):
                self.local_conditional_probs['prob'].iloc[self.num_vals*i:self.num_vals*(i+1)] = np.random.dirichlet([self.alpha] * self.num_vals).flatten()
        return
    
    def set_margprob(self, dirichlet_alpha=1):
        if not len(self.parents):
            probs = np.random.dirichlet([dirichlet_alpha] * self.num_vals).flatten()
            try:
                old_probs = self.local_marginal_probs['prob']
                while np.sqrt((old_probs - probs).pow(2).sum()) < 0.25:
                    probs = np.random.dirichlet([dirichlet_alpha] * self.num_vals).flatten()
            except:
                pass
                
            self.local_marginal_probs['prob'] = probs.flatten()
        return
    
    def sample(self):
        if len(self.parents):
            conditions = np.array([True for _ in range(len(self.local_conditional_probs))]) # type:ignore
            for pa in self.parents:
                select = (self.local_conditional_probs[f'X{pa.id}'] == pa.cur_val).to_numpy()   # type:ignore
                conditions = np.logical_and(conditions,select)
            cond_df = self.local_conditional_probs[conditions]  # type:ignore
            self.cur_val = np.random.choice(cond_df[f'X{self.id}'], size=1, p=cond_df[f'prob']).item()
        else:
            self.cur_val = np.random.choice(self.local_marginal_probs[f'X{self.id}'], size=1, p=self.local_marginal_probs[f'prob']).item()
        return
        

class DAG:
    def __init__(self, adj_mtx, max_numvals, alpha) -> None:
        self.adj_mtx = adj_mtx
        self.nodes = [Node(np.random.randint(2, max_numvals), i + 1, alpha) for i in range(adj_mtx.shape[0])]
        self.__endogeneous_nodes = []
        self.__render_adjmtx()
        self.__order_nodes()
        self.__init_condprobs()
        self.__init_margprob()
        
    def __render_adjmtx(self):
        for i in range(self.adj_mtx.shape[0]):
            parents = [self.nodes[j] for j in range(self.adj_mtx.shape[0]) if self.adj_mtx[j][i] == 1]
            self.nodes[i].set_parents(parents)  # type:ignore
            if len(parents) == 0:
                self.__endogeneous_nodes.append(self.nodes[i])
        return
    
    def __order_nodes(self):
        self.nodes.sort(key=lambda item: item.get_level())
        return
    
    def __init_condprobs(self):
        for node in self.nodes:
            node.set_condprob()    # type:ignore
        return
    
    def __init_margprob(self):
        for node in self.nodes:
            node.set_margprob(1.0)    # type:ignore
        return
    
    def reinit_endoprob(self, dirichlet_alpha):
        chosen_id = np.random.choice([i for i in range(len(self.__endogeneous_nodes))], size=1).item()
        self.__endogeneous_nodes[chosen_id].set_margprob(dirichlet_alpha)
        return
    
    def disseminate(self, n):
        df = pd.DataFrame(columns=[f'X{node.id}' for node in self.nodes])
        for i in tqdm(range(n), leave=False):
            res = []
            for node in self.nodes:
                # print(f"X{node.id}-->", end="")
                node.sample()
                res.append(node.cur_val)
            # print("|")
            df.loc[len(df),:] = res  # type:ignore
        return df

In [2]:
import os
from pathlib import Path
from utils import is_acyclic

def gen_data(dag, n=10000, savepath="./data", filename="output.csv"):
    df = dag.disseminate(n)
    
    if filename is not None:
        res_path = os.path.join(savepath)
        if not Path(res_path).exists():
            os.makedirs(res_path)
        
        df.to_csv(os.path.join(res_path, filename), index=False)
    
    return df

In [3]:
# from utils import compute_mll
from typing import List

def compute_mll(summary_with_ch: pd.DataFrame, potential_parent: list, num_env):
    if len(potential_parent):
        marginalized_ch = summary_with_ch.groupby(potential_parent)['count'].sum().reset_index()
        output = summary_with_ch.merge(marginalized_ch, on=potential_parent, how='left')            
        output.insert(0, f'probs_{num_env}', output['count_x']/output['count_y'])
        output.insert(0, f'joint_{num_env}', output['count_x']/output['count_x'].sum())
        mll = np.array(output['count_x']).dot(np.log(output[f'probs_{num_env}'])).item()
        output = output.drop(['count_x', 'count_y'], axis=1)
        # return mll, output.rename({'count_y': f'count_y{num_env}', 'count_x': f'count_x{num_env}'}, axis=1)
        return mll, output
    else:
        output = summary_with_ch.copy()
        output.insert(0, f'probs_{num_env}', output['count']/output['count'].sum())
        mll = np.array(output['count']).dot(np.log(output[f'probs_{num_env}'])).item()
        output = output.drop(['count'], axis=1)
        return mll, output


def compute_variance_viasilos(silos: List[pd.DataFrame], variable: str, parents: list, verbose=False):
    conditional_probs_record = silos[0][parents + [variable]].groupby(parents + [variable]).count().reset_index()
    mll_list = []
    env = 0
    for data in silos:
        vertical_sampled_data = data[parents + [variable]]
        vertical_sampled_data.insert(0, 'count', [1] * len(vertical_sampled_data))
        
        summary_with_ch = vertical_sampled_data.groupby(parents + [variable])['count'].sum().reset_index()
        mll, output = compute_mll(summary_with_ch, parents, env)
        conditional_probs_record = conditional_probs_record.merge(output, on=parents + [variable], how='left')
        mll_list.append(mll)
        env += 1
            
    mean_mll = np.mean(mll_list)
    var_avg = conditional_probs_record.iloc[:, len(parents) + 1:].var(axis=1, skipna=True).mean()
    if verbose:
        print(conditional_probs_record)
    return var_avg, mean_mll, conditional_probs_record


def compute_weighted_variance_viasilos(silos: List[pd.DataFrame], variable: str, parents: list, verbose=False):
    variance, _, df = compute_variance_viasilos(silos, variable, parents, verbose=verbose)
    if len(parents):
        joint_mat = np.array([df[f'joint_{i}'] for i in range(len(silos))]).T
        probs_mat = np.array([df[f'probs_{i}'] for i in range(len(silos))]).T
        probs_mean = np.array([np.mean(probs_mat[i][~np.isnan(probs_mat[i])], keepdims=True) for i in range(probs_mat.shape[0])])
        prod = joint_mat * (probs_mat - probs_mean)**2
        return np.mean(prod[~np.isnan(prod)])
    else:
        return variance


def get_condprob(dag, node_id):
    if len(dag.nodes[node_id].parents):
        return dag.nodes[node_id].local_conditional_probs
    else:
        return dag.nodes[node_id].local_marginal_probs
    

def compute_condprob(df: pd.DataFrame, variable: str, conditioned_vars: list):
    vertical_sampled_data = df[conditioned_vars + [variable]]
    vertical_sampled_data.insert(0, 'count', [1] * len(vertical_sampled_data))
    summary_with_ch = vertical_sampled_data.groupby([variable] + conditioned_vars)['count'].sum().reset_index()
    _, output = compute_mll(summary_with_ch, conditioned_vars, 0)
    return output

### Generic graph

In [8]:
import bnlearn as bn

model = bn.import_DAG('../data/munin.bif', verbose=0)

INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:Note: NumExpr detected 64 cores but "NUMEXPR_MAX_THRE

In [9]:
dataname = "munin"
adj_mtx = model['adjmat'].to_numpy() * 1
# data = json.load(open(f"../CausalBKAI/data/TestData/bnlearn_discrete_10000/truth_dag_adj/{dataname}.json", "r"))
# adj_mtx = np.array(data['Adj'])


# adj_mtx = np.array(
#     [[0,1,0],
#      [0,0,1],
#      [0,0,0]]
# )

mi = 3
di = 5

dag = DAG(adj_mtx, max_numvals=mi, alpha=di)
# get_condprob(dag, 2)

In [10]:
# track_endo = get_condprob(dag, 0)[['X1']].groupby(['X1']).sum().reset_index()
n = 10
silos = []
for i in range(n):
    dag.reinit_endoprob(dirichlet_alpha=di)
    # track_endo = track_endo.merge(get_condprob(dag, 0).rename({"prob": f"prob{i}"}, axis=1), how='left', on=['X1'])
    df = gen_data(dag, 5000, savepath=f"../data/distributed/{dataname}/m{mi}_d{di}_n{n}", filename=f"silo-{i}.csv") # f"silo-{i}.csv"
    silos.append(df)

# track_endo

                                                   

In [11]:
np.savetxt(f"../data/distributed/{dataname}/adj.txt", adj_mtx)

In [24]:
get_condprob(dag, 0)

Unnamed: 0,X1,prob
0,0,0.29255
1,1,0.70745


In [11]:
get_condprob(dag, 1)

Unnamed: 0,X2,X1,prob
0,0,0,0.225861
2,1,0,0.774139
1,0,1,0.362888
3,1,1,0.637112


In [12]:
get_condprob(dag, 2)

Unnamed: 0,X3,X2,prob
0,0,0,0.634269
2,1,0,0.365731
1,0,1,0.43945
3,1,1,0.56055


In [27]:
variance = compute_weighted_variance_viasilos(silos, 'X2', ['X1'], verbose=True)
variance

   X1  X2  joint_0   probs_0  joint_1   probs_1  joint_2   probs_2  joint_3  \
0   0   0   0.1263  0.226060   0.0736  0.219048   0.1219  0.224866   0.1690   
1   0   1   0.4324  0.773940   0.2624  0.780952   0.4202  0.775134   0.5912   
2   1   0   0.1653  0.374575   0.2357  0.354970   0.1644  0.359030   0.0875   
3   1   1   0.2760  0.625425   0.4283  0.645030   0.2935  0.640970   0.1523   

    probs_3  joint_4   probs_4  
0  0.222310   0.0673  0.228834  
1  0.777690   0.2268  0.771166  
2  0.364887   0.2574  0.364641  
3  0.635113   0.4485  0.635359  


6.678034804858005e-06

In [28]:
variance = compute_weighted_variance_viasilos(silos, 'X1', ['X2'], verbose=True)
variance

   X2  X1  joint_0   probs_0  joint_1   probs_1  joint_2   probs_2  joint_3  \
0   0   0   0.1263  0.433128   0.0736  0.237957   0.1219  0.425777   0.1690   
1   0   1   0.1653  0.566872   0.2357  0.762043   0.1644  0.574223   0.0875   
2   1   0   0.4324  0.610390   0.2624  0.379904   0.4202  0.588763   0.5912   
3   1   1   0.2760  0.389610   0.4283  0.620096   0.2935  0.411237   0.1523   

    probs_3  joint_4   probs_4  
0  0.658869   0.0673  0.207268  
1  0.341131   0.2574  0.792732  
2  0.795158   0.2268  0.335851  
3  0.204842   0.4485  0.664149  


0.006844981799618394

In [78]:
variance= compute_weighted_variance_viasilos(silos, 'X3', ['X1', 'X2'], verbose=False)
variance

8.748477280948522e-05

### Synthesized graph

In [1]:
import networkx as nx
import numpy as np
import random

In [2]:
num_node = 20
p = 0.2

In [3]:
graph = nx.erdos_renyi_graph(n=num_node, p=p, seed=0, directed=True)

In [4]:
try:
    while True:
        cycle_list = nx.find_cycle(graph, orientation='original')
        if len(cycle_list):
            random.shuffle(cycle_list)
            a, b, type = cycle_list[0]
            graph.remove_edge(a, b)
        else:
            break
except:
    pass

In [5]:
adj_mtx = np.zeros([num_node, num_node])
for edge in graph.edges:
    a, b = edge
    adj_mtx[a][b] = 1
    
outdegrees = np.sum(adj_mtx, axis=0, keepdims=True)
indegrees = np.sum(adj_mtx, axis=1, keepdims=True)

print(np.max(outdegrees), np.max(indegrees))

if outdegrees.max() < indegrees.max():
    adj_mtx = adj_mtx.T

7.0 6.0


In [8]:
mi = 3
di = 1
dag = DAG(adj_mtx, max_numvals=mi, alpha=di)

In [9]:
n = 10
# silos = []
for i in range(n):
    dag.reinit_endoprob(dirichlet_alpha=1)
    df = gen_data(dag, 5000, savepath=f"./data/distributed/erdos_renyi/d{num_node}_p{p}/m{mi}_d{di}_n{n}", filename=f"silo-{i}.csv") # f"silo-{i}.csv"
    # silos.append(df)

                                                    

FileNotFoundError: [Errno 2] No such file or directory: './data/distributed/erdos_renyi/d{num_node}_p{p}/adj.txt'

In [None]:
with open(f"./data/distributed/erdos_renyi/d{num_node}_p{p}/adj.txt", "w") as f:
    np.savetxt(f, adj_mtx)

### Conditional-probability preserved data partitioning

In [4]:
dataname = "asia"
data = json.load(open(f"../CausalBKAI/data/TestData/bnlearn_discrete_10000/truth_dag_adj/{dataname}.json", "r"))
adj_mtx = np.array(data['Adj'])

# adj_mtx = np.array(
#     [[0,1,1],
#      [0,0,1],
#      [0,0,0]]
# )

mi = 3
di = 3

dag = DAG(adj_mtx, max_numvals=mi, alpha=di)

In [5]:
n = 10
df = gen_data(dag, 50000, savepath=None, filename=None) # type:ignore

                                                      

In [6]:
def marginal_prob(data, variables: list):
    res = data[[*variables, 'count']].groupby(variables).sum().reset_index()
    res['prob'] = res['count']/res['count'].sum()
    return res

In [7]:
subdata = df.copy()
subdata['count'] = [1] * len(subdata)

In [8]:
marg_dis = marginal_prob(subdata, ['X1'])
marg_dis

Unnamed: 0,X1,count,prob
0,0,6847,0.13694
1,1,43153,0.86306


In [9]:
get_condprob(dag, 0)

Unnamed: 0,X1,prob
0,0,0.138696
1,1,0.861304


In [10]:
marg_dict = {}
for var in df.columns:
    marg_dict[var] = marginal_prob(subdata, [var])['prob'].to_numpy().flatten()

if len(marg_dict):
    rand_var = np.random.choice(list(marg_dict.keys()))
    

In [11]:
marg_dict

{'X1': array([0.13694, 0.86306]),
 'X3': array([0.58324, 0.41676]),
 'X2': array([0.34624, 0.65376]),
 'X4': array([0.74182, 0.25818]),
 'X5': array([0.553, 0.447]),
 'X6': array([0.7292, 0.2708]),
 'X7': array([0.7168, 0.2832]),
 'X8': array([0.37084, 0.62916])}

In [43]:
def sample_by_variable(data: pd.DataFrame, variable: str, sample_dis: dict):
    """
    This function create a new data frame from the input data frame
    By sampling single variable following the input sample distribution
    and return the marginal distribution of all variables in the new data frame
    
    Arguments:
        variable:   str
        sample_dis: dict {'value': prob}
    
    Return:
        new_data: pd.DataFrame
        marg_dict: dictionary {'variable': marginal-distribution}
    """
    coc = data[variable].to_numpy()             # Column of Concern (variable column)
    vals = [val for val in sample_dis.keys()]
    counts = np.array([np.sum(coc == val) for val in sample_dis.keys()])
    probs = np.array([p for p in sample_dis.values()])
    num_selects = np.floor(min(counts/probs) * probs)
    
    all_index = []
    for val, num_select in zip(vals, num_selects):
        all_index += list(np.random.choice(list(np.where(coc==val)[0]), size=int(num_select), replace=False))
    
    return data.iloc[all_index].reset_index()

def marginal_probs(df: pd.DataFrame):
    vars = df.columns
    df['count'] = [1] * len(df)
    res = {}
    for var in vars:
        res_var = df[[var, 'count']].groupby(var).sum().reset_index()
        res_var['prob'] = res_var['count']/res_var['count'].sum()
        res[var] = res_var['prob'].to_numpy()
    return res
    

from causallearn.utils.cit import CIT

def find_basis(data: pd.DataFrame, ordering: list, confidence=0.01):
    """
    This function finds propagation basis of a graph
    Definition: A propagation basis of a graph is a set of variables
    that if we change their marginal distribution, every other variables 
    will have their marginal distribution changed accordingly
    
    Arguments:
        data:       pd.Dataframe 
        ordering:   list[variables] that is already in some specific orders
    
    Procedure:
        1. Select the first variable (X) in the ordering list
        2. Use the Chi-square test to identify variables that are dependent on X
        3. Remove these variables from the ordering list and record X
        4. Repeat from 1 until the ordering list is empty
    
    Return:
        Basis: list[variables]
    """
    original_order = data.columns.to_list()
    basis = []
    while len(ordering):
        Xamine_var = ordering.pop(0)            
        chisq_obj = CIT(data, "chisq")
        remove_list = []
        for remain_var in ordering:
            pval = chisq_obj(original_order.index(Xamine_var), original_order.index(remain_var), []) # type:ignore
            if pval <= confidence: # type:ignore
                print(f"{Xamine_var} dependent with {remain_var} with p_val={pval}")
                remove_list.append(remain_var)
            else:
                print(f"{Xamine_var} independent with {remain_var} with p_val={pval}")
        
        ordering = list(set(ordering) - set(remove_list))
        basis.append(Xamine_var)
        
    return basis

In [46]:
import numpy as np
import pandas as pd
from utils import conditional_mutual_information, conditional_entropy, compute_mll, compute_weighted_variance_viasilos
from pathlib import Path
import os
import json
import random

dataname = "asia"
mi = 3      # The number of values a variable can take is ranged in [2, mi-1]
di = 1      # The dirichlet alpha that controls the data distribution
n = 10      # The number of data silos

silos = []

# folderpath = f"./data/distributed/erdos_renyi/d20_p0.2/m3_d1_n10"
# groundtruth = np.loadtxt(f"./data/distributed/erdos_renyi/d20_p0.2/adj.txt")

folderpath = f"./data/distributed/{dataname}/m{mi}_d{di}_n{n}"
groundtruth = np.loadtxt(f"./data/distributed/{dataname}/adj.txt")

if not Path(folderpath).exists():
    print("Folder", folderpath, "not exist!")
else:
    for file in sorted(os.listdir(folderpath)):
        filename = os.path.join(folderpath, file)
        silo_data = pd.read_csv(filename)
        silos.append(silo_data)
        print("Loaded file:", filename, end="\t")
        all_vars = silos[0].columns
        print(len(silo_data), " Instances\t", len(all_vars), "Variables")
        
merged_df = pd.concat(silos, axis=0)

Loaded file: ./data/distributed/asia/m3_d1_n10/silo-0.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-1.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-2.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-3.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-4.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-5.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-6.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-7.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-8.csv	5000  Instances	 8 Variables
Loaded file: ./data/distributed/asia/m3_d1_n10/silo-9.csv	5000  Instances	 8 Variables


In [48]:
find_basis(merged_df, ['X6', 'X1', 'X2', 'X3', 'X4', 'X5', 'X7', 'X8'], confidence=0.01)

X6 dependent with X1 with p_val=0.0
X6 dependent with X2 with p_val=0.0
X6 dependent with X3 with p_val=4.3339925795881283e-19
X6 dependent with X4 with p_val=4.310987466659948e-286
X6 dependent with X5 with p_val=0.0036992473188418324
X6 dependent with X7 with p_val=0.0
X6 dependent with X8 with p_val=0.0


['X6']

In [22]:
output = sample_by_variable(df, 'X1', {0:0.7, 1:0.3})
print(len(output))

9781


In [23]:
np.unique(output['X1'], return_counts=True)

(array([0, 1], dtype=object), array([6847, 2934]))

In [24]:
get_condprob(dag, 0)

Unnamed: 0,X1,prob
0,0,0.138696
1,1,0.861304


In [25]:
get_condprob(dag, 2)

Unnamed: 0,X2,X1,prob
0,0,0,0.50819
2,1,0,0.49181
1,0,1,0.32332
3,1,1,0.67668


In [26]:
compute_condprob(output, 'X2', ['X1'])

Unnamed: 0,joint_0,probs_0,X2,X1
0,0.344955,0.492771,0,0
1,0.096411,0.321404,0,1
2,0.355076,0.507229,1,0
3,0.203558,0.678596,1,1
