In [1]:
import shap
import matplotlib.pyplot as plt
import pandas as pd 
import numpy as np 
import warnings
import xgboost as xgb
from sklearn.metrics import accuracy_score
from sklearn import metrics
from sklearn.cluster import KMeans
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, roc_curve
warnings.filterwarnings("ignore")
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
import re
import random
import itertools
import lime
from lime import lime_tabular
from sklearn.linear_model import Ridge
from sklearn.model_selection import KFold


In [2]:


###############################################################################
#  2.  Weight-learning  —  Doubly-robust objective
###############################################################################
def dr_pseudo_w(y, A, pi, m1, m0):
    """
    DR pseudo-outcome for weight-learning.
      c   = (1 - A)/2  +  π*A         ➜  c = π  (treated) , 1-π (control)
      φ_DR = ((A01 - π) * (y - m_A)) / c  +  (m1 - m0)
    Returns both φ_DR and c (because c is also the weight).
    """
    A01 = (A + 1.0) / 2.0
    m_A = np.where(A == 1, m1, m0)
    c   = (1.0 - A) / 2.0 + pi * A          # same denominator you used
    φ   = (A01 - pi) * (y - m_A) / (c + EPS) + (m1 - m0)
    return φ, c + EPS


def obj_dr_w(predt, dtrain):
    """
    Custom XGBoost objective for DR weight-learning:
      Loss  L = (φ_DR − pred)^2 / c
    so   ∂L/∂pred = -2/c (φ_DR − pred)
         ∂²L/∂pred² =  2/c
    """
    y = dtrain.get_label()
    φ, c = dr_pseudo_w(y, X_train_trt, pi_train, m1_train, m0_train)

    grad = -2.0 / c * (φ - predt)
    hess =  2.0 / c

    return grad, hess


def eval_dr_w(predt, dtrain):
    """Weighted squared error for monitoring."""
    y = dtrain.get_label()
    φ, c = dr_pseudo_w(y, X_train_trt, pi_train, m1_train, m0_train)
    loss = np.mean(((φ - predt) ** 2) / c)
    return 'DR_W_loss', float(loss)



In [3]:
train_num=random.sample(range(0,1000), 800)
test_num=list(set(range(0,1000))-set(train_num))
data_simulation = pd.read_csv('/ui/abv/liuzx18/project_shapley/simulation_data/simulation_0_6.csv').iloc[train_num,]
data_simulation_test = pd.read_csv('/ui/abv/liuzx18/project_shapley/simulation_data/simulation_0_6.csv').iloc[test_num,]
#del data_simulation['Unnamed: 0']
#del data_simulation_test['Unnamed: 0']
x_df=data_simulation[['x1','x2','x3','x4','x5','x6','x7','x8','x9','x10']]
X_=np.array(x_df)[:]
x_df_test=data_simulation_test[['x1','x2','x3','x4','x5','x6','x7','x8','x9','x10']]
X_test=np.array(x_df_test)

y_train=np.array(data_simulation[['y']])
y_train=y_train.reshape(800,)


y_test=np.array(data_simulation_test[['y']])
y_test=y_test.reshape(200,)
trt_=data_simulation[['treatment']]


g_real=data_simulation[['sigpos']]
g_real_test=data_simulation_test[['sigpos']]




logreg = LogisticRegression()
logreg.fit(X_,trt_)
pi_x = logreg.predict_proba(X_)
pi_train=pi_x[:,1]



X_train_trt=np.where(trt_==1,1,-1)
X_train_trt=X_train_trt.reshape(800,)

dtrain = xgb.DMatrix(X_, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)


x_train_feature_pd=x_df

In [4]:
EPS = 1e-12    
A   = X_train_trt            # ±1 array
X   = X_               # feature matrix   (n, p)
Y   = y_train                # outcome vector   (n, )
K   = 5

m1_train = np.zeros_like(Y, dtype=float)
m0_train = np.zeros_like(Y, dtype=float)

kf = KFold(n_splits=K, shuffle=True, random_state=123)

for train_idx, test_idx in kf.split(X):
    # --------- treated arm (+1) ----------
    in_arm   = (A[train_idx] ==  1)
    booster1 = xgb.XGBRegressor(max_depth=3, n_estimators=200,
                                learning_rate=0.05, subsample=0.8,
                                colsample_bytree=0.8)
    booster1.fit(X[train_idx][in_arm], Y[train_idx][in_arm])
    m1_train[test_idx] = booster1.predict(X[test_idx])

    # --------- control arm (−1) ----------
    in_arm   = (A[train_idx] == -1)
    booster0 = xgb.XGBRegressor(max_depth=3, n_estimators=200,
                                learning_rate=0.05, subsample=0.8,
                                colsample_bytree=0.8)
    booster0.fit(X[train_idx][in_arm], Y[train_idx][in_arm])
    m0_train[test_idx] = booster0.predict(X[test_idx])

In [5]:
def dr_pseudo_a(y, A, pi, m1, m0):
    """
    DR pseudo-outcome for A-learning (A = ±1 coding).
      φ_DR =  ((A01 - π) * (y - m_A)) / (π * (1 - π))  +  (m1 - m0)
    """
    A01 = (A + 1.0) / 2.0                  # convert ±1 → 0/1
    m_A = np.where(A == 1, m1, m0)
    denom = pi * (1.0 - pi) + EPS
    return (A01 - pi) * (y - m_A) / denom + (m1 - m0)


def obj_dr_a(predt, dtrain):
    """
    Custom XGBoost *objective* using the DR pseudo-outcome for A-learning.
      Loss  L = (φ_DR − pred)^2     (un-weighted)
    """
    y = dtrain.get_label()
    φ = dr_pseudo_a(y, X_train_trt, pi_train, m1_train, m0_train)

    grad = -2.0 * (φ - predt)               # ∂L/∂pred
    hess =  2.0 * np.ones_like(predt)       # ∂²L/∂pred²

    return grad, hess


def eval_dr_a(predt, dtrain):
    """Squared error on φ_DR (for logging only)."""
    y = dtrain.get_label()
    φ = dr_pseudo_a(y, X_train_trt, pi_train, m1_train, m0_train)
    loss = np.mean((φ - predt) ** 2)
    return 'DR_A_loss', float(loss)

best_epoch=1000

# --- DR A-learning ----------------------------------------------------------
best_params={'learning_rate':0.005,
     'verbosity':1,
     'booster':'gbtree',
     'max_depth':1,
     'lambda':5,
     'tree_method':'hist' 
    }

model = xgb.train(
        best_params,
        dtrain,
        num_boost_round=best_epoch,  # Use the best num_boost_round
        obj=obj_dr_a,
        feval=eval_dr_a
    )
# Predictions and Evaluation on Train Set
pred_train = model.predict(dtrain)
pred_train_prob = 1.0 / (1.0 + np.exp(-pred_train))  # Sigmoid transformation if needed
auc_train = roc_auc_score(g_real.astype(int), pred_train_prob)
print(f"Final Train AUC: {auc_train}")

# Predictions and Evaluation on Test Set
pred_test = model.predict(dtest)
pred_test_prob = 1.0 / (1.0 + np.exp(-pred_test))  # Sigmoid transformation if needed
auc_test = roc_auc_score(g_real_test.astype(int), pred_test_prob)
print(f"Final Test AUC: {auc_test}")

Final Train AUC: 0.9982020012507816
Final Test AUC: 0.9989898989898991
