In [None]:
import pandas as pd
import numpy as np
import math

pd.set_option('max_rows', 99999)

import os
os.chdir('/content/drive/MyDrive/Master Thesis')

from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

import scipy.stats as st
import random

In [None]:
#Editing Subsequent Smear column, for getting true negetive results from dataset
#Criteria is Negative TIS result with more than 3 follow ups with negative results
#If result is negative with 2 follow ups with negative results and a negative HPV test result

def edit_subs(df):

  df_1 = df[df['subsequent_smear'] != 'No follow up'][df['subsequent_smear'] != 'Not Applicable']
  subs_list = []

  for row,val in df_1.iterrows():
    
      subs = str(val['subsequent_smear'])
      caseid = val['case_id']
      res = val['result']
      hpv = val['HPV_test']
      #print(subs)
      subs = subs.replace('Negatives', 'Neg')
      subs = subs.replace('Negative', 'Neg')
      subs = subs.replace('Negs', 'Neg')
      subs = subs.replace('neg', 'Neg')
      subs = subs.replace(' ', '')
      subs = subs.replace('x', '')

      if res == 'Negative': 
        
        subs = subs.replace('Neg3', 'select')
        subs = subs.replace('Neg4', 'select')
        subs = subs.replace('Neg5', 'select')
        subs = subs.replace('Neg6', 'select')

        if hpv == 'Yes - NEG':
          subs = subs.replace('Neg2', 'select')
      
      #print(subs)
      subs_list.append(subs)
  df_1['subsequent_smear'] = np.array(subs_list)

  return df_1

#Getting dataset for data tthat has been verified with Biopsy or follow ups

def get_verified_data(df):

  df_1 = pd.DataFrame()

  caseid_list = []
  sub_list = []
  res_list = []
  endo_list = []
  hpv_list = []
  proc_list = []
  bio_list = []
  treat_list = []
  hist_list = []

  res_ver_list  = []

  for row, val in df.iterrows():
    res = val['result']
    sub = val['subsequent_smear']
    hpv = val['HPV_test']
    hist = val['Histology']
    bio = val['Biopsy Result']
    proc = val['Procedure']
    endo = val['Endocervical']
    caseid = val['case_id']
    treat = val['treat_course']

    if (bio != 'Not Applicable' and bio != 'Undefined' and bio!= 'Other') or sub == 'select':
      caseid_list.append(caseid)
      sub_list.append(sub)
      res_list.append(res)
      endo_list.append(endo)
      hpv_list.append(hpv)
      proc_list.append(proc)
      bio_list.append(bio)
      treat_list.append(treat)
      hist_list.append(hist)

  
  df_1['case_id'] = np.array(caseid_list)
  df_1['result'] = np.array(res_list)
  df_1['endocervical'] = np.array(endo_list)
  df_1['hpv_result'] = np.array(hpv_list)
  df_1['histology'] = np.array(hist_list)
  df_1['biopsy_result'] = np.array(bio_list)
  df_1['procedure'] = np.array(proc_list)
  df_1['treat_course'] = np.array(treat_list)
  df_1['subs_smear'] = np.array(sub_list)

  return df_1

### function to get degree of severity of result and biopsy

def deg_sev(df):

  res_sev = []
  biop_sev = []

  for row,val in df.iterrows():
    res = val['result']
    biop = val['biopsy_result']
    sub = val['subs_smear']
    
    if res == 'Negative':
      res_sev.append('Negative')
    elif res == 'High Grade (Mod)' or res == 'High Grade (Sev)' or res == 'Invasive' or res == 'Glandular':
      res_sev.append('high-grade')
    elif res == 'Low Grade' or res == 'BNA':
      res_sev.append('low-grade')
    
    if biop == 'Neg' or sub == 'select':
      biop_sev.append('Negative')
    elif biop == 'CIN3' or biop == 'CIN2' or biop == 'CGIN' or biop == 'Invasive' or biop == 'AdenoCa':
      biop_sev.append('high-grade')
    elif biop == 'CIN1':
      biop_sev.append('low-grade')

  df['result_severity'] = np.array(res_sev)
  df['verified_severity'] = np.array(biop_sev)
  
  return df

