## Structural Equation Test to the Bell DAG:

given Bell scenario correlations
$P(A,B,X,Y) =  \sum_\lambda P(A| X, \lambda) P(B|Y, \lambda) P(X)P(Y)P(\lambda) \qquad \forall \lambda \in $ [0,card_l] our goal is to find the smallest integer $\lambda$

## File structure
1) Generating compatible distributions with deterministic strategies
2) Checking compatibility of given distribution via deterministic strategies
3) Checking compatibility of given distribution via structural equation
4) Finding minimal cardinality of hidden classical variable $\lambda$

In [1]:
import gurobipy as gp
from gurobipy import GRB, norm
import itertools
import numpy as np
import sympy as sp

## (1) Generating compatible distributions with deterministic strategies:

In [2]:
card_A, card_B, card_X, card_Y = 2, 2, 2, 2

In [3]:
def sample_bell_dist_det():

    ## Bin(INT), Bin(INT) -> Bin(INT)
    def identity(x):
        return x
    def swap(x):
        return 1-x
    def to0(_):
        return 0
    def to1(_):
        return 1

    basic_strategies = [identity, swap, to0, to1]
    all_strategies = list(itertools.product(basic_strategies, repeat=2))
    def generate_top(u):
        f_u, g_u = all_strategies[u]

        # value 1 at point(a,b) = (f_u(x),g_u(y)), zero everywhere else
        def middle_entry(x,y):
            # value 1 at (f_u(x),g_u(y)) zero everywhere else
            middle = np.zeros((2,2), dtype=int)
            middle[f_u(x), g_u(y)] = 1
            return middle

        top_level = np.array([
                    [middle_entry(0,0), middle_entry(0,1)], 
                    [middle_entry(1,0), middle_entry(1,1)]
                    ]) 

        return top_level

    # multiplying the summed weights by a random number between 0 and 1
    arr = sum([np.random.rand()*generate_top(i) for i in range(16)])
    for x,y in np.ndindex(2,2):
        sum_values = np.sum(arr[x, y])
        arr[x, y] /= sum_values # normalization
    # print(np.sum(arr))

    # have in appropriate dictionary form
    P_AB_giv_XY_dict = {}
    for a,b,x,y in np.ndindex(2,2,2,2):
        P_AB_giv_XY_dict[(a,b,x,y)] = arr[x][y][a][b]
    # print(P_AB_giv_XY_dict)
    # arr
    return P_AB_giv_XY_dict

In [4]:
# creating a random distribution for P_l with given cardinality ######
def hidden_dist(cardL):
    rand_numbs = np.random.random(cardL)
    normd_numbs = rand_numbs / np.sum(rand_numbs)
    scaled_numbs = np.round(normd_numbs * 10**3) / 10**3
    correction = 1 - np.sum(scaled_numbs)
    largest_index = np.argmax(scaled_numbs)
    scaled_numbs[largest_index] += correction
    return scaled_numbs

def sample_bell_dist_cond_prob():
    cardL = 7
    distribution_sampled = []
    while len(distribution_sampled)<1:
        P_l = hidden_dist(cardL)
        P_A_giv_Xl = np.empty((card_A, card_X, cardL))
        P_B_giv_Yl = np.empty((card_B, card_Y, cardL))
        for x, l in np.ndindex((card_X, cardL)):
            P_A_giv_Xl[:,x,l] = hidden_dist(card_A)
        for y, l in np.ndindex((card_Y, cardL)):
            P_B_giv_Yl[:,y,l] = hidden_dist(card_B)
        P_A_do_X = (P_A_giv_Xl * P_l.reshape((1,1,cardL))).sum(axis=2)
        P_B_do_Y = (P_B_giv_Yl * P_l.reshape((1,1,cardL))).sum(axis=2)
        if not np.all(np.abs(P_A_do_X[:,0]-P_A_do_X[:,1]) > 0.2):
            continue
        if not np.all(np.abs(P_B_do_Y[:,0]-P_B_do_Y[:,1]) > 0.2):
            continue
        P_ABL_giv_XY = P_l.reshape((1,1,1,1,cardL)) * P_A_giv_Xl.reshape((card_A,1,card_X,1,cardL))  * P_B_giv_Yl.reshape((1,card_B,1,card_Y,cardL)) 
        P_AB_giv_XY = P_ABL_giv_XY.sum(axis=4)
        distribution_sampled.append(P_AB_giv_XY)
        # for x, y in np.ndindex((card_X, card_Y)):
        #     print(P_AB_giv_XY[:,:,x,y])

    # in dictionary format
    print(P_AB_giv_XY[a,b,x,y][0])
    # P_AB_giv_XY_dict = {(a,b,x,y): P_AB_giv_XY[a,b,x,y][0] for a,b,x,y in itertools.product(range(2), repeat=4)}
    # return P_AB_giv_XY_dict

