In [1]:
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.ensemble import GradientBoostingRegressor, GradientBoostingClassifier
from sklearn.neighbors import NearestNeighbors
from econml.dml import LinearDML
import warnings
from tqdm import tqdm

# Suppress warnings for cleaner output (e.g., convergence warnings in high-dim logistic reg)
warnings.filterwarnings('ignore')

## Non linear simulation

In [2]:
def generate_bin_data(n=500, p=100, seed=123):
    np.random.seed(seed)
    
    # 1. Generate Covariates (100 variables)
    X = np.random.normal(0, 1, size=(n, p))
    
    # 2. Define the &quot;True&quot; Confounding Mechanism (The Nuisance Function)
    # Only the first 5 variables (indices 0-4) actually matter.
    # Interaction: X0 * X1
    # Non-linear: X2 squared
    # Linear: X3, X4
    nuisance_term = 0.5 * X[:,0] * X[:,1] + 0.4 * (X[:,2]**2) + 0.3 * X[:,3] + 0.2 * X[:,4]
    # 3. Treatment Assignment (Propensity)
    # P(T=1 | X) depends on the nuisance term
    logit_p = -0.5 + nuisance_term
    prob_t = 1 / (1 + np.exp(-logit_p))
    T = np.random.binomial(1, prob_t)
    # 4. Outcome Generate
    # True Causal Effect is ~14.2% Risk Difference (calculated via simulation below)
    
    # Base Log-Odds (without treatment)
    logit_y_base = -1.5 + nuisance_term
    
    # To get the Ground Truth Risk Difference, we simulate the counterfactuals:
    # What if everyone was treated?
    p1_true = 1 / (1 + np.exp(-(logit_y_base + 0.8 * 1))) 
    # What if everyone was untreated?
    p0_true = 1 / (1 + np.exp(-(logit_y_base + 0.8 * 0)))
    
    true_ate = np.mean(p1_true - p0_true)
    
    # Generate Observed Binary Outcome
    # We add the treatment effect (0.8 log odds) to the base
    logit_y_observed = logit_y_base + 0.8 * T
    prob_y_observed = 1 / (1 + np.exp(-logit_y_observed))
    Y = np.random.binomial(1, prob_y_observed)
    Y = Y.astype(int)
    
    return X, T, Y, true_ate


In [3]:
class CausalEstimators:

# A helper class to run various causal inference methods:
# 1. Naive (Unadjusted Difference in Means)
# 2. Regression Adjustment (Linear/Logistic)
# 3. IPTW (Inverse Probability of Treatment Weighting)
# 4. PSM (Propensity Score Matching 1:1)
# 5. DML (Double Machine Learning)

    def __init__(self, X, T, Y, outcome_type= 'continuous'):
        self.X = X
        self.T = T
        self.Y = Y
        self.outcome_type = outcome_type
        # Create a DataFrame for easier handling in naive methods
        self.df = pd.DataFrame(X)
        self.df['T'] = T
        self.df['Y'] = Y
    def run_naive(self):
    
        treated = self.df[self.df['T'] == 1]['Y'].mean()
        control = self.df[self.df['T'] == 0]['Y'].mean()
        return treated - control

    def run_regression_adjustment(self):
        features = np.column_stack((self.T, self.X))
        # Logistic Regression for binary outcome
        # We use L2 penalty (default) to help convergence in high dimensions (N=500, P=100)
        model = LogisticRegression(solver='liblinear', max_iter=2000) 
        model.fit(features, self.Y)
        
        # --- Marginal Effects (Risk Difference) ---
        # 1. Create a dataset where EVERYONE is treated (T=1)
        X_treated = np.column_stack((np.ones(len(self.X)), self.X))
        # 2. Create a dataset where EVERYONE is control (T=0)
        X_control = np.column_stack((np.zeros(len(self.X)), self.X))
        
        # 3. Predict probabilities for both counterfactuals
        p1 = model.predict_proba(X_treated)[:, 1]
        p0 = model.predict_proba(X_control)[:, 1]
        
        # 4. Average difference is the Average Treatment Effect (Risk Difference)
        return np.mean(p1 - p0)


    def run_iptw(self):
        # Estimate Propensity Scores
        ps_model = LogisticRegression(solver='liblinear', max_iter=2000)
        ps_model.fit(self.X, self.T)
        ps = ps_model.predict_proba(self.X)[:, 1]
        
        # Clip to prevent division by zero or extreme weights (common in small N)
        ps = np.clip(ps, 0.05, 0.95)
        # Calculate weights: 1/PS for treated, 1/(1-PS) for control
        weights = np.where(self.T == 1, 1/ps, 1/(1-ps))
        # Weighted Difference in Means (Risk Difference)
        weighted_mean_1 = np.average(self.Y[self.T==1], weights=weights[self.T==1])
        weighted_mean_0 = np.average(self.Y[self.T==0], weights=weights[self.T==0])
        return weighted_mean_1 - weighted_mean_0


    def run_psm(self):
        # Estimate Propensity Scores
        ps_model = LogisticRegression(solver='liblinear', max_iter=2000)
        ps_model.fit(self.X, self.T)
        ps = ps_model.predict_proba(self.X)[:, 1]
        
        treated_idx = np.where(self.T == 1)[0]
        control_idx = np.where(self.T == 0)[0]
        
        # Safety check for separation
        if len(control_idx) == 0 or len(treated_idx) == 0:
            return np.nan
        # Match each treated unit to nearest control unit based on PS
        nbrs = NearestNeighbors(n_neighbors=1).fit(ps[control_idx].reshape(-1, 1))
        distances, indices = nbrs.kneighbors(ps[treated_idx].reshape(-1, 1))
        matched_control_idx = control_idx[indices.flatten()]
        
        # Difference in Means of matched pairs (Risk Difference for binary)
        return np.mean(self.Y[treated_idx]) - np.mean(self.Y[matched_control_idx])
    
    def run_dml(self):
        # Use shallow trees (depth 2) to prevent overfitting on small N=500
        y_model = GradientBoostingClassifier(n_estimators=50, max_depth=2, random_state=42)
        t_model = GradientBoostingClassifier(n_estimators=50, max_depth=2, random_state=42)
    
        # Set cv=3 for small sample size (5-fold might be too thin)
        est = LinearDML(model_y=y_model,
                        model_t=t_model,
                        discrete_outcome = True,
                        discrete_treatment=True,
                        cv=3,
                        random_state=42)
        est.fit(self.Y, self.T, X=self.X)
        # Return Average Treatment Effect
        return est.effect(self.X).mean()


In [4]:
# --- Test run to see if the code is working ---
X, T, Y, true_eff = generate_bin_data(n=500, p=100,seed = 1)
sim = CausalEstimators(X, T, Y, outcome_type='binary')

print(f"Scenario A: Continuous Outcome (e.g., Total Pay)")
print(f"True Causal Effect: {true_eff:.2f}")
print("-" * 65)
print(f"{'Method':<30} | {'Estimate':<15} | {'Bias':<10}")
print("-" * 65)

results_continuous = {
"Naive (Unadjusted)": sim.run_naive(),
"Regression (Logistic):": sim.run_regression_adjustment(),
"IPTW (Logistic PS)": sim.run_iptw(),
"PSM (1:1 Nearest Neighbor)": sim.run_psm(),
"DML (Gradient Boosting)": sim.run_dml()
}

for method, val in results_continuous.items():
    bias = val - true_eff
    print(f"{method:<30} | {val:<15.2f} | {bias:<10.2f}")
    
print("\n" + "="*65 + "\n")

