In [None]:
import pandas as pd
import numpy as np
from lifelines import CoxPHFitter
import random
import matplotlib.pyplot as plt
import statistics
from sklearn import metrics
from sklearn.metrics import roc_auc_score, roc_curve, precision_recall_curve, confusion_matrix
from sklearn.linear_model import LogisticRegression

import warnings
warnings.filterwarnings('ignore')
pd.set_option('max_columns', None)
pd.set_option('max_rows', 100)


# calibration slope
def logit(p):
  return np.log(p/(1-p))

def calibration_slope(ground_truth, probabilities):
  probabilities = np.array(probabilities)
  logit_probabilities = logit(probabilities).reshape(-1,1)
  lr = LogisticRegression(penalty='none', fit_intercept=True).fit(logit_probabilities, ground_truth)
  return lr.coef_.item()

# roc curve
def PlotROC(y_test,  y_pred_proba, AUCvalue):
    fpr, tpr, _ = metrics.roc_curve(y_test,  y_pred_proba)
    plt.text(0.3,0, "".join(['AUC =', AUCvalue]), fontsize = 15)
    plt.plot(fpr,tpr)
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')
    plt.show()

# frequency plot
def PlotFreq(y_pred_proba):
    y_pred_plot = pd.Series(y_pred_proba)
    y_pred_plot.plot.hist(grid=True, bins=20, rwidth=0.9,
                       color='#607c8e')
    plt.xlabel('Prob')
    plt.ylabel('Frequency')
    plt.grid(axis='y', alpha=0.75)
    plt.show()
    


