In [0]:
%run ./project_config

In [0]:
%run ./parameters

In [0]:
from pyspark.sql import functions as f, DataFrame
from pyspark.sql.window import Window
from functions import load_table, save_table, read_csv_file
from functools import reduce

# 1 Load tables

In [0]:
cohort_prior_mi = load_table('cohort_prior_mi')
hes_apc_diagnosis = load_table('hes_apc_diagnosis')
gdppr = load_table('gdppr', method = 'gdppr')

display(cohort_prior_mi.limit(50))
display(hes_apc_diagnosis.limit(50))
display(gdppr.limit(50))

# 2 Prepare Codelists

Codelists, lists of labels from coding systems like snomed or icd10 used to describe a disease or event, are generally agreed upon outside the SDE. They are then copied into a seperate file (see codelists folder). The below reads in these prepared codelists as they'll be needed for this curation notebook.

In [0]:
dict_codelists_snomed = {
    "index_mi": "./codelists/incident_myocardial_infarction_snomed.csv",
}

list_codelists_snomed = [
    read_csv_file(codelist_path)
    .withColumn('phenotype', f.lit(phenotype))
    for phenotype, codelist_path in dict_codelists_snomed.items()
]

codelist_snomed = reduce(DataFrame.unionByName, list_codelists_snomed)

display(codelist_snomed)


In [0]:
dict_codelists_icd10 = {
    "index_mi": "./codelists/incident_myocardial_infarction_icd10.csv",
}

list_codelists_icd10 = [
    read_csv_file(codelist_path)
    .withColumn('phenotype', f.lit(phenotype))
    for phenotype, codelist_path in dict_codelists_icd10.items()
]

codelist_icd10 = reduce(DataFrame.unionByName, list_codelists_icd10)

display(codelist_icd10)


# 3 Prepare dataset

In [0]:
# GDPPR can be a very long set so limiting to only the cols needed here is sensible. We'll also be sourcing data from other datasets and harmonising, so preparing flags to show where the records have come from helps with this.

gdppr_prepared = (
    gdppr
    .select(
        'person_id', 'date', 'code',
        f.lit('gdppr').alias('data_source'),
        f.lit(1).alias('source_priority')
    )
)

In [0]:
# Similar for hospital data from hes, we need the columns to have the same names before appending with gdppr, and we want to know which source each record has come from so we prepare that col in advance.

hes_apc_prepared = (
    hes_apc_diagnosis
    .filter("diag_position = 1")
    .select(
        'person_id', 'code',
        f.col('epistart').alias('date'),
        f.lit('hes_apc').alias('data_source'),
        f.lit(2).alias('source_priority')
    )
)

# 4 Prepare cohort dates

In [0]:
# The study window will vary between projects and will be started in the parameters of a project ease of use, reference and update. Limiting the data to the eligible window early on in the code helps the code run quicker.

cohort_prepared = (
    cohort_prior_mi
    .select(
        'person_id',
        f.col('cohort_entry_start_date').alias('min_date'),
        f.col('cohort_entry_end_date').alias('max_date')
    )
)

# 5 Perform matching

In [0]:
# Here we take the codelists of codes we are interested in for this study, find only the records in primary care (gdppr) that match, join this to the cohort we've already defined thus far and a limit to only the study window.

gdppr_matched = (
    gdppr_prepared
    .join(
        f.broadcast(codelist_snomed),
        on='code', how='inner'
    )
    .join(
        cohort_prepared,
        on='person_id', how='inner'
    )
    .filter("(date >= min_date) AND (date <= max_date)")
)

In [0]:
# Similar for HES as above but with icd10 codes rather than snomed

hes_apc_matched = (
    hes_apc_prepared
    .join(
        f.broadcast(codelist_icd10),
        on='code', how='inner'
    )
    .join(
        cohort_prepared,
        on='person_id', how='inner'
    )
    .filter("(date >= min_date) AND (date <= max_date)")
)

# 6 Combine

In [0]:
# We can then append the records from both sources, so we can see the chronology of MI events when collated from both primary care and secondary care sources. Note this will now be at event level (1 row per event), rather than patient level, so we may see multiple records per patient.

index_mi_events = (
    gdppr_matched
    .unionByName(hes_apc_matched)
)

save_table(index_mi_events, 'index_mi_events')

# 7 Aggregate

In [0]:
# We're interested in the earliest MI event, so for each patient we need to sort their records choronologically and then take the first record. In normal computing this would be a groupBy, but with distributed computed we nee the 'window.partitionBy' function, thus making sure one patients record is not handled by more than one computer.

index_mi_events = load_table('index_mi_events')

_win = Window.partitionBy('person_id').orderBy('date', 'source_priority')

index_mi_earliest = (
    index_mi_events
    .withColumn('rank', f.row_number().over(_win))
    .filter('rank = 1')
    .withColumn('flag', f.lit(1))
)

index_mi_earliest = (
    index_mi_earliest
    .groupBy('person_id')
    .pivot('phenotype')
    .agg(
        f.first('flag').alias('flag'),
        f.first('date').alias('date'),
        f.first('code').alias('code'),
        f.first('data_source').alias('source')
    )
)

save_table(index_mi_earliest, 'index_mi_earliest')

# 8 Append to cohort

In [0]:
cohort_prior_mi = load_table('cohort_prior_mi')
index_mi_earliest = load_table('index_mi_earliest')

cohort_index_mi = (
    cohort_prior_mi
    .join(
        index_mi_earliest,
        on='person_id', how='left'
    )
)
    
save_table(cohort_index_mi, 'cohort_index_mi')

# 9 Display

In [0]:
cohort_index_mi = load_table('cohort_index_mi')
display(cohort_index_mi.limit(500))