In [1]:

import os
import sys
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame, functions as F, types as T
from pyspark.sql.window import Window
import pyspark.pandas as ps

os.environ['YARN_CONF_DIR'] = '/opt/hadoop/etc/hadoop/'
spark = SparkSession.builder.master("yarn").appName("acg_testing").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/22 09:16:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/22 09:16:45 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
# time check
start_time = datetime.now()

cohort_start_date='2021-11-01'
cohort_end_date='2022-10-31'

person_raw = spark.read.parquet('/data/pedsnet_dcc_v52/dcc_pedsnet/person')
person_input_df = person_raw\
    .filter(person_raw.gender_concept_id.isin(8507, 8532))\
    .withColumn('sex',F.when(F.col('gender_concept_id')==8507,'M').otherwise('F'))\
    .select('person_id','sex',F.col('birth_date').alias('date_of_birth'))

visits_df_raw = spark.read.parquet('/data/pedsnet_dcc_v52/dcc_pedsnet/visit_occurrence')
visits_df_intermediate = visits_df_raw\
    .filter(F.col('visit_concept_id').isin([9201,2000000048,2000001532, 9202,2000000469,44814711, 9203, 2000000088]))\
    .filter(F.col('visit_start_date').between(cohort_start_date,cohort_end_date) &
            F.col('visit_end_date').between(cohort_start_date,cohort_end_date))\
    .select('visit_occurrence_id','person_id','site','visit_concept_id','visit_start_date','visit_end_date')\
    .withColumn('op_ed_visit',F.when(F.col('visit_concept_id').isin([9202, 44814711, 9203, 2000000048]),1).otherwise(0))



persons_df = visits_df_intermediate\
    .join(person_input_df,on='person_id',how='inner')\
    .filter(F.col('op_ed_visit')>0)\
    .select('person_id','sex','date_of_birth')\
    .distinct()\
    .withColumn('age',F.floor(F.datediff(F.lit(cohort_end_date),F.col('date_of_birth'))/365.25).cast(T.IntegerType()))\
    .persist()

persons_df\
    .select(F.col('person_id').alias('patient_id'),'sex','date_of_birth','age')\
    .write.csv('/data/pedsnet_dcc_v52/acg_input/patient_services',header=True,mode='overwrite')

visits_df = persons_df\
    .select('person_id')\
    .join(visits_df_intermediate, 'person_id', 'inner')\
    .persist()

#The below code chunk shows how many conditions are problem list conditions v not by site
'''
spark.read.parquet('/data/pedsnet_dcc_v52/dcc_pedsnet/condition_occurrence')\
    .withColumn('problem_list',F.when(F.col('condition_type_concept_id').isin([2000000089,2000000090]),1).otherwise(0))\
    .withColumn('not_pl',F.when(F.col('condition_type_concept_id').isin([32879, 2000000089,2000000090]),0).otherwise(1))\
    .groupBy('site').agg(F.sum('problem_list').alias('pl'),F.sum('not_pl').alias('pl_not')).withColumn('total',F.col('pl')+F.col('pl_not')).show()
'''

                                                                                

"\nspark.read.parquet('/data/pedsnet_dcc_v52/dcc_pedsnet/condition_occurrence')    .withColumn('problem_list',F.when(F.col('condition_type_concept_id').isin([2000000089,2000000090]),1).otherwise(0))    .withColumn('not_pl',F.when(F.col('condition_type_concept_id').isin([32879, 2000000089,2000000090]),0).otherwise(1))    .groupBy('site').agg(F.sum('problem_list').alias('pl'),F.sum('not_pl').alias('pl_not')).withColumn('total',F.col('pl')+F.col('pl_not')).show()\n"

In [3]:


conditions_df_raw = spark.read.parquet('/data/pedsnet_dcc_v52/dcc_pedsnet/condition_occurrence')
concept_df_raw = spark.read.parquet('/data/pedsnet_dcc_v52/vocabulary/concept') 
concept_relationship_df_raw = spark.read.parquet('/data/pedsnet_dcc_v52/vocabulary/concept_relationship') 

#map the vocabulary_ids to the ACG equivalent
#NOTE: ICD9CM may not be appropriate to map to 9, which is meant to be ICD9. check with Chris.
vocab_dict = {'ICD10':'10', 'ICD10CM':'10CM', 'ICD9CM':'9','SNOMED':'S'}
map_col = F.create_map([F.lit(x) for i in vocab_dict.items() for x in i])

