In [1]:
# Import packages needed to access Spark SQL Database in JupyterLab
import pyspark
import dxpy
import dxdata
from pyspark.sql import functions as f
from pyspark.sql import Window
import itertools
import plotly.express as px
import pandas as pd
from pyspark.sql.types import *
pd.DataFrame.iteritems = pd.DataFrame.items

In [2]:
# Spark initialization (Done only once; do not rerun this cell unless you select Kernel -> Restart kernel).
#sc = pyspark.SparkContext()
#spark = pyspark.sql.SparkSession(sc)

config = pyspark.SparkConf().setAll([('spark.kryoserializer.buffer.max', '2000')])
sc = pyspark.SparkContext(conf=config)
                                    
spark = pyspark.sql.SparkSession(sc)

In [3]:
cohort_a = pd.read_csv("/mnt/project/Users/yonghu4/Lpa_EB/cohort/cohort_a.csv", dtype=str, keep_default_na=False)
cohort_b = pd.read_csv("/mnt/project/Users/yonghu4/Lpa_EB/cohort/cohort_b.csv", dtype=str, keep_default_na=False)

In [4]:
cohortori_a = cohort_a
cohortori_b = cohort_b

In [5]:
cohort_a = spark.createDataFrame(cohort_a)
cohort_b = spark.createDataFrame(cohort_b)

In [6]:
col_int_list = ['crit01', 'crit02', 'crit03', 'crit04', 'crit05', 'crit06', 'crit07', 'crit08', 'crit09', 'crit10', 'crit11', 'crit12']

for i in col_int_list:
    cohort_a = cohort_a.withColumn(i, f.when((f.col(i) == ''), f.lit(None)).otherwise(f.col(i)))
    cohort_a = cohort_a.withColumn(i, f.col(i).cast(IntegerType()))
    cohort_b = cohort_b.withColumn(i, f.when((f.col(i) == ''), f.lit(None)).otherwise(f.col(i)))
    cohort_b = cohort_b.withColumn(i, f.col(i).cast(IntegerType()))

In [7]:
cohort_a.show(3)
cohort_a.printSchema()
cohort_b.show(3)
cohort_b.printSchema()

+-------+----+------------+------------+------------+------------+----------+-------------+------+-----+-------------+-------------+-----------+-----------+--------------+--------------+--------+----------------+------------------+-----+------------+---------+------------------+-----------------+-----------------+--------------+----------------+---------------+-----------------+-----------------+-------------------+------------------+------------------+------------+-------------+-------------+-------------+------------------+-------------------+-------------------+-------------------+------------------------+-----------------------+------------------------+-----------------------+------------------------+-----------------------+------------------------+-----------------------+----------+----------+-----------+-----------------+-----------------+------------------------+-----------------------+------------------------+-----------------------+------------------------+--------------------

In [8]:
# crit01: UK Biobank participants
# crit02: First ASCVD diagnosis post UK Biobank enrolment, fulfilling the two criteria:
    ## 1. ascvd01_indexdate > date_of_ac_0 &
    ## 2. ascvd01_indexdate <= 2021-10-31"
# crit03: Valid Lp(a) at enrolment (first visit)
# crit04: Aged >= 40 years old at index ASCVD diagnosis (ascvd01_indexdate)
# crit05: Hospital care data span from index date (ascvd01_indexdate):
    ## last_epistart > ascvd01_indexdate"
# crit06: Patients without Scottish Morbidity Records
# crit07: Patients with inpatient records
# crit08: Patients without primary care data
# crit09: Study cohort or incident ASCVD:
    ## ascvd01_indexdate is not null"
# crit10: Recent ASCVD:
    ## ascvd02_indexdate is not null"
# crit11: Premature ASCVD:
    ## ascvd03_indexdate is not null"
# crit12: Recurrent ASCVD:
    ## ascvd04_indexdate is not null"