In [5]:
# creating a random distribution for P_l with given cardinality ######
def hidden_dist(cardL):
    rand_numbs = np.random.random(cardL)
    normd_numbs = rand_numbs / np.sum(rand_numbs)
    scaled_numbs = np.round(normd_numbs * 10**3) / 10**3
    correction = 1 - np.sum(scaled_numbs)
    largest_index = np.argmax(scaled_numbs)
    scaled_numbs[largest_index] += correction
    return scaled_numbs

def sample_bell_dist_cond_prob():
    cardL = 7
    distribution_sampled = []
    while len(distribution_sampled)<1:
        P_l = hidden_dist(cardL)
        P_A_giv_Xl = np.empty((card_A, card_X, cardL))
        P_B_giv_Yl = np.empty((card_B, card_Y, cardL))
        for x, l in np.ndindex((card_X, cardL)):
            P_A_giv_Xl[:,x,l] = hidden_dist(card_A)
        for y, l in np.ndindex((card_Y, cardL)):
            P_B_giv_Yl[:,y,l] = hidden_dist(card_B)
        P_A_do_X = (P_A_giv_Xl * P_l.reshape((1,1,cardL))).sum(axis=2)
        P_B_do_Y = (P_B_giv_Yl * P_l.reshape((1,1,cardL))).sum(axis=2)
        if not np.all(np.abs(P_A_do_X[:,0]-P_A_do_X[:,1]) > 0.2):
            continue
        if not np.all(np.abs(P_B_do_Y[:,0]-P_B_do_Y[:,1]) > 0.2):
            continue
        P_ABL_giv_XY = P_l.reshape((1,1,1,1,cardL)) * P_A_giv_Xl.reshape((card_A,1,card_X,1,cardL))  * P_B_giv_Yl.reshape((1,card_B,1,card_Y,cardL)) 
        P_AB_giv_XY = P_ABL_giv_XY.sum(axis=4)
        distribution_sampled.append(P_AB_giv_XY)
        # for x, y in np.ndindex((card_X, card_Y)):
        #     print(P_AB_giv_XY[:,:,x,y])

    # in dictionary format
    P_AB_giv_XY_dict = {(a,b,x,y): P_AB_giv_XY[a,b,x,y] for a,b,x,y in itertools.product(range(2), repeat=4)}
    return P_AB_giv_XY_dict

In [6]:
# P_AB_giv_XY_dict = sample_bell_dist_det()
P_AB_giv_XY_dict = sample_bell_dist_cond_prob()
P_AB_giv_XY_dict


# # make sure probability as expected for Bell 
# print(P_AB_giv_XY.sum() == 4.0, sum(P_AB_giv_XY_dict.values()) == 4)
# print(abs(P_AB_giv_XY.sum() - 4.0)< 1e-06, abs(sum(P_AB_giv_XY_dict.values())-4.0) < 1e-06)

