In [4]:
#Used to find subsets of lists
import itertools

In [121]:
##Helper functions

#Updates endogenous variables (V) given a certain state X,W
#i.e. sets to True/False given X or W
def V_given_set(V,X_or_W,indices):
    updated_V = V.copy()
    var_seen = 0
    for i in range(len(V)):
        if i in indices:
            updated_V[i] = X_or_W[var_seen]
            var_seen += 1
    return updated_V
    
#Creates a list of all subsets (cardinality >= 1) of a given lst
#Used https://bit.ly/2YVGYxt
def subsets_finder(indices):
    #indices corresp. to elem of some lst opposed to using indices corresp to V.
    updated_idx = [i for i in range(len(indices))]
    if(len(indices)<=1):
        return [updated_idx]
    else:
        return [list(subset) for i in range(0, len(updated_idx))
                  for subset in itertools.combinations(updated_idx, i)]

#returns list of all possible splits(take indices finds all )
#Used: https://bit.ly/2KfTeFk
def partitions(indices):
    subsets = [v for a in range(len(indices)) for v in itertools.combinations(indices, a)]
    comb = []
    for i in range(len(subsets)//2 + 1):
        comb.append((list(itertools.chain(subsets[i])), [e for e in indices if e not in subsets[i]]))
    return comb
    
#Given certain indices, updates lst[i] = not lst[i] for i in indices
def update_var(lst,indices):
    return_lst = lst.copy()
    for i in range(len(lst)):
        if(i in indices):
            return_lst[i] = not lst[i]
    return return_lst

#Extracts given elements from X using t
def extract_X(X,t):
    return_lst = []
    for i in t:
        return_lst.append(X[i])
    return return_lst

#Created a class of original H.P defn of Causality
class Causal_Model:
    #U,V,F are assumed to be lists for simplicity sake
    #Assumed that final var = outcome (e.g. last elem of V = Forest Fire)
    #U,V are boolean values corresp. to some list of var
    #F is a list of pointers to functions (e.g. def foo())
    def __init__(self, U, V, R, F):
        self.exogenous_var = U
        self.endogenous_var = V
        self.var_range = R
        self.function = F
        self.signature = (U,V,R)
        self.model = (self.signature,F)
    
    
    
    # For the following, I assume that this can be used to check causality if an outcome did not occur(?)
    
    #AC1 of defn, checks if outcome = function for given X
    #outcome_val refers to desired outcome
    def ac1_check(self,X,X_indices,outcome_function,outcome_val):
        V = V_given_set(self.endogenous_var,X,X_indices)
        outcome_given_X = outcome_function(self.exogenous_var,V)
        return outcome_given_X == outcome_val

    #AC2(a) checks the but-for clause, i.e changing X would lead to opposite outcome
    #2005 paper says W->w' but modified defn. paper says W->w, func is using the latter 
    #This function finds the correct W, and calls ac2_b
    def ac2_a_check(self,Z,Z_indices,X,X_indices,W,W_indices,outcome_function,outcome_val):
        V = V_given_set(self.endogenous_var,Z,Z_indices)
        V = V_given_set(V,W,W_indices)
        x_prime = [not i for i in X]
        V_given_x_prime = V_given_set(V,x_prime,X_indices)
        outcome = outcome_function(self.exogenous_var,V_given_x_prime)
        #Checks if initial W is optimal and returns
        if outcome_val != outcome:
            return True
        else:
            return False
    
    
    #Checks AC2(b) of the defn
    #Checks that outcome holds for all other W, fixing Z
    def ac2_b_check(self,Z,Z_indices,X,X_indices,W,W_indices,outcome_function, outcome_val):
        V_fixed_Z = V_given_set(self.endogenous_var,Z,Z_indices)
        subsets_of_W = subsets_finder(W_indices)
        updated_W = W
        for indices in subsets_of_W:
            updated_W = update_var(W,indices)
            curr_V = V_given_set(V_fixed_Z,updated_W,W_indices)
            outcome = outcome_function(self.exogenous_var,curr_V)
            if outcome_val != outcome:
                return False
        return True
    
    
    def ac2_check_given_Z_W(self,Z,Z_indices,X,X_indices,W,W_indices,outcome_function,outcome_val):
        ac2_a = self.ac2_a_check(Z,Z_indices,X,X_indices,W,W_indices,outcome_function,outcome_val)
        ac2_b = self.ac2_b_check(Z,Z_indices,X,X_indices,W,W_indices,outcome_function,outcome_val)
        if(ac2_a and ac2_b):
            return True
        else:
            return False
        
    
    def Z_and_W_search(self,X,X_indices,outcome_function,outcome_val):
        Z = X.copy()
        Z_indices = X_indices.copy()
        Useable_V_indices = [i for i in range(len(self.endogenous_var)) if i not in X_indices]
        splits = partitions(Useable_V_indices)
        for partition in splits:
            curr_Z_indices_no_X = partition[0]
            curr_W_indices = partition[1]
            curr_Z_indices = Z_indices + curr_Z_indices_no_X
            curr_Z = [self.endogenous_var[i] for i in curr_Z_indices]
            curr_W = [self.endogenous_var[i] for i in curr_W_indices]
            ac2_check = self.ac2_check_given_Z_W(curr_Z,curr_Z_indices,X,X_indices,
                                                 curr_W,curr_W_indices,outcome_function,outcome_val)
            if(ac2_check):
                return True
            else:
                curr_Z_no_X = [self.endogenous_var[i] for i in curr_Z_indices_no_X]
                subsets_of_curr_Z_no_X = subsets_finder(curr_Z_indices_no_X)
                subsets_of_curr_W = subsets_finder(curr_W_indices)
                for sub_z in subsets_of_curr_Z_no_X:
                    updated_Z = Z + update_var(curr_Z_no_X,sub_z)
                    for sub_w in subsets_of_curr_W:
                        updated_W = update_var(curr_W,sub_w)
                        ac2_check = self.ac2_check_given_Z_W(updated_Z,curr_Z_indices,X,X_indices,updated_W,
                                                             curr_W_indices,outcome_function,outcome_val)
                        if(ac2_check):
                            return True
        return False
                    
    
    
    def ac2_check(self,X,X_indices,outcome_function,outcome_val):
        return self.Z_and_W_search(X,X_indices,outcome_function,outcome_val)
    
    
    
    #Checks that X is minimal by iterating over all subsets
    def ac3_check(self,X,X_indices,outcome_function,outcome_val):
        if(len(X) <= 1):
            return True
        
        subsets_of_X = subsets_finder(X_indices)
        for i in subsets_of_X:
            updated_X = extract_X(X,i)
            W_indices = [j for j in range(len(self.endogenous_var)) if j not in i]
            W = [self.endogenous_var[k] for k in W_indices]
            ac2_check = self.ac2_a_check(updated_X,i,updated_X,i,W,W_indices,outcome_function,outcome_val)
            if(ac2_check):
                return False
            
        return True
    #Error checking
    def wrong_check(self,ac_1,ac_2,ac_3):
        if (not ac_1):
            print("(False b/c of AC1)")
        if(not ac_2):
            print("(False b/c of AC2)")
        if(not ac_3):
            print("(False b/c of AC3)")
    
    #Returns true if X satisfies HP defn, False o.w.
    def causality_check(self,X,X_indices,outcome_val,outcome_func):
        ac_1 = self.ac1_check(X,X_indices,outcome_func,outcome_val)
        ac_2 = self.ac2_check(X,X_indices,outcome_func,outcome_val)
        ac_3 = self.ac3_check(X,X_indices,outcome_func,outcome_val)
        self.wrong_check(ac_1,ac_2,ac_3)
        return ac_1 and ac_2 and ac_3

## Testing
### Using examples from 2005 HP paper

### Example 2.1: Forest Fire (F) caused by either Lightning (L) or Match Lit (ML) 

In [122]:
#Initializes U to True for random exogen. var (val/var not important)
#Initializes V to False (can also do True)
#R is trivially set to 0 or 1
# Index[0] = L ; Index[1] = ML ; Index[2] = F
U = [True,True,True]
V = [False,False,False]
R = [(False,True),(False,True),(False,True)]
def forest_fire(U,V):
    return V[0] or V[1]
F_f = forest_fire
F = [None, None, F_f]
FF_model = Causal_Model(U,V,R,F)

In [123]:
X = [True]
X_index = [0]
print("Testing Casuality of Lightning (Correct Val = True):")
print(FF_model.causality_check(X,X_index,True,F_f))

Testing Casuality of Lightning (Correct Val = True):
True


In [124]:
X_ml = [True]
X_ml_index = [1]
print("Testing for ML (Correct Val = True):")
print(FF_model.causality_check(X_ml,X_ml_index,True,F_f))

Testing for ML (Correct Val = True):
True


In [125]:
X_both = [True,True]
X_both_index = [0,1]
print("Testing for Both (Correct Val = False b/c X is not minimal?):")
print(FF_model.causality_check(X_both,X_both_index,True,F_f))

Testing for Both (Correct Val = False b/c X is not minimal?):
(False b/c of AC3)
False


## Example 3.2 - Case 1 (Disjunctive): 
### Two Arsonists drop lit matches, either match suffices to burn the forest down

In [126]:
#U = [Some Condition, Intention of Arsonist 1, Intention of Arsonist 2, Intention of both]
#V  = [Match_Lit_Arsonist_1, Match_Lit_Aronist_2, ForestFire]
U = [True,True,True,True]
V = [True,True,True]
R = [(False,True),(False,True),(False,True)]
def disjunctive_forest_fire(U,V):
    return V[0] or V[1]
F_df = [None,None,disjunctive_forest_fire]
Arson_model = Causal_Model(U,V,R,F_df)

In [127]:
X = [True]
X_index = [0]
print("Testing Casuality of Arsonist 1(Correct Val = True):")
print(Arson_model.causality_check(X,X_index,True,disjunctive_forest_fire))

Testing Casuality of Arsonist 1(Correct Val = True):
True


In [128]:
X = [True]
X_index = [1]
print("Testing Casuality of Arsonist 2(Correct Val = True):")
print(Arson_model.causality_check(X,X_index,True,disjunctive_forest_fire))

Testing Casuality of Arsonist 2(Correct Val = True):
True


In [166]:
X = [True,True]
X_index = [0,1]
print("Testing Casuality of BOTH(Correct Val = False (b/c not minimal)):")
print(Arson_model.causality_check(X,X_index,True,disjunctive_forest_fire))

Testing Casuality of BOTH(Correct Val = False (b/c not minimal)):
True


## Example 3.2 - Case 2 (Conjuctive): 
### Two Arsonists drop lit matches, need BOTH matches to burn forest

In [130]:
#U = [Some Condition, Intention of Arsonist 1, Intention of Arsonist 2, Intention of both]
#V  = [Match_Lit_Arsonist_1, Match_Lit_Aronist_2, ForestFire]
U = [True,True,True,True]
V = [True,True,True]
R = [(False,True),(False,True),(False,True)]
def conjunctive_forest_fire(U,V):
    return V[0] and V[1]
F_df = [None,None,conjunctive_forest_fire]
Arson2_model = Causal_Model(U,V,R,F_df)

In [131]:
X = [True]
X_index = [0]
print("Testing Casuality of ONLY Arsonist 1(Correct Val = True):")
print(Arson2_model.causality_check(X,X_index,True,conjunctive_forest_fire))

Testing Casuality of ONLY Arsonist 1(Correct Val = True):
True


In [132]:
X = [True]
X_index = [1]
print("Testing Casuality of ONLY Arsonist 2(Correct Val = True):")
print(Arson2_model.causality_check(X,X_index,True,conjunctive_forest_fire))

Testing Casuality of ONLY Arsonist 2(Correct Val = True):
True


In [133]:
X = [True,True]
X_index = [0,1]
print("Testing Casuality of BOTH Arsonists (Correct Val = False):")
print(Arson2_model.causality_check(X,X_index,True,conjunctive_forest_fire))

Testing Casuality of BOTH Arsonists (Correct Val = False):
(False b/c of AC3)
False


## Example 4.1: 
### Rain in April,May (+Electric Showers in May/June) and then lightning in June -> Forest Fire
### Question: Did April's showers cause the fire in June opposed to May?

In [134]:
#U is trivial let it be (for e.g.) U[0] = sufficient FF conditions in May U[1] = [...] in June
#V[0] = April Showers; V[1] = No Electric Storm (E.s) (May or June) V[2] = E.S only in May V[3] = E.S only in June 
#V[4] = E.S. Both; V[5] = FF in May V[6] = No FF in May V[7] = FF in June
# R is trivial
U = [True,True]
V = [True,False,False,False,True,False,False,True]
R = [(False, True),(False, True),(False,True),(False,True),(False,True),(False,True),(False,True)]
def April_showers_bring_June_Fires(U,V):
    return V[0] and (V[3] or V[4])
F_storms = [None,None,None,None,None,None,None,April_showers_bring_June_Fires]
Storm_FF = Causal_Model(U,V,R,F_storms)

In [135]:
X = [True]
X_index = [0]
print("Testing Causality of April Showers on June Fire: (Correct Val = True)")
print(Storm_FF.causality_check(X,X_index,True,April_showers_bring_June_Fires))

Testing Causality of April Showers on June Fire: (Correct Val = True)
True


In [136]:
X = [True,True]
X_index = [0,4]
print("Testing Causality of April Showers and E.S (in May & June) on June Fire: (Correct Val = False (AC3))")
print(Storm_FF.causality_check(X,X_index,True,April_showers_bring_June_Fires))

Testing Causality of April Showers and E.S (in May & June) on June Fire: (Correct Val = False (AC3))
(False b/c of AC3)
False



## Example 4.2
### Suzy and Billy throw rocks at a bottle. Suzy's rock hits the bottle first, causing it to shatter. However, had she not thrown, Billy's rock would have shattered the bottle.
### Is Suzy a cause of bottle shattering?

In [138]:
#Initializes U to some random exogenous var (e.g. Suzy,Billy state of mind)
#V[0] = Suzy Throws ; V[1] = Suzy's Rock Hits Bottle; V[2] = Billy Throws; 
# V[3] = Billy Rock Hits Bottle; V[4] = Bottle Shatters
U = [True, True]
V = [True,True,True,False,True]
R = [(False,True),(False,True),(False,True)]
#bottle shatters in this scenario (where Suzy is first)
def bottle_shatters(U,V):
    return (V[0] and V[1])
F_SB = [None,None,bottle_shatters]
SB_model = Causal_Model(U,V,R,F_SB)

In [139]:
X = [True]
X_index = [0]
print("Testing Casuality of Suzy Throw/Hit(Correct Val = True):")
print(SB_model.causality_check(X,X_index,True,bottle_shatters))

Testing Casuality of Suzy Throw/Hit(Correct Val = True):
True


In [140]:
X = [True]
X_index = [2]
print("Testing Casuality of Billy Throw(Correct Val = False):")
print(SB_model.causality_check(X,X_index,True,bottle_shatters))

Testing Casuality of Billy Throw(Correct Val = False):
(False b/c of AC2)
False


## Example 4.3
### Billy is hospitalized on Monday, Dr. forgets to give medication on Monday. Say Dr. on Monday and Dr. on Tuesday are reliable and the following twist: one dose is harmless, two doses are lethal.


In [141]:
# U is trivial, say it is U[0] = Medicine available Mon.  U[1] = [...] on Tuesday 
# V[0] = Given medicine on Monday (MT)
#V[1] = Given medicine on Tuesday (TT)
# V[2] = Billy alive on Tuesday Morning 
# V[3] = Billy alive and well on Tuesday Morning
# V[4] = Billy sick on Tuesday Mornining (and alive) and recovers Tuesday afternoon
# V[5] = Billy sick Tuesday morning and afternoon (so missed treatment?)
# V[6] = Billy recovered Tuesday morning and is dead Tuesday afternoon 
U = [True,True]
V = [True,False,True,False,True,False,False]
R = [(False, True),(False, True),(False,True),(False,True),(False,True),(False,True)]
def billy_alive(U,V):
    return V[2] or V[3] or V[4] or V[5]
def billy_dead(U,V):
    return V[0] and V[1]
F_billy = [None,None,billy_alive,billy_alive,billy_alive,billy_dead]
Billy_model = Causal_Model(U,V,R,F_billy)

In [142]:
X = [True]
X_index = [0]
print("Testing if MT = 1 is a cause of Billy alive (Expected: False)")
print(Billy_model.causality_check(X,X_index,True,billy_alive))

Testing if MT = 1 is a cause of Billy alive (Expected: False)
(False b/c of AC2)
False


In [143]:
X = [True]
X_index = [1]
print("Testing if TT=1 is a cause of Billy Dead (Expected: True)")
print(Billy_model.causality_check(X,X_index,True,billy_dead))

Testing if TT=1 is a cause of Billy Dead (Expected: True)
True


In [144]:
def tuesday_treatment(U,V):
    return not V[0]
X = [True]
X_index = [0]
print("Testing if MT=1 is a cause of TT = 0(Expected: True)")
print(Billy_model.causality_check(X,X_index,False,tuesday_treatment))

Testing if MT=1 is a cause of TT = 0(Expected: True)
True