In [9]:
crit01 = cohort_a.select('crit01', 'crit02', 'crit03', 'crit04', 'crit05', 'crit06', 'crit07', 'crit08', 'crit09', 'crit10', 'crit11', 'crit12').filter(f.col('crit01') == 1)
#crit01.select(f.count(f.col('crit01'))).show()

In [10]:
crit01 = cohort_a.filter(f.col('crit01') == 1)
crit01count = crit01.count()
crit02 = crit01.filter(f.col('crit02') == 1)
crit02count = crit02.count()
crit03 = crit02.filter(f.col('crit03') == 1)
crit03count = crit03.count()
crit04 = crit03.filter(f.col('crit05') == 1)
crit04count = crit04.count()
crit05 = crit04.filter(f.col('crit06') == 1)
crit05count = crit05.count()

print(f'Total number of UK Biobank participants: {crit01count}')
print(f'Patients with first diagnosis of ASCVD in the identification period (post UK Biobank enrollment)* with non-missing gender: {crit02count}')
print(f'Patients from Step 2 with valid Lp(a) measurement in nmol/L: {crit03count}')
print(f'Patients from Step 3 and hospital care data span from index date: {crit04count}')
print(f'Patients from Step 4 excluding patients with Scottish Morbidity Records (SMR): {crit05count}')

Total number of UK Biobank participants: 502230
Patients with first diagnosis of ASCVD in the identification period (post UK Biobank enrollment)* with non-missing gender: 52082
Patients from Step 2 with valid Lp(a) measurement in nmol/L: 47024
Patients from Step 3 and hospital care data span from index date: 41128
Patients from Step 4 excluding patients with Scottish Morbidity Records (SMR): 38460


In [11]:
crit05.select('ascvd01_group_icd10').show(5)
crit05.select('ascvd01_group_oper4').show(5)

+--------------------+
| ascvd01_group_icd10|
+--------------------+
|     ascvd_tia_codes|
|ascvd_stable_angi...|
|      ascvd_mi_codes|
|ascvd_cad_codes;a...|
|ascvd_mi_codes;as...|
+--------------------+
only showing top 5 rows

+--------------------+
| ascvd01_group_oper4|
+--------------------+
|                    |
|                    |
|                    |
|                    |
|ascvd_revasculari...|
+--------------------+
only showing top 5 rows



In [12]:
crit05 = crit05.withColumn('ascvd01_group_icd10', f.when((f.col('ascvd01_group_icd10') == ''), f.lit(None)).otherwise(f.col('ascvd01_group_icd10')))
crit05 = crit05.withColumn('ascvd01_group_oper4', f.when((f.col('ascvd01_group_oper4') == ''), f.lit(None)).otherwise(f.col('ascvd01_group_oper4')))

crit05.select('ascvd01_group_icd10').show(5)
crit05.select('ascvd01_group_oper4').show(5)

+--------------------+
| ascvd01_group_icd10|
+--------------------+
|     ascvd_tia_codes|
|ascvd_stable_angi...|
|      ascvd_mi_codes|
|ascvd_cad_codes;a...|
|ascvd_mi_codes;as...|
+--------------------+
only showing top 5 rows

+--------------------+
| ascvd01_group_oper4|
+--------------------+
|                null|
|                null|
|                null|
|                null|
|ascvd_revasculari...|
+--------------------+
only showing top 5 rows



In [13]:
sp_mi_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_mi')).count()
sp_is_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_is')).count()
sp_tia_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_tia')).count()
sp_pad_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_pad')).count()
sp_unstable_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_unstable')).count()
sp_stable_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_stable')).count()
sp_cad_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_cad')).count()
sp_cere_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_cerebrovascular')).count()
sp_revas_count = crit05.filter(f.col('ascvd01_group_oper4').contains('ascvd_revascularization')).count()
sp_non_count = crit05.filter(f.col('ascvd01_group_icd10').contains('ascvd_nonspecific')).count()