{(0, 0, 0, 0): 0.40872569099999995,
 (0, 0, 0, 1): 0.30211312099999993,
 (0, 0, 1, 0): 0.220952887,
 (0, 0, 1, 1): 0.185551343,
 (0, 1, 0, 0): 0.20596030899999998,
 (0, 1, 0, 1): 0.3125728789999999,
 (0, 1, 1, 0): 0.16463311300000003,
 (0, 1, 1, 1): 0.20003465699999998,
 (1, 0, 0, 0): 0.24332230900000004,
 (1, 0, 0, 1): 0.144698879,
 (1, 0, 1, 0): 0.431095113,
 (1, 0, 1, 1): 0.26126065699999995,
 (1, 1, 0, 0): 0.14199169100000003,
 (1, 1, 0, 1): 0.240615121,
 (1, 1, 1, 0): 0.18331888699999996,
 (1, 1, 1, 1): 0.3531533429999999}

tools to manipulate from P_ABXY -> P_AB|XY or from joint prob to conditional

In [7]:
# change to array form
P_AB_giv_XY_arr = np.zeros((2,2,2,2))
for a,b,x,y in np.ndindex(2,2,2,2):
    # P_AB_giv_XY_arr[x][y][a][b] = P_AB_giv_XY_dict[(a,b,x,y)]
    P_AB_giv_XY_arr[a,b,x,y] = P_AB_giv_XY_dict[(a,b,x,y)]

# normalization check
# sum(P_AB_giv_XY_dict.values()) == 4.0

# # get P_AB|XY from P_ABXY
# def P_AB_giv_XY(A,B, X, Y):
#     P_ABXY = sum([P_ABXY_dict[(a,b,x,y)] for a,b,x,y in P_ABXY_dict if a==A and b==B and x==X and y==Y])
#     P_AB = sum([P_ABXY_dict[(a,b,x,y)] for a,b,x,y in P_ABXY_dict if a==A and b==B])
#     return P_ABXY/P_AB

## (2) Checking compatibility via deterministic strategies:

In [8]:
"""
num_variables = len(all_strategies)  # num of W variables
A = np.zeros((num_variables, 16))

for i in range(num_variables):
    matrix = generate_top(i)
    flat_matrix = matrix.reshape(-1)
    # Position of 1s in the flattened matrix represents the coefficient for W[i] in each column of A
    A[i, :] = flat_matrix


# A*W[i] == internal_gurobi_expr
Ws = sp.Matrix(16, 1, lambda i, _: sp.symbols(f'W_{i}'))

# internal_gurobi_expr_sp = A * Ws
A * Ws
"""

"\nnum_variables = len(all_strategies)  # num of W variables\nA = np.zeros((num_variables, 16))\n\nfor i in range(num_variables):\n    matrix = generate_top(i)\n    flat_matrix = matrix.reshape(-1)\n    # Position of 1s in the flattened matrix represents the coefficient for W[i] in each column of A\n    A[i, :] = flat_matrix\n\n\n# A*W[i] == internal_gurobi_expr\nWs = sp.Matrix(16, 1, lambda i, _: sp.symbols(f'W_{i}'))\n\n# internal_gurobi_expr_sp = A * Ws\nA * Ws\n"

In [9]:
# Define the four functions
## Bin(INT) -> Bin(INT)
def identity(x):
    return x
def swap(x):
    return 1-x
def to0(_):
    return 0
def to1(_):
    return 1

basic_strategies = [identity, swap, to0, to1]
all_strategies = list(itertools.product(basic_strategies, repeat=2))

def generate_top(u):
    f_u, g_u = all_strategies[u]
    # value 1 at point(a,b) = (f_u(x),g_u(y)), zero everywhere else
    def middle_entry(x,y):
        # value 1 at (f_u(x),g_u(y)) zero everywhere else
        middle = np.zeros((2,2), dtype=int)
        middle[f_u(x), g_u(y)] = 1
        return middle
    top_level = np.array([
                [middle_entry(0,0), middle_entry(0,1)], 
                [middle_entry(1,0), middle_entry(1,1)]
                ]) 
    return top_level



# declaring Gurobi variable W_i w/ i in {0,1,...,15}, and declaring v
m1 = gp.Model("m1")
m1.setParam('OutputFlag', 0)
W = m1.addVars(16, lb=0, ub=1, vtype=GRB.CONTINUOUS, name="W")
m1.update()
global is_compatible

