In [1]:
import networkx as nx
from causal_model import model, graph

## The first test example of the id method. 

### Initialization.

In [2]:
# Construct the causal graph for the causal model
causation = {
    'X': ['W'],
    'Z': ['X', 'W'],
    'W': [],
    'Y': ['Z'],
}
arc = [('X', 'Y')]
cg = graph.CausalGraph(causation, latent_confounding_arcs=arc)

In [3]:
cm = model.CausalModel(causal_graph=cg)

### id method.

In [4]:
# Test the identification method
p = cm.id({'Y'}, {'X'})

line 4
line 2, var {'W'}
line 1, var {'W'}
line 2, var {'X', 'W', 'Z'}
line 6
line 7
line 2, var set()
line 1, var set()


In [5]:
p.variables
p.conditional

set()

In [6]:
p.marginal

{'W', 'Z'}

In [7]:
for p_ in p.product:
    print(f'var {p_.variables}')
    print(f'cond {p_.conditional}')
    print(f'mar {p_.marginal}')
    for p__ in p_.product:
        print(f'sub var {p__.variables}')
        print(f'sub cond {p__.conditional}')
        print(f'sub mar {p__.marginal}')
        if p__.product:
            for z in p__.product:
                print(f'subsub var {z.variables}')
                print(f'subsub maar {z.arginal}')
                print(f'subsub prod {z.product}')

var set()
cond set()
mar set()
sub var {'Z'}
sub cond {'W', 'X'}
sub mar set()
var set()
cond set()
mar {'X'}
sub var {'X'}
sub cond {'W'}
sub mar set()
sub var {'Y'}
sub cond {'X', 'W', 'Z'}
sub mar set()
var {'W'}
cond set()
mar set()


### Final result.

The above expressions are combined to give the final result:

\begin{equation}
P(y|do(x)) = \sum_{w, z} P(w)P(z|x, w)\sum_{x} P(x|w)P(y|x,w,z)
\end{equation}

Exactly the desired one.

## The second test example of the id method.

In [8]:
causation1 = {
    'X': ['Z1'],
    'Z1': [],
    'Z2': ['X'],
    'Y': ['Z2'],
}
arcs1 = [('X', 'Z1'), ('Z1', 'Z2'), ('Z1', 'Y'), ('X', 'Y')]
cg1 = graph.CausalGraph(causation1, latent_confounding_arcs=arcs1)
cm1 = model.CausalModel(cg1)

In [9]:
cm1.id({'Y'}, {'X'})

line3
line 4
line 2, var {'X', 'Z1', 'Z2'}
line 5


IdentificationError: The causal quantity is not identifiable in thecurrent graph.

### The causal effect in the above example can not be identified. The id method gives us exactly what we want.

## Now we run the third example.

In [10]:
causation2 = {
    'W1': [],
    'W2': [],
    'X': ['W1'],
    'Y1': ['X'],
    'Y2': ['W2']
}
arcs2 = [('W1', 'Y1'), ('W1', 'W2'), ('W1', 'Y2'), ('W1', 'Y1')]
cg2 = graph.CausalGraph(causation2, latent_confounding_arcs=arcs2)
cm2 = model.CausalModel(cg2)

In [11]:
p1 = cm2.id({'Y1', 'Y2'}, {'X'})

line3
line 4
line 2, var {'Y1', 'X', 'W1'}
line 7
line 2, var set()
line 1, var set()
line 2, var {'W2'}
line 1, var {'W2'}
line 2, var {'W2', 'Y2'}
line 6


In [12]:
p1.variables
p1.marginal

{'W2'}

In [25]:
cm2.causal_graph.remove_incoming_edges({'X'}, new=True).ancestors({'Y1', 'Y2'})

{'W2', 'X', 'Y1', 'Y2'}

In [26]:
list(cm2.causal_graph.remove_nodes({'X', 'W1'}, new=True).c_components)

[{'Y1'}, {'W2'}, {'Y2'}]

In [17]:
l = list(p1.product)

In [14]:
for p_ in p1.product:
    print(f'var {p_.variables}')
    print(f'cond {p_.conditional}')
    print(f'mar {p_.marginal}')
    for p__ in p_.product:
        print(f'sub var {p__.variables}')
        print(f'sub cond {p__.conditional}')
        print(f'sub mar {p__.marginal}')
        if p__.product:
            for z in p__.product:
                print(f'subsub var {z.variables}')
                print(f'subsub maar {z.arginal}')
                print(f'subsub prod {z.product}')

