In [1]:
import os
import sys
import dask
import shutil
import pickle
import pandas as pd
from survivalCoxRun import ExperimentRun
from sim_utils import get_parameters, get_cluster, run_parellel

In [2]:
homedir = "/home/bandheyh/common/survival-LCS-telo"
sys.path.append(homedir)

In [3]:
HPC = True
DEBUG = False

outputdir = homedir + "/pipeline"
model_list = ['me', 'epi', 'het', 'add']
nfeat_list = ['f100', 'f1000', 'f10000']
maf_list = ['maf0.2', 'maf0.4']
censor_list = [0.1, 0.4, 0.8]

time_label = "eventTime"
status_label = "eventStatus"
instance_label="inst"
T = 100
knots = 8

iterations = 50000
random_state = 42

cv_count = 5
pmethod = "random"
isContinuous = True
nu = 1
rulepop = 1000


In [4]:
if DEBUG:
    outputdir = homedir + "/test"
    model_list = ['me']
    censor_list = [0.1, 0.4, 0.8]
    nfeat_list = ['f100']
    maf_list = ['maf0.2']
    iterations = 1000
    cv_count = 3

In [5]:
### Create empty brier score DataFrame
cox_brier_df = pd.DataFrame()

# make_folder_structure(outputdir, model_list)

job_obj_list = list()
brier_df_list = list()

In [6]:
if os.path.exists(homedir + '/dask_logs/'):
    shutil.rmtree(homedir + '/dask_logs/')
if not os.path.exists(homedir + '/dask_logs/'):
    os.mkdir(homedir + '/dask_logs/')

In [7]:
cluster = get_cluster(output_path=homedir)

Running dask-cluster
{'type': 'Scheduler', 'id': 'Scheduler-cd3a4ac4-3c1c-40b1-97c2-c079ba5c9dab', 'address': 'tcp://172.21.0.91:36275', 'services': {'dashboard': 46541}, 'started': 1732012908.8187213, 'workers': {}}


Perhaps you already have a cluster running?
Hosting the HTTP server on port 46541 instead


In [8]:
for i in range(0,len(model_list)):
    for j in range(0,len(nfeat_list)):
        for k in range(0,len(maf_list)):
            g, mtype, d, m, o, e, m0_path, m0_type, m1_path, m1_type = get_parameters(homedir, outputdir, 
                                                                                      model_list, nfeat_list, maf_list, 
                                                                                      i, j, k)
            gametes_data_path = g
            gametes_model_path_0 = m0_path
            gametes_model_path_1 = m1_path
            data_path = d
            model_path = m
            output_path = o
            experiment_name = e
            model0_type = m0_type
            model1_type = m1_type
            model_type = mtype

            for l in range(0, len(censor_list)):
                for m in range(0, cv_count):
                    slcs = ExperimentRun(data_path, model_path, output_path, model_type, m, censor_list[l])
                    if HPC == False:
                        ibs = slcs.run()
                        brier_df_list.append(ibs)
                    else:
                        job_obj_list.append(slcs)

In [9]:
print(job_obj_list[0])
print(len(job_obj_list))

<survivalCoxRun.ExperimentRun object at 0x15551ae1c0d0>
360


In [10]:
if HPC == True:
    delayed_results = []
    for model in job_obj_list:
        brier_df = dask.delayed(run_parellel)(model)
        delayed_results.append(brier_df)
    results = dask.compute(*delayed_results)

In [11]:
with open(outputdir + '/results_coxmodels_parallel.pkl', 'wb') as file:
    pickle.dump(results, file, pickle.HIGHEST_PROTOCOL)