## 4. Defining a Causal Model using automatic Causal Discovery

The fourth step is to construct a causal model that involves identifying the relationships between variables to understand how they influence Exam_Score. 
This will be achieved by creating a Directed Acyclic Graph (DAG) that visually represents these relationships using automatiec Causal Discovery.

In [7]:
# Imports
import pandas as pd
from graphviz import Digraph
import dowhy
from dowhy import gcm
from dowhy import CausalModel
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
import graphviz
from causallearn.utils.GraphUtils import GraphUtils
from causallearn.search.ConstraintBased.PC import pc
from causallearn.search.ScoreBased.GES import ges
from causallearn.search.FCMBased import lingam
import matplotlib.image as mpimg
import io
import re

%matplotlib inline

### Load the preprocessed Dataset

In [8]:
df_encoded = pd.read_csv('../data/encoded_student_performance_factors.csv')
# df_cleaned = pd.read_csv('../data/cleaned_student_performance_factors.csv')
labels = df_encoded.columns.tolist()
data = df_encoded.to_numpy()

### Identify Potential Causal Relationships using automatic Causal Discovery

In [30]:
# Hilfsfunktion für Plotting
def plot_dag(graph_dot, title):
    graph = graphviz.Source(graph_dot)
    graph.render(filename=title, format='png', cleanup=True)

def fix_pydot_labels(dot_str):
    """
    Bereinigt DOT-Strings aus GraphUtils, sodass Labels direkt als Knotennamen verwendet werden.
    """
    lines = dot_str.splitlines()
    label_dict = {}

    # Schritt 1: Label Dictionary erstellen
    for line in lines:
        match = re.match(r'\s*(\d+)\s*\[label="?([^"\]]+)"?\];', line)
        if match:
            node_id, label = match.groups()
            label_dict[node_id] = label.strip('"')

    # Schritt 2: Nur Kantenzeilen auswählen und Knoten ersetzen
    corrected_lines = ["digraph {"]

    # Labels anfügen
    for node_id, label in label_dict.items():
        corrected_lines.append(f'{label}')

    # Kanten extrahieren
    for line in lines:
        if '->' in line:
            for node_id, label in label_dict.items():
                line = re.sub(rf'\b{node_id}\b', f'{label}', line)
            corrected_lines.append(line)

    corrected_lines.append("}")

    return '\n'.join(corrected_lines)

In [32]:
# 1. PC Algorithmus DAG visualisieren
pc_graph = pc(data)
pc_dot = GraphUtils.to_pydot(pc_graph.G, labels=labels).to_string()
pc_dot_fixed = fix_pydot_labels(pc_dot)
plot_dag(pc_dot_fixed, "../causal-model/automatic-causal-models/pc_dag")
print(pc_dot_fixed)

Depth=14, working on node 19: 100%|██████████| 20/20 [00:00<00:00, 1407.27it/s]


