# Main model classic double robust DiD

In [None]:
import numpy as np
from doubleml import DoubleMLData, DoubleMLDID
from doubleml.datasets import make_did_SZ2020

In [None]:
from sklearn.linear_model import LinearRegression, LogisticRegression

ml_g = LinearRegression()  # as in the paper, estimators not needed
ml_m = LogisticRegression()  # as in the paper, estimators not needed

In [None]:
n_obs = 1000
n_rep = 200
ATTE = 0.0

ATTE_estimates = np.full((n_rep), np.nan)
coverage = np.full((n_rep), np.nan)
ci_length = np.full((n_rep), np.nan)
asymptotic_variance = np.full(n_rep, np.nan)


np.random.seed(42)
for i_rep in range(n_rep):
    if (i_rep % int(n_rep / 10)) == 0:
        print(f"Iteration: {i_rep}/{n_rep}")
    dml_data = make_did_SZ2020(n_obs=n_obs, dgp_type=1, cross_sectional_data=False)

    dml_did = DoubleMLDID(dml_data, ml_g=ml_g, ml_m=ml_m, n_folds=5)
    dml_did.fit()

    ATTE_estimates[i_rep] = dml_did.coef.squeeze()
    confint = dml_did.confint(level=0.95)
    coverage[i_rep] = (confint["2.5 %"].iloc[0] <= ATTE) & (
        confint["97.5 %"].iloc[0] >= ATTE
    )
    ci_length[i_rep] = confint["97.5 %"].iloc[0] - confint["2.5 %"].iloc[0]

    summary_df = dml_did.summary
    std_err = summary_df.loc["d", "std err"]
    asymptotic_variance[i_rep] = std_err**2

# Calculate metrics
avg_bias = np.mean(ATTE_estimates - ATTE)
med_bias = np.median(ATTE_estimates - ATTE)
rmse = np.sqrt(np.mean((ATTE_estimates - ATTE) ** 2))
avg_asymptotic_variance = np.mean(asymptotic_variance)
coverage_probability = np.mean(coverage)
avg_ci_length = np.mean(ci_length)

# Print results
print(f"Av. Bias: {avg_bias}")
print(f"Med. Bias: {med_bias}")
print(f"RMSE: {rmse}")
print(f"Asy. V: {avg_asymptotic_variance}")
print(f"Cover: {coverage_probability}")
print(f"CIL: {avg_ci_length}")

# TWFE Simulation

In [None]:
import numpy as np
import pandas as pd
import statsmodels.formula.api as smf
from doubleml.datasets import make_did_SZ2020

# Set seed for reproducibility
np.random.seed(42)

# Parameters
n_obs = 1000
n_rep = 200
ATTE = 0.0  # Adjust this to reflect the true treatment effect

# Storage for estimates
ATTE_estimates = np.full(n_rep, np.nan)
coverage = np.full(n_rep, np.nan)
ci_length = np.full(n_rep, np.nan)
asymptotic_variance = np.full(n_rep, np.nan)

biases = []

for i_rep in range(n_rep):
    if i_rep % int(n_rep / 10) == 0:
        print(f"Iteration: {i_rep}/{n_rep}")

    # Generate data
    x, y, d = make_did_SZ2020(
        n_obs=n_obs,
        dgp_type=4,
        cross_sectional_data=False,
        return_type="array",
    )

    # Convert to DataFrame
    df = pd.DataFrame(x, columns=[f"X{i+1}" for i in range(x.shape[1])])
    df["y"] = y
    df["d"] = d
    df["time"] = np.random.randint(
        2,
        size=len(df),
    )  # Example time indicator (replace with your time indicator)

    # Fit TWFE model using statsmodels
    formula = "y ~ d + time + d:time + X1:time + X2:time + X3:time + X4:time "
    model = smf.ols(formula=formula, data=df).fit()

    # Extract the treatment effect estimate (coefficient of the treatment variable)
    tau_fe = model.params["d:time"]
    ATTE_estimates[i_rep] = tau_fe

    # Calculate and store the bias
    bias = tau_fe - ATTE
    biases.append(bias)

    # Confidence intervals
    ci = model.conf_int().loc["d:time"]
    ci_length[i_rep] = ci[1] - ci[0]
    coverage[i_rep] = ci[0] <= ATTE <= ci[1]
    asymptotic_variance[i_rep] = model.bse["d:time"] ** 2

