In [None]:
import pandas as pd

data = pd.read_csv(
    r"src\deep_learning_for_semiparametric_did_estimation\data\injury.csv",
)

In [None]:
df_filtered = data[data["ky"] == 1]

In [None]:
# Rename columns
df_filtered = df_filtered.rename(
    columns={"durat": "duration", "ldurat": "log_duration", "afchnge": "after_1980"},
)

# Plot 1

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

df_plot = df_filtered

df_plot["highearn"] = df_plot["highearn"].map(
    {0: "Low earner", 1: "High earner"},
)
df_plot["after_1980"] = df_plot["after_1980"].map(
    {0: "Before 1980", 1: "After 1980"},
)

# Plotting

g = sns.FacetGrid(df_plot, col="highearn", height=4, aspect=1)
g.map_dataframe(
    sns.histplot,
    x="log_duration",
    binwidth=0.5,
    color="grey",
    edgecolor="white",
)


# Customize the plot
g.set_axis_labels("Duration", "Frequency")
g.set_titles(col_template="{col_name}")
g.fig.subplots_adjust(top=0.85)
plt.show()

In [None]:
plot_path = (
    "C:/athesis/deep_learning_for_semiparametric_did_estimation/paper/graphs/dist.png"
)
g.savefig(plot_path)

# Some Diagnostics

### regression exclusion test

For different kind of sicknesses, we can see that the regression exclusion test is not significant. This means that the regression is not biased by the exclusion of the variable.
If it is significant within the treatment, meaning duration of the sick leave varies significantly between injury, which is a violation of assuming homogenopous treatment effect.

In [None]:
df_diag = df_filtered

In [None]:
df_highearn = df_diag[df_diag["highearn"] == 1]

In [None]:
df_highearn

In [None]:
pivot_table = df_highearn.pivot_table(
    index="after_1980",
    columns="injtype",
    values="duration",
    aggfunc="sum",
    fill_value=0,
)
pivot_table.loc["difference"] = pivot_table.loc[1] - pivot_table.loc[0]

In [None]:
pivot_table

In [None]:
import numpy as np
import pandas as pd
import statsmodels.api as sm

# Assuming df_filtered is your initial DataFrame
# Filter the data for highearn = 1
highearn_1 = df_diag[df_diag["highearn"] == 1]
highearn_1 = pd.get_dummies(highearn_1, columns=["injtype"], drop_first=True)

# Convert 'injtype' boolean columns to numeric
for col in highearn_1.columns:
    if col.startswith("injtype_"):
        highearn_1[col] = highearn_1[col].astype(int)

# Define the dependent and independent variables
y = highearn_1["duration"]
X = highearn_1[
    ["after_1980"] + [col for col in highearn_1.columns if col.startswith("injtype_")]
]
X = sm.add_constant(X)  # Adds a constant term to the predictor

# Fit the full model
full_model = sm.OLS(y, X).fit()

# Fit the restricted model (excluding injtype variables)
restricted_model = sm.OLS(y, sm.add_constant(highearn_1[["after_1980"]])).fit()

# Perform the F-test to compare the models
anova_results = sm.stats.anova_lm(restricted_model, full_model)

# Drop the first row with NaN values for better readability
anova_results = anova_results.dropna()

# Display the results
anova_results

# Simple Analysis

In [None]:
df = pd.DataFrame(df_filtered)

import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf

# Convert to categorical for the linear model
df["after_1980"] = df["after_1980"].map({0: "Before_1980", 1: "After_1980"})
df["highearn"] = df["highearn"].map({0: "Low_earner", 1: "High_earner"})

# Fit the linear model
model = smf.ols(
    "log_duration ~ highearn + after_1980 + highearn * after_1980",
    data=df,
).fit()

# Display the results in a tidy format
results = model.summary2().tables[1]
print(results)

# Alternatively, display the results in a more "tidy" DataFrame format
tidy_results = results.reset_index()
tidy_results.columns = [
    "term",
    "estimate",
    "std_error",
    "t_value",
    "p_value",
    "conf_lower",
    "conf_upper",
]
print(tidy_results)

Coef.	Std.Err.	t	P>|t|	[0.025	0.975]
Intercept	1.580352	0.037248	42.427637	0.000000e+00	1.507332	1.653373
highearn[T.Low_earner]	-0.447080	0.049420	-9.046587	1.995458e-19	-0.543961	-0.350198
after_1980[T.Before_1980]	-0.198259	0.051902	-3.819849	1.349724e-04	-0.300007	-0.096510
highearn[T.Low_earner]:after_1980[T.Before_1980]	0.190601	0.068509	2.782138	5.418222e-03	0.056297	0.324905

# Simple Analysis + Controls

In [None]:
df.columns

In [None]:
df = pd.DataFrame(df_filtered)

# Convert to categorical variables
df["indust"] = df["indust"].astype("category")
df["injtype"] = df["injtype"].astype("category")

# Fit the more complex linear model
model_2 = smf.ols(
    "log_duration ~ highearn + after_1980 + highearn * after_1980 + male + married + lage + hosp + indust + injtype + prewage + prewage * highearn + construc+ manuf +totmed +head +neck + upextr+trunk+lowback+lowextr+occdis",
    data=df,
).fit()

# Display the results in a tidy format
results_complex = model_2.summary2().tables[1]
print(results_complex)

# Alternatively, display the results in a more "tidy" DataFrame format
tidy_results = results_complex.reset_index()
tidy_results.columns = [
    "term",
    "estimate",
    "std_error",
    "t_value",
    "p_value",
    "conf_lower",
    "conf_upper",
]
print(tidy_results)