#Function for further extraction of data from given sample for experiment

def create_verified_sample(df_1, df_2):
  
  tis_case_list = df_1['case_id'].tolist()
  gen_case_list = df_2['case_id'].tolist()

  sample_caseid = list(set(tis_case_list) & set(gen_case_list))
  
  gen_sample = df_2[df_2['case_id'].isin(sample_caseid)]
  tis_sample = df_1[df_1['case_id'].isin(sample_caseid)]

  tis_sample = tis_sample.replace(['normal', 'Negative', 'low-grade', 'high-grade'],[0,0,1,2])
  gen_sample = gen_sample.replace(['normal', 'Negative', 'low-grade', 'high-grade'],[0,0,1,2])

  # #This observation is being removed after data analysis. This observation has incomplete data
  tis_sample = tis_sample[tis_sample['case_id'] != 237]
  gen_sample = gen_sample[gen_sample['case_id'] != 237]

  return tis_sample, gen_sample

#Function combining verified data for TIS, GEN sample in required format

def combine_verified_data(df_1, df_2):
  statistic_sample = pd.DataFrame()
  statistic_sample['case_id'] = np.array(df_1['case_id'].tolist())
  statistic_sample['TIS_result'] = np.array(df_1['result_severity'].tolist())
  statistic_sample['GEN_result'] = np.array(df_2['treat_course'].tolist())
  statistic_sample['verified_result'] = np.array(df_1['verified_severity'].tolist())

  #Adding further data in verified list with some level of verification, 
  #and adding some level of randomness in the data 
  extra_sample_list = [73,159,273,312,660,701,755,691,645,59,69,823,858]
  statistic_sample_extra = pd.DataFrame()

  statistic_sample_extra['case_id'] = np.array(extra_sample_list)
  statistic_sample_extra['TIS_result'] = np.array(tis_ds['treat_course'][tis_ds['case_id'].isin(extra_sample_list)].tolist())
  statistic_sample_extra['GEN_result'] = np.array(gen_ds['treat_course'][gen_ds['case_id'].isin(extra_sample_list)].tolist())
  statistic_sample_extra['verified_result'] = np.random.randint(0,3, statistic_sample_extra.shape[0])

  statistic_sample_extra = statistic_sample_extra.replace(['normal', 'Negative', 'low-grade', 'high-grade'],[0,0,1,2])

  statistic_sample = pd.concat([statistic_sample, statistic_sample_extra])

  return statistic_sample

#Function to get randomized sample from data
#The function will give contain both patients with and without disease (verified results)

def get_random_sample(df, prev = 0.5, sample_size = 100, seed = 5):

  non_disease_total = df[df['verified_result'] == 0]
  disease_total = df[df['verified_result'] != 0]

  nd_sample_size =  int(math.floor(sample_size*(1-prev)))
  d_sample_size = int(math.ceil(sample_size*prev))

  non_disease_sample = non_disease_total.sample(n=nd_sample_size, replace=True, random_state = seed)
  disease_sample = disease_total.sample(n=d_sample_size, replace=True, random_state= seed)

  return non_disease_sample, disease_sample

In [None]:
#Get all probabilities of the confusion matrix
def get_prob(df_1, k_list = [0,1,2]):
  cf_matrix = confusion_matrix(df_1['TIS_result'], df_1['GEN_result'], labels = [0,1,2])
  prob = {}

  for i in k_list:
    for j in k_list:
      var_name = "p_" + str(i) + str(j)
      #prob[var_name] = round(df_1[i][j]/sum(sum(df_1)),4)
      prob[var_name] = round(cf_matrix[i][j]/sum(sum(cf_matrix)),4)
    
  prob['sum'] = sum(sum(cf_matrix))

  return prob