internal_gurobi_expr = sum([W[i]*generate_top(i) for i in range(16)])
m1.addConstr(W.sum() == 1, "sum(W_i) == 1")
for x,y, a,b in itertools.product(range(2), repeat=4):
    m1.addConstr(internal_gurobi_expr[x,y,a,b] == P_AB_giv_XY_dict[a,b,x,y], f"P({a},{b}|{x},{y}) == {internal_gurobi_expr[x,y,a,b]} == {P_AB_giv_XY_dict[(a,b,x,y)]}")
m1.update()

# m1.setParam('FeasibilityTol', 1e-4)
# m1.setParam('OptimalityTol', 0.01)
m1.optimize()

if m1.status == 2: # GRB.OPTIMAL
    is_compatible = True
else: 
    is_compatible = False

# assuming the distribution has weights 0 < w_i < 1
print("Compatibility:", m1.status, is_compatible)
# if not is_compatible:
#     raise SystemExit("DAG not compatible with given dist")

Set parameter Username
Academic license - for non-commercial use only - expires 2025-01-09
Compatibility: 2 True


## (3) Checking compatibility via structural equation

In [16]:
def asses_compat_with_card(P_AB_giv_XY_numeric: np.ndarray, card_l: int, verbose=0) -> bool:
    m = gp.Model("m")
    m.reset()
    m.setParam('OutputFlag', verbose)
    m.params.NonConvex = 2 # Using quadratic equality constraints.


    # variables
    P_l = m.addMVar(card_l, vtype=GRB.CONTINUOUS, lb=0, ub=1, name="P_l")
    P_A_giv_X_l = m.addMVar(shape = (card_A, card_X, card_l), vtype=GRB.CONTINUOUS, name="P(A|X,l)", lb=0, ub=1)
    P_B_giv_Y_l = m.addMVar(shape = (card_B, card_Y, card_l), vtype=GRB.CONTINUOUS, name="P(B|Y,l)", lb=0, ub=1)
    P_BL_giv_Y = m.addMVar(shape = (card_B, card_Y, card_l), vtype=GRB.CONTINUOUS, name="P(B,l|Y)", lb=0, ub=1)
    m.update()

    for b,y,l in np.ndindex(card_B, card_Y, card_l):
        m.addConstr(P_BL_giv_Y[b,y,l] == P_B_giv_Y_l[b,y,l] * P_l[l], f"prod[{b},{y},{l}] = P(B|Y,l) * P(l)")

    # structural eqn
    for a,b,x,y in np.ndindex(card_A, card_B, card_X, card_Y):
        m.addConstr(P_AB_giv_XY_numeric[a,b,x,y] == sum([P_A_giv_X_l[a,x,l] * P_BL_giv_Y[b,y,l]*P_l[l] for l in range(card_l)]), f"eqn[{a},{b},{x},{y}]")
    m.update()


    m.addConstr(sum([P_l[l] for l in range(card_l)]) == 1, "sum_P_l = 1")

    for x, l in np.ndindex(card_X, card_l):
        m.addConstr(sum([P_A_giv_X_l[a,x,l] for a in range(card_A)]) == 1, f'sum P(a|{x,l}) = 1')
    for y, l in np.ndindex(card_Y, card_l):
        m.addConstr(sum([P_B_giv_Y_l[b,y,l] for b in range(card_B)]) == 1, f'sum P(b|{y,l}) = 1')

    # relaxing the equality constraint to a range
    # m.setParam('FeasibilityTol', 1e-3)
    # m.setParam('OptimalityTol', 1e-3)
    # making the algorithm less sensitive to numerical issues
    # m.setParam('ObjScale', -0.5)
    # m.setParam('NumericFocus', 3)
    # m.setParam('Presolve', 0)

    m.optimize()
    
    if m.status == 2: # GRB.OPTIMAL
        return True
    else: 
        m.computeIIS()
        # m.write("model.ilp")
        return False


In [11]:
import numpy as np
import gurobipy as gp
from gurobipy import GRB, norm

