# Benchmarks

## Initialize

In [None]:
import os
import math
import pathlib
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.feather as feather
from tqdm.auto import tqdm
from IPython.display import clear_output

import warnings
from lifelines.utils import CensoringType
from lifelines.utils import concordance_index

In [None]:
import ray
ray.shutdown()

In [None]:
node = !hostname
if "sc" in node[0]:
    base_path = "/sc-projects/sc-proj-ukb-cvd"
else: 
    base_path = "/data/analysis/ag-reils/ag-reils-shared/cardioRS"
print(base_path)

project_label = "22_medical_records"
project_path = f"{base_path}/results/projects/{project_label}"
figure_path = f"{project_path}/figures"
output_path = f"{project_path}/data"

pathlib.Path(figure_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(output_path).mkdir(parents=True, exist_ok=True)

experiment = 230425
experiment_path = f"{output_path}/{experiment}"
pathlib.Path(experiment_path).mkdir(parents=True, exist_ok=True)

In [None]:
import ray
# ray start --head --port=6379 --num-cpus 64
#ray.init(num_cpus=24, include_dashboard=False)#, dashboard_port=24762, dashboard_host="0.0.0.0", include_dashboard=True)#, webui_url="0.0.0.0"))
ray.init(address='auto')

In [None]:
endpoints_md = pd.read_csv(f"{experiment_path}/endpoints.csv")
endpoints = sorted(endpoints_md.endpoint.to_list())

In [None]:
data_covariates = pd.read_feather(f"{experiment_path}/data_covariates_full.feather").set_index("eid")

In [None]:
AgeSex = ["age", "sex"]

SCORE2 = [
    "age", 
    "sex",
    "smoking_status", # current smoker
    "systolic_blood_pressure",
    "cholesterol",
    "hdl_cholesterol",

] 

ASCVD = [
    "age", 
    "sex",
    "ethnic_background",
    "smoking_status", # current smoker
    "diabetes", # diabetes
    "antihypertensives", 
    "systolic_blood_pressure",
    "cholesterol",
    "hdl_cholesterol",
] 

QRISK3 = [
    "age", 
    "sex",
    "ethnic_background",
    "smoking_status", # current smoker
    "bmi",
    "diabetes1", # type 1 diabetes
    "diabetes2", # type 1 diabetes
    "fh_heart_disease",
    "renal_failure", 
    "atrial_fibrillation", 
    "migraine",
    "rheumatoid_arthritis", 
    "systemic_lupus_erythematosus", 
    "schizophrenia", 
    "bipolar_disorder", 
    "major_depressive_disorder", 
    "male_erectile_dysfunction", 
    "antihypertensives", 
    "corticosteroids",
    "psycholeptics",
    "systolic_blood_pressure",
    "cholesterol",
    "hdl_cholesterol",

]

Comorbidities = [
    'aids',
    'leukemia',
    'lymphoma',
    'solid_tumor',
    'diabetes',
    'dementia',
    'hemiplegia',
    'myocardial_infarction',
    'heart_failure',
    'stroke_tia',
    'pad',
    'copd',
    'peptic_ulcer',
    'chronic_liver_disease',
    'chronic_kidney_disease', 
    'connective_tissue_disorder'
]

# assert, that all variables are available
covariates_scores = sorted(list(set(AgeSex + SCORE2 + ASCVD + QRISK3 + Comorbidities)))
if not set(covariates_scores).issubset(data_covariates.columns.to_list()):
    print("Not all required covariates are prepared!", list(set(covariates_scores).difference(data_covariates.columns.to_list())))
else:
    print("Success, all required covariates are prepared!")
    data_covariates = data_covariates[covariates_scores]

In [None]:
variables_cont = data_covariates.select_dtypes(include=["int32", "int64", "float32", "float64"]).columns.to_list()#dtypes.to_frame().rename(columns={0:"dtype"}).query("dtype!='bool'")
variables_cat = data_covariates.select_dtypes(include=["category"]).columns.to_list()
variables_bool = data_covariates.select_dtypes(include=["bool"]).columns.to_list()#dtypes.to_frame().rename(columns={0:"dtype"}).query("dtype!='bool'")
print("Cont: ", variables_cont)
print("Cat: ", variables_cat)
print("Bool: ", variables_bool)

variables_to_norm = variables_cont + endpoints
variables_to_norm

In [None]:
in_path = pathlib.Path(f"{experiment_path}/loghs")
in_path.mkdir(parents=True, exist_ok=True)

out_path = f"{experiment_path}/coxph/input"
pathlib.Path(out_path).mkdir(parents=True, exist_ok=True)

In [None]:
models = models = [f.name for f in in_path.iterdir() if f.is_dir() and "ipynb_checkpoints" not in str(f)]
partitions = [i for i in range(22)] #[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]

In [None]:
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import miceforest
import pickle
import zstandard

def read_merge_data(fp_in, split, data_covariates, variables_to_prepare):
    temp = pd.read_feather(f"{fp_in}/{split}.feather").set_index("eid")
    temp["split"] = temp["split"].astype(str)
    temp = temp.merge(data_covariates, left_index=True, right_index=True, how="left")[variables_to_prepare]
    return temp   

def load_pickle(fp):
    with open(fp, "rb") as fh:
        dctx = zstandard.ZstdDecompressor()
        with dctx.stream_reader(fh) as decompressor:
            data = pickle.loads(decompressor.read())
    return data
    
def save_pickle(data, data_path):
    pathlib.Path(data_path).parent.mkdir(parents=True, exist_ok=True)
    with open(data_path, "wb") as fh:
        cctx = zstandard.ZstdCompressor()
        with cctx.stream_writer(fh) as compressor:
            compressor.write(pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL))
            