In [None]:
model_2.summary2()

# Deep Learning Doubly Robust

## Main model for Application

In [None]:
df_filtered["indust"] = df_filtered["indust"].astype("category")
df_filtered["injtype"] = df_filtered["injtype"].astype("category")


df = pd.DataFrame(df_filtered)
df_cleaned = df.dropna()
df = df_cleaned

In [None]:
import os
import warnings

import numpy as np
import pandas as pd
import statsmodels.api as sm
import tensorflow as tf
from sklearn.base import BaseEstimator
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.layers import Dense, Input, ReLU
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2

os.environ["PYTHONHASHSEED"] = "42"

# Configure TensorFlow for deterministic operations
os.environ["TF_DETERMINISTIC_OPS"] = "1"
os.environ["CUDA_VISIBLE_DEVICES"] = ""  # Use CPU for deterministic behavior
warnings.filterwarnings("ignore")
# Setting seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)


class LossHistory(Callback):
    """To retrieve the training and validation losses during training."""

    def on_train_begin(self, logs=None):
        if logs is None:
            logs = {}
        self.losses = []
        self.val_losses = []

    def on_epoch_end(self, epoch, logs=None):
        if logs is None:
            logs = {}
        self.losses.append(logs.get("loss"))
        self.val_losses.append(logs.get("val_loss"))

    def get_min_loss(self):
        return min(self.losses), min(self.val_losses)


def create_deep_ffnn(input_dim, depth, units, learning_rate, l2_reg):
    inputs = Input(shape=(input_dim,))
    x = Dense(units, kernel_regularizer=l2(l2_reg))(inputs)
    x = ReLU()(x)

    for _ in range(depth - 2):
        x = Dense(units, kernel_regularizer=l2(l2_reg))(x)
        x = ReLU()(x)

    outputs = Dense(1, activation="sigmoid")(x)
    model = Model(inputs, outputs)

    # Compile the model with Adam optimizer and specified learning rate
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss="binary_crossentropy")

    return model


class KerasClassifier(BaseEstimator):
    def __init__(
        self,
        input_dim,
        depth,
        units,
        learning_rate,
        l2_reg,
        epochs=50,
        batch_size=32,
        random_state=42,
    ):
        self.input_dim = input_dim
        self.depth = depth
        self.units = units
        self.learning_rate = learning_rate
        self.l2_reg = l2_reg
        self.epochs = epochs
        self.batch_size = batch_size
        self.random_state = random_state
        self.model = None
        self.classes_ = None
        self.train_loss = None
        self.val_loss = None

    def fit(self, X, y, sample_weight=None):
        np.random.seed(self.random_state)
        tf.random.set_seed(self.random_state)

        self.model = create_deep_ffnn(
            self.input_dim,
            self.depth,
            self.units,
            self.learning_rate,
            self.l2_reg,
        )
        history = LossHistory()
        self.model.fit(
            X,
            y,
            epochs=self.epochs,
            batch_size=self.batch_size,
            verbose=0,
            validation_split=0.2,
            callbacks=[history],
        )
        self.train_loss = history.losses
        self.val_loss = history.val_losses
        self.classes_ = np.unique(y)

    def predict(self, X):
        pred_prob = self.model.predict(X)
        return (pred_prob > 0.5).astype(int)

    def predict_proba(self, X):
        pred_prob = self.model.predict(X)
        return np.hstack([1 - pred_prob, pred_prob])


# Define parameters for the neural network
depth = 3
units = 32
learning_rate = 0.01
l2_reg = 0.01
input_dim = 19  # Adjust according to your feature dimension

df.dropna()
df["D"] = df["highearn"] * df["after_1980"]
y = df["log_duration"].values
d = df["D"].values
x = df[
    [
        "highearn",
        "after_1980",
        "male",
        "married",
        "lage",
        "hosp",
        "indust",
        "injtype",
        "prewage",
        "construc",
        "manuf",
        "totmed",
        "head",
        "neck",
        "upextr",
        "trunk",
        "lowback",
        "lowextr",
        "occdis",
    ]
].values

# Split the data for the neural network
int_cov_train, int_cov_val, D_train, D_val = train_test_split(
    x,
    d,
    test_size=0.2,
    random_state=42,
)

# Create and train the neural network classifier
nn_classifier = KerasClassifier(
    input_dim=input_dim,
    depth=depth,
    units=units,
    learning_rate=learning_rate,
    l2_reg=l2_reg,
    random_state=42,
)
nn_classifier.fit(int_cov_train, D_train)

# Define and fit the DoubleML model
from doubleml import DoubleMLData, DoubleMLDID

ml_g = LinearRegression()
dml_data = DoubleMLData.from_arrays(x=x, y=y, d=d)
dml_did = DoubleMLDID(
    dml_data,
    ml_g=ml_g,
    ml_m=nn_classifier,
    score="observational",
    in_sample_normalization=False,
    n_folds=5,
)
dml_did.fit()

min_train_loss = min(nn_classifier.train_loss)
min_val_loss = min(nn_classifier.val_loss)
print("Minimum Training Loss: ", min_train_loss)
print("Minimum Validation Loss: ", min_val_loss)

# Output the results
print(dml_did)

In [None]:
dml_did.summary

coef	std err	t	P>|t|	2.5 %	97.5 %
d	0.184286	0.04616	3.992338	0.000065	0.093814	0.274758