Scenario A: Continuous Outcome (e.g., Total Pay)
True Causal Effect: 0.16
-----------------------------------------------------------------
Method                         | Estimate        | Bias      
-----------------------------------------------------------------
Naive (Unadjusted)             | 0.20            | 0.04      
Regression (Logistic):         | 0.19            | 0.03      
IPTW (Logistic PS)             | 0.25            | 0.10      
PSM (1:1 Nearest Neighbor)     | 0.22            | 0.06      
DML (Gradient Boosting)        | 0.19            | 0.03      




## Baseline group with 10 covariates

In [5]:
def run_baseline_simulation(n_epochs=100):
    # 1. Initialize a list to store results
    results_list = []

    print(f"Starting simulation with {n_epochs} epochs...")

    for epoch in tqdm(range(n_epochs)):
        # --- 1. CONTINUOUS OUTCOME ---
        X, T, Y, true_eff = generate_bin_data(n=500, p=10, seed = epoch)
        sim = CausalEstimators(X, T, Y, outcome_type='continuous')
                
        # 3. Apply your 5 different methods
        # Replace these with your actual function calls (e.g., DML, PSM, etc.)
        
        # Method 1: Naive (Unadjusted)
        res_m1 = sim.run_naive()
        
        # Method 2: Regression (Linear)
        res_m2 = sim.run_regression_adjustment()
        
        # Method 3: IPTW (Logistic PS)
        res_m3 = sim.run_iptw()
        
        # Method 4: PSM (1:1 Nearest Neighbor)
        res_m4 = sim.run_psm()
        
        # Method 5: DML (Gradient Boosting)
        res_m5 = sim.run_dml()

        # 4. Record results for each method in a dictionary
        # We store them as separate rows to make it "tidy" (Long Format)
        methods = {
            "Naive (Unadjusted)": res_m1,
            "Regression (Logistic):": res_m2,
            "IPTW (Logistic PS)": res_m3,
            "PSM (1:1 Nearest Neighbor)": res_m4,
            "DML (Gradient Boosting)": res_m5
        }

        for method_name, value in methods.items():
            results_list.append({
                "epoch": epoch,
                "method": method_name,
                "estimate": value,
                "bias": abs(value - true_eff), # You can add metrics here
            })

    # 5. Convert the list of dictionaries to a pandas DataFrame
    df_results = pd.DataFrame(results_list)
    
    return df_results


In [6]:
# Execute
df = run_baseline_simulation(n_epochs=1000)

Starting simulation with 1000 epochs...


100%|██████████| 1000/1000 [04:49<00:00,  3.46it/s]


In [7]:
# Quick summary of mean estimates across methods
summary = df.groupby('method')['estimate'].agg(['mean', 'std']).reset_index()
print("\nSimulation Estimates Summary:")
print(summary)


Simulation Estimates Summary:
                       method      mean       std
0     DML (Gradient Boosting)  0.204469  0.045362
1          IPTW (Logistic PS)  0.252240  0.043199
2          Naive (Unadjusted)  0.271544  0.041403
3  PSM (1:1 Nearest Neighbor)  0.253618  0.056115
4      Regression (Logistic):  0.237319  0.041023


In [8]:
# Quick summary of mean estimates across methods
summary = df.groupby('method')['bias'].agg(['mean', 'std']).reset_index()
print("\nSimulation Bias Summary:")
print(summary)


Simulation Bias Summary:
                       method      mean       std
0     DML (Gradient Boosting)  0.054327  0.036002
1          IPTW (Logistic PS)  0.095070  0.042135
2          Naive (Unadjusted)  0.113943  0.041329
3  PSM (1:1 Nearest Neighbor)  0.098309  0.051947
4      Regression (Logistic):  0.080724  0.038914


## Linear data generation with rare disease settings as the control group