In [None]:
class CoxMetrics:
    ## input
    def __init__(self, data, pred_sf, visit):   ## con_var & cate_var are lists of variable names
        self.data = data
        self.pred_sf = pred_sf
        self.y_true = "Y" + str(visit)
        self.y_prob = 'sf_V' + str(visit)
        self.y_pred = 'pred_V' + str(visit)
        
        
    ### complete the test df with predicted risk and labeled prediction
        self.data['sf_V1'] = 1 - self.pred_sf.loc[:, 1]
        self.data['sf_V2'] = 1-self.pred_sf.loc[:, 2]
        self.data['sf_V3'] = 1-self.pred_sf.loc[:, 3]
        
        # calculate optimized threshold
        fpr, tpr, thresholds = roc_curve(self.data[self.y_true], self.data[self.y_prob]) 
        optimal_idx = np.argmax(tpr - fpr)
        self.threshold = thresholds[optimal_idx]
        print("optimized threshold =",self.threshold)
        
        # label the prediction with optimized threshold
        self.data['pred_V1'] = 0
        self.data.loc[self.data['sf_V1'] >= self.threshold, 'pred_V1'] = 1
        self.data['pred_V2'] = 0
        self.data.loc[self.data['sf_V2'] >= self.threshold, 'pred_V2'] = 1
        self.data['pred_V3'] = 0
        self.data.loc[self.data['sf_V3'] >= self.threshold, 'pred_V3'] = 1
        
        print("Prediction completed")    
    
  
    ### subgrouping
        self.te_low = self.data[self.data["nSES"] == 0] 
        self.te_high = self.data[self.data["nSES"] == 1]
        self.te_f = self.data[self.data["gender"] == 0]
        self.te_m = self.data[self.data["gender"] == 1]
        
        # subgroups by both gender and nses
        self.f_low = self.data[(self.data['gender'] == 0) & (self.data["nSES"] == 0)]
        self.f_high = self.data[(self.data["gender"] == 0) & self.data["nSES"] == 1]
        self.m_low = self.data[(self.data['gender'] == 1) & (self.data["nSES"] == 0)]
        self.m_high = self.data[(self.data['gender'] == 1) & (self.data["nSES"] == 1)]
        
        print("Subgrouping")
   
    
    # model performance metrics
        print("\n>>>Model performance")
        self.metrics()
    
    
    def metrics(self):
        
        # AUC
        print("\n>>>>AUC")
        auc = metrics.roc_auc_score(self.data[self.y_true], self.data[self.y_prob])
        print("AUC=", auc)
        
        auc_low = metrics.roc_auc_score(self.te_low[self.y_true], self.te_low[self.y_prob])
        auc_high = metrics.roc_auc_score(self.te_high[self.y_true], self.te_high[self.y_prob])
        auc_f = metrics.roc_auc_score(self.te_f[self.y_true], self.te_f[self.y_prob])
        auc_m = metrics.roc_auc_score(self.te_m[self.y_true], self.te_m[self.y_prob])
        print("AUC in low nSES group = ",auc_low,
             "\n AUC in high nSES group = ",auc_high,
              "\n disparity in AUC = ",abs(auc_low - auc_high),
             "\n AUC in female group = ",auc_f,
             "\n AUC in male group = ",auc_m)
        
        auc_f_low = metrics.roc_auc_score(self.f_low[self.y_true], self.f_low[self.y_prob])
        auc_f_high = metrics.roc_auc_score(self.f_high[self.y_true], self.f_high[self.y_prob])
        auc_m_low = metrics.roc_auc_score(self.m_low[self.y_true], self.m_low[self.y_prob])
        auc_m_high = metrics.roc_auc_score(self.m_high[self.y_true], self.m_high[self.y_prob])
        print("AUC in f_low group = ",auc_f_low,
             "\n AUC in f_high group = ",auc_f_high,
             "\n AUC in m_low group = ",auc_m_low,
             "\n AUC in m_high group = ",auc_m_high)
        
        # calibration slope
        print("\n>>>>Calibration Slope")
        print("calibration slope =",calibration_slope(self.data[self.y_true], self.data[self.y_prob]))
        
        print("CS in low_nses group = ",calibration_slope(self.te_low[self.y_true], self.te_low[self.y_prob]),
             "\n CS in high_nses group = ",calibration_slope(self.te_high[self.y_true], self.te_high[self.y_prob]),
            "\n CS in f_low group = ",calibration_slope(self.f_low[self.y_true], self.f_low[self.y_prob]),
             "\n CS in f_high group = ",calibration_slope(self.f_high[self.y_true], self.f_high[self.y_prob]),
             "\n CS in m_low group = ",calibration_slope(self.m_low[self.y_true], self.m_low[self.y_prob]),
             "\n CS in m_high group = ",calibration_slope(self.m_high[self.y_true], self.m_high[self.y_prob]))      
        
        # TPR&FPR
        print("\n>>>>TPR&FPR")
        def TprFpr(TrueLabel, PredLable, subgroup):
            CM = confusion_matrix(TrueLabel, PredLable)
            tpr = CM[1,1]/(CM[1,1]+CM[1,0])
            fpr = CM[0,1]/(CM[0,1]+CM[0,0])
            print(subgroup,": \n",
                  "".join(['TPR=', str(tpr), ',','FPR=', str(fpr)]))
    
        TprFpr(self.data[self.y_true], self.data[self.y_pred],"whole")
        TprFpr(self.te_low[self.y_true], self.te_low[self.y_pred],"low")
        TprFpr(self.te_high[self.y_true], self.te_high[self.y_pred],"high")
        TprFpr(self.f_low[self.y_true], self.f_low[self.y_pred],"f_low")
        TprFpr(self.f_high[self.y_true], self.f_high[self.y_pred],"f_high")
        TprFpr(self.m_low[self.y_true], self.m_low[self.y_pred], "m_low")
        TprFpr(self.m_high[self.y_true], self.m_high[self.y_pred],"m_high")
        
        # roc curve
        PlotROC(self.data[self.y_true], self.data[self.y_prob], str(auc))
        
        # frequency plot
        PlotFreq(self.data[self.y_prob])
        

In [None]:
# Apr 4
cox_tr = pd.read_csv('data/jhs_tr_stratified.csv')
cox_te = pd.read_csv('data/jhs_te_stratified.csv')

cox_tr = cox_tr.loc[:, ~cox_tr.columns.isin(['subjid', 'y_tot'])]
cox_te = cox_te.loc[:, ~cox_te.columns.isin(['subjid', 'y_tot'])]

# recode event and time

cox_tr['event'] = 0
cox_tr['time'] = 3
cox_te['event'] = 0
cox_te['time'] = 3

## incidence in V1
cox_tr.loc[cox_tr['y1'] == 1,'event'] = 1
cox_tr.loc[cox_tr['y1'] == 1,'time'] = 1
cox_te.loc[cox_te['y1'] == 1,'event'] = 1
cox_te.loc[cox_te['y1'] == 1,'time'] = 1