print(f'MI: {sp_mi_count}')
print(f'IS: {sp_is_count}')
print(f'TIA: {sp_tia_count}')
print(f'PAD: {sp_pad_count}')
print(f'Unstable: {sp_unstable_count}')
print(f'Stable: {sp_stable_count}')
print(f'CAD: {sp_cad_count}')
print(f'Revascularization: {sp_revas_count}')
print(f'Cerebrovascular: {sp_cere_count}')
print(f'Nonspecific: {sp_non_count}')

MI: 7754
IS: 4637
TIA: 1665
PAD: 2542
Unstable: 1490
Stable: 7443
CAD: 17042
Revascularization: 4693
Cerebrovascular: 4861
Nonspecific: 0


In [14]:
cohort_a.filter((f.col('crit01') == 1) & (f.col('crit02') == 1) & (f.col('crit03') == 1) & (f.col('crit04') == 1) & (f.col('crit05') == 1) & (f.col('crit06') == 1)).count()

38460

In [15]:
#Cohort A
crita01 = cohort_a.filter((f.col('crit07') == 1) & (f.col('crit08') == 1))
crita01count = crita01.count()
crita02 = crita01.filter(f.col('crit02') == 1)
crita02count = crita02.count()
crita03 = crita02.filter(f.col('crit03') == 1)
crita03count = crita03.count()
crita04 = crita03.filter(f.col('crit05') == 1)
crita04count = crita04.count()
crita05 = crita04.filter(f.col('crit06') == 1)
crita05count = crita05.count()

print(f'Total number of UK Biobank participants (cohort A): {crita01count}')
print(f'Patients with first diagnosis of ASCVD in the identification period (post UK Biobank enrollment)* with non-missing gender: {crita02count}')
print(f'Patients from Step 2 with valid Lp(a) measurement in nmol/L: {crita03count}')
print(f'Patients from Step 3 and hospital care data span from index date: {crita04count}')
print(f'Patients from Step 4 excluding patients with Scottish Morbidity Records (SMR): {crita05count}')

Total number of UK Biobank participants (cohort A): 242221
Patients with first diagnosis of ASCVD in the identification period (post UK Biobank enrollment)* with non-missing gender: 28543
Patients from Step 2 with valid Lp(a) measurement in nmol/L: 25591
Patients from Step 3 and hospital care data span from index date: 22364
Patients from Step 4 excluding patients with Scottish Morbidity Records (SMR): 21662


In [16]:
cohort_a_mi_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_mi')).count()
cohort_a_is_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_is')).count()
cohort_a_tia_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_tia')).count()
cohort_a_pad_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_pad')).count()
cohort_a_unstable_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_unstable')).count()
cohort_a_stable_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_stable')).count()
cohort_a_cad_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_cad')).count()
cohort_a_cere_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_cerebrovascular')).count()
cohort_a_revas_count = crita05.filter(f.col('ascvd01_group_oper4').contains('ascvd_revascularization')).count()
cohort_a_non_count = crita05.filter(f.col('ascvd01_group_icd10').contains('ascvd_nonspecific')).count()


print(f'MI: {cohort_a_mi_count}')
print(f'IS: {cohort_a_is_count}')
print(f'TIA: {cohort_a_tia_count}')
print(f'PAD: {cohort_a_pad_count}')
print(f'Unstable: {cohort_a_unstable_count}')
print(f'Stable: {cohort_a_stable_count}')
print(f'CAD: {cohort_a_cad_count}')
print(f'Revascularization: {cohort_a_revas_count}')
print(f'Cerebrovascular: {cohort_a_cere_count}')
print(f'Nonspecific: {cohort_a_non_count}')

MI: 4238
IS: 2616
TIA: 951
PAD: 1419
Unstable: 846
Stable: 4114
CAD: 9679
Revascularization: 2692
Cerebrovascular: 2846
Nonspecific: 0


In [17]:
#Cohort B
critb01 = cohort_b.filter((f.col('crit07') == 1) & (f.col('crit08') == 1))
critb01count = critb01.count()
critb02 = critb01.filter(f.col('crit02') == 1)
critb02count = critb02.count()
critb03 = critb02.filter(f.col('crit03') == 1)
critb03count = critb03.count()
critb04 = critb03.filter(f.col('crit05') == 1)
critb04count = critb04.count()
critb05 = critb04.filter(f.col('crit06') == 1)
critb05count = critb05.count()