In [9]:
def generate_linear_data(n=500, p=100, setting='continuous',seed=123):
    np.random.seed(seed)
    
    # 1. Generate Covariates (100 variables)
    X = np.random.normal(0, 1, size=(n, p))
    
    # 2. Define the &quot;True&quot; Confounding Mechanism (The Nuisance Function)
    # Only the first 5 variables (indices 0-4) actually matter.
    # Interaction: X0 * X1
    # Non-linear: X2 squared
    # Linear: X3, X4
    nuisance_term = 0.5 * X[:,0] + 0.2 * X[:,1] + 0.4 * X[:,2] - 0.3 * X[:,3] - 0.2 * X[:,4] - 0.35 * X[:,5]
    # 3. Treatment Assignment (Propensity)
    # P(T=1 | X) depends on the nuisance term
    logit_p = -0.5 + nuisance_term
    prob_t = 1 / (1 + np.exp(-logit_p))
    T = np.random.binomial(1, prob_t)
    # 4. Outcome Generate
    
    # Base Log-Odds (without treatment)
    logit_y_base = -1.5 + nuisance_term
    
    # To get the Ground Truth Risk Difference, we simulate the counterfactuals:
    # What if everyone was treated?
    p1_true = 1 / (1 + np.exp(-(logit_y_base + 0.8 * 1))) 
    # What if everyone was untreated?
    p0_true = 1 / (1 + np.exp(-(logit_y_base + 0.8 * 0)))
    
    true_ate = np.mean(p1_true - p0_true)
    
    # Generate Observed Binary Outcome
    # We add the treatment effect (0.8 log odds) to the base
    logit_y_observed = logit_y_base + 0.8 * T
    prob_y_observed = 1 / (1 + np.exp(-logit_y_observed))
    Y = np.random.binomial(1, prob_y_observed)
    Y = Y.astype(int)
    
    return X, T, Y, true_ate

In [10]:
def run_linear_simulation(n_epochs=100):
    # 1. Initialize a list to store results
    results_list = []

    print(f"Starting simulation with {n_epochs} epochs...")

    for epoch in tqdm(range(n_epochs)):
        # --- 1. CONTINUOUS OUTCOME ---
        X, T, Y, true_eff = generate_linear_data(n=500, p=100, setting='continuous',seed = epoch)
        sim = CausalEstimators(X, T, Y, outcome_type='continuous')
                
        # 3. Apply your 5 different methods
        # Replace these with your actual function calls (e.g., DML, PSM, etc.)
        
        # Method 1: Naive (Unadjusted)
        res_m1 = sim.run_naive()
        
        # Method 2: Regression (Linear)
        res_m2 = sim.run_regression_adjustment()
        
        # Method 3: IPTW (Logistic PS)
        res_m3 = sim.run_iptw()
        
        # Method 4: PSM (1:1 Nearest Neighbor)
        res_m4 = sim.run_psm()
        
        # Method 5: DML (Gradient Boosting)
        res_m5 = sim.run_dml()

        # 4. Record results for each method in a dictionary
        # We store them as separate rows to make it "tidy" (Long Format)
        methods = {
            "Naive (Unadjusted)": res_m1,
            "Regression (Logistic):": res_m2,
            "IPTW (Logistic PS)": res_m3,
            "PSM (1:1 Nearest Neighbor)": res_m4,
            "DML (Gradient Boosting)": res_m5
        }

        for method_name, value in methods.items():
            results_list.append({
                "epoch": epoch,
                "method": method_name,
                "estimate": value,
                "bias": abs(value - true_eff), # You can add metrics here
            })

    # 5. Convert the list of dictionaries to a pandas DataFrame
    df_results = pd.DataFrame(results_list)
    
    return df_results


In [11]:
# Execute control group
df = run_linear_simulation(n_epochs=1000)

Starting simulation with 1000 epochs...


100%|██████████| 1000/1000 [23:40<00:00,  1.42s/it]


In [12]:
# Quick summary of mean estimates across methods
summary = df.groupby('method')['estimate'].agg(['mean', 'std']).reset_index()
print("\nSimulation Estimates Summary:")
print(summary)


Simulation Estimates Summary:
                       method      mean       std
