In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window as w
import pandas as pd
pd.set_option('display.max_columns', None)

import warnings
warnings.filterwarnings("ignore")

spark = (
    SparkSession.builder
    .config("spark.driver.memory","100g")
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.2.2")
    .config("fs.s3a.aws.credentials.provider","com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    .config("spark.driver.maxResultSize",0)
    # .config('spark.hadoop.io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
    .getOrCreate()
)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.hadoop.io.compression.codecs", "org.apache.hadoop.io.compress.BZip2Codec")

s3_bucket_spark = "s3a://820323602090-team-dbad373c-7e36-407b-8690-05a44b804f43"
s3_bucket_pandas = "s3a://820323602090-team-dbad373c-7e36-407b-8690-05a44b804f43"

!aws s3 ls $team_bucket/data/data_komodo/



:: loading settings :: url = jar:file:/home/jovyan/.conda/envs/python3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7675368b-427b-4a6b-aba4-ffb92cbb3e6e;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 161ms :: artifacts dl 4ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	--------------------------------

                           PRE prm_diagnosis/
                           PRE prm_diagnosis_mk/
                           PRE prm_enrollment/
                           PRE prm_payers/
                           PRE prm_persons/
                           PRE prm_prescription/
                           PRE prm_prescription_1/
                           PRE prm_procedure/
                           PRE prm_providers/
                           PRE prm_visit/


In [2]:
data_path = s3_bucket_spark + '/data/data_komodo/'

## Diagnosis

### prm_diagnosis

KEYS:
- person_id (??)  
- npi_id <> HCP_1_NPI  
- encounter_key <> ENCOUNTER_KEY  
- payer_id <> PAYER_KH_ID  
- vx_visit_id <> VISIT_ID  
- date_day <> CLAIM_DATE

FEATURES:
- claim_charge_amount (statistics by NPI)
- primary_diagnosis_flag (share by NPI)
- dx_diagnosis_code:
    - Extract E-codes and V-codes categories
    - take codes only after october 2015
    - extract codes before "."
    - get most common ones
- dx_diag_code_type: represents ICD-9 or ICD-10 code system

In [4]:
prm_diagn = spark.read.parquet(data_path+"prm_diagnosis/")

23/04/25 03:53:08 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [10]:
prm_diagn.select(*prm_diagn.columns[9:-2]).limit(5)

claim_charge_amount,primary_diagnosis_flag,dx_diagnosis_code,dx_diag_code_type
,True,M542,10
,False,M5412,10
270.0,True,R3121,10
259.0,True,N200,10
259.0,False,N281,10


In [11]:
prm_diagn.select('dx_diagnosis_code').distinct().count()

                                                                                

61773

In [18]:
# ICD-10 PSO (after 10/2015)
prm_diagn.filter(prm_diagn.dx_diagnosis_code.startswith('L40')).select('dx_diagnosis_code').distinct()

                                                                                

dx_diagnosis_code
L4059
L4050
L400
L403
L4052
L4051
L408
L409
L401
L4053


In [19]:
# ICD-9 PSO (before 10/2015)
prm_diagn.filter(prm_diagn.dx_diagnosis_code.startswith('696')).select('dx_diagnosis_code').distinct()

                                                                                

dx_diagnosis_code
6961
6960
6968
696
6965
6964
6962
6963


In [32]:
prm_diagn.groupBy('dx_diag_code_type').count()

                                                                                

dx_diag_code_type,count
,117337
9.0,138827
10.0,235781034


In [20]:
prm_diagn.filter(prm_diagn.dx_diagnosis_code.startswith('L40')).groupBy('dx_diag_code_type').count()

                                                                                

dx_diag_code_type,count
,1924
10.0,11162632


In [22]:
prm_diagn.filter(prm_diagn.dx_diagnosis_code.startswith('696')).groupBy('dx_diag_code_type').count()

                                                                                

dx_diag_code_type,count
9.0,1498
,50


### prm_diagnosis_mk (same but with procedures)

KEYS:
- person_id (??)  
- primary_hcp_npi <> HCP_1_NPI  
- encounter_key <> ENCOUNTER_KEY  
- payer_id <> PAYER_KH_ID  
- vx_visit_id <> VISIT_ID  
- date_day <> CLAIM_DATE

FEATURES:
- dx_diagnosis_code: much less than in previous table
- primary_hcp_flag: maybe some relation to HCP key how to join it (~80% true)
- claim_charge_amount: almost all have filled values
- primary_procedure_flag: some sort of primary procedure during a visit
- dx_procedure_code: procedure code, hierarchical, check slack for more info
- dx_diag_code_type: used to switch between ICD-9 and ICD-10 systems

In [27]:
prm_diagn_mk = spark.read.parquet(data_path+"prm_diagnosis_mk/")

In [29]:
prm_diagn_mk.select('primary_hcp_flag', *prm_diagn_mk.columns[7:-2]).limit(5)

primary_hcp_flag,claim_charge_amount,primary_diagnosis_flag,dx_diagnosis_code,primary_procedure_flag,dx_procedure_code,dx_diag_code_type
True,396157.26,True,M4316,True,0SG10A0,10
True,396157.26,True,M4316,False,0SG1071,10
True,396157.26,True,M4316,False,0SB20ZZ,10
True,396157.26,True,M4316,False,0SG3071,10
False,396157.26,True,M4316,True,0SG10A0,10


In [30]:
prm_diagn_mk.select('dx_diagnosis_code').distinct().count()

                                                                                

13289

In [31]:
prm_diagn_mk.groupBy('dx_diag_code_type').count()

                                                                                

dx_diag_code_type,count
,8306
9.0,950
10.0,2929678


In [33]:
prm_diagn_mk.groupBy('primary_hcp_flag').count()

                                                                                

primary_hcp_flag,count
True,2311519
False,627415


In [35]:
prm_diagn_mk.select('dx_procedure_code').distinct().count()

                                                                                

10661

In [36]:
prm_diagn_mk.filter(f.col('claim_charge_amount') == 0).count()

                                                                                

677

In [37]:
prm_diagn_mk.filter(prm_diagn_mk.dx_diagnosis_code.startswith('L40')).count()

                                                                                

39736

In [38]:
prm_diagn_mk.filter(prm_diagn_mk.dx_diagnosis_code.startswith('696')).count()

                                                                                

5

In [81]:
prm_diagn_mk.groupBy('dx_procedure_code').count().orderBy('count', ascending=False).limit(10)

                                                                                

dx_procedure_code,count
02HV33Z,110990
30233N1,70240
B2111ZZ,60696
4A023N7,59004
0BH17EZ,44578
5A09357,38941
0DJ08ZZ,34251
B2151ZZ,33346
5A1945Z,30981
5A1D70Z,30364


In [41]:
prm_diagn_mk.filter((prm_diagn_mk.dx_diagnosis_code.startswith('L40')) | 
                    (prm_diagn_mk.dx_diagnosis_code.startswith('696')))\
            .groupBy('dx_procedure_code').count()\
            .orderBy('count', ascending=False)\
            .limit(10)

                                                                                

dx_procedure_code,count
02HV33Z,1424
30233N1,816
4A023N7,815
B2111ZZ,799
0SRD0J9,482
0SRC0J9,472
5A09357,456
B2151ZZ,444
0BH17EZ,425
0DJ08ZZ,398


In [42]:
prm_diagn_mk.groupBy('primary_procedure_flag').count()

                                                                                

primary_procedure_flag,count
True,1066183
False,1872751


## Payers

### prm_enrollment (less important)

KEYS:
- person_id (??)

FEATURES:
- enr_plan_type

In [44]:
prm_enrol = spark.read.parquet(data_path+"prm_enrollment/")

In [45]:
prm_enrol.limit(5)

person_id,enr_eligibility_start_dt,enr_eligibility_end_dt,enr_closed_start_dt,enr_closed_end_dt,enr_plan_type,cohort_id,source
a4a185a8229c07c92...,2018-01-01,2020-12-31,2018-01-01,2020-12-31,C,1043685,komodo
a4a185a8229c07c92...,2020-09-05,2020-12-31,2020-09-05,2020-12-31,C,1043685,komodo
a4a185a8229c07c92...,2015-03-01,2017-03-31,2015-03-01,2017-03-31,C,1043685,komodo
a4a185a8229c07c92...,2020-07-01,2020-09-04,2020-07-01,2020-09-04,C,1043685,komodo
a4a185a8229c07c92...,2020-10-01,2020-12-31,2020-10-01,2020-12-31,C,1043685,komodo


### prm_payers (less important)

KEYS: 
- payer_id <> PAYER_KH_ID

FEATURES:
- payer_name: maybe extract something

In [46]:
prm_payers = spark.read.parquet(data_path+"prm_payers/")

In [47]:
prm_payers.limit(5)

payer_id,payer_name,source
1,Medical Mutual of...,komodo
2,ATRIO Health Plans,komodo
4,Health New Englan...,komodo
6,America's 1st Cho...,komodo
7,"GlobalHealth, Inc.",komodo


## Persons

### prm_persons (already used in features)

KEYS:
- person_id (??)

FEATURES:
- px_birth_dt: calculate age
- px_gender
- px_location_state and px_zip3_cd: most common states

In [49]:
prm_persons = spark.read.parquet(data_path+"prm_persons/")

In [51]:
prm_persons.select(*prm_persons.columns[1:-2]).limit(5)

px_birth_dt,px_gender,px_location_state,px_zip3_cd
1967-12-31,M,WA,984
1952-12-31,F,IL,611
1953-12-31,M,NY,125
1956-12-31,M,MA,14
1965-12-31,M,MI,481


## Medicines

### prm_prescription (less important)

KEYS:
- person_id (??)  
- npi_id <> HCP_1_NPI
- date_day <> CLAIM_DATE

FEATURES:
- rx_refill_num: Maximum number of refills authorized (a lot of null and 0)
- rx_quantity (prescribed or dispensed?): 36% NANs, hardly can be used due to different types of medicine
- rx_drug_days_supply: Estimated number of days the dispensing will last, same number of NANs, can be used for long-lasting period features
- rx_ndc_code: medicine code, most common ones as features, group somehow
- rx_pharmacy_submitted_cost: a lot of NANs and zeros, can make binary feature on this

In [52]:
prm_prescription = spark.read.parquet(data_path+"prm_prescription/")

In [54]:
prm_prescription.select(*prm_prescription.columns[3:-2]).limit(5)

rx_refill_num,rx_quantity,rx_drug_days_supply,rx_ndc_code,rx_pharmacy_submitted_cost
,,,,
,,,,
,,,,
,,,,
,,,,


In [61]:
prm_prescription.groupBy('rx_ndc_code').count().orderBy('count', ascending=False).limit(10)

                                                                                

rx_ndc_code,count
,83464591
74433902.0,1703727
59572063106.0,1125739
59310057922.0,664658
173068220.0,660555
58406044504.0,650129
78063941.0,528112
54327099.0,512466
16729018317.0,488298
378001401.0,472144


In [62]:
prm_prescription.groupBy('rx_pharmacy_submitted_cost').count().orderBy('count', ascending=False).limit(10)

                                                                                

rx_pharmacy_submitted_cost,count
,156716687
0.0,5834975
4.0,827016
10.0,807433
20.0,481300
15.0,424621
12.0,420073
40.0,371570
18.0,368438
30.0,364278


### prm_prescription_1 (same by with diagnoses)

KEYS:
- person_id (??)  
- npi_id <> HCP_1_NPI
- date_day <> CLAIM_DATE

FEATURES:
- rx_refill_num: Maximum number of refills authorized (a lot of null and 0)
- rx_quantity (prescribed or dispensed?): 36% NANs, hardly can be used due to different types of medicine
- rx_drug_days_supply: Estimated number of days the dispensing will last, same number of NANs, can be used for long-lasting period features
- rx_ndc_code: medicine code, most common ones as features, group somehow
- rx_pharmacy_submitted_cost: a lot of NANs and zeros, can make binary feature on this
- diagnosis_code_type: used to switch between ICD-9 and ICD-10 systems
- diagnosis: actual diagnosis (like L40... for PSO)

In [63]:
prm_prescription_1 = spark.read.parquet(data_path+"prm_prescription_1/")

In [71]:
prm_prescription_1.select(*prm_prescription_1.columns[3:-2]).sample(fraction=.1).limit(5)

rx_refill_num,rx_quantity,rx_drug_days_supply,rx_ndc_code,diagnosis_code_type,diagnosis,rx_pharmacy_submitted_cost
,,,,,,
,,,,,,
,,,,,,
,,,,,,
,,,,,,


In [65]:
prm_prescription.count()

                                                                                

243192424

In [66]:
prm_prescription_1.count()

                                                                                

262919306

In [73]:
set(prm_prescription_1.columns) - set(prm_prescription.columns)

{'diagnosis', 'diagnosis_code_type'}

In [75]:
prm_prescription_1.groupBy('diagnosis').count().orderBy('count', ascending=False).limit(10)

                                                                                

diagnosis,count
,189333827
L400,3569573
I10,2255386
L409,1326604
Z23,1214918
E119,1187498
Z0000,1049334
G4733,1041662
M545,950456
L570,620630


## Procedures

### prm_procedure

Differes from one in diagnoses: total procedures, not directly connected with diagnoses.

KEYS:
- person_id (??)  
- npi_id <> HCP_1_NPI  
- encounter_key <> ENCOUNTER_KEY  
- vx_visit_id <> VISIT_ID  
- date_day <> CLAIM_DATE

FEATURES:
- prm_procedure: actual procedure code, including generally a patient visit itself; lots of NANs

In [77]:
prm_procedure = spark.read.parquet(data_path+"prm_procedure/")

In [79]:
prm_procedure.select('pr_procedure_code').limit(5)

pr_procedure_code
32666
""
84132
C9290
J1650


In [80]:
prm_procedure.groupBy('pr_procedure_code').count().orderBy('count', ascending=False).limit(10)

                                                                                

pr_procedure_code,count
99213.0,13096693
,10399906
99214.0,9985317
36415.0,6591494
97110.0,5977433
85025.0,4722394
80053.0,4255693
97140.0,4222912
80061.0,2812859
99212.0,2023420


## Providers

### prm_providers

KEYS:
- npi_id <> HCP_1_NPI  
- provider_hco_id <> HCO_ID_NPI 

FEATURES:
- provider_speciality_2: 78% NANs, can be used as a feature
- provider_state: take most popular states
- provider_type: speciality category (maybe univariant for train and test data)  
_____
- provider_speciality_1: speciality (maybe univariant for train and test data)
- provider_middle_name, provider_first_name, provider_last_name seem useless
- provider_deactivation_dt and provider_reactivation_dt: NANs only, useless

In [82]:
prm_providers = spark.read.parquet(data_path+"prm_providers/")

In [85]:
prm_providers.select(*prm_providers.columns[1:5], *prm_providers.columns[6:-2]).limit(5)

provider_speciality_1,provider_speciality_2,provider_state,provider_type,provider_first_name,provider_middle_name,provider_last_name,provider_deactivation_dt,provider_reactivation_dt
Assistive Therapy,,WI,Assistive Therapy,Melanie,,Maske,,
Nurse Practitioner,,OH,Advanced Practice...,Brenda,,Hamer,,
Internal Medicine,,CA,Physician,Maggie,,Wang,,
"Surgery, General",,PA,Physician,Jasmine,,Kashkoush,,
Family Practice,,NY,Physician,Ranier,,Horton,,


In [88]:
prm_providers.count()

2362586

In [86]:
prm_providers.groupBy('provider_speciality_1').count().orderBy('count', ascending=False).limit(10)

provider_speciality_1,count
Nurse Practitioner,236110
Behavioral Care,190545
Dentist,175407
Physician Assistant,155494
Assistive Therapy,150835
Family Practice,112248
Internal Medicine,103820
Pharmacology,100115
Advanced Practice...,84749
Emergency Medicine,69136


In [87]:
prm_providers.groupBy('provider_speciality_2').count().orderBy('count', ascending=False).limit(10)

provider_speciality_2,count
,1836626
Advanced Practice...,174736
Internal Medicine,114405
Nurse Practitioner,69876
Family Practice,32637
Pediatrics,22373
"Surgery, General",8349
Cardiology,7022
"Psychiatry, Child...",5041
General Practice,4570


In [91]:
prm_providers.groupBy('provider_state').count().orderBy('count', ascending=False).limit(10)

                                                                                

provider_state,count
CA,241697
NY,183479
TX,163005
FL,151583
PA,107923
OH,96632
IL,93464
MI,78860
MA,73032
NC,72037


In [92]:
prm_providers.groupBy('provider_type').count().orderBy('count', ascending=False).limit(10)

provider_type,count
Physician,1036782
Advanced Practice...,385279
Behavioral Care,191015
Dentist,175748
Physician Assistant,155088
Assistive Therapy,150764
Pharmacology,100135
Chiropractor,54580
Optometrist,50113
Podiatrist,19866


In [93]:
prm_providers.select('provider_type').distinct().count()

18

In [98]:
prm_providers.groupBy('provider_reactivation_dt').count().orderBy('count', ascending=False).limit(10)

provider_reactivation_dt,count
,2362586


## Visits

### prm_visit (already used in features)

KEYS:
- person_id (??)  
- vx_provider_id <> HCP_1_NPI
- vx_visit_id <> VISIT_ID

FEATURES:
- vx_visit_type: visit category type (8 + NANs)

In [99]:
prm_visit = spark.read.parquet(data_path+"prm_visit/")

In [101]:
prm_visit.select(*prm_visit.columns[3:-2]).limit(5)

vx_visit_type,vx_visit_start_date,vx_visit_end_date
Inpatient Visit,2015-10-07,2015-10-08
Inpatient Visit,2015-10-07,2015-10-08
Inpatient Visit,2015-10-07,2015-10-08
Inpatient Visit,2015-10-07,2015-10-08
Inpatient Visit,2015-10-07,2015-10-08


In [104]:
prm_visit.groupBy('vx_visit_type').count().orderBy('count', ascending=False)

                                                                                

vx_visit_type,count
Outpatient Visit,99999273
Inpatient Visit,45448617
Laboratory Visit,6199750
Home Visit,4795991
,3481864
Emergency Room Visit,2434933
Non-hospital inst...,1646760
Ambulance Visit,396723
Pharmacy Visit,28518


In [None]:
spark.stop()