## incidence in V2
v2_index = (cox_tr['y2'] == 1) & (cox_tr['y1'] == 0)
cox_tr.loc[v2_index,'event'] = 1
cox_tr.loc[v2_index,'time'] = 2
v2_index_te = (cox_te['y2'] == 1) & (cox_te['y1'] == 0)
cox_te.loc[v2_index_te,'event'] = 1
cox_te.loc[v2_index_te,'time'] = 2

## incidence in V1
v3_index = (cox_tr['y3'] == 1) & (cox_tr['y1'] == 0) & (cox_tr['y2'] == 0)
cox_tr.loc[v3_index,'event'] = 1
v3_index_te = (cox_te['y3'] == 1) & (cox_te['y1'] == 0) & (cox_te['y2'] == 0)
cox_te.loc[v3_index_te,'event'] = 1


In [None]:
# cumulative true outcome: Y1, Y2, Y3
cox_te['Y1'] = cox_te['y1']

cox_te['Y2'] = cox_te['Y1']
cox_te.loc[cox_te['y2'] == 1, 'Y2'] = 1

cox_te['Y3'] = cox_te['Y2']
cox_te.loc[cox_te['y3'] == 1, 'Y3'] = 1

cox_te

In [None]:
##### M1 without nses #####

In [None]:
# fit model

df_m1 = cox_tr.loc[:, ~cox_tr.columns.isin(['nSES', 'nbSESpc2score', 'y1', 'y2', 'y3'])]

m1 = CoxPHFitter()
m1.fit(df_m1, duration_col='time', event_col='event')

In [None]:
# model summary

m1.print_summary()

m1.plot()
plt.show()

In [None]:
# prediction

In [None]:
# V1
plt.hist(m1_sf.loc[:, 1], color = '#1f77b4', edgecolor = 'black',
         bins = 50)
plt.title('M1 - predicted hazards for V1')
plt.xlabel('hazards')
plt.ylabel('Frequency')
plt.show()

In [None]:
# model performance

In [None]:
m1_sf = np.transpose(m1.predict_survival_function(cox_te))
m1_v3 = CoxMetrics(cox_te, m1_sf, visit = 3)
m1_merge = m1_v3.data[['gender', 'nSES', 'Y3', 'sf_V3', 'pred_V3']]


In [None]:
##### M2 with binary nSES #####

In [None]:
# fit model

df_m2 = cox_tr.loc[:, ~cox_tr.columns.isin(['nbSESpc2score', 'y1', 'y2', 'y3'])]

m2 = CoxPHFitter()
m2.fit(df_m2, duration_col='time', event_col='event')

In [None]:
# model summary
m2.print_summary()
m2.plot()
plt.show()

In [None]:
m2_sf = np.transpose(m2.predict_survival_function(cox_te))
m2_v3 = CoxMetrics(cox_te, m2_sf, visit = 3)

In [None]:
m2_merge = m2_v3.data[['sf_V3', 'pred_V3']]
m2_merge = m2_merge.rename(columns={"sf_V3": "m2_sf_V3",
                                   'pred_V3': 'm2_pred_V3'})


In [None]:
##### M3 with continuous nSES score #####

In [None]:
# fit model

df_m3 = cox_tr.loc[:, ~cox_tr.columns.isin(['nSES', 'y1', 'y2', 'y3'])]

m3 = CoxPHFitter()
m3.fit(df_m3, duration_col='time', event_col='event')

In [None]:
# model summary
m3.print_summary()
m3.plot()
plt.show()

In [None]:
m3_sf = np.transpose(m3.predict_survival_function(cox_te))
m3_v3 = CoxMetrics(cox_te, m3_sf, visit = 3)

In [None]:
m3_merge = m3_v3.data[['sf_V3', 'pred_V3']]
m3_merge = m3_merge.rename(columns={"sf_V3": "m3_sf_V3",
                                   'pred_V3': 'm3_pred_V3'})

In [None]:
# combine predictions from 3 models
test_df = pd.concat([m1_merge, m2_merge, m3_merge], axis = 1)
test_df.to_csv('data/cox_predictions.csv', index = False) 