0     DML (Gradient Boosting)  0.190478  0.043287
1          IPTW (Logistic PS)  0.154049  0.052746
2          Naive (Unadjusted)  0.255671  0.040736
3  PSM (1:1 Nearest Neighbor)  0.162023  0.091481
4      Regression (Logistic):  0.128974  0.043140


In [13]:
# Quick summary of mean estimates across methods
summary = df.groupby('method')['bias'].agg(['mean', 'std']).reset_index()
print("\nSimulation Bias Summary:")
print(summary)


Simulation Bias Summary:
                       method      mean       std
0     DML (Gradient Boosting)  0.055210  0.035465
1          IPTW (Logistic PS)  0.043147  0.032855
2          Naive (Unadjusted)  0.114673  0.040430
3  PSM (1:1 Nearest Neighbor)  0.076260  0.054505
4      Regression (Logistic):  0.035243  0.027416


## Main group with 100 covariates

In [14]:
def run_simulation(n_epochs=100):
    # 1. Initialize a list to store results
    results_list = []

    print(f"Starting simulation with {n_epochs} epochs...")

    for epoch in tqdm(range(n_epochs)):
        # --- 1. CONTINUOUS OUTCOME ---
        X, T, Y, true_eff = generate_bin_data(n=500, p=100, seed = epoch)
        sim = CausalEstimators(X, T, Y, outcome_type='continuous')
                
        # 3. Apply your 5 different methods
        # Replace these with your actual function calls (e.g., DML, PSM, etc.)
        
        # Method 1: Naive (Unadjusted)
        res_m1 = sim.run_naive()
        
        # Method 2: Regression (Linear)
        res_m2 = sim.run_regression_adjustment()
        
        # Method 3: IPTW (Logistic PS)
        res_m3 = sim.run_iptw()
        
        # Method 4: PSM (1:1 Nearest Neighbor)
        res_m4 = sim.run_psm()
        
        # Method 5: DML (Gradient Boosting)
        res_m5 = sim.run_dml()

        # 4. Record results for each method in a dictionary
        # We store them as separate rows to make it "tidy" (Long Format)
        methods = {
            "Naive (Unadjusted)": res_m1,
            "Regression (Logistic):": res_m2,
            "IPTW (Logistic PS)": res_m3,
            "PSM (1:1 Nearest Neighbor)": res_m4,
            "DML (Gradient Boosting)": res_m5
        }

        for method_name, value in methods.items():
            results_list.append({
                "epoch": epoch,
                "method": method_name,
                "estimate": value,
                "bias": abs(value - true_eff), # You can add metrics here
            })

    # 5. Convert the list of dictionaries to a pandas DataFrame
    df_results = pd.DataFrame(results_list)
    
    return df_results


In [15]:
# Execute main group
df = run_simulation(n_epochs=1000)

Starting simulation with 1000 epochs...


100%|██████████| 1000/1000 [23:40<00:00,  1.42s/it]


In [16]:
# Quick summary of mean estimates across methods
summary = df.groupby('method')['estimate'].agg(['mean', 'std']).reset_index()
print("\nSimulation Estimates Summary:")
print(summary)


Simulation Estimates Summary:
                       method      mean       std
0     DML (Gradient Boosting)  0.229954  0.044883
1          IPTW (Logistic PS)  0.251185  0.051680
2          Naive (Unadjusted)  0.269620  0.041328
3  PSM (1:1 Nearest Neighbor)  0.250257  0.070751
4      Regression (Logistic):  0.231013  0.044545


In [17]:
# Quick summary of mean estimates across methods
summary = df.groupby('method')['bias'].agg(['mean', 'std']).reset_index()
print("\nSimulation Bias Summary:")
print(summary)


Simulation Bias Summary:
                       method      mean       std
0     DML (Gradient Boosting)  0.074249  0.041657
1          IPTW (Logistic PS)  0.094572  0.049828
2          Naive (Unadjusted)  0.111963  0.041464
3  PSM (1:1 Nearest Neighbor)  0.100435  0.059243
4      Regression (Logistic):  0.074990  0.041831
