In [None]:
import sys
import os
import itertools

local_dir = ''
data_dir = ''
sys.path.insert(0, os.path.join(local_dir, 'CPRD_Cut22'))

from utils.yaml_act import yaml_load
from utils.arg_parse import arg_paser
from CPRD.config.spark import spark_init, read_parquet, read_txt, read_csv
import pyspark.sql.functions as F
from CPRD.functions import tables, merge,risk_prediction, modalities, MedicalDictionary, risk_prediction, predictor_extractor
from CPRD.base.table import Patient,Practice,Clinical, Diagnosis, Therapy, Hes, Consultation, Proc_HES
from CPRD.functions import merge
from utils.utils import save_obj, load_obj
from CPRD.functions.cohort_select import Cohort

from CPRD.config.utils import cvt_str2time
from CPRD.config.utils import check_time
from CPRD.config.utils import RangeExtract

import pandas as pd
from pyspark.sql import Window
import matplotlib.pyplot as plt

class dotdict(dict):
    """dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__


args = dotdict({'params': os.path.join(local_dir, 'CPRD_Cut22/config/config.yaml')})
params = yaml_load(args.params)
spark_params = params['pyspark']
spark = spark_init(spark_params)
file = params['file_path']
data_params = params['params']
pheno_dict = load_obj(file['PhenoMaps'])
save_path = os.path.join(os.path.join(local_dir, 'EHR_AGE/UKB_EHR_TIME'))
patid_path = 'data/ukbiobank_phenoage.csv'
ukb_data_path = ''

# Preprocess diagnosis

In [4]:

patient = spark.sqlContext.read.option("header", "true")\
    .option("maxColumns", 10000000)\
    .csv(patid_path)
main = spark.sqlContext.read.option("header", "true")\
    .option("maxColumns", 10000000)\
    .csv(ukb_data_path)\
    .select(['eid', '`34-0.0`', '`52-0.0`', '`53-0.0`', '`53-1.0`', '`53-2.0`', '`53-3.0`','`40000-0.0`'])\
    .withColumnRenamed('eid', 'patid')

main = main.join(patient, on='patid', how='inner')

month_mapping = {
    'January': '01', 'February': '02', 'March': '03', 'April': '04',
    'May': '05', 'June': '06', 'July': '07', 'August': '08',
    'September': '09', 'October': '10', 'November': '11', 'December': '12'
}
month_map_expr = F.create_map([F.lit(x) for x in sum(month_mapping.items(), ())])

# Concatenate year of birth and month of birth to create date of birth (DOB)
main = main.withColumn('month_numeric', month_map_expr[F.col('`52-0.0`')])
main = main.withColumn("dob", F.to_date(F.concat(F.col('`34-0.0`'), F.lit('-'), F.col('month_numeric'), F.lit('-01')), "yyyy-MM-dd"))
# Rename columns
main = main.withColumnRenamed('53-0.0', 'doa0')\
           .withColumnRenamed('53-1.0', 'doa1')\
           .withColumnRenamed('53-2.0', 'doa2')\
           .withColumnRenamed('53-3.0', 'doa3')\
           .withColumnRenamed('40000-0.0', 'dod')

# Convert 'doa0', 'doa1', and 'dod' to date type
main = main.withColumn("doa0", F.to_date(F.col("doa0"), "yyyy-MM-dd"))\
           .withColumn("doa1", F.to_date(F.col("doa1"), "yyyy-MM-dd"))\
           .withColumn("doa2", F.to_date(F.col("doa2"), "yyyy-MM-dd"))\
           .withColumn("doa3", F.to_date(F.col("doa3"), "yyyy-MM-dd"))\
           .withColumn("dod", F.to_date(F.col("dod"), "yyyy-MM-dd"))

main = main.drop('34-0.0', '52-0.0', '53-0.0', '53-1.0', '53-2.0', '53-3.0', '40000-0.0', 'month_numeric').cache()


use GP+HES+First recorded

In [6]:
diagnoses = read_parquet(spark.sqlContext, ukb_data_path+'/diagnoses.parquet')\
    .select(['patid', 'eventdate', 'ICD'])\
    .withColumnRenamed('ICD', 'code').dropna()
first_record = read_parquet(spark.sqlContext, ukb_data_path+'/first_record.parquet').withColumnRenamed('eid', 'patid')\
    .withColumnRenamed('ICD', 'code')
diagnoses = diagnoses.union(first_record)

medications = read_txt(spark.sc, spark.sqlContext, path=ukb_data_path+'/processed_gp_scripts.txt')\
    .withColumn('eventdate', F.to_date('issue_date' ,"dd/MM/yyyy"))\
    .withColumnRenamed('mapped_bnf_code', 'code')\
    .withColumnRenamed('eid', 'patid')\
    .select(['patid', 'eventdate', 'code']).dropna()

procedure = read_txt(spark.sc, spark.sqlContext, path=ukb_data_path+'/merged_hesin_oper.txt')\
    .withColumn('eventdate', F.to_date('admidate' ,"dd/MM/yyyy"))\
    .withColumnRenamed('oper4', 'code')\
    .withColumnRenamed('eid', 'patid')\
    .select(['patid', 'eventdate', 'code']).dropna()
data = medications.union(procedure).union(diagnoses).dropna()

# Define the enevt date boundaries
start_date = "1950-01-01"
end_date = "2025-12-31"

# ----- Restrict the data DataFrame by eventdate -----
data = data.filter((F.col("eventdate") >= F.lit(start_date)) & (F.col("eventdate") <= F.lit(end_date))).cache()

# end_rows = main.select(
#     "patid",
#     F.to_date(F.col("Date of Attendance"), "yyyy-MM-dd").alias("eventdate"),
#     F.lit('end').alias("code")
# )
# end_rows.show()
# data = data.unionByName(end_rows)
# Create new rows for each patient using the Date of Attendance and baseline_age (converted to string)
# end_rows = main.select(
#     "patid",
#     F.to_date(F.col("Date of Attendance"), "yyyy-MM-dd").alias("eventdate"),
#     F.col("Chronological Age").cast("string").alias("code")
# )
# end_rows.show()
# data = data.unionByName(end_rows)

In [7]:
data_with_attendance = data.join(main.select("patid", "Date of Attendance"), on="patid", how="inner")
# Filter records to include only events that occur before the Date of Attendance
filtered_data = data_with_attendance.filter(F.col("eventdate") < F.col("Date of Attendance"))

# For each patient, choose the latest event date before Date of Attendance
window_spec = Window.partitionBy("patid").orderBy(F.col("eventdate").desc())
last_event = filtered_data.withColumn("rank", F.row_number().over(window_spec))\
                          .filter(F.col("rank") == 1)\
                          .select("patid", F.col("eventdate").alias("last_eventdate"))

# Join with main again to ensure we have Date of Attendance available
event_gap = last_event.join(main.select("patid", "Date of Attendance"), on="patid", how="inner")

# Compute the gap in years between Date of Attendance and last event date, rounded down
event_gap = event_gap.withColumn("gap_years",F.floor(F.months_between("Date of Attendance", "last_eventdate") / 12))

# Create end_rows with gap_years as the code column, renaming Date of Attendance to eventdate
end_rows = event_gap.withColumn("code", F.col("gap_years").cast("string"))\
                    .select("patid", "Date of Attendance", "code")\
                    .withColumnRenamed("Date of Attendance", "eventdate")

# *** FIX: Convert the 'eventdate' column in end_rows to a date type ***
end_rows = end_rows.withColumn("eventdate", F.to_date(F.col("eventdate"), "yyyy-MM-dd"))

# Now perform the union so that both data and end_rows have eventdate as DateType
data = data.unionByName(end_rows)


In [1]:
main = main.withColumnRenamed('PhenoAge', 'label').withColumnRenamed('Chronological Age', 'baseline_age')
cohort = main.select('patid', 'dob', 'Date of Attendance', 'label')
behrt_formater = predictor_extractor.BEHRTextraction()
EHR = behrt_formater.format_behrt(data, cohort, col_entry='Date of Attendance', col_yob='dob', age_col_name='age', col_code='code', unique_in_months=6).dropna().cache()
EHR = EHR.join(main.select('patid', 'Age_range', 'baseline_age'), on='patid', how='inner')
EHR.write.mode("overwrite").parquet(os.path.join(save_path))
EHR.show(), EHR.select('code').show(), EHR.count()