In [None]:
print(test_df['sf_V3'].mean())
print(test_df['m2_sf_V3'].mean())
print(test_df['m3_sf_V3'].mean())

In [None]:
##### prediction comparisons ##### 

In [None]:
# focus on subjects that have dif labels in 3 models
consistent_subj_index = (test_df['pred_V3'] == test_df['m2_pred_V3'] ) & ( test_df['m2_pred_V3'] == test_df['m3_pred_V3'])
inconsistent_subj = test_df.loc[~consistent_subj_index, :]

f_low = inconsistent_subj.loc[(inconsistent_subj['nSES'] == 0)&(inconsistent_subj['gender'] == 0),:]
f_high = inconsistent_subj.loc[(inconsistent_subj['nSES'] == 1)&(inconsistent_subj['gender'] == 0),:]

m_low = inconsistent_subj.loc[(inconsistent_subj['nSES'] == 0)&(inconsistent_subj['gender'] == 1),:]
m_high = inconsistent_subj.loc[(inconsistent_subj['nSES'] == 1)&(inconsistent_subj['gender'] == 1),:]


In [None]:
sum((inconsistent_subj['m3_pred_V3'] - inconsistent_subj['pred_V3']) == -1)

In [None]:
###### cali plots 

In [None]:
# calibration plot prep
from sklearn.calibration import calibration_curve
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
import matplotlib.transforms as mtransforms     
from matplotlib import lines

# function to add line using slope and intercept
def abline(slope, intercept):
    axes = plt.gca()
    x_vals = np.array(axes.get_xlim())
    y_vals = intercept + slope * x_vals
    plt.plot(x_vals, y_vals, '-', color='black')
    

In [None]:
### plot 1: all test data, bin by 
nses0_y, nses0_x = calibration_curve(test_df['Y3'], test_df['sf_V3'], n_bins=10,strategy = 'quantile')
nses1_y, nses1_x = calibration_curve(test_df['Y3'], test_df['m2_sf_V3'], n_bins=10, strategy = 'quantile')
nsescon_y, nsescon_x = calibration_curve(test_df['Y3'], test_df['m3_sf_V3'], n_bins=10, strategy = 'quantile')

# calibration curves
fig, ax = plt.subplots()
plt.plot(nses1_x,nses1_y, marker='o', linewidth=1, label='with binary nSES')
plt.plot(nses0_x, nses0_y, marker='o', linewidth=1, label='without nSES')
plt.plot(nsescon_x, nsescon_y, marker='o', linewidth=1, label='with continuous nSES')

# reference line, legends, and axis labels
abline(1, 0)
fig.suptitle('Calibration plot for Cox Model')
ax.set_xlabel('Predicted probability')
ax.set_ylabel('True probability in each bin')
plt.legend()
plt.show() 




In [None]:
# mse from the idealized cali line
mse_no_nses = sum((nses0_y - nses0_x)**2) / 10
print(mse_no_nses)
mse_bin_nses = sum((nses1_y - nses1_x)**2) / 10
print(mse_bin_nses)
mse_con_nses = sum((nsescon_y - nsescon_x)**2) / 10
print(mse_con_nses)

In [None]:
##### customized bin cali plots 

In [None]:
class CustomizedBin:
    def __init__(self, threshold, true_data, pred_prob, model):  
        self.threshold = threshold
        self.true_data = true_data
        self.pred_prob = pred_prob
        self.model = model
        
        pred_0 = self.pred_prob[self.pred_prob < self.threshold]
        self.n_0 = len(pred_0)
        
        pred_1 = self.pred_prob[self.pred_prob >= self.threshold]
        self.n_1 = len(pred_1)
        
        self.prob_pred = [np.mean(pred_0), np.mean(pred_1)]
        
        prob_true_0 = sum(self.true_data[self.pred_prob < self.threshold].values == 1) / self.n_0
        prob_true_1= sum(self.true_data[self.pred_prob >= self.threshold].values == 1) / self.n_1
        self.prob_true = [float(prob_true_0), float(prob_true_1)]
        
        weight = [(self.n_0)/(self.n_0+self.n_1),(self.n_1)/(self.n_0+self.n_1)]
        weighted_mse = sum((np.array(self.prob_true) - np.array(self.prob_pred)) * weight)/2
        
        print('Weighted MSE of model',self.model,'=',weighted_mse)