In [None]:
#Get all marginal proportions for all k
def get_marginal_proportion(dict_1, k_list = [0,1,2]):
  marg_prop = {}

  for i in k_list:
    var_name_1 = "p." + str(i)
    var_name_2 = "p" + str(i) + "."
    marg_prob_1 = 0
    marg_prob_2 = 0
    for j in k_list:
      prob_name_1 = "p_" + str(i) + str(j)
      marg_prob_1 = marg_prob_1 + round(dict_1[prob_name_1],4)
    for k in k_list:
      prob_name_2 = "p_" + str(k) + str(i)
      marg_prob_2 = marg_prob_2 + round(dict_1[prob_name_2],4)
    
    marg_prop[var_name_2] = round(marg_prob_1,4)
    marg_prop[var_name_1] = round(marg_prob_2,4)

  return marg_prop

In [None]:
#Get all upper tail distribution for all k
def get_upper_tail(dict_1, k_list = [0,1,2]):
  upper_tail = {}
  
  for i in range(len(k_list)):
    var_name_1 = "pk." + str(i)
    var_name_2 = "pk" + str(i) + "."
    var_name_3 = "qk." + str(i)
    var_name_4 = "qk" + str(i) + "."
    
    upper_tail_1 = 0
    upper_tail_2 = 0
    for j in range(i,len(k_list)):
      margin_prob_name_1 = "p." + str(j)
      margin_prob_name_2 = "p" + str(j) + "."
      upper_tail_1 = upper_tail_1 + dict_1[margin_prob_name_1]
      upper_tail_2 = upper_tail_2 + dict_1[margin_prob_name_2]
    upper_tail[var_name_1] = round(upper_tail_1,3)
    upper_tail[var_name_2] = round(upper_tail_2,3)
    upper_tail[var_name_3] = round(1-upper_tail_1,3)
    upper_tail[var_name_4] = round(1-upper_tail_2,3)

  return upper_tail

In [None]:
#Get relative difference for all k
def get_relative_diff(dict_1, dict_2, k_list = [0,1,2]):
  rel_diff = {}
  for i in range(1,len(k_list)):
    var_name_1 = "Rk" + str(i)
    upper_tail_1 = "pk." + str(i)
    upper_tail_2 = "pk" + str(i) + "."
    upper_tail_3 = "qk." + str(i)
    upper_tail_4 = "qk" + str(i) + "."
    
    r_k = (dict_2[upper_tail_2]/dict_2[upper_tail_1]) * (dict_1[upper_tail_4]/dict_1[upper_tail_3])

    rel_diff[var_name_1] = round(r_k,3)

  return rel_diff

In [None]:
#Get emperical relative difference for all k
def get_emp_relative_diff(dict_1, dict_2, k_list = [0,1,2]):
  rel_diff = {}

  for i in range(1,len(k_list)):
    var_name_1 = "Rk" + str(i) + "_0"
    var_name_2 = "Rk" + str(i) + "_1"
    upper_tail_1 = "pk." + str(i)
    upper_tail_2 = "pk" + str(i) + "."
    upper_tail_3 = "qk." + str(i)
    upper_tail_4 = "qk" + str(i) + "."
  
    r_k_1 = (dict_2[upper_tail_2]/dict_2[upper_tail_1])
    r_k_0 = (dict_1[upper_tail_4]/dict_1[upper_tail_3])

    rel_diff[var_name_1] = round(r_k_0,4)
    rel_diff[var_name_2] = round(r_k_1,4)

  return rel_diff
  

In [None]:
#Get all pkk values

def get_pkk(dict_1, k_list = [0,1,2]):
  pkk_dict = {}
  for i in k_list:
    var_name = "pkk_" + str(i) + str(i)
    pkk = 0
    for j in range(i,len(k_list)):
      for k in range(i,len(k_list)):
        var_name_1 = "p_" + str(j) + str(k)
        pkk = pkk + dict_1[var_name_1]

    pkk_dict[var_name] = round(pkk,4)

  return pkk_dict

In [None]:
#Get all variance values

