# Split Conformal Prediction API

In [32]:
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor
from sklearn.ensemble import GradientBoostingRegressor
import numpy as np

class SplitConformalPrediction:
    def __init__(self, data, W, Y, X, test_size1=0.4, test_size2=0.5, lower_quantile=0.05, upper_quantile=0.95, random_state=42):
        self.data = data
        self.W = W
        self.Y = Y
        self.X = X
        self.test_size1 = test_size1
        self.test_size2 = test_size2
        self.lower_quantile = lower_quantile
        self.upper_quantile = upper_quantile
        self.random_state = random_state
        
        self.split_data()
        self.train_nn_Y0_Y1()
        self.train_nn_ITE()
        self.train_quantile_models()
    
    def split_data(self):
        # Split the original data into prediction_model_train and temp_data
        self.prediction_model_train, temp_data = train_test_split(
            self.data, 
            test_size=self.test_size1, 
            random_state=self.random_state
        )
        
        # Further split temp_data into quantile_model_train and quantile_model_calibration
        self.quantile_model_train, self.quantile_model_calibration = train_test_split(
            temp_data, 
            test_size=self.test_size2, 
            random_state=self.random_state
        )
        
    def train_nn_Y0_Y1(self):
        # Prepare data for training Y0 and Y1 models
        train_data_Y0 = self.prediction_model_train[self.prediction_model_train[self.W] == 0]
        X_train_Y0 = train_data_Y0[self.X]
        y_train_Y0 = train_data_Y0[self.Y]

        train_data_Y1 = self.prediction_model_train[self.prediction_model_train[self.W] == 1]
        X_train_Y1 = train_data_Y1[self.X]
        y_train_Y1 = train_data_Y1[self.Y]

        # Train model for Y0
        self.model_Y0 = MLPRegressor(random_state=self.random_state)
        self.model_Y0.fit(X_train_Y0, y_train_Y0)

        # Train model for Y1
        self.model_Y1 = MLPRegressor(random_state=self.random_state)
        self.model_Y1.fit(X_train_Y1, y_train_Y1)

    def train_nn_ITE(self):
        # Prepare pseudo ITE data
        pseudo_ITE_data = self.prediction_model_train.copy()
        pseudo_ITE_data['Y1'] = np.where(pseudo_ITE_data[self.W] == 1, pseudo_ITE_data[self.Y], self.model_Y0.predict(pseudo_ITE_data[self.X]))
        pseudo_ITE_data['Y0'] = np.where(pseudo_ITE_data[self.W] == 0, pseudo_ITE_data[self.Y], self.model_Y1.predict(pseudo_ITE_data[self.X]))
        pseudo_ITE_data['ITE'] = pseudo_ITE_data['Y1'] - pseudo_ITE_data['Y0']

        # Train model for ITE
        X_ITE = pseudo_ITE_data[self.X]
        y_ITE = pseudo_ITE_data['ITE']
        self.model_ITE = MLPRegressor(random_state=self.random_state)
        self.model_ITE.fit(X_ITE, y_ITE)

    def train_quantile_models(self):
        # Train lower bound quantile model
        self.lower_quantile_model = GradientBoostingRegressor(loss='quantile', alpha=self.lower_quantile, random_state=self.random_state)
        self.lower_quantile_model.fit(self.quantile_model_train[self.X], self.quantile_model_train[self.Y])

        # Train upper bound quantile model
        self.upper_quantile_model = GradientBoostingRegressor(loss='quantile', alpha=self.upper_quantile, random_state=self.random_state)
        self.upper_quantile_model.fit(self.quantile_model_train[self.X], self.quantile_model_train[self.Y])

    def calculate_calibrated_interval(self):
        # Calculate predicted ITE for the calibration set
        X_calibration = self.quantile_model_calibration[self.X]
        actual_ite_calibration = self.quantile_model_calibration[self.Y]
        pred_ite_calibration = self.model_ITE.predict(X_calibration)

        # Calculate prediction errors and sort them
        prediction_errors = np.abs(actual_ite_calibration - pred_ite_calibration)
        sorted_errors = np.sort(prediction_errors)

        # Calculate calibrated bounds
        lower_error = sorted_errors[int(len(sorted_errors) * self.lower_quantile)]
        upper_error = sorted_errors[int(len(sorted_errors) * self.upper_quantile)]
        calibrated_lower_bound = pred_ite_calibration - lower_error
        calibrated_upper_bound = pred_ite_calibration + upper_error

        # Calculate coverage rate
        calibrated_coverage_rate = np.mean((actual_ite_calibration >= calibrated_lower_bound) & (actual_ite_calibration <= calibrated_upper_bound))

        return calibrated_lower_bound, calibrated_upper_bound, calibrated_coverage_rate