digraph {
Hours_Studied
Attendance
Parental_Involvement
Access_to_Resources
Extracurricular_Activities
Sleep_Hours
Previous_Scores
Motivation_Level
Internet_Access
Tutoring_Sessions
Family_Income
Teacher_Quality
School_Type
Peer_Influence
Physical_Activity
Learning_Disabilities
Parental_Education_Level
Distance_from_Home
Gender
Exam_Score
Hours_Studied -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Attendance -> Parental_Education_Level [dir=both, arrowtail=none, arrowhead=normal];
Attendance -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Parental_Involvement -> Access_to_Resources [dir=both, arrowtail=none, arrowhead=none];
Parental_Involvement -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Access_to_Resources -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Extracurricular_Activities -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Previous_Scores -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Motivation_Level 

In [33]:
# 2. GES Algorithmus DAG visualisieren
ges_graph = ges(data)['G']
ges_dot = GraphUtils.to_pydot(ges_graph, labels=labels).to_string()
ges_dot_fixed = fix_pydot_labels(ges_dot)
plot_dag(ges_dot_fixed, "../causal-model/automatic-causal-models/ges_dag")
print(ges_dot_fixed)

digraph {
Hours_Studied
Attendance
Parental_Involvement
Access_to_Resources
Extracurricular_Activities
Sleep_Hours
Previous_Scores
Motivation_Level
Internet_Access
Tutoring_Sessions
Family_Income
Teacher_Quality
School_Type
Peer_Influence
Physical_Activity
Learning_Disabilities
Parental_Education_Level
Distance_from_Home
Gender
Exam_Score
Hours_Studied -> Distance_from_Home [dir=both, arrowtail=none, arrowhead=normal];
Hours_Studied -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Attendance -> Distance_from_Home [dir=both, arrowtail=none, arrowhead=normal];
Attendance -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Parental_Involvement -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Access_to_Resources -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Exam_Score -> Extracurricular_Activities [dir=both, arrowtail=none, arrowhead=normal];
Previous_Scores -> Exam_Score [dir=both, arrowtail=none, arrowhead=normal];
Motivation_Level -> Exam_Scor

In [34]:
# 3. LiNGAM Algorithmus DAG visualisieren
lingam_model = lingam.ICALiNGAM()
lingam_model.fit(data)

def make_graph(adjacency_matrix, labels):
    idx = np.abs(adjacency_matrix) > 0.01
    dirs = np.where(idx)
    d = graphviz.Digraph(engine='dot')
    for name in labels:
        d.node(name)
    for to, from_, coef in zip(dirs[0], dirs[1], adjacency_matrix[idx]):
        d.edge(labels[from_], labels[to], label=f"{coef:.2f}")
    return d

lingam_dot = make_graph(lingam_model.adjacency_matrix_, labels).source
plot_dag(lingam_dot, "../causal-model/automatic-causal-models/lingam_dag")
print(lingam_dot)

digraph {
	Hours_Studied
	Attendance
	Parental_Involvement
	Access_to_Resources
	Extracurricular_Activities
	Sleep_Hours
	Previous_Scores
	Motivation_Level
	Internet_Access
	Tutoring_Sessions
	Family_Income
	Teacher_Quality
	School_Type
	Peer_Influence
	Physical_Activity
	Learning_Disabilities
	Parental_Education_Level
	Distance_from_Home
	Gender
	Exam_Score
	Hours_Studied -> Exam_Score [label=0.29]
	Attendance -> Exam_Score [label=0.20]
	Parental_Involvement -> Exam_Score [label=1.00]
	Access_to_Resources -> Exam_Score [label=1.03]
	Extracurricular_Activities -> Exam_Score [label=0.55]
	Previous_Scores -> Exam_Score [label=0.05]
	Motivation_Level -> Exam_Score [label=0.53]
	Internet_Access -> Exam_Score [label=0.91]
	Tutoring_Sessions -> Exam_Score [label=0.50]
	Family_Income -> Exam_Score [label=0.53]
	Teacher_Quality -> Exam_Score [label=0.53]
	Peer_Influence -> Exam_Score [label=0.51]
	Physical_Activity -> Exam_Score [label=0.18]
	Learning_Disabilities -> Exam_Score [label=0.84]
	P



### Kausale Analyse

In [42]:
# Angepasste Funktionen zur kausalen Analyse
def create_causal_model(graph_dot, df, treatment, outcome): 
    model = CausalModel(
        data=df,
        treatment=treatment,
        outcome=outcome,
        graph=graph_dot
    )

    return model

def do_causal_identification(model):
    identified_estimand = model.identify_effect()
    return identified_estimand

def do_causal_estimation_propensity_score_stratification(model, identified_estimand):
    estimate = model.estimate_effect(identified_estimand,
                                 method_name="backdoor.propensity_score_stratification")
    return estimate

def do_causal_estimation_linear_regression(model, identified_estimand):
    estimate = model.estimate_effect(identified_estimand,
                                 method_name="backdoor.linear_regression")
    return estimate

# Refuting the estimate 
# -> Refutation methods provide tests that every correct estimator should pass. So if an estimator fails the refutation test (p-value is <0.05), then it means that there is some problem with the estimator.
# -> We cannot verify that the estimate is correct but we can reject it if it violates certain expected behaviour 
# -> refutation tests are based on either Invariant transformations (changes in the data that should not change the estimate. Any estimator whose result varies significantly between the original data and the
# modified data fails the test -> Random Common Cause or Data Subset) or Nullifying transformations (after the data changes, the causal true estimate is zero. Any estimator whose result varies significantly 
# from zero on the new data fails the test -> Placeabo Treatment)
def do_causal_refute_estimate_random(model, identified_estimand, estimate):
    refutation_random = model.refute_estimate(identified_estimand, estimate, method_name="random_common_cause", show_progress_bar=True)
    return refutation_random

def do_causal_refute_estimate_placebo(model, identified_estimand, estimate):
    res_placebo = model.refute_estimate(identified_estimand, estimate, method_name="placebo_treatment_refuter", show_progress_bar=True, placebo_type="permute")
    return res_placebo

def do_causal_refute_estimate_subset(model, identified_estimand, estimate):
    res_subset = model.refute_estimate(identified_estimand, estimate, method_name="data_subset_refuter", show_progress_bar=True, subset_fraction=0.9)
    return res_subset

### PC

In [36]:
# PC DAG Causal Model
pc_model = create_causal_model(pc_dot_fixed, df_encoded, treatment="Hours_Studied", outcome="Exam_Score")

In [37]:
# pc_identified_estimand
pc_identified_estimand = do_causal_identification(pc_model)
print(pc_identified_estimand)

Estimand type: EstimandType.NONPARAMETRIC_ATE

### Estimand : 1
Estimand name: backdoor
Estimand expression:
       d                       
────────────────(E[Exam_Score])
d[Hours_Studied]               
Estimand assumption 1, Unconfoundedness: If U→{Hours_Studied} and U→Exam_Score then P(Exam_Score|Hours_Studied,,U) = P(Exam_Score|Hours_Studied,)

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!



In [None]:
# pc_do_causal_estimation_propensity_score_stratification
pc_estimate_propensity_score = do_causal_estimation_propensity_score_stratification(pc_model, pc_identified_estimand)
print(pc_estimate_propensity_score)

In [None]:
# pc_do_causal_estimation_linear_regression
pc_estimate_linear_regression = do_causal_estimation_linear_regression(pc_model, pc_identified_estimand)
print(pc_estimate_linear_regression)

In [None]:
# pc_do_causal_refute_estimate_random
pc_refutation_random = do_causal_refute_estimate_random(pc_model, pc_identified_estimand, pc_estimate_linear_regression)
print('pc_refutation_random: ', pc_refutation_random)

# pc_do_causal_refute_estimate_placebo
pc_res_placebo = do_causal_refute_estimate_placebo(pc_model, pc_identified_estimand, pc_estimate_linear_regression)
print('pc_res_placebo: ', pc_res_placebo)

# pc_do_causal_refute_estimate_subset
pc_res_subset = do_causal_refute_estimate_subset(pc_model, pc_identified_estimand, pc_estimate_linear_regression)
print('pc_res_subset: ', pc_res_subset)

### GES

In [44]:
# GES DAG Causal Model
ges_model = create_causal_model(ges_dot_fixed, df_encoded, treatment="Hours_Studied", outcome="Exam_Score")

In [45]:
# ges_identified_estimand
ges_identified_estimand = do_causal_identification(ges_model)
print(ges_identified_estimand)

Estimand type: EstimandType.NONPARAMETRIC_ATE

### Estimand : 1
Estimand name: backdoor
Estimand expression:
       d                       
────────────────(E[Exam_Score])
d[Hours_Studied]               
Estimand assumption 1, Unconfoundedness: If U→{Hours_Studied} and U→Exam_Score then P(Exam_Score|Hours_Studied,,U) = P(Exam_Score|Hours_Studied,)

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!



In [None]:
# ges_do_causal_estimation_propensity_score_stratification
ges_estimate_propensity_score = do_causal_estimation_propensity_score_stratification(ges_model, ges_identified_estimand)
print(ges_estimate_propensity_score)

In [None]:
# ges_do_causal_estimation_linear_regression
ges_estimate_linear_regression = do_causal_estimation_linear_regression(ges_model, ges_identified_estimand)
print(ges_estimate_linear_regression)

In [None]:
# ges_do_causal_refute_estimate_random
ges_refutation_random = do_causal_refute_estimate_random(ges_model, ges_identified_estimand, ges_estimate_linear_regression)
print('ges_refutation_random: ', ges_refutation_random)

# ges_do_causal_refute_estimate_placebo
ges_res_placebo = do_causal_refute_estimate_placebo(ges_model, ges_identified_estimand, ges_estimate_linear_regression)
print('ges_res_placebo: ', ges_res_placebo)

# ges_do_causal_refute_estimate_subset
ges_res_subset = do_causal_refute_estimate_subset(ges_model, ges_identified_estimand, ges_estimate_linear_regression)
print('pc_res_subset: ', ges_res_subset)

### Lingam

In [46]:
# Lingam DAG Causal Model
lingam_model = create_causal_model(lingam_dot, df_encoded, treatment="Hours_Studied", outcome="Exam_Score")

In [47]:
# lingam_identified_estimand
lingam_identified_estimand = do_causal_identification(lingam_model)
print(lingam_identified_estimand)

Estimand type: EstimandType.NONPARAMETRIC_ATE

### Estimand : 1
Estimand name: backdoor
Estimand expression:
       d                       
────────────────(E[Exam_Score])
d[Hours_Studied]               
Estimand assumption 1, Unconfoundedness: If U→{Hours_Studied} and U→Exam_Score then P(Exam_Score|Hours_Studied,,U) = P(Exam_Score|Hours_Studied,)

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!



In [None]:
# lingam_do_causal_estimation_propensity_score_stratification
lingam_estimate_propensity_score = do_causal_estimation_propensity_score_stratification(lingam_model, lingam_identified_estimand)
print(lingam_estimate_propensity_score)

In [None]:
# lingam_do_causal_estimation_linear_regression
lingam_estimate_linear_regression = do_causal_estimation_linear_regression(lingam_model, lingam_identified_estimand)
print(lingam_estimate_linear_regression)

In [None]:
# lingam_do_causal_refute_estimate_random
lingam_refutation_random = do_causal_refute_estimate_random(lingam_model, lingam_identified_estimand, lingam_estimate_linear_regression)
print('lingam_refutation_random: ', lingam_refutation_random)

# lingam_do_causal_refute_estimate_placebo
lingam_res_placebo = do_causal_refute_estimate_placebo(lingam_model, lingam_identified_estimand, lingam_estimate_linear_regression)
print('lingam_res_placebo: ', lingam_res_placebo)

# lingam_do_causal_refute_estimate_subset
lingam_res_subset = do_causal_refute_estimate_subset(lingam_model, lingam_identified_estimand, lingam_estimate_linear_regression)
print('lingam_res_subset: ', lingam_res_subset)