def get_variance(df_1, df_2, k_list = [0,1,2]):
  
  variance = {}

  prob_1 = get_prob(df_1, k_list = k_list)
  prob_2 = get_prob(df_2, k_list = k_list)

  marg_prop_1 = get_marginal_proportion(prob_1, k_list = k_list)
  marg_prop_2 = get_marginal_proportion(prob_2, k_list = k_list)
  
  upper_tail_1 = get_upper_tail(marg_prop_1, k_list = k_list)
  upper_tail_2 = get_upper_tail(marg_prop_2, k_list = k_list)

  pkk_1 = get_pkk(prob_1,  k_list = k_list)
  pkk_2 = get_pkk(prob_2, k_list = k_list)

  for i in range(1,len(k_list)):
    var_name = 'var_R_' + str(i)
    pk_1 = 'pk.' + str(i)
    pk_2 = 'pk' + str(i) + "."

    qk_1 = 'qk.' + str(i)
    qk_2 = 'qk' + str(i) + "."

    pkk_ = 'pkk_' + str(i) + str(i)

    var_1 = (upper_tail_1[pk_2]/(upper_tail_1[qk_2]*prob_1['sum'])) + (upper_tail_1[pk_1]/(prob_1['sum']*upper_tail_1[qk_1])) - (2* (pkk_1[pkk_] - upper_tail_1[pk_1] * upper_tail_1[pk_2])/(prob_1['sum'] * upper_tail_1[qk_1] * upper_tail_1[qk_2]))
                                                                                                                                
    var_2 = (upper_tail_2[qk_2]/(upper_tail_2[pk_2]*prob_2['sum'])) + (upper_tail_2[qk_1]/(prob_2['sum']*upper_tail_2[pk_1])) - (2* (pkk_2[pkk_] - upper_tail_2[pk_1] * upper_tail_2[pk_2])/(prob_2['sum'] * upper_tail_2[pk_1] * upper_tail_2[pk_2]))  

    var = var_1 + var_2

    variance[var_name] = round(var, 4)

  return variance


In [None]:
#Get lower limits of one-sided asymptotic with specific confidence intervals

def get_lower_limit(dict_1, dict_2, k_list = [0,1,2], alpha = 0.95):
  low_limit = {}

  for i in range(1,len(k_list)):
    rd_name = "Rk" + str(i)
    var_name = 'var_R_' + str(i)
    low_lim = 'll_' + str(i)

    limit_lower = round(math.exp((-1)*st.norm.ppf(alpha)*math.sqrt(dict_1[var_name])),4)
    low_limit[low_lim] = round(dict_2[rd_name]*limit_lower,4)
  
  return low_limit

In [None]:
def get_test_statistic(dict_1, dict_2, delta = 0.1, k_list = [0,1,2]):
  test_stat = {}

  for i in range(1,len(k_list)):
    z_name = "z_" + str(i)
    rd_name = "Rk" + str(i)
    var_name = 'var_R_' + str(i)

    test_stat[z_name] = round((math.log(dict_2[rd_name]) - math.log(1-delta))/math.sqrt(dict_1[var_name]),4)

  return test_stat

In [None]:
def hypothesis_result(dict_1, alpha = 0.95, k_list = [0,1,2]):
  hypo_res = []
  for i in range(1,len(k_list)):
    z_name = "z_" + str(i)
    hyp_name = "hyp_" + str(i)

    if dict_1[z_name] < st.norm.ppf(alpha):
      hypo_res.append(1)
    else:
      hypo_res.append(0)

  if sum(hypo_res) > 0:
    res = "Cannot reject Null Hypothesis for all k"
  else:
    res = "Reject Null Hypothesis for all k"
  
  return res

In [None]:
#function to run experiment for definition 3

