# General example for converting UK Biobank data into the research environemnt (RAP) to DELPHI format


Notes:
 - This is setup to run as a notebook in a spark server job
 - it needs access to a dataset record (which you may need to explicitly specify for your project)
 - and also a cohort which is here refered to as "full_cohort" this may differ from your project
 - the token labels to ukb field ids should be modified to suit your needs
 - your should change the token ids to be integer and create a token_id to field_id (or disease icd10 name)
 - this output a file with all individuals included - you will want to split this into "train.bin" and "val.bin" 

In [None]:
import dxdata
import pandas as pd
import numpy as np
from tqdm import tqdm
from pyspark.sql.functions import lit, udf, col
from pyspark.sql.types import DoubleType
from functools import reduce
import os

def get_first_occ_fields(main_entity):
    fo_fields = []
    for field in main_entity.fields:
        parts = field.name.split("_")
        if len(str(parts[0])) > 3:
            field_num = int(parts[0][1:])
            if (field_num >= 130000 and field_num <= 132604):
                if field.title.startswith("Date"):
                    fo_fields.append(field)
    return fo_fields

def compute_age_from_eid_and_event(eid, event_date):
    dob = dob_lookup.get(eid)
    if dob is None or event_date is None:
        return None
    try:
        return (pd.to_datetime(event_date) - dob).days / 365.25
    except Exception:
        return None

# Initialize dxdata engine
engine = dxdata.connect(dialect="hive+pyspark")

project = os.getenv('DX_PROJECT_CONTEXT_ID')
record = os.popen("dx find data --type Dataset --delimiter ',' | awk -F ',' '{print $5}'").read().rstrip()
# find what is presumed to be the relevant dataset record
record = record.split('\n')[0]

DATASET_ID = project + ":" + record
dataset = dxdata.load_dataset(id=DATASET_ID)

# we retrieve the priamry entity from the dataset
main_entity = dataset.primary_entity

# use cohort - change to whichever name:path you have for this object
cohort = dxdata.load_cohort(folder="/", name="full_cohort")
cohort_eids_df = engine.execute(cohort.sql)

In [None]:
# hard coded sex dob and basic demographic data
eid_f = main_entity.find_field(name="eid")
sex_f = main_entity.find_field(title="Sex")
year_f = main_entity.find_field(title="Year of birth")
month_f = main_entity.find_field(title="Month of birth")
death_f = dataset['death'].find_field(title="Date of death")
assessment_f = main_entity["p53_i0"]
bmi_f = main_entity["p21001_i0"]
smoking_f = main_entity["p1239_i0"]
alcohol_f = main_entity["p1558_i0"]


In [5]:
# collect the cancer code enteries
cancer_codes = {}
cancer_codes['type'] = []
cancer_codes['date'] = []
for i in range(22):
    cancer_codes['type'].append(main_entity.find_field(name="p40006_i" + str(i)))
    cancer_codes['date'].append(main_entity.find_field(name="p40005_i" + str(i)))


In [None]:
# Deal with first occrances and demographic data - this takes a little while
fo_fields = get_first_occ_fields(main_entity)
fields_to_get = [eid_f, sex_f, year_f, month_f, assessment_f, bmi_f, smoking_f, alcohol_f] + cancer_codes['type'] + cancer_codes['date'] + fo_fields + [death_f]

df = main_entity.retrieve_fields(fields=fields_to_get, filter_sql=cohort.sql, engine=engine)
df1 = df.select("eid", "p31","p34","p52","p53_i0","p21001_i0","p1239_i0","p1558_i0").toPandas()

dobf1  = df1[['p34', 'p52']]
dobf1.columns = ["YEAR", "MONTH"]
df1['dob'] = pd.to_datetime(dobf1.assign(DAY=1))
df1['bmi_status'] = np.where(df1['p21001_i0']>28,5,np.where(df1['p21001_i0']>22,4,3))
df1['smoking_status'] = np.where(df1['p1239_i0']==1,8,np.where(df1['p1239_i0']==2,7,6))
df1['alcohol_status'] = np.where(df1['p1558_i0']==1,11,np.where(df1['p1558_i0'] < 4,10,9))

In [None]:
# Prepare a pandas dictionary for fast eid to dob lookup
dob_lookup = df1.set_index('eid')['dob'].to_dict()
age_event_udf = udf(compute_age_from_eid_and_event, DoubleType())

In [None]:
# Remove all NULL enteries for each ICD10 code seperately and combine into an overall spark table 

# deal with the tokens and dates 
d_all = df.select("eid", cancer_codes['date'][0].name, cancer_codes['type'][0].name).where(df[cancer_codes['date'][0].name].isNotNull())
d_all = d_all.withColumnRenamed(cancer_codes['date'][i].name, "date")
d_all = d_all.withColumnRenamed(cancer_codes['type'][i].name, "token")
d_all = d_all.withColumn("age", age_event_udf(col("eid"), col("date")))
    
for i in tqdm(1, len(cancer_codes['date'])):
    cf1 = df.select("eid", cancer_codes['date'][i].name, cancer_codes['type'][i].name).where(df[cancer_codes['date'][i].name].isNotNull())
    cf1 = cf1.withColumnRenamed(cancer_codes['date'][i].name, "date")
    cf1 = cf1.withColumnRenamed(cancer_codes['type'][i].name, "token")
    cf1 = cf1.withColumn("age", age_event_udf(col("eid"), col("date")))
    d_all = d_all.union(cf1)


In [None]:
# Deal with all first occurances - this takes a long time
for i in tqdm(range(0,len(fo_fields))):
    f = fo_fields[i]
    d = df.select(['eid', f.name]).where(df[f.name].isNotNull())
    d1 = d.withColumn("token", lit(f.name))
    d1 = d1.withColumnRenamed(f.name, "date")
    d1 = d1.withColumn("age", age_event_udf(col("eid"), col("date")))
    d_all = d_all.union(d1)


In [None]:
# Format, sort and write out to a file
df_all = d_all.select("eid", "age", "token").toPandas()
df_all['age'] = df_all['age'] * 365.25
data = np.array(df_all).squeeze()
data[:,0] = data[:,0].astype(np.uint32)
data[:,1] = data[:,1].astype(np.uint32)
data = data[np.lexsort((data[:,1], data[:,0]))]

data.tofile('all_records.bin')