In [35]:
# Initialize the class with the data and parameters
W = "treatment"
Y = "wage_2010"
X = [col for col in data.columns if col not in [W, Y]]
scp = SplitConformalPrediction(data, W=W, Y=Y, X=X, random_state=121)

# Calculate the calibrated confidence interval
calibrated_lower_bound, calibrated_upper_bound, calibrated_coverage_rate = scp.calculate_calibrated_interval()



In [None]:
calibrated_coverage_rate

# Step-by-step Illustration

In [11]:
# Load the dataset
file_path = r"gender_equality_data_no_na.csv"
df = pd.read_csv(file_path)

# Show the first few rows of the dataset and its summary statistics
df.describe()

Unnamed: 0,hours,female,IQ,KWW,educ,black,south,urban,sibs,brthord,meduc,feduc,wage_2005,emp_2005,exper_2005,tenure_2005,age_2005,married_2005,treatment,wage_2010
count,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0,736.0
mean,39.691576,0.490489,102.471467,36.22962,13.705163,0.080163,0.320652,0.720109,2.820652,2.183424,10.808424,10.305707,779.342052,0.900815,11.402174,7.199728,32.972826,0.899457,0.341033,1068.35824
std,14.828039,0.500249,14.616429,7.656947,2.233639,0.27173,0.467045,0.449251,2.265484,1.51556,2.827462,3.280994,436.803111,0.299113,4.208666,5.017743,3.047349,0.300928,0.474379,602.048732
min,0.0,0.0,54.0,13.0,9.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,28.0,0.0,0.0,0.0
25%,40.0,0.0,94.0,31.75,12.0,0.0,0.0,0.0,1.0,1.0,9.0,8.0,525.0,1.0,8.0,3.0,30.0,1.0,0.0,744.485697
50%,40.0,0.0,104.0,37.0,13.0,0.0,0.0,1.0,2.0,2.0,12.0,11.0,773.625,1.0,11.0,7.0,33.0,1.0,0.0,1077.658545
75%,45.0,1.0,113.0,42.0,16.0,0.0,1.0,1.0,4.0,3.0,12.0,12.0,1005.4375,1.0,15.0,11.0,36.0,1.0,1.0,1437.73383
max,80.0,1.0,145.0,56.0,18.0,1.0,1.0,1.0,14.0,10.0,18.0,18.0,2668.0,1.0,22.0,22.0,38.0,1.0,1.0,3791.589594


In [None]:
# Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor

# Load the data
file_path = 'gender_equality_data_no_na.csv'
data = pd.read_csv(file_path)

# Split the data into three parts
prediction_model_train, temp_data = train_test_split(data, test_size=0.4, random_state=42)
quantile_model_train, quantile_model_calibration = train_test_split(temp_data, test_size=0.5, random_state=42)

# Define features and target variables
W = "treatment"
Y = "wage_2010"
X = [col for col in data.columns if col not in [W, Y]]

In [16]:
# Follow the receipe of X-learner

# Train a simple Neural Network model predicting Y0 (wage_2010 for treatment=0)
train_data_Y0 = prediction_model_train[prediction_model_train[W] == 0]
X_train_Y0 = train_data_Y0[X]
y_train_Y0 = train_data_Y0[Y]
model_Y0 = MLPRegressor(random_state=42)
model_Y0.fit(X_train_Y0, y_train_Y0)

# Train another simple Neural Network model predicting Y1 (wage_2010 for treatment=1)
train_data_Y1 = prediction_model_train[prediction_model_train[W] == 1]
X_train_Y1 = train_data_Y1[X]
y_train_Y1 = train_data_Y1[Y]
model_Y1 = MLPRegressor(random_state=42)
model_Y1.fit(X_train_Y1, y_train_Y1)

model_Y0, model_Y1



(MLPRegressor(random_state=42), MLPRegressor(random_state=42))

In [17]:
# Prepare the data representing pseudo ITE (Individual Treatment Effect)
pseudo_ITE_data = prediction_model_train.copy()
pseudo_ITE_data['Y1'] = None
pseudo_ITE_data['Y0'] = None

# For rows where W=1, Y1=Y, and Y0 is predicted by model_Y0
mask_W1 = pseudo_ITE_data[W] == 1
pseudo_ITE_data.loc[mask_W1, 'Y1'] = pseudo_ITE_data.loc[mask_W1, Y]
pseudo_ITE_data.loc[mask_W1, 'Y0'] = model_Y0.predict(pseudo_ITE_data.loc[mask_W1, X])

