In [1]:
import pandas as pd
import numpy as np


In [2]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

# Read CSV File
df = spark.read.option("header",True).csv("QUALITY_DATA.csv")
df.printSchema()

root
 |-- measurement_year: string (nullable = true)
 |-- measure_name: string (nullable = true)
 |-- measure_desc: string (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- base_event_date: string (nullable = true)
 |-- compliant_cnt: string (nullable = true)
 |-- eligible_cnt: string (nullable = true)
 |-- id: string (nullable = true)



In [3]:
from pyspark.sql.types import IntegerType, StringType, DoubleType, LongType, BooleanType
from pyspark.sql.functions import col
df = df.withColumn('eligible_cnt', col('eligible_cnt').cast(IntegerType()))
df = df.withColumn('compliant_cnt', col('compliant_cnt').cast(IntegerType()))

In [4]:
df.dropDuplicates( subset=['measurement_year','measure_name', 'measure_type','base_event_date','compliant_cnt',	'eligible_cnt',	'id'])

DataFrame[measurement_year: string, measure_name: string, measure_desc: string, measure_type: string, base_event_date: string, compliant_cnt: int, eligible_cnt: int, id: string]

In [4]:
df[(df['id']==40240) & (df['measure_name']=='TEX')].sort_values(by='measurement_year')#['measure_name'].value_counts()


Unnamed: 0,measurement_year,measure_name,measure_desc,measure_type,base_event_date,compliant_cnt,eligible_cnt,id
31216437,2020,TEX,Dr Talks Exercise,Patient Experience,28JAN2020,0.0,1,40240
31216438,2021,TEX,Dr Talks Exercise,Patient Experience,13APR2021,1.0,1,40240
31216439,2021,TEX,Dr Talks Exercise,Patient Experience,16SEP2021,1.0,1,40240
31216440,2022,TEX,Dr Talks Exercise,Patient Experience,17MAR2022,1.0,1,40240


In [16]:
df[df['id']==40240]['measure_name'].value_counts()

TEX           6
ESA           4
SWT           4
EGR           4
RXC           4
ETA           4
MDR           4
ADH (DIAB)    3
ADH (ACE)     3
TFP           2
TBC           2
SBT           2
ASV           1
SFT           1
Name: measure_name, dtype: int64

In [8]:
from pyspark.sql import functions as F

df_agg=df.groupby(['id']).pivot('measure_name').agg({'eligible_cnt':'sum'})
        #agg(F.first('measurement_year').alias('measurement_year'),F.first('eligible_cnt').alias('eligible_cnt')))

In [12]:
dct=dict()
for i in df_agg.columns:
  if i not in ['id','measurement_year''measure_name']:
    dct[i]= i.replace(' ', '_')+'_elig_cnt'
    
dct

{'ABA': 'ABA_elig_cnt',
 'ADH (ACE)': 'ADH_(ACE)_elig_cnt',
 'ADH (DIAB)': 'ADH_(DIAB)_elig_cnt',
 'ADH (STATIN)': 'ADH_(STATIN)_elig_cnt',
 'ART': 'ART_elig_cnt',
 'ASV': 'ASV_elig_cnt',
 'BCS': 'BCS_elig_cnt',
 'CBP': 'CBP_elig_cnt',
 'CDC (EYE)': 'CDC_(EYE)_elig_cnt',
 'CDC (HbA1c)': 'CDC_(HbA1c)_elig_cnt',
 'CDC (NPH)': 'CDC_(NPH)_elig_cnt',
 'COA (FSA)': 'COA_(FSA)_elig_cnt',
 'COA (MDR)': 'COA_(MDR)_elig_cnt',
 'COA (PNS)': 'COA_(PNS)_elig_cnt',
 'COL': 'COL_elig_cnt',
 'COL (45-50)': 'COL_(45-50)_elig_cnt',
 'EED': 'EED_elig_cnt',
 'EGR': 'EGR_elig_cnt',
 'ESA': 'ESA_elig_cnt',
 'ETA': 'ETA_elig_cnt',
 'FMC': 'FMC_elig_cnt',
 'HBD': 'HBD_elig_cnt',
 'KED': 'KED_elig_cnt',
 'MDR': 'MDR_elig_cnt',
 'MRP': 'MRP_elig_cnt',
 'OMW': 'OMW_elig_cnt',
 'PCR': 'PCR_elig_cnt',
 'RXC': 'RXC_elig_cnt',
 'SBT': 'SBT_elig_cnt',
 'SFT': 'SFT_elig_cnt',
 'SPC STATIN': 'SPC_STATIN_elig_cnt',
 'SUPD': 'SUPD_elig_cnt',
 'SWT': 'SWT_elig_cnt',
 'TBC': 'TBC_elig_cnt',
 'TEX': 'TEX_elig_cnt',
 'TFP': 

In [13]:

#df.filter(df.age > 3)
df_agg.toPandas().rename(columns=dct).to_csv('QUALITY_DATA_measure_name_elig_cnt_pyspark.csv')

In [14]:
from pyspark.sql import functions as F

df_agg=df.groupby(['id']).pivot('measure_name').agg({'compliant_cnt':'sum'})

In [17]:
dct_comp=dict()
for i in df_agg.columns:
  if i not in ['id','measurement_year''measure_name']:
    dct_comp[i]= i.replace(' ', '_')+'_comp_cnt'
    
dct_comp

{'ABA': 'ABA_comp_cnt',
 'ADH (ACE)': 'ADH_(ACE)_comp_cnt',
 'ADH (DIAB)': 'ADH_(DIAB)_comp_cnt',
 'ADH (STATIN)': 'ADH_(STATIN)_comp_cnt',
 'ART': 'ART_comp_cnt',
 'ASV': 'ASV_comp_cnt',
 'BCS': 'BCS_comp_cnt',
 'CBP': 'CBP_comp_cnt',
 'CDC (EYE)': 'CDC_(EYE)_comp_cnt',
 'CDC (HbA1c)': 'CDC_(HbA1c)_comp_cnt',
 'CDC (NPH)': 'CDC_(NPH)_comp_cnt',
 'COA (FSA)': 'COA_(FSA)_comp_cnt',
 'COA (MDR)': 'COA_(MDR)_comp_cnt',
 'COA (PNS)': 'COA_(PNS)_comp_cnt',
 'COL': 'COL_comp_cnt',
 'COL (45-50)': 'COL_(45-50)_comp_cnt',
 'EED': 'EED_comp_cnt',
 'EGR': 'EGR_comp_cnt',
 'ESA': 'ESA_comp_cnt',
 'ETA': 'ETA_comp_cnt',
 'FMC': 'FMC_comp_cnt',
 'HBD': 'HBD_comp_cnt',
 'KED': 'KED_comp_cnt',
 'MDR': 'MDR_comp_cnt',
 'MRP': 'MRP_comp_cnt',
 'OMW': 'OMW_comp_cnt',
 'PCR': 'PCR_comp_cnt',
 'RXC': 'RXC_comp_cnt',
 'SBT': 'SBT_comp_cnt',
 'SFT': 'SFT_comp_cnt',
 'SPC STATIN': 'SPC_STATIN_comp_cnt',
 'SUPD': 'SUPD_comp_cnt',
 'SWT': 'SWT_comp_cnt',
 'TBC': 'TBC_comp_cnt',
 'TEX': 'TEX_comp_cnt',
 'TFP': 

In [18]:
df_agg.toPandas().rename(columns=dct_comp).to_csv('QUALITY_DATA_measure_name_comp_cnt_pyspark.csv')

In [19]:
from pyspark.sql import functions as F

df_agg=df.groupby(['id']).pivot('measure_type').agg({'compliant_cnt':'sum'})

In [20]:
dct_comp=dict()
for i in df_agg.columns:
  if i not in ['id','measurement_year''measure_name']:
    dct_comp[i]= i.replace(' ', '_')+'_comp_cnt'
    
dct_comp

{'HEDIS': 'HEDIS_comp_cnt',
 'Patient Experience': 'Patient_Experience_comp_cnt',
 'Patient Safety': 'Patient_Safety_comp_cnt'}

In [21]:
df_agg.toPandas().rename(columns=dct_comp).to_csv('QUALITY_DATA_measure_type_comp_cnt_pyspark.csv')

In [5]:
from pyspark.sql import functions as F

df_agg=df.groupby(['id']).pivot('measure_type').agg({'eligible_cnt':'sum'})

In [6]:
dct=dict()
for i in df_agg.columns:
  if i not in ['id','measurement_year''measure_name']:
    dct[i]= i.replace(' ', '_')+'_elig_cnt'
    
dct

{'HEDIS': 'HEDIS_elig_cnt',
 'Patient Experience': 'Patient_Experience_elig_cnt',
 'Patient Safety': 'Patient_Safety_elig_cnt'}

In [7]:
df_agg.toPandas().rename(columns=dct).to_csv('QUALITY_DATA_measure_type_elig_cnt_pyspark.csv')

In [None]:
from pyspark.sql import functions as F

df_agg2=df.groupby(['id']).pivot('measure_type').agg({'eligible_cnt':'sum','compliant_cnt':'sum'})

In [None]:
dct2=dict()
for i in df_agg2.columns:
  if i not in ['id','measurement_year']:
    dc2t[i]= i.replace(' ', '_')
    
dct2

In [None]:

#df.filter(df.age > 3)
df_agg.toPandas().rename(columns=dct).to_csv('QUALITY_DATA_measure_name_pyspark.csv')

In [31]:

#df.filter(df.age > 3)
df_agg.filter(df_agg['measurement_year']==2021).toPandas().rename(columns=dct_2021).to_csv('QUALITY_DATA__measure_name_eligible_cnt_2021_pyspark.csv')

In [None]:
df_agg.filter(df_agg['measurement_year']==2022).toPandas().rename(columns=dct_2022).to_csv('QUALITY_DATA__measure_name_eligible_cnt_2022_pyspark.csv')

In [None]:
df_agg.filter(df_agg['measurement_year']==2020).toPandas().rename(columns=dct_2022).to_csv('QUALITY_DATA__measure_name_eligible_cnt_2022_pyspark.csv')

In [7]:
df.pivot_table(index = ['id', columns ='measure_name', values = 'base_event_date' , aggfunc='nunique', fill_value = 0)#.to_csv('QUALITY_DATA_measure_name.csv')

TypeError: values should be a numeric type.

In [15]:
df.pivot_table(index = 'id', columns =['measure_name','measurement_year'], values = 'compliant_cnt' , aggfunc='sum', fill_value = 0).to_csv('QUALITY_DATA__measure_nm_comp_cnt.csv')

In [16]:
df.pivot_table(index = 'id', columns =['measure_type','measurement_year'], values = 'compliant_cnt' , aggfunc='sum', fill_value = 0).to_csv('QUALITY_DATA__measure_type_comp_cnt.csv')

In [17]:
df.pivot_table(index = 'id', columns =['measure_type','measurement_year'], values = 'eligible_cnt' , aggfunc='sum', fill_value = 0).to_csv('QUALITY_DATA__measure_type_eligible_cnt.csv')

In [6]:
#df['base_event_dt_lag'] = df.sort_values(by=['id','base_event_date']).groupby('id')['base_event_date'].shift(periods=1)
# Convert the base_event_date to a proper datetime format
df['base_event_date'] = pd.to_datetime(df['base_event_date'], format='%d%b%Y', errors='coerce')

df['base_event_dt_lag'] = df.sort_values(by=['id', 'base_event_date']).groupby('id').apply(lambda x: x['base_event_date'].shift(1))


MemoryError: 

In [31]:
df[(df['id']==40240) & (df['measure_name']=='TEX')].sort_values(by='measurement_year')#['measure_name'].value_counts()

Unnamed: 0,measurement_year,measure_name,measure_desc,measure_type,base_event_date,compliant_cnt,eligible_cnt,id,base_event_dt_lag,lag_diff_days
31216437,2020,TEX,Dr Talks Exercise,Patient Experience,2020-01-28,0.0,1,40240,2020-01-28,0.0
31216438,2021,TEX,Dr Talks Exercise,Patient Experience,2021-04-13,1.0,1,40240,2021-04-13,0.0
31216439,2021,TEX,Dr Talks Exercise,Patient Experience,2021-09-16,1.0,1,40240,2021-09-16,0.0
31216440,2022,TEX,Dr Talks Exercise,Patient Experience,2022-03-17,1.0,1,40240,2022-03-17,0.0


## Member conditions

In [2]:
mc=pd.read_csv('humana_mays_target_member_conditions.csv', low_memory = False)

In [3]:
mc['condition']=mc.apply(lambda x: x['cond_desc'].split(',', 1)[0].replace(' ','_'), axis=1)

In [8]:
mc[mc['cond_key'].between(326,329)][['cond_key','cond_desc']].drop_duplicates()

Unnamed: 0,cond_key,cond_desc
2,329,"Chronic Kidney Disease, Moderate Stage 3, Exce..."
17,328,"Chronic Kidney Disease, Moderate Stage 3B"
21,327,"Chronic Kidney Disease, Severe Stage 4"
114,326,"Chronic Kidney Disease, Stage 5"


In [9]:
mc[mc['cond_key'].between(136,138)][['cond_key','cond_desc']].drop_duplicates()

Unnamed: 0,cond_key,cond_desc
29,136,"Chronic Kidney Disease, Stage 5"
185,137,"Drug Use Disorder, Moderate/Severe, or Drug Us..."
232,137,"Chronic Kidney Disease, Severe Stage 4"
378,138,"Chronic Kidney Disease, Moderate Stage 3"
778,138,"Drug Use Disorder, Mild, Uncomplicated, Except..."


In [4]:
mc[mc['cond_desc'].str.contains('Kidney')]

Unnamed: 0,cond_key,chronicity,cond_desc,hcc_model_type,cms_model_vers_cd,membership_year,id,condition
2,329,Chronic,"Chronic Kidney Disease, Moderate Stage 3, Exce...",MEDICAL,V28,2023,993642,Chronic_Kidney_Disease
15,329,Chronic,"Chronic Kidney Disease, Moderate Stage 3, Exce...",MEDICAL,V28,2023,1898678,Chronic_Kidney_Disease
17,328,Chronic,"Chronic Kidney Disease, Moderate Stage 3B",MEDICAL,V28,2023,1898678,Chronic_Kidney_Disease
21,327,Chronic,"Chronic Kidney Disease, Severe Stage 4",MEDICAL,V28,2023,1898678,Chronic_Kidney_Disease
23,329,Chronic,"Chronic Kidney Disease, Moderate Stage 3, Exce...",MEDICAL,V28,2023,1752583,Chronic_Kidney_Disease
...,...,...,...,...,...,...,...,...
4009276,329,Chronic,"Chronic Kidney Disease, Moderate Stage 3, Exce...",MEDICAL,V28,2023,883811,Chronic_Kidney_Disease
4009287,328,Chronic,"Chronic Kidney Disease, Moderate Stage 3B",MEDICAL,V28,2023,587490,Chronic_Kidney_Disease
4009307,329,Chronic,"Chronic Kidney Disease, Moderate Stage 3, Exce...",MEDICAL,V28,2023,1817131,Chronic_Kidney_Disease
4009329,328,Chronic,"Chronic Kidney Disease, Moderate Stage 3B",MEDICAL,V28,2023,654526,Chronic_Kidney_Disease


In [25]:
aggregated_categories = {
    "Cardiovascular_Diseases": ['Heart_Failure', 'Acute_on_Chronic_Heart_Failure', 'Congestive_Heart_Failure', 
                                'Specified_Heart_Arrhythmias', 'Angina_Pectoris', 'Cardiomyopathy/Myocarditis', 
                                'End-Stage_Heart_Failure'],
    
    "Diabetes_Related": ['Diabetes_with_Chronic_Complications', 'Diabetes_without_Complication', 
                         'Diabetes_with_Glycemic', 'Proliferative_Diabetic_Retinopathy_and_Vitreous_Hemorrhage', 
                         'Diabetes_with_Severe_Acute_Complications', 'Severe_Diabetic_Eye_Disease'],
    
    "Kidney_and_Liver_Diseases": ['Chronic_Kidney_Disease', 'Dialysis_Status', 'Chronic_Liver_Failure/End-Stage_Liver_Disorders', 
                                  'Cirrhosis_of_Liver', 'Chronic_Hepatitis'],
    
    "Respiratory_Diseases": ['Chronic_Obstructive_Pulmonary_Disease', 'Severe_Persistent_Asthma', 
                              'Fibrosis_of_Lung_and_Other_Chronic_Lung_Disorders', 'Idiopathic_Pulmonary_Fibrosis_and_Lung_Involvement_in_Systemic_Sclerosis'],
    
    "Cancers": ['Lung_and_Other_Severe_Cancers', 'Lymphoma_and_Other_Cancers', 'Breast', 'Prostate', 
                'Colorectal', 'Cancer_Metastatic_to_Bone', 'Cancer_Metastatic_to_Lung', 'Metastatic_Cancer_and_Acute_Leukemia'],
    
    "Mental_Health_Disorders": ['Major_Depressive', 'Bipolar_Disorders_without_Psychosis', 'Schizophrenia', 
                                'Reactive_and_Unspecified_Psychosis', 'Personality_Disorders', 
                                'Substance_Use_Disorder', 'Drug_Use_Disorder', 'Alcohol_Use_Disorder'],
    
    "Neurological_Disorders": ['Parkinson_and_Other_Degenerative_Disease_of_Basal_Ganglia', 'Seizure_Disorders_and_Convulsions', 
                               'Multiple_Sclerosis', 'Cerebral_Palsy', 'Quadriplegic_Cerebral_Palsy'],
    
    "Immunological_Disorders": ['Disorders_of_Immunity', 'Systemic_Lupus_Erythematosus_and_Other_Specified_Systemic_Connective_Tissue_Disorders', 
                                'Chronic_Inflammatory_Demyelinating_Polyneuritis_and_Multifocal_Motor_Neuropathy', 
                                'Rheumatoid_Arthritis_and_Other_Specified_Inflammatory_Rheumatic_Disorders', 
                                'Rheumatoid_Arthritis_and_Inflammatory_Connective_Tissue_Disease', 'Specified_Immunodeficiencies_and_White_Blood_Cell_Disorders'],
    
    "Miscellaneous": ['Morbid_Obesity', 'Amputation_Status', 'Chronic_Ulcer_of_Skin', 'Artificial_Openings_for_Feeding_or_Elimination', 
                      'Pressure_Ulcer_of_Skin_with_Partial_Thickness_Skin_Loss']
}


In [61]:
def agg_condition(x,aggregated_categories=aggregated_categories):
    for k , v in aggregated_categories.items():
        if x in v:
            return k
    return 'other'
    

In [67]:
mc['condition_agg']=mc['condition'].apply(agg_condition)

In [79]:
mc.pivot_table(index='id',columns='condition_agg',  values='membership_year',aggfunc='count').fillna(0)

condition_agg,Cancers,Cardiovascular_Diseases,Diabetes_Related,Immunological_Disorders,Kidney_and_Liver_Diseases,Mental_Health_Disorders,Miscellaneous,Neurological_Disorders,Respiratory_Diseases,other
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1,1.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,3.0
2,0.0,1.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0
3,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0
4,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,0.0,0.0,1.0,0.0,4.0,0.0,0.0,0.0,0.0,2.0
...,...,...,...,...,...,...,...,...,...,...
1999992,0.0,0.0,1.0,0.0,0.0,3.0,0.0,0.0,0.0,0.0
1999994,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0
1999995,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0
1999997,1.0,1.0,0.0,2.0,1.0,0.0,0.0,0.0,0.0,0.0


In [80]:
mc.pivot_table(index='id',columns='condition_agg',  values='membership_year',aggfunc='count').fillna(0).to_csv(r'C:\Users\shanm\OneDrive\Desktop\Kavya\Humana_Mays_24\Data\Training_final\Training_final\member_conditions_agg.csv')