concept_df = concept_df_raw\
    .filter(F.col('vocabulary_id').isin(['SNOMED','ICD9CM','ICD10CM','ICD10']))\
    .withColumn('dx_version', map_col[F.col('vocabulary_id')])\
    .select(concept_df_raw.concept_id.alias('condition_source_concept_id'),
            concept_df_raw.concept_code.alias('dx_cd'),
            'dx_version')

window = Window.partitionBy('visit_occurrence_id')

# Get all conditions with the our cohorts visits, remove those that are registry or problem list conditions,
# then limit conditions to those with the highest "certainty" for each visit 
# Most to least certain: Final Diagnosis, Clinical Diagnosis, Admitting Diagnosis, Null/No matching concept
# Avg conditions per visit anywhere from 2.8 (national) to 6.5 (Lurie)
medical_services_df_raw = visits_df.select("visit_occurrence_id")\
    .join(conditions_df_raw,on='visit_occurrence_id',how='inner')\
    .filter(~F.col('condition_type_concept_id').isin([32879, 2000000089,2000000090]))\
    .withColumn('condition_status_order', 
                F.when(F.col('condition_status_concept_id').eqNullSafe(4230359),1)\
                .when(F.col('condition_status_concept_id').eqNullSafe(4309119),2)\
                .when(F.col('condition_status_concept_id').eqNullSafe(4203942),3)\
                .otherwise(4))\
    .withColumn('condition_status_filter',F.rank().over(window.orderBy("condition_status_order")))\
    .filter(F.col('condition_status_filter').eqNullSafe(1))\
    .select('visit_occurrence_id','site','condition_status_concept_id','condition_source_concept_id')\
    .distinct()\
    .withColumn('dx_n',F.row_number().over(window.orderBy('site')))\
    .withColumn('dx_n_ceiling',F.when(~(F.col('dx_n')%10==0),F.col('dx_n')%10).otherwise(10))\
    .withColumn('visit_dummy',F.floor((F.col('dx_n')-1)/10))\
    .repartition('visit_occurrence_id')\
    .join(F.broadcast(concept_df),on='condition_source_concept_id',how='inner')\
    .persist()

In [4]:

pivoted_df= None
dx_n_list = sorted([int(row.dx_n_ceiling) for row in medical_services_df_raw.select('dx_n_ceiling').distinct().collect()])
for i in dx_n_list:
    dx_codes = medical_services_df_raw.filter(F.col('dx_n_ceiling')==i)\
                .select(medical_services_df_raw.visit_occurrence_id,
                        medical_services_df_raw.visit_dummy,
                        medical_services_df_raw.dx_cd.alias('dx_cd_'+str(i)),
                        medical_services_df_raw.dx_version.alias('dx_version_'+str(i)))
    if isinstance(pivoted_df, DataFrame):
        pivoted_df = pivoted_df.join(dx_codes,on=['visit_occurrence_id','visit_dummy'],how='left')
        
    else:
        pivoted_df = dx_codes


join_visits = visits_df\
    .withColumn('service_place',
                F.when(F.col('visit_concept_id').isin([9201,2000000048,2000001532]),'IP')\
                .when(F.col('visit_concept_id').isin([9202,2000000469,44814711]),'OP')\
                .when(F.col('visit_concept_id').eqNullSafe(9203),'ED').otherwise('OBS'))\
    .select(F.col("person_id").alias('patient_id'), "site", "visit_occurrence_id", 
            "service_place", F.col("visit_start_date").alias('service_begin_date'), F.col("visit_end_date").alias('service_end_date'))

medical_services_df = pivoted_df.join(join_visits,on='visit_occurrence_id',how='inner')

medical_services_df.drop('visit_occurrence_id','visit_dummy').write.csv('/data/pedsnet_dcc_v52/acg_input/medical_services',header=True,mode='overwrite')
medical_services_df_raw.unpersist()

                                                                                

DataFrame[condition_source_concept_id: int, visit_occurrence_id: bigint, site: string, condition_status_concept_id: int, dx_n: int, dx_n_ceiling: int, visit_dummy: bigint, dx_cd: string, dx_version: string]

In [5]:

concept_relationship_df_raw = spark.read.parquet('/data/pedsnet_dcc_v52/vocabulary/concept_relationship') 
rx_norm_concepts = concept_df_raw.filter(concept_df_raw.vocabulary_id.isin(['RxNorm','RxNorm Extension'])).select(F.col('concept_id').alias('concept_id_rxnorm'))
ndc_concepts= concept_df_raw.filter(concept_df_raw.vocabulary_id.eqNullSafe('NDC')).select('concept_id','concept_code')

w = Window().partitionBy("concept_id_rxnorm").orderBy("concept_id")
rxnorm_ndc_map = concept_relationship_df_raw\
    .filter(F.col('relationship_id').eqNullSafe('Maps to'))\
    .join(rx_norm_concepts,on=F.col('concept_id_1')==F.col('concept_id_rxnorm'),how='inner')\
    .join(ndc_concepts,
        on=F.col('concept_id_2')==F.col('concept_id'),how='inner')\
    .select(F.col('concept_id_rxnorm'),F.first('concept_id').over(w).alias('concept_id_ndc'),'concept_code')\
    .distinct()

drug_exposure_raw = spark.read.parquet('/data/pedsnet_dcc_v52/dcc_pedsnet/drug_exposure')
drug_exposure_input = visits_df\
    .select("visit_occurrence_id")\
    .join(drug_exposure_raw,on='visit_occurrence_id',how='inner')\
    .select('person_id','site','visit_occurrence_id','drug_exposure_start_date',F.col('drug_concept_id').alias('concept_id'))\
    .persist()

ndc_rx = drug_exposure_input\
    .join(ndc_concepts.select('concept_id','concept_code'),on='concept_id',how='inner')\
    .select('person_id','site','drug_exposure_start_date','concept_id','concept_code')
rxnorm_rx_mapped = drug_exposure_input\
    .join(rxnorm_ndc_map.select('concept_id_rxnorm','concept_id_ndc','concept_code'),
        on=F.col('concept_id')==F.col('concept_id_rxnorm'),
        how='inner').select('person_id','site','drug_exposure_start_date',F.col('concept_id_ndc').alias('concept_id'),'concept_code')

pharmacy = ndc_rx.union(rxnorm_rx_mapped)\
    .select(F.col('person_id').alias('patient_id'),
            F.col('concept_code').alias('rx_cd'),
            F.col('drug_exposure_start_date').alias('rx_fill_date'),
            'site')\
    .withColumn('rx_code_type',F.lit('N'))

pharmacy.write.csv('/data/pedsnet_dcc_v52/acg_input/pharmacy_services',header=True,mode='overwrite')

drug_exposure_input.unpersist()
visits_df.unpersist()
persons_df.unpersist()


                                                                                

DataFrame[person_id: bigint, sex: string, date_of_birth: date, age: int]

In [8]:
#about 27 minutes 12 seconds, minimal resource allocation (~1% of cluster)
print(datetime.now()-start_time)

0:41:30.920329


In [7]:
spark.stop()


cdm_tbl('visit_occurrence') %>% 
    filter(visit_concept_id %in% c(9201,9202,9203,44814711,2000000048,2000001532) & 
             visit_start_date>=as.Date('2022-01-01') &
             visit_start_date<as.Date('2023-01-01')) %>% 
    select(visit_occurrence_id) %>% 
    inner_join(cdm_tbl('condition_occurrence') %>% 
                 select(condition_occurrence_id,condition_concept_id,condition_source_concept_id,visit_occurrence_id,site,
                        condition_status_concept_id,condition_status_concept_name,
                        condition_type_concept_id,condition_status_concept_id,
                        condition_status_concept_name),by='visit_occurrence_id') %>%
    inner_join(vocabulary_tbl('concept') %>%
                 select(concept_id,concept_code,vocabulary_id),
               by=c('condition_source_concept_id'='concept_id')) %>%
    filter(!grepl('9',vocabulary_id,ignore.case = TRUE)) %>%
    mutate(condition_status_order=
             case_when(is.na(condition_status_concept_id) | condition_status_concept_id==0 ~ 4,
                       condition_status_concept_id==4203942 ~ 3,
                       condition_status_concept_id==4309119 ~ 2,
                       condition_status_concept_id==4230359 ~ 1)) %>%
    group_by(visit_occurrence_id) %>% 
    filter(condition_status_order==min(condition_status_order)) %>% ungroup() %>%