print(f'Total number of UK Biobank participants (cohort B): {critb01count}')
print(f'Patients with first diagnosis of ASCVD in the identification period (post UK Biobank enrollment)* with non-missing gender: {critb02count}')
print(f'Patients from Step 2 with valid Lp(a) measurement in nmol/L: {critb03count}')
print(f'Patients from Step 3 and hospital care data span from index date: {critb04count}')
print(f'Patients from Step 4 excluding patients with Scottish Morbidity Records (SMR): {critb05count}')

Total number of UK Biobank participants (cohort B): 206857
Patients with first diagnosis of ASCVD in the identification period (post UK Biobank enrollment)* with non-missing gender: 26206
Patients from Step 2 with valid Lp(a) measurement in nmol/L: 23835
Patients from Step 3 and hospital care data span from index date: 20946
Patients from Step 4 excluding patients with Scottish Morbidity Records (SMR): 18766


In [18]:
cohort_b_mi_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_mi')) | (f.col('ascvd01_group_readv2').contains('ascvd_mi')) | (f.col('ascvd01_group_ctv3').contains('ascvd_mi'))).count()
cohort_b_is_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_is')) | (f.col('ascvd01_group_readv2').contains('ascvd_is')) | (f.col('ascvd01_group_ctv3').contains('ascvd_is'))).count()
cohort_b_tia_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_tia')) | (f.col('ascvd01_group_readv2').contains('ascvd_tia')) | (f.col('ascvd01_group_ctv3').contains('ascvd_tia'))).count()
cohort_b_pad_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_pad')) | (f.col('ascvd01_group_readv2').contains('ascvd_pad')) | (f.col('ascvd01_group_ctv3').contains('ascvd_pad'))).count()
cohort_b_unstable_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_unstable')) | (f.col('ascvd01_group_readv2').contains('ascvd_unstable')) | (f.col('ascvd01_group_ctv3').contains('ascvd_unstable'))).count()
cohort_b_stable_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_stable')) | (f.col('ascvd01_group_readv2').contains('ascvd_stable')) | (f.col('ascvd01_group_ctv3').contains('ascvd_stable'))).count()
cohort_b_cad_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_cad')) | (f.col('ascvd01_group_readv2').contains('ascvd_cad')) | (f.col('ascvd01_group_ctv3').contains('ascvd_cad'))).count()
cohort_b_cere_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_cerebrovascular')) | (f.col('ascvd01_group_readv2').contains('ascvd_cerebrovascular')) | (f.col('ascvd01_group_ctv3').contains('ascvd_cerebrovascular'))).count()
cohort_b_revas_count = critb05.filter((f.col('ascvd01_group_oper4').contains('ascvd_revascularization')) | (f.col('ascvd01_group_readv2').contains('ascvd_revascularization')) | (f.col('ascvd01_group_ctv3').contains('ascvd_revascularization'))).count()
cohort_b_non_count = critb05.filter((f.col('ascvd01_group_icd10').contains('ascvd_nonspecific')) | (f.col('ascvd01_group_readv2').contains('ascvd_nonspecific')) | (f.col('ascvd01_group_ctv3').contains('ascvd_nonspecific'))).count()


print(f'MI: {cohort_b_mi_count}')
print(f'IS: {cohort_b_is_count}')
print(f'TIA: {cohort_b_tia_count}')
print(f'PAD: {cohort_b_pad_count}')
print(f'Unstable: {cohort_b_unstable_count}')
print(f'Stable: {cohort_b_stable_count}')
print(f'CAD: {cohort_b_cad_count}')
print(f'Revascularization: {cohort_b_revas_count}')
print(f'Cerebrovascular: {cohort_b_cere_count}')
print(f'Nonspecific: {cohort_b_non_count}')