In [None]:
# whole test set

# threshold: 
# without nses = 0.115
# with binary nses = 0.109
# with nses score = 0.087


# bin the test set on threshold
no_nses = CustomizedBin(threshold = 0.115, true_data = test_df['Y3'], pred_prob = test_df['sf_V3'], model = 'without nSES')
bin_nses = CustomizedBin(threshold = 0.109, true_data = test_df['Y3'], pred_prob = test_df['m2_sf_V3'],model = 'with binary nSES')
con_nses = CustomizedBin(threshold = 0.087, true_data = test_df['Y3'], pred_prob = test_df['m3_sf_V3'], model = 'with continuous nSES score')


# calibration curves
fig, ax = plt.subplots()
plt.plot(no_nses.prob_pred, no_nses.prob_true, marker='o', linewidth=1, label='without nSES')
ax.plot([0.115,0.115],[0.05,0.15], color = '#1f77b4', linestyle='dashed')
plt.plot(bin_nses.prob_pred,bin_nses.prob_true, marker='o', linewidth=1, label='with binary nSES')
ax.plot([0.109,0.109],[0.05,0.15], color = 'orange', linestyle='dashed')
plt.plot(con_nses.prob_pred, con_nses.prob_true, marker='o', linewidth=1, label='with continuous nSES')
ax.plot([0.087,0.087],[0.05,0.15], color = 'green', linestyle='dashed')

# reference line, legends, and axis labels
abline(1, 0)
fig.suptitle('Calibration plot for Cox Model')
ax.set_xlabel('Predicted probability')
ax.set_ylabel('True probability in each bin')
plt.legend()
plt.show() 



In [None]:
# subgroups

low_nses = test_df.loc[test_df['nSES'] == 0,:]
high_nses = test_df.loc[test_df['nSES'] == 1,:]

f_low = test_df.loc[(test_df['nSES'] == 0)&(test_df['gender'] == 0),:]
f_high = test_df.loc[(test_df['nSES'] == 1)&(test_df['gender'] == 0),:]

m_low = test_df.loc[(test_df['nSES'] == 0)&(test_df['gender'] == 1),:]
m_high = test_df.loc[(test_df['nSES'] == 1)&(test_df['gender'] == 1),:]

In [None]:
# high_nses

# bin the test set on threshold
no_nses = CustomizedBin(threshold = 0.115, true_data = high_nses['Y3'], pred_prob = high_nses['sf_V3'], model = 'without nSES')
bin_nses = CustomizedBin(threshold = 0.109, true_data = high_nses['Y3'], pred_prob = high_nses['m2_sf_V3'],model = 'with binary nSES')
con_nses = CustomizedBin(threshold = 0.087, true_data = high_nses['Y3'], pred_prob = high_nses['m3_sf_V3'], model = 'with continuous nSES score')

# calibration curves
fig, ax = plt.subplots()
plt.plot(no_nses.prob_pred, no_nses.prob_true, marker='o', linewidth=1, label='without nSES')
ax.plot([0.115,0.115],[0.05,0.20], color = '#1f77b4', linestyle='dashed')
plt.plot(bin_nses.prob_pred,bin_nses.prob_true, marker='o', linewidth=1, label='with binary nSES')
ax.plot([0.109,0.109],[0.05,0.20], color = 'orange', linestyle='dashed')
plt.plot(con_nses.prob_pred, con_nses.prob_true, marker='o', linewidth=1, label='with continuous nSES')
ax.plot([0.087,0.087],[0.05,0.20], color = 'green', linestyle='dashed')

# reference line, legends, and axis labels
abline(1, 0)
fig.suptitle('Calibration plot for Cox Model high nSES group')
ax.set_xlabel('Predicted probability')
ax.set_ylabel('True probability in each bin')
plt.legend()
plt.show() 


In [None]:
##### cali plot with self-defined bins (3 points)