class BellCompatible:
    def __init__(self, card_A, card_B, card_X, card_Y, card_l, verbose=0): # settings and hidden common cause cardinality
        self.card_A = card_A
        self.card_B = card_B
        self.card_X = card_X
        self.card_Y = card_Y
        self.card_l = card_l
        self.verbose = verbose
        self.model = None

    def initialize_model(self):
        self.model = gp.Model("BellCompat")
        self.model.reset()
        self.model.setParam('OutputFlag', self.verbose)
        self.model.params.NonConvex = 2 

        # variables
        self.P_l = self.model.addMVar(self.card_l, vtype=GRB.CONTINUOUS, lb=0, ub=1, name="P_l")
        self.P_A_giv_X_l = self.model.addMVar(shape=(self.card_A, self.card_X, self.card_l), vtype=GRB.CONTINUOUS, name="P(A|X,l)", lb=0, ub=1)
        self.P_B_giv_Y_l = self.model.addMVar(shape=(self.card_B, self.card_Y, self.card_l), vtype=GRB.CONTINUOUS, name="P(B|Y,l)", lb=0, ub=1)
        self.P_BL_giv_Y = self.model.addMVar(shape=(self.card_B, self.card_Y, self.card_l), vtype=GRB.CONTINUOUS, name="P(B,l|Y)", lb=0, ub=1)
        self.model.update()

        # Adding constraints
        for b, y, l in np.ndindex(self.card_B, self.card_Y, self.card_l):
            self.model.addConstr(self.P_BL_giv_Y[b, y, l] == self.P_B_giv_Y_l[b, y, l] * self.P_l[l], f"prod[{b},{y},{l}] = P(B|Y,l) * P(l)")

        for x, l in np.ndindex(self.card_X, self.card_l):
            self.model.addConstr(sum([self.P_A_giv_X_l[a, x, l] for a in range(self.card_A)]) == 1, f'sum P(a|{x,l}) = 1')
        for y, l in np.ndindex(self.card_Y, self.card_l):
            self.model.addConstr(sum([self.P_B_giv_Y_l[b, y, l] for b in range(self.card_B)]) == 1, f'sum P(b|{y,l}) = 1')

        self.model.addConstr(sum([self.P_l[l] for l in range(self.card_l)]) == 1, "sum_P_l = 1")

    def is_compatible(self, P_AB_giv_XY_numeric):
        self.initialize_model()

        # structural equation
        for a, b, x, y in np.ndindex(self.card_A, self.card_B, self.card_X, self.card_Y):
            self.model.addConstr(P_AB_giv_XY_numeric[a, b, x, y] == sum([self.P_A_giv_X_l[a, x, l] * self.P_BL_giv_Y[b, y, l] for l in range(self.card_l)]), f"eqn[{a},{b},{x},{y}]")

        self.model.update()
        self.model.optimize()

        if self.model.status == 2: # GRB.OPTIMAL
            return True
        else:
            self.model.computeIIS()
            return False

## (4) Finding minimal cardinality

In [12]:
# m = BellCompatible(2,2,2,2, card_l = 3, verbose=1)
# model_feasibility = m.is_compatible(P_AB_giv_XY_dict)
# print("Is this compatible w/ Bell DAG?", model_feasibility)

In [13]:
max_card_try = 7 # maximum cardinality to try
for card_l in range(max_card_try+1):
    m = BellCompatible(2,2,2,2, card_l, verbose=1)
    model_feasibility = m.is_compatible(P_AB_giv_XY_arr)
    if model_feasibility == False:
        continue
    elif model_feasibility == True:
        print("Bell DAG compatible with card_l:", card_l)
        break

# For the dstribution:
# P_AB_giv_XY_dict

Discarded solution information
Set parameter NonConvex to value 2


TypeError: unsupported operand type(s) for -: 'bool' and 'NoneType'

In [None]:
# P_AB_giv_XY_dict[0,0,1,1]  == P_AB_giv_XY_arr[0,0,1,1]

m = BellCompatible(2,2,2,2, card_l, verbose=1)


In [14]:
m.is_compatible(P_AB_giv_XY_arr)

Discarded solution information
Set parameter NonConvex to value 2


TypeError: unsupported operand type(s) for -: 'bool' and 'NoneType'