MI: 2739
IS: 1817
TIA: 1339
PAD: 1188
Unstable: 804
Stable: 5883
CAD: 5173
Revascularization: 1900
Cerebrovascular: 2104
Nonspecific: 248


In [19]:
#Cohort C
critc01 = cohort_b.filter((f.col('crit07') == 0))
critc01count = critc01.count()
critc02 = critc01.filter((f.col('crit08') == 1) & (f.col('crit02') == 1))
critc02count = critc02.count()
critc03 = critc02.filter(f.col('crit03') == 1)
critc03count = critc03.count()
critc04 = critc03.filter(f.col('crit05') == 1)
critc04count = critc04.count()
critc05 = critc04.filter(f.col('crit06') == 1)
critc05count = critc05.count()

print(f'Total number of UK Biobank participants (cohort C): {critc01count}')
print(f'Patients with first diagnosis of ASCVD in the identification period (post UK Biobank enrollment)* with non-missing gender: {critc02count}')
print(f'Patients from Step 2 with valid Lp(a) measurement in nmol/L: {critc03count}')
print(f'Patients from Step 3 and hospital care data span from index date: {critc04count}')
print(f'Patients from Step 4 excluding patients with Scottish Morbidity Records (SMR): {critc05count}')

Total number of UK Biobank participants (cohort C): 53152
Patients with first diagnosis of ASCVD in the identification period (post UK Biobank enrollment)* with non-missing gender: 438
Patients from Step 2 with valid Lp(a) measurement in nmol/L: 404
Patients from Step 3 and hospital care data span from index date: 0
Patients from Step 4 excluding patients with Scottish Morbidity Records (SMR): 0


In [20]:
cohort_c_mi_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_mi')) | (f.col('ascvd01_group_readv2').contains('ascvd_mi')) | (f.col('ascvd01_group_ctv3').contains('ascvd_mi'))).count()
cohort_c_is_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_is')) | (f.col('ascvd01_group_readv2').contains('ascvd_is')) | (f.col('ascvd01_group_ctv3').contains('ascvd_is'))).count()
cohort_c_tia_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_tia')) | (f.col('ascvd01_group_readv2').contains('ascvd_tia')) | (f.col('ascvd01_group_ctv3').contains('ascvd_tia'))).count()
cohort_c_pad_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_pad')) | (f.col('ascvd01_group_readv2').contains('ascvd_pad')) | (f.col('ascvd01_group_ctv3').contains('ascvd_pad'))).count()
cohort_c_unstable_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_unstable')) | (f.col('ascvd01_group_readv2').contains('ascvd_unstable')) | (f.col('ascvd01_group_ctv3').contains('ascvd_unstable'))).count()
cohort_c_stable_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_stable')) | (f.col('ascvd01_group_readv2').contains('ascvd_stable')) | (f.col('ascvd01_group_ctv3').contains('ascvd_stable'))).count()
cohort_c_cad_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_cad')) | (f.col('ascvd01_group_readv2').contains('ascvd_cad')) | (f.col('ascvd01_group_ctv3').contains('ascvd_cad'))).count()
cohort_c_cere_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_cerebrovascular')) | (f.col('ascvd01_group_readv2').contains('ascvd_cerebrovascular')) | (f.col('ascvd01_group_ctv3').contains('ascvd_cerebrovascular'))).count()
cohort_c_revas_count = critc03.filter((f.col('ascvd01_group_oper4').contains('ascvd_revascularization')) | (f.col('ascvd01_group_readv2').contains('ascvd_revascularization')) | (f.col('ascvd01_group_ctv3').contains('ascvd_revascularization'))).count()
cohort_c_non_count = critc03.filter((f.col('ascvd01_group_icd10').contains('ascvd_nonspecific')) | (f.col('ascvd01_group_readv2').contains('ascvd_nonspecific')) | (f.col('ascvd01_group_ctv3').contains('ascvd_nonspecific'))).count()