var set()
cond set()
mar set()
sub var {'Y2'}
sub cond {'W2'}
sub mar set()
var set()
cond set()
mar {'W1'}
sub var {'W1'}
sub cond set()
sub mar set()
sub var {'Y1'}
sub cond {'W1', 'X'}
sub mar set()
var {'W2'}
cond set()
mar set()


The above result gives us the following expression

\begin{align*}
    P(y_1, y_2 |do(x)) &= \sum_{w_2} P(y_2|w_2)\sum_{w_1}P(w_1)P(y_1|w_1, x)P(w_2) \\
                        & = \sum_{w_2}P(y_2, w_2)\sum_{w_1}P(w_1)P(y_1|w_1, x)
\end{align*}

Also correct, except that there are some minor issues.

# We now test other methods.

## Start from the backdoor.

In [1]:
import networkx as nx
from causal_model import model, graph
causation3 = {
    'X1': [],
    'X2': [],
    'X3': ['X1'],
    'X4': ['X1', 'X2'],
    'X5': ['X2'],
    'X6': ['X'],
    'X': ['X3', 'X4'],
    'Y': ['X6', 'X4', 'X5']
}
cg3 = graph.CausalGraph(causation3)
cm3 = model.CausalModel(cg3)

In [2]:
cm3.identify({'X'}, {'Y'}, identify_method=('backdoor', 'simple'))

The corresponding statistical estimand should be Not implemented yet.)


(['X3', 'X4'], <causal_model.prob.Prob at 0x7f9c282918e0>)

In [3]:
cm3.identify({'X'}, {'Y'}, identify_method=('backdoor', 'all'))

The corresponding statistical estimand should be Not implemented yet.)


([{'X2', 'X4'},
  {'X3', 'X4'},
  {'X1', 'X4'},
  {'X4', 'X5'},
  {'X2', 'X3', 'X4'},
  {'X1', 'X2', 'X4'},
  {'X2', 'X4', 'X5'},
  {'X1', 'X3', 'X4'},
  {'X3', 'X4', 'X5'},
  {'X1', 'X4', 'X5'},
  {'X1', 'X2', 'X3', 'X4'},
  {'X2', 'X3', 'X4', 'X5'},
  {'X1', 'X2', 'X4', 'X5'},
  {'X1', 'X3', 'X4', 'X5'},
  {'X1', 'X2', 'X3', 'X4', 'X5'}],
 <causal_model.prob.Prob at 0x7f9c25be0af0>)

In [4]:
cm3.identify({'X'}, {'Y'}, identify_method=('backdoor', 'minimal'))

The corresponding statistical estimand should be Not implemented yet.)


({'X2', 'X4'}, <causal_model.prob.Prob at 0x7f9c28291400>)

### Verified. Now test other methods related to backdoor adjustment.

In [5]:
cm3.is_valid_backdoor_set({'X1', 'X4'}, {'X'}, {'Y'})

True

In [6]:
cm3.is_valid_backdoor_set({'X4'}, 'X', 'Y')

False

In [7]:
cm3.get_backdoor_path('X', 'Y')

[['X', 'X3', 'X1', 'X4', 'Y'],
 ['X', 'X3', 'X1', 'X4', 'X2', 'X5', 'Y'],
 ['X', 'X4', 'Y'],
 ['X', 'X4', 'X2', 'X5', 'Y']]

In [8]:
cm3.has_collider(['X', 'X3', 'X1', 'X4', 'X2', 'X5', 'Y'])

True

In [9]:
cm3.is_connected_backdoor_path(['X', 'X4', 'X2', 'X5', 'Y'])

True

## Now test methods related to frontdoor adjustment.

In [10]:
causation4 = {
    'X': [],
    'Z': ['X'],
    'Y': ['Z']
}
arcs4 = [('X', 'Y')]
cg4 = graph.CausalGraph(causation4, latent_confounding_arcs=arcs4)
cm4 = model.CausalModel(cg4)

In [11]:
cm4.get_backdoor_path('X', 'Z')

[['X', 'U0', 'Y', 'Z']]

In [12]:
cm4.has_collider(['Z', 'X', 'U0', 'Y'])

False

In [13]:
cm4.is_connected_backdoor_path(['Z', 'X', 'U0', 'Y'])

True

In [14]:
cm4.is_valid_backdoor_set({'X'}, {'Z'}, {'Y'})

True

In [15]:
cm4.is_frontdoor_set({'Z'}, 'X', 'Y')

True

In [16]:
cm4.get_frontdoor_set('X', 'Y')

({'Z'}, <causal_model.prob.Prob at 0x7f9c282be9d0>)