In [1]:
import numpy as np
import pandas as pd
from scipy.stats import beta as beta_distribution
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

In [2]:
# Generate feature vectors
def generate_feature_vectors(N, d):
    return np.random.uniform(0, 1, (N, d))

# Propensity score function
def propensity_score(x):
    return 1 / 4 * (1 + beta_distribution.pdf(x[0], 2, 4))

# Response functions
mu0 = lambda x: 2 * x[0] - 1
mu1 = mu0

In [3]:
from Simulation.X_learner import X_learner_lgbm, X_learner_linear, X_learner_rf, X_learner_svm, X_learner_nn
from Simulation.T_learner import T_learner_lgbm, T_learner_linear, T_learner_rf, T_learner_svm, T_learner_nn
from Simulation.S_learner import S_learner_lgbm, S_learner_linear, S_learner_rf, S_learner_svm, S_learner_nn


# Generate potential outcomes
def potential_outcomes(X, mu0, mu1):
    Y0 = []
    Y1 = []
    for sample in X:
        err_0 = np.random.normal(loc=0, scale=1)
        err_1 = np.random.normal(loc=0, scale=1)
        Yi_0 = mu0(sample) + err_0
        Yi_1 = mu1(sample) + err_1
        Y0.append(Yi_0)
        Y1.append(Yi_1)
    return Y0, Y1


# Simulate data for a single experiment
def generate_data(N, d, mu0, mu1):
    X = generate_feature_vectors(N, d)
    e = np.array([propensity_score(x) for x in X])
    Y0, Y1 = potential_outcomes(X, mu0, mu1)
    T = np.random.binomial(1, e, size=N)
    Y = T * Y1 + (1 - T) * Y0
    return X, T, Y0, Y1, Y

def make_dataframe(X, T, Y0, Y1, Y):
    df = pd.DataFrame(X)
    df.columns = [f"X{i}" for i in range(1, X.shape[1] + 1)]
    df['T'] = T
    df['Y0'] = Y0
    df['Y1'] = Y1
    df['Y'] = Y
    df['cate'] = df['Y1'] - df['Y0']
    return df

# Calculate the MSE
def calculate_mse(test_df):
    return mean_squared_error(test_df['cate'], test_df['pred_cate'])

# Perform multiple experiments with different sample sizes
def perform_experiments(N_list, d, mu0, mu1, model):
    s_mse_list, t_mse_list, x_mse_list = [], [], []

    for N in N_list:
        X, T, Y0, Y1, Y = generate_data(N, d, mu0, mu1)
        df_sim = make_dataframe(X, T, Y0, Y1, Y)
        X_sim = [f"X{i}" for i in range(1, d + 1)]
        train_sim, test_sim = train_test_split(df_sim, test_size=0.3, random_state=13)
        
        if model == "LGBM":
            # Get CATE estimates (for S-, T- and X-learner)
            s_cate_train, s_cate_test = S_learner_lgbm(train_sim, test_sim, X_sim, 'T', 'Y')
            t_cate_train, t_cate_test = T_learner_lgbm(train_sim, test_sim, X_sim, 'T', 'Y')
            x_cate_train, x_cate_test = X_learner_lgbm(train_sim, test_sim, X_sim, 'T', 'Y')

        elif model == "LinearRegression":
            # Get CATE estimates (for S-, T- and X-learner)
            s_cate_train, s_cate_test = S_learner_linear(train_sim, test_sim, X_sim, 'T', 'Y')
            t_cate_train, t_cate_test = T_learner_linear(train_sim, test_sim, X_sim, 'T', 'Y')
            x_cate_train, x_cate_test = X_learner_linear(train_sim, test_sim, X_sim, 'T', 'Y')

        elif model == "RF":
            # Get CATE estimates (for S-, T- and X-learner)
            s_cate_train, s_cate_test = S_learner_rf(train_sim, test_sim, X_sim, 'T', 'Y')
            t_cate_train, t_cate_test = T_learner_rf(train_sim, test_sim, X_sim, 'T', 'Y')
            x_cate_train, x_cate_test = X_learner_rf(train_sim, test_sim, X_sim, 'T', 'Y')

        elif model == "SVM":
            # Get CATE estimates (for S-, T- and X-learner)
            s_cate_train, s_cate_test = S_learner_svm(train_sim, test_sim, X_sim, 'T', 'Y')
            t_cate_train, t_cate_test = T_learner_svm(train_sim, test_sim, X_sim, 'T', 'Y')
            x_cate_train, x_cate_test = X_learner_svm(train_sim, test_sim, X_sim, 'T', 'Y')

        elif model == "NN":
            # Get CATE estimates (for S-, T- and X-learner)
            s_cate_train, s_cate_test = S_learner_nn(train_sim, test_sim, X_sim, 'T', 'Y')
            t_cate_train, t_cate_test = T_learner_nn(train_sim, test_sim, X_sim, 'T', 'Y')
            x_cate_train, x_cate_test = X_learner_nn(train_sim, test_sim, X_sim, 'T', 'Y')

        else:
            s_cate_train, s_cate_test = None, None
            t_cate_train, t_cate_test = None, None
            x_cate_train, x_cate_test = None, None
        
        # Calculate MSE
        s_mse_list.append(calculate_mse(s_cate_test))
        t_mse_list.append(calculate_mse(t_cate_test))
        x_mse_list.append(calculate_mse(x_cate_test))

    return s_mse_list, t_mse_list, x_mse_list

def iterate_experiments(N_list, num_experiments, d, mu0, mu1, model):
    s_mse_total, t_mse_total, x_mse_total = [], [], []

    for _ in range(num_experiments):
        s_mse, t_mse, x_mse = perform_experiments(N_list, d, mu0, mu1, model)
        s_mse_total.append(s_mse)
        t_mse_total.append(t_mse)
        x_mse_total.append(x_mse)

    return s_mse_total, t_mse_total, x_mse_total

In [4]:
# Simulation setup for the confounded case
N_list = [300, 1000, 3000, 6000, 10000]
num_experiments = 5
d = 20

In [5]:
# NN as model
model = "NN"
s_mse_total, t_mse_total, x_mse_total= iterate_experiments(N_list, num_experiments, d, mu0, mu1, model)
s_mse = np.mean(s_mse_total, axis=0)
t_mse = np.mean(t_mse_total, axis=0)
x_mse = np.mean(x_mse_total, axis=0)

In [6]:
from matplotlib import pyplot as plt

# Plotting the average MSE for different num of samples
plt.plot(N_list, s_mse, marker='o', label='S-learner')
plt.plot(N_list, t_mse, marker='o', label='T-learner')
plt.plot(N_list, x_mse, marker='o', label='X-learner')
plt.xlabel('Number of samples')
plt.ylabel('MSE')
plt.title('Simulation 6: NN Regressor')
plt.legend()
plt.show()

In [7]:
print("NN:")
print("S-learner: ")
print(s_mse)
print("T-learner: ")
print(t_mse)
print("X-learner: ")
print(x_mse)