print(f'MI: {cohort_c_mi_count}')
print(f'IS: {cohort_c_is_count}')
print(f'TIA: {cohort_c_tia_count}')
print(f'PAD: {cohort_c_pad_count}')
print(f'Unstable: {cohort_c_unstable_count}')
print(f'Stable: {cohort_c_stable_count}')
print(f'CAD: {cohort_c_cad_count}')
print(f'Revascularization: {cohort_c_revas_count}')
print(f'Cerebrovascular: {cohort_c_cere_count}')
print(f'Nonspecific: {cohort_c_non_count}')

MI: 4
IS: 14
TIA: 49
PAD: 27
Unstable: 4
Stable: 246
CAD: 11
Revascularization: 19
Cerebrovascular: 25
Nonspecific: 10


In [None]:
cmdatelist = ['dm_e10_date_first','dm_e11_date_first','dm_e12_date_first','dm_e13_date_first','dm_e14_date_first','dm_e15_date_first',
              'hbp_i10_date_first','hbp_i11_date_first','hbp_i12_date_first','hbp_i13_date_first','hbp_i15_date_first',
              'crf_n18_date_first'
             ]
for i in cmdatelist:
    df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn(i, f.when((f.col(i) == ''), f.lit(None)).otherwise(f.col(i)))
    df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn(i, f.to_date(f.col(i),'yyyy-MM-dd'))

In [None]:
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('hbp_date_first', f.least(f.col('hbp_i10_date_first'), f.col('hbp_i11_date_first'), f.col('hbp_i12_date_first'), f.col('hbp_i13_date_first'), f.col('hbp_i15_date_first')))
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('dm_date_first', f.least(f.col('dm_e10_date_first'), f.col('dm_e11_date_first'), f.col('dm_e12_date_first'), f.col('dm_e13_date_first'), f.col('dm_e14_date_first'), f.col('dm_e15_date_first')))

In [None]:
ethnic_mapping = pd.read_csv("/mnt/project/Users/yonghu4/Lpa_EB/pgm/ethnic_mapping.csv", dtype=str, keep_default_na=False)
ethnic_mapping = spark.createDataFrame(ethnic_mapping)
ethnic_mapping.show(truncate=False)
ethnic_mapping.printSchema()

In [None]:
df_cohort_ascvd_i = df_cohort_ascvd_i.join(ethnic_mapping,'ethnic','left')
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('ethnic_final', f.when((f.col('ethnic_final').isNull()), f.lit('Unknown')).otherwise(f.col('ethnic_final')))

In [None]:
df_cohort_ascvd_i.printSchema()

In [None]:
df_cohort_ascvd_i.filter(f.col('hbp_date_first').isNotNull()).select('eid','hbp_i10_date_first','hbp_i11_date_first','hbp_i12_date_first','hbp_i13_date_first','hbp_i15_date_first','hbp_date_first').show()

In [None]:
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('indexdate', f.when((f.col('indexdate') == ''), f.lit(None)).otherwise(f.col('indexdate')))
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('indexdate', f.to_date(f.col('indexdate'),'yyyy-MM-dd'))

In [None]:
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('lpa_baseline', f.col('lpa_baseline').cast(DoubleType()))
df_cohort_ascvd_i.printSchema()

In [None]:
df_cohort_ascvd_i.select('eid','lpa_baseline').show()

In [None]:
acdatelist = ['date_of_ac_0','date_of_ac_1','date_of_ac_2','date_of_ac_3']

for i in acdatelist:
    df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn(i, f.when((f.col(i) == ''), f.lit(None)).otherwise(f.col(i)))
    df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn(i, f.to_date(f.col(i),'yyyy-MM-dd'))

In [None]:
for x in range(4):
    date_of_ac = 'date_of_ac_num'
    days = 'days_ac2index_num'
    date_of_ac_x = date_of_ac.replace('num', str(x))
    days_x = days.replace('num', str(x))
    df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn(days_x, f.when((f.col(date_of_ac_x) < f.col('indexdate')), f.datediff(f.col('indexdate'),f.col(date_of_ac_x))).otherwise(f.lit(None)))

