In [1]:
from graph import load_graph
import pandas as pd
import networkx as nx
import numpy as np
import dowhy
import pandas as pd
from dowhy import CausalModel
from IPython.display import Image, display



  from .autonotebook import tqdm as notebook_tqdm


### Functions

In [2]:
def gml_to_string(file):
    gml_str = ''
    with open(file, 'r') as file:
        for line in file:
            gml_str += line.rstrip()
    return gml_str

def get_dowhy_model(file):
    gml_graph = gml_to_string(file)
    df = pd.read_stata("data/close_college.dta")

    model=CausalModel(
        data = df,
        treatment='educ',
        outcome='lwage',
        graph=gml_graph
        )
    # model.view_model()
    return model

In [3]:
def find_mediators(paths):
    M = set()
    for path in paths:
        for variable in path[1:-1]:
            M.add(variable)
    return M

def check_backdoor_mediators(mediators, graph, direction):
    backdoor_paths = []
    for m in mediators:
        if direction == 'treatment_to_m':
            backdoor_paths.extend(graph.get_backdoor_paths(['educ'], [m]))

        elif direction == 'm_to_outcome':
            backdoor_paths.extend(graph.get_backdoor_paths([m], ['educ']))

    return backdoor_paths

def check_if_blocked(backdoor_paths, graph, direction):
    path_is_blocked = []
    for path in backdoor_paths:
        if direction == 'treatment_to_m':
            path_is_blocked.append(graph.is_blocked(path, conditioned_nodes = []))

        elif direction == 'm_to_outcome':
            path_is_blocked.append(graph.is_blocked(path, conditioned_nodes = ['educ']))

    return path_is_blocked

def check_frontdoor(graph, node1, node2):
    all_directed_paths = graph.get_all_directed_paths([node1], [node2])
    all_directed_paths = [path for path in all_directed_paths if len(path)>2]
    mediators = find_mediators(all_directed_paths)
    backdoor_paths_x_m = check_backdoor_mediators(mediators, graph, "treatment_to_m")
    backdoor_paths_m_y = check_backdoor_mediators(mediators, graph, "m_to_outcome")

    check_x_m = check_if_blocked(backdoor_paths_x_m, graph, "treatment_to_m")
    check_m_y = check_if_blocked(backdoor_paths_m_y, graph, "m_to_outcome")

    if all(check_x_m) and all(check_m_y):
        print("Frontdoor criterion satisfied.")

    else:
        print("Frontdoor criterion NOT satisfied.")

### Frontdoor criterion - Graph version = 0

In [4]:
dowhy_model = get_dowhy_model("graph_files/graph_version0.gml")
dowhy_graph = dowhy_model._graph
check_frontdoor(dowhy_graph, 'smsa', 'exper')

identified_estimand = dowhy_model.identify_effect()
print('\n', identified_estimand)

Frontdoor criterion NOT satisfied.

 Estimand type: EstimandType.NONPARAMETRIC_ATE

### Estimand : 1
Estimand name: backdoor
No such variable(s) found!

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!



### Frontdoor criterion - Graph version = 1

In [5]:
dowhy_model = get_dowhy_model("graph_files/graph_version1.gml")
dowhy_graph = dowhy_model._graph
check_frontdoor(dowhy_graph, 'smsa', 'exper')

identified_estimand = dowhy_model.identify_effect()
print('\n', identified_estimand)

Frontdoor criterion NOT satisfied.

 Estimand type: EstimandType.NONPARAMETRIC_ATE

### Estimand : 1
Estimand name: backdoor
Estimand expression:
   d                        
───────(E[lwage|black,smsa])
d[educ]                     
Estimand assumption 1, Unconfoundedness: If U→{educ} and U→lwage then P(lwage|educ,black,smsa,U) = P(lwage|educ,black,smsa)

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!