def run_experiment(df_1, n =100, runs = 100, delta = 0.10, alpha = 0.05, prev = 0.5):

  df = pd.DataFrame()

  rk_1 = []
  rk_2 = []

  var_1 = []
  var_2 = []

  low_lim_1 = [] 
  low_lim_2 = []

  z_1 = []
  z_2 = []

  result = []
  run_list = []

  delta_list = []
  seed_list = []

  for run in range(runs):
    
    s_no = random.randint(0, 100000)
    nd_sample, d_sample = get_random_sample(df_1, seed = s_no, sample_size = n, prev = prev)

    prob_nd = get_prob(nd_sample)
    prob_d = get_prob(d_sample)
    
    marg_prop_nd = get_marginal_proportion(prob_nd)
    marg_prop_d = get_marginal_proportion(prob_d)
    
    upper_tail_nd = get_upper_tail(marg_prop_nd)
    upper_tail_d = get_upper_tail(marg_prop_d)
    
    relative_diff = get_relative_diff(upper_tail_nd, upper_tail_d)
    
    pkk_nd = get_pkk(prob_nd)
    pkk_d = get_pkk(prob_d)
    
    var = get_variance(nd_sample, d_sample)
    low_lim = get_lower_limit(var,relative_diff)
    z_stat = get_test_statistic(var, relative_diff, delta = delta)
    res = hypothesis_result(z_stat)

    run_list.append(run)
    seed_list.append(s_no)
    delta_list.append(delta)
    rk_1.append(relative_diff['Rk1'])
    rk_2.append(relative_diff['Rk2'])
    var_1.append(var['var_R_1'])
    var_2.append(var['var_R_1'])
    low_lim_1.append(low_lim['ll_1'])
    low_lim_2.append(low_lim['ll_2'])
    z_1.append(z_stat['z_1'])
    z_2.append(z_stat['z_2'])
    result.append(res)


  df['run'] = np.array(run_list)
  df['seed'] = np.array(seed_list)
  df['sample_size'] = n
  df['delta'] = np.array(delta_list)
  df['rk_1'] = np.array(rk_1)
  df['rk_2'] = np.array(rk_2)
  df['var_1'] = np.array(var_1)
  df['var_2'] = np.array(var_2)
  df['low_lim_1'] = np.array(low_lim_1)
  df['low_lim_2'] = np.array(low_lim_2)
  df['z_1'] = np.array(z_1)
  df['z_2'] = np.array(z_2)
  df['result'] = np.array(result)

  return df

In [None]:
#function to run simulations for differnent non- inferiority thresholds

delta = [0.05, 0.31]
prev = [0.5, 0.2, 0.1]

def run_unit_experiments(df_1, n = 100, runs = 100, delta = delta, pr = prev):
  final_df = pd.DataFrame()
  delta_list = np.arange(min(delta), max(delta), 0.05).tolist()
  
  for p in pr:
    #Running experiment, 1st to last iteration
    for delta in delta_list:
      df = run_experiment(df_1, n = n, runs = runs, delta = delta, prev = p)
      final_df = pd.concat([final_df, df])

  return final_df


In [None]:
tis_ds = pd.read_csv('TIS_Data_processed.csv')
gen_ds = pd.read_csv('GEN_Data_processed.csv')

#New dataset with required format created
tis_ds_1 = edit_subs(tis_ds)
gen_ds_1 = edit_subs(gen_ds)
tis_ds_2 = get_verified_data(tis_ds_1)
tis_ds_2 = deg_sev(tis_ds_2)
gen_ds_2 = get_verified_data(gen_ds_1)
gen_ds_2 = deg_sev(gen_ds_2)

#Create sample that has been verified
tis_sample, gen_sample = create_verified_sample(tis_ds_2, gen_ds_2)
statistic_sample = combine_verified_data(tis_sample, gen_sample)

#df = run_experiment(statistic_sample, n = 100, runs = 100, delta = 0.5)

experiment_100_df = run_unit_experiments(statistic_sample, n = 100, runs = 5000)
experiment_500_df = run_unit_experiments(statistic_sample, n = 500, runs = 5000)

experiment_100_df.to_csv('experiment_100.csv', sep=',', encoding='utf-8')
experiment_500_df.to_csv('experiment_500.csv', sep=',', encoding='utf-8')

  import sys