In [None]:
df_cohort_ascvd_i.select('eid', 'date_of_ac_0', 'days_ac2index_0', 'date_of_ac_1', 'days_ac2index_1').show(10)

In [None]:
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('smoking_0', f.when((f.col('smoking_0') == ''), f.lit(None)).otherwise(f.col('smoking_0')))
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('smoking_1', f.when((f.col('smoking_1') == ''), f.lit(None)).otherwise(f.col('smoking_1')))
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('smoking_2', f.when((f.col('smoking_2') == ''), f.lit(None)).otherwise(f.col('smoking_2')))
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('smoking_3', f.when((f.col('smoking_3') == ''), f.lit(None)).otherwise(f.col('smoking_3')))

In [None]:
#identify the earliest smoking record pre-index
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('days_smoke2index_0', f.when(f.col('smoking_0').isNotNull(), f.col('days_ac2index_0')).otherwise(f.lit(None)))
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('days_smoke2index_1', f.when(f.col('smoking_1').isNotNull(), f.col('days_ac2index_1')).otherwise(f.lit(None)))
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('days_smoke2index_2', f.when(f.col('smoking_2').isNotNull(), f.col('days_ac2index_2')).otherwise(f.lit(None)))
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('days_smoke2index_3', f.when(f.col('smoking_3').isNotNull(), f.col('days_ac2index_3')).otherwise(f.lit(None)))

In [None]:
#Final smoking status day closest to index

smokecols = ['days_smoke2index_0','days_smoke2index_1','days_smoke2index_2','days_smoke2index_3']
smokecol_string = ', '.join("'{0}'".format(c) for c in smokecols)

df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('vals', f.array(smokecols))\
.withColumn('smokecols', f.expr('Array(' + smokecol_string + ')'))\
.withColumn('smokezipped', f.arrays_zip('vals', 'smokecols'))\
.withColumn('smokewithout_nulls', f.expr('filter(smokezipped, x -> not x.vals is null)'))\
.withColumn('smokesorted', f.expr('array_sort(smokewithout_nulls)'))\
.withColumn('smoking_final_col', f.col('smokesorted')[0].smokecols)\
.drop('vals', 'smokecols', 'smokezipped', 'smokewithout_nulls', 'smokesorted')
                        
df_cohort_ascvd_i.select('eid','smoking_final_col','days_smoke2index_0','days_smoke2index_1','days_smoke2index_2','days_smoke2index_3').show(10)

In [None]:
df_cohort_ascvd_i = df_cohort_ascvd_i.withColumn('smoking_final', f.when((f.col('smoking_final_col') == f.lit('days_smoke2index_0')), f.col('smoking_0'))
                                                                   .when((f.col('smoking_final_col') == f.lit('days_smoke2index_1')), f.col('smoking_1'))
                                                                   .when((f.col('smoking_final_col') == f.lit('days_smoke2index_2')), f.col('smoking_2'))
                                                                   .when((f.col('smoking_final_col') == f.lit('days_smoke2index_3')), f.col('smoking_3'))
                                                                   .otherwise(f.lit('missing')))

df_cohort_ascvd_i.filter((f.col('indexdate').isNotNull()) & (f.col('indexdate') > f.col('date_of_ac_0'))).select('eid', 'smoking_final', 'indexdate', 'date_of_ac_0', 'date_of_ac_1', 'date_of_ac_2', 'days_smoke2index_0', 'days_smoke2index_1', 'days_smoke2index_2', 'days_smoke2index_3', 'smoking_0', 'smoking_1', 'smoking_2', 'smoking_3').filter((f.col('smoking_0').isNotNull()) | (f.col('smoking_1').isNotNull()) | (f.col('smoking_2').isNotNull()) | (f.col('smoking_3').isNotNull())).show(20)

In [None]:
df_cohort_ascvd_i_crit05 = df_cohort_ascvd_i.filter(f.col('crit05') == 1)
df_cohort_ascvd_i_crit05.count()