In [None]:
class CustomizedBin3:
    def __init__(self, threshold, true_data, pred_prob, model):   ## threshold is now a list with two values
        self.threshold = threshold
        self.true_data = true_data
        self.pred_prob = pred_prob
        self.model = model
        
        pred_low = self.pred_prob[self.pred_prob < self.threshold[0]]
        self.n_low = len(pred_low)
        
        med_index = (self.pred_prob >= self.threshold[0]) & (self.pred_prob <= self.threshold[1])
        pred_med = self.pred_prob[med_index]
        self.n_med = len(pred_med)
        
        pred_high = self.pred_prob[self.pred_prob >= self.threshold[1]]
        self.n_high = len(pred_high)
        
        self.prob_pred = [np.mean(pred_low), np.mean(pred_med), np.mean(pred_high)]
        
        prob_true_low = sum(self.true_data[self.pred_prob < self.threshold[0]].values == 1) / self.n_low
        prob_true_med = sum(self.true_data[med_index].values == 1) / self.n_med
        prob_true_high = sum(self.true_data[self.pred_prob >= self.threshold[1]].values == 1) / self.n_high
        self.prob_true = [float(prob_true_low), float(prob_true_med), float(prob_true_high)]
        
        tot_n = self.n_low + self.n_med + self.n_high
        weight = [self.n_low/tot_n , self.n_med/tot_n, self.n_high/tot_n]
        weighted_mse = sum((np.array(self.prob_true) - np.array(self.prob_pred)) * weight)/3
        
        print('Weighted MSE of model',self.model,'=',weighted_mse)



In [None]:
# whole test set

threshold = [0.1, 0.2]

# bin the test set on threshold
no_nses = CustomizedBin3(threshold = threshold, true_data = test_df['Y3'], pred_prob = test_df['sf_V3'], model = 'without nSES')
bin_nses = CustomizedBin3(threshold = threshold, true_data = test_df['Y3'], pred_prob = test_df['m2_sf_V3'],model = 'with binary nSES')
con_nses = CustomizedBin3(threshold = threshold, true_data = test_df['Y3'], pred_prob = test_df['m3_sf_V3'], model = 'with continuous nSES score')


# calibration curves
fig, ax = plt.subplots()
plt.plot(no_nses.prob_pred, no_nses.prob_true, marker='o', linewidth=1, label='without nSES')
plt.plot(bin_nses.prob_pred,bin_nses.prob_true, marker='o', linewidth=1, label='with binary nSES')
plt.plot(con_nses.prob_pred, con_nses.prob_true, marker='o', linewidth=1, label='with continuous nSES')
ax.plot([0.1,0.1],[0.05,0.15], color = 'black', linestyle='dashed')
ax.plot([0.2,0.2],[0.15,0.25], color = 'black', linestyle='dashed')

# reference line, legends, and axis labels
abline(1, 0)
fig.suptitle('Calibration plot for Cox Model')
ax.set_xlabel('Predicted probability')
ax.set_ylabel('True probability in each bin')
plt.legend()
plt.show() 


In [None]:
# subgroups

threshold = [0.1, 0.2]

# bin the test set on threshold
no_nses = CustomizedBin3(threshold = threshold, true_data = high_nses['Y3'], pred_prob = high_nses['sf_V3'], model = 'without nSES')
bin_nses = CustomizedBin3(threshold = threshold, true_data = high_nses['Y3'], pred_prob = high_nses['m2_sf_V3'],model = 'with binary nSES')
con_nses = CustomizedBin3(threshold = threshold, true_data = high_nses['Y3'], pred_prob = high_nses['m3_sf_V3'], model = 'with continuous nSES score')


# calibration curves
fig, ax = plt.subplots()
plt.plot(no_nses.prob_pred, no_nses.prob_true, marker='o', linewidth=1, label='without nSES')
plt.plot(bin_nses.prob_pred,bin_nses.prob_true, marker='o', linewidth=1, label='with binary nSES')
plt.plot(con_nses.prob_pred, con_nses.prob_true, marker='o', linewidth=1, label='with continuous nSES')
ax.plot([0.1,0.1],[0.05,0.15], color = 'black', linestyle='dashed')
ax.plot([0.2,0.2],[0.15,0.25], color = 'black', linestyle='dashed')

# reference line, legends, and axis labels
abline(1, 0)
fig.suptitle('Calibration plot for Cox Model - high_nses')
ax.set_xlabel('Predicted probability')
ax.set_ylabel('True probability in each bin')
plt.legend()
plt.show() 