# For rows where W=0, Y0=Y, and Y1 is predicted by model_Y1
mask_W0 = pseudo_ITE_data[W] == 0
pseudo_ITE_data.loc[mask_W0, 'Y0'] = pseudo_ITE_data.loc[mask_W0, Y]
pseudo_ITE_data.loc[mask_W0, 'Y1'] = model_Y1.predict(pseudo_ITE_data.loc[mask_W0, X])

# Calculate the pseudo ITE: ITE = Y1 - Y0
pseudo_ITE_data['ITE'] = pseudo_ITE_data['Y1'] - pseudo_ITE_data['Y0']

# Prepare features and target for ITE model
X_ITE = pseudo_ITE_data[X]
y_ITE = pseudo_ITE_data['ITE']

# Fit the ITE to another simple Neural Network
model_ITE = MLPRegressor(random_state=42)
model_ITE.fit(X_ITE, y_ITE)

model_ITE



MLPRegressor(random_state=42)

In [19]:
# 1. Using model_ITE, predict the ITE for Quantile Model Training Data
X_quantile = quantile_model_train[X]
pred_ite = model_ITE.predict(X_quantile)

# 2. Prepare the actual ITE for Quantile Model Training Data
quantile_ITE_data = quantile_model_train.copy()
quantile_ITE_data['Y1'] = None
quantile_ITE_data['Y0'] = None

# For rows where W=1, Y1=Y, and Y0 is predicted by model_Y0
mask_W1_quantile = quantile_ITE_data[W] == 1
quantile_ITE_data.loc[mask_W1_quantile, 'Y1'] = quantile_ITE_data.loc[mask_W1_quantile, Y]
quantile_ITE_data.loc[mask_W1_quantile, 'Y0'] = model_Y0.predict(quantile_ITE_data.loc[mask_W1_quantile, X])

# For rows where W=0, Y0=Y, and Y1 is predicted by model_Y1
mask_W0_quantile = quantile_ITE_data[W] == 0
quantile_ITE_data.loc[mask_W0_quantile, 'Y0'] = quantile_ITE_data.loc[mask_W0_quantile, Y]
quantile_ITE_data.loc[mask_W0_quantile, 'Y1'] = model_Y1.predict(quantile_ITE_data.loc[mask_W0_quantile, X])

# Calculate the actual ITE: ITE = Y1 - Y0
quantile_ITE_data['actual_ite'] = quantile_ITE_data['Y1'] - quantile_ITE_data['Y0']

actual_ite = quantile_ITE_data['actual_ite'].values

pred_ite[:10], actual_ite[:10]  # Displaying first 10 values of actual_ite for a quick look

(array([227.98618267, 207.29785914, 252.8251098 , 191.38238592,
        183.06390687, 238.41118759, 173.62007684, 235.92821863,
        274.24334126, 192.66731149]),
 array([750.995554548319, 267.0314392492619, 331.6038042017951,
        275.92708550816906, 195.4425383669277, 320.14558252573613,
        175.23488594360128, 222.96862697266886, 207.448234410542,
        127.61528483408733], dtype=object))

In [20]:
from sklearn.ensemble import GradientBoostingRegressor

# 1. Train two quantile models, one for the upper bound and the other for the lower bound
# Quantile levels for 90% coverage: lower bound at 5% and upper bound at 95%
lower_quantile = 0.05
upper_quantile = 0.95

# Train the lower bound quantile model
lower_quantile_model = GradientBoostingRegressor(loss='quantile', alpha=lower_quantile, random_state=42)
lower_quantile_model.fit(X_quantile, actual_ite)

# Train the upper bound quantile model
upper_quantile_model = GradientBoostingRegressor(loss='quantile', alpha=upper_quantile, random_state=42)
upper_quantile_model.fit(X_quantile, actual_ite)

# 2. Predict the upper and lower bounds for the Quantile Model Training Data
lower_bound = lower_quantile_model.predict(X_quantile)
upper_bound = upper_quantile_model.predict(X_quantile)

# Calculate the coverage rate
coverage_rate = ((pred_ite >= lower_bound) & (pred_ite <= upper_bound)).mean()

lower_bound[:10], upper_bound[:10], coverage_rate  # Displaying first 10 values of lower and upper bounds and the coverage rate

(array([-75.89817172, 106.70172788, 115.8019146 , 118.43842557,
         92.91491302, 127.5386123 , 112.6503457 ,  92.61794121,
        100.42338973, 126.77992619]),
 array([750.98325003, 483.31701257, 477.54225273, 395.2192382 ,
        339.44517005, 409.23182337, 641.15387069, 339.01792247,
        371.49106345, 430.46629698]),
 0.9115646258503401)