In [None]:
df_cohort_ascvd_i_crit05.select('age_index').summary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max').show()

In [None]:
df_cohort_ascvd_i_crit05.groupBy('ethnic_final').agg(f.count('ethnic_final')).show(truncate=False)

In [None]:
df_cohort_ascvd_i_crit05.groupBy('smoking_final').agg(f.count('smoking_final')).show(truncate=False)

In [None]:
df_cohort_ascvd_i_crit05.filter((f.col('hbp_date_first').isNotNull()) & (f.col('hbp_date_first') < f.col('indexdate'))).count()

In [None]:
df_cohort_ascvd_i_crit05.filter((f.col('dm_date_first').isNotNull()) & (f.col('dm_date_first') < f.col('indexdate'))).count()

In [None]:
df_cohort_ascvd_i_crit05.filter((f.col('crf_n18_date_first').isNotNull()) & (f.col('crf_n18_date_first') < f.col('indexdate'))).count()

In [None]:
df_cohort_ascvd_i_crit05_lpa065 = df_cohort_ascvd_i_crit05.filter(f.col('lpa_baseline') < 65)
df_cohort_ascvd_i_crit05_lpa150 = df_cohort_ascvd_i_crit05.filter(f.col('lpa_baseline') >= 150)

In [None]:
df_cohort_ascvd_i_crit05_lpa065.count()

In [None]:
df_cohort_ascvd_i_crit05_lpa150.count()

In [None]:
df_cohort_ascvd_i_crit05_lpa065.groupBy('sex').agg(f.count('sex')).show()
df_cohort_ascvd_i_crit05_lpa150.groupBy('sex').agg(f.count('sex')).show()

df_cohort_ascvd_i_crit05_lpa065.select('age_index').summary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max').show()
df_cohort_ascvd_i_crit05_lpa150.select('age_index').summary('count', 'mean', 'stddev', 'min', '25%', '50%', '75%', 'max').show()

df_cohort_ascvd_i_crit05_lpa065.groupBy('ethnic_final').agg(f.count('ethnic_final')).show(truncate=False)
df_cohort_ascvd_i_crit05_lpa150.groupBy('ethnic_final').agg(f.count('ethnic_final')).show(truncate=False)

df_cohort_ascvd_i_crit05_lpa065.groupBy('smoking_final').agg(f.count('smoking_final')).show(truncate=False)
df_cohort_ascvd_i_crit05_lpa150.groupBy('smoking_final').agg(f.count('smoking_final')).show(truncate=False)

df_cohort_ascvd_i_crit05_lpa065.filter((f.col('dm_date_first').isNotNull()) & (f.col('dm_date_first') < f.col('indexdate'))).select(f.count(f.col('eid')).alias('dm_Count')).show()
df_cohort_ascvd_i_crit05_lpa150.filter((f.col('dm_date_first').isNotNull()) & (f.col('dm_date_first') < f.col('indexdate'))).select(f.count(f.col('eid')).alias('dm_Count')).show()

df_cohort_ascvd_i_crit05_lpa065.filter((f.col('hbp_date_first').isNotNull()) & (f.col('hbp_date_first') < f.col('indexdate'))).select(f.count(f.col('eid')).alias('hbp_Count')).show()
df_cohort_ascvd_i_crit05_lpa150.filter((f.col('hbp_date_first').isNotNull()) & (f.col('hbp_date_first') < f.col('indexdate'))).select(f.count(f.col('eid')).alias('hbp_Count')).show()

df_cohort_ascvd_i_crit05_lpa065.filter((f.col('crf_n18_date_first').isNotNull()) & (f.col('crf_n18_date_first') < f.col('indexdate'))).select(f.count(f.col('eid')).alias('crf_Count')).show()
df_cohort_ascvd_i_crit05_lpa150.filter((f.col('crf_n18_date_first').isNotNull()) & (f.col('crf_n18_date_first') < f.col('indexdate'))).select(f.count(f.col('eid')).alias('crf_Count')).show()