# Calculate metrics
avg_bias = np.mean(biases)
med_bias = np.median(biases)
rmse = np.sqrt(np.mean(np.square(biases)))
mean_coverage = np.mean(coverage)
mean_ci_length = np.mean(ci_length)
mean_asymptotic_variance = np.mean(asymptotic_variance)
# Print results
print(f"Av. Bias: {avg_bias}")
print(f"Med. Bias: {med_bias}")
print(f"RMSE: {rmse}")
print(f"Coverage:{mean_coverage}")
print(f"mean_ci_length:{mean_ci_length}")
print(f"mean_asymptotic_variance:{mean_asymptotic_variance}")

# Deep learning with a few runs

In [None]:
import warnings

import numpy as np
from doubleml import DoubleMLData, DoubleMLDID
from doubleml.datasets import make_did_SZ2020
from lightgbm import LGBMRegressor
from scikeras.wrappers import KerasClassifier  # pip install scikeras
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential

warnings.filterwarnings("ignore")


np.random.seed(42)
n_reps = 10  # change that accordingly
n_obs = 1000

# Function to create Keras model


def create_model():
    model = Sequential()
    model.add(Dense(64, input_dim=x.shape[1], activation="relu"))
    model.add(Dense(32, activation="relu"))
    model.add(Dense(1, activation="sigmoid"))  # Assuming binary classification
    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    return model


# Initialize arrays to store statistics
biases = np.zeros(n_reps)
variances = np.zeros(n_reps)
rmse_list = np.zeros(n_reps)
coverage_probs = np.zeros(n_reps)
ci_lengths = np.zeros(n_reps)

for i_rep in range(n_reps):
    x, y, d = make_did_SZ2020(
        n_obs=n_obs,
        dgp_type=4,
        cross_sectional_data=False,
        return_type="array",
    )
    dml_data = DoubleMLData.from_arrays(x=x, y=y, d=d)

    # Wrap the Keras model with KerasClassifier
    keras_classifier = KerasClassifier(
        build_fn=create_model,
        epochs=10,
        batch_size=32,
        verbose=0,
    )

    # Use StandardScaler to normalize data and then use the Keras classifier in a pipeline
    ml_m = Pipeline([("scaler", StandardScaler()), ("nn", keras_classifier)])

    # Use LGBMRegressor for regression
    n_estimators = 30
    ml_g = LGBMRegressor(n_estimators=n_estimators)

    dml_plr = DoubleMLDID(dml_data, ml_g, ml_m)
    dml_plr.fit()

    ATTE_estimates[i_rep] = dml_plr.coef.squeeze()
    confint = dml_plr.confint(level=0.95)
    coverage[i_rep] = (confint["2.5 %"].iloc[0] <= ATTE) & (
        confint["97.5 %"].iloc[0] >= ATTE
    )
    ci_length[i_rep] = confint["97.5 %"].iloc[0] - confint["2.5 %"].iloc[0]
    # Extract standard error from the summary
    summary_df = dml_plr.summary
    std_err = summary_df.loc["d", "std err"]
    asymptotic_variance[i_rep] = std_err**2

# Calculate metrics
avg_bias = np.mean(ATTE_estimates - ATTE)
med_bias = np.median(ATTE_estimates - ATTE)
rmse = np.sqrt(np.mean((ATTE_estimates - ATTE) ** 2))
avg_asymptotic_variance = np.mean(asymptotic_variance)
coverage_probability = np.mean(coverage)
avg_ci_length = np.mean(ci_length)

# Print results
print(f"Av. Bias: {avg_bias}")
print(f"Med. Bias: {med_bias}")
print(f"RMSE: {rmse}")
print(f"Asy. V: {avg_asymptotic_variance}")
print(f"Cover: {coverage_probability}")
print(f"CIL: {avg_ci_length}")

## Visulaization Ideas

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

df_pa = pd.DataFrame(ATTE_estimates, columns=["Estimate"])
g = sns.kdeplot(df_pa, fill=True)
plt.show()