def get_variable_schema(data):
    
    missing = data.columns[data.isna().any()].to_list()
    
    print(missing)
    
    variable_schema = {}
    for m in missing:
        variable_schema[m] = [x for x in data.columns if x != m and x in missing]
    
    return variable_schema

def tune_imputer(data):
    
    variable_schema = get_variable_schema(data)
        
    kernel = miceforest.ImputationKernel(data, datasets=1, random_state=42)#, train_nonmissing=True)
    
    best_hps, losses = kernel.tune_parameters(dataset=0, n_jobs=96, optimization_steps=5, verbose=True) # add bootstrrapping! 
  
    return best_hps

def get_imputer_hps(data_covariates, variables_to_prepare, model, partition, samples):

    fp_in = f"{in_path}/{model}/{partition}"
    fp_out = f"{out_path}/{model}/"
    pathlib.Path(fp_out).mkdir(parents=True, exist_ok=True)
    
    temp = read_merge_data(fp_in, "train", data_covariates, variables_to_prepare).sample(samples)

    print("tune hps")
    best_hps = tune_imputer(temp)
    save_pickle(best_hps, f"{fp_out}/imputer_best_hps.p")
    
    return best_hps

def fit_imputer(data, best_hps, fp_out):
    
    variable_schema = get_variable_schema(data)
    print(variable_schema)
        
    kernel = miceforest.ImputationKernel(data, datasets=1, random_state=42)#, train_nonmissing=True)

    # Run the MICE algorithm for 3 iterations
    kernel.mice(3, n_jobs=96, variable_parameters=best_hps, verbose=True)
    
    kernel.plot_imputed_distributions(wspace=0.3,hspace=0.3)
    
    plt.savefig(f"{fp_out}/imputer_distributions.pdf", bbox_inches='tight')
    
    return kernel
    
@ray.remote
def scale_encode_save_feather(partition, split, temp_df, scaler, variables_cont, variables_cat, fp_out):
    print(partition, split, f"scale {split}")
    temp_df[variables_cont] = scaler.transform(temp_df[variables_cont].values)

    print(partition, split, f"onehotencode {split}")
    temp_df = pd.get_dummies(temp_df, columns=variables_cat, prefix=variables_cat)

    # save imputed and standardized file
    print(partition, split, f"save {split}")
    temp_df.reset_index(drop=False).to_feather(f"{fp_out}/{split}.feather")
    

def impute_norm_variables(data_covariates, model, partition, variables_to_prepare, variables_cont, variables_cat, samples):

    fp_in = f"{in_path}/{model}/{partition}"
    fp_out = f"{out_path}/{model}/{partition}"
      
    if pathlib.Path(fp_in).is_dir():
        if not pathlib.Path(fp_out).is_dir():
            pathlib.Path(fp_out).mkdir(parents=True, exist_ok=True)
            
    for split in tqdm(["train", "valid", "test"]):
        
        print(partition, split, "read and merge data")
        temp = read_merge_data(fp_in, split, data_covariates, variables_to_prepare)
        
    
        if split=="train": 
            # fit and save imputer
            print(partition, split, "fit imputer")
            print(partition, split, "fit imputer: load hps")
            best_hps = load_pickle(f"{out_path}/{model}/imputer_best_hps.p")
            print(partition, split, "fit imputer: fit imputer")
            imputer = fit_imputer(temp.sample(samples), best_hps, fp_out)
            print(partition, split, "fit imputer: save imputer")
            save_pickle(imputer, f"{fp_out}/imputer.p")
            
            # check imputer and log results
            print(partition, split, "check imputer: plot distributions")
            print(imputer.plot_imputed_distributions(wspace=0.3,hspace=0.3))
            #plt.savefig(f"{fp_out}/imputed_dists.png")
            
        # apply imputer and scaler
        print(partition, split, f"impute {split}")
        try:
            temp = imputer.impute_new_data(new_data=temp, verbose=True).complete_data(0)
        except AssertionError as e:
            print(f"ERROR {partition} {split}")
            print(e)
            temp.reset_index().to_feather(f"error_{partition}_{split}.feather")
        
        if split=="train": 
            
            # fit and save standardscaler
            print(partition, split, "fit scaler")
            scaler = StandardScaler(with_mean=True, with_std=True, copy=True).fit(temp[variables_cont].values)
            save_pickle(scaler, f"{fp_out}/scaler.p")
            
        scale_encode_save_feather.remote(partition, split, temp, scaler, variables_cont, variables_cat, fp_out)
        
    return True

In [None]:
variables_to_prepare = covariates_scores + endpoints
get_imputer_hps(data_covariates, variables_to_prepare, models[0], partitions[0], samples=10000)

In [None]:
variables_to_prepare = covariates_scores + endpoints
get_imputer_hps(data_covariates, variables_to_prepare, models[1], partitions[0], samples=10000)

In [None]:
#impute_norm_variables(data_covariates, models[0], partitions[0], variables_to_norm, 10000)

In [None]:
def norm_logh_and_extra(data_covariates, variables_to_prepare, variables_cont, variables_cat, samples):
    
    print(f"Tune and fit imputation with {samples} samples")

    progress = []
    for model in models:
        hps_path = f"{out_path}/{model}/imputer_best_hps.p"
        if not pathlib.Path(hps_path).is_file():
            print(f"No HPs found, estimating new HPs...")
            get_imputer_hps(data_covariates, model, partitions[0], samples)
        else:
            print(f"Use {hps_path}")
        
        for partition in tqdm(partitions):
            progress.append(impute_norm_variables(data_covariates, model, partition, variables_to_prepare, variables_cont, variables_cat, samples))
    #[ray.get(s) for s in tqdm(progress)]

In [None]:
variables_to_prepare = covariates_scores + endpoints
norm_logh_and_extra(data_covariates, variables_to_prepare, variables_cont, variables_cat, 10000)