In [None]:
#@title PySpark Setup(run me!)
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

import findspark
findspark.init()

findspark.find()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark


In [None]:
#@title Dynamic Input(select your choices and run the cell)
Date_to_calculate_age = '2020-01-01' #@param {type:"date"}
Choose_a_model_version = 'V24' #@param ["V22","V24", "RX", "ESRD-P1","ESRD-P2"]
Choose_a_model_year = '2020' #@param ["2020","2021"]
Sex_Age_edits_required = 'Yes' #@param ["Yes","No"]

In [None]:
#@title Age Calculation
import pandas 
import os
import pyspark.sql.functions as func
from pyspark.sql.functions import datediff, to_date, lit
from pyspark.sql.functions import struct
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf


#Loading Persons file
df1 = pandas.read_csv ("/content/Person-file2.csv")
#df1=df.drop(['Unnamed: 8', 'Unnamed: 9','Unnamed: 10'], axis=1)
df1 = spark.createDataFrame(df1)

df2=df1.withColumn("Age",(datediff(to_date(lit(Date_to_calculate_age)),("DOB")))/366)
df3 = df2.withColumn("Age", func.round(df2["Age"], 2).cast('integer'))

#Condition for AGE and OREC
def func(Age, OREC):
  if Age == 64 and OREC == 0:
    return 65
  elif Age < 0 :
    return 0
  return Age


func_udf = udf(func, IntegerType())
df4 = df3.withColumn('new_column',func_udf(df3['Age'], df3['OREC']))

drop_list = [ 'Age',]
sdf5=df4.select([column for column in df4.columns if column not in drop_list])

df_final = sdf5.withColumnRenamed("new_column", "Age")

#df_final.show()

In [None]:
#@title Demography Variable

import pandas as pd
from pyspark.sql.functions import datediff, to_date, lit
from pyspark.sql.functions import col, expr, when
import pyspark.sql.functions as F
from pyspark.sql.types import *

data = df_final

demo = (spark.read.format("csv").options(header="true").load("/content/Demography Variable Calculations.csv"))

demo = demo.filter((demo['Version'] == Choose_a_model_version) & (demo['Year'] == Choose_a_model_year))
data = data.withColumn("Year", lit(str(Choose_a_model_year)))
data = data.withColumn("Version", lit(str(Choose_a_model_version)))

for i in demo.collect():
  data = data.withColumn(i[2], expr(i[3]))

#data.show()

#data=data.toPandas()

In [None]:
data = data.withColumnRenamed("Version","V")

In [None]:
#@title CC_Mapping

import pandas as pd
import numpy as np
import time
from pyspark.sql.functions import length,col,trim
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col

df_diagnosis = pd.read_csv('/content/Person_DiagonsisCodes2.csv')
df_diagnosis = spark.createDataFrame(df_diagnosis)
df_diagnosis = df_diagnosis.withColumn('DIAGNOSIS CODE',trim(col("DIAGNOSIS CODE")))

df_CC_mapping = pd.read_csv('/content/CC Mapping.csv')
label_schema = StructType([
    StructField("YEAR", StringType()),
    StructField("VERSION", StringType()),
    StructField("DIAGNOSIS CODE", StringType()),
    StructField("CC", IntegerType())
])
df_CC_mapping = spark.createDataFrame(df_CC_mapping,schema= label_schema)


start=time.time()
# for year in model_years:
#   for version in versions:
cond1= col('YEAR')== str(Choose_a_model_year)
cond2= col('VERSION')== str(Choose_a_model_version)
df_temp= df_CC_mapping.where(cond1 & cond2)

# df_temp.show(5)
df_temp= df_temp.toDF('YEAR','VERSION','D_DIAG CODE','CC')

h= df_temp.join(df_diagnosis,(df_diagnosis["DIAGNOSIS CODE"] == df_temp["D_DIAG CODE"]),how='right')  # Mtachin diagnosis codes from diagnosis input file and CC_mapping file

unique_HCC= sorted([i.CC for i in df_temp.select('CC').distinct().collect()]) #Finding all unique values of HCC and storing it in a list to generate HCC columns later

CC_list = h.select("CC").rdd.flatMap(lambda x: x).collect()

p_id = h.select("PERSON ID").rdd.flatMap(lambda x: x).collect()

diag_code= h.select("DIAGNOSIS CODE").rdd.flatMap(lambda x: x).collect()


df_main= pd.DataFrame()
df_main['PERSON ID']= p_id
df_main['DIAGNOSIS CODE']= diag_code 

df_main['HCC']= CC_list  # CC values obtained after mapping from Part 2.
df_main['HCC'] = list(df_main['HCC'].astype(np.float).astype("Int32"))

df_main['MODEL YEAR']= str(Choose_a_model_year)
df_main['Version']=str(Choose_a_model_version)

# df_main
if 'RX' in Choose_a_model_version:
  columns_df=[]
  for value in (unique_HCC):
    columns_df.append('RXHCC'+str(value)) # Example: RXHCC1,RXHCC2,RXHCC3 etc
  #print(columns_df)
  for colum in range(len(columns_df)):
    df_main[columns_df[colum]]=0  #adding zero's to every column 

  for v in range(len(df_main)):                         
    df_main.at[v,'RXHCC'+ str(df_main.iloc[v][2])]=1   # Adding 1's to columns. Example if df_main['HCC'][10]= 22, then adding a 1 in HCC10 column in the 9th row. 

  df_main = df_main.drop(df_main.columns[-1],axis=1)
  #df_main.to_excel(str(Choose_a_model_version)+'_'+str(Choose_a_model_year)+'.xlsx')


else:
  # Part 5: Creating new columns with column names as HCC values obtained from Part 4
  columns_df=[]
  for value in (unique_HCC):
    columns_df.append('HCC'+str(value)) # Example: HCC1,HCC2,HCC3 etc
  #print(columns_df)
  for colum in range(len(columns_df)):
    df_main[columns_df[colum]]=0  #adding zero's to every column 

  for v in range(len(df_main)):                         
    df_main.at[v,'HCC'+ str(df_main.iloc[v][2])]=1   # Adding 1's to columns. Example if df_main['HCC'][10]= 22, then adding a 1 in HCC10 column in the 9th row. 

  df_main = df_main.drop(df_main.columns[-1],axis=1)
  #df_main.to_excel(str(Choose_a_model_version)+'_'+str(Choose_a_model_year)+'.xlsx')


df_main= df_main.fillna(0)
spark_df_main= spark.createDataFrame(df_main)
#spark_df_main.show(5)
print(time.time()-start)

In [None]:
@title CC-Override
data = data.withColumnRenamed("Person ID", "PERSON ID")

In [None]:
df = data.join(spark_df_main, 'PERSON ID', 'inner')

In [None]:
columns_to_drop = ['Year','V']
df= df.drop(*columns_to_drop)

In [None]:
df=df.withColumnRenamed("DIAGNOSIS CODE","DIAG")

In [None]:
df=df.withColumnRenamed("Gender","SEX")

In [None]:
df=df.withColumnRenamed("Age","AGEF")

In [None]:
data = df
    
overide_df = (spark.read.format("csv").options(header="true").load("/content/CC Override Rules.csv"))
overide_df = overide_df.withColumnRenamed("MODEL YEAR","MODEL_YEAR")
overide_df = overide_df.filter((overide_df.VERSION.isin(Choose_a_model_version))&(overide_df.MODEL_YEAR.isin(Choose_a_model_year)))
for i in overide_df.collect():
  data = data.withColumn('HCC', expr(i[3]))
#data.toPandas().to_excel('Merged_CCOveride.xlsx')

In [None]:
df_CCmerged= data

In [None]:
@title HCC HIERARCHY
data = pd.read_excel('/content/HCC Hierarchy.xlsx')   #Loading Files
data1 = spark.createDataFrame(data) 
data_2020_test = data.loc[(data['MODEL YR'] == int(Choose_a_model_year)) & (data['MODEL VRSN'] == Choose_a_model_version)]
merged_cc_2020_test = df_CCmerged
for i in range(len(data_2020_test)):
  # merged_cc_2020_spark1 = 0
  # if list(data_2020_test['HIGHER HCC'])[i] in merged_cc_2020_test.columns:
  merged_cc_2020_spark1 = merged_cc_2020_test.withColumn(list(data_2020_test['LOWER HCC'])[i], F.when((F.col(list(data_2020_test['HIGHER HCC'])[i])==1) & (F.col(list(data_2020_test['LOWER HCC'])[i])== 1) & (F.col('VERSION')== list(data_2020_test['MODEL VRSN'])[i]) ,0).otherwise(F.col(list(data_2020_test['LOWER HCC'])[i])))


In [None]:
result= merged_cc_2020_spark1

In [None]:
@title Interaction Variable Calculation
 #loading libraries
import pandas as pd
import os
import pyspark.sql.functions as func

In [None]:
from pyspark.sql.functions import datediff, to_date, lit
from pyspark.sql.functions import col, expr, when
import pyspark.sql.functions as F
from pyspark.sql.types import *

#result= pd.read_csv('/content/Merged_file_for_HCC_updated.csv')

data_interaction = result

interaction_demo = pd.read_excel("/content/Interaction Variable Calculations.xlsx")
interaction_demo= spark.createDataFrame(interaction_demo)

interaction_demo = interaction_demo.filter((interaction_demo['MODEL VRSN'] == Choose_a_model_version) & (interaction_demo['MODEL YR'] == Choose_a_model_year))
data_interaction = data_interaction.where(F.col('MODEL YEAR') == Choose_a_model_year)
data_interaction = data_interaction.withColumn("Version", lit(str(Choose_a_model_version)))

for i in interaction_demo.collect():
  if i[2] in result.columns:
      data_interaction = data_interaction.withColumn(i[2], expr(i[3]))

# data_interaction.show(5)
# data_interaction.toPandas().to_excel('Interaction_Variable_Calculation.xlsx') 


In [None]:
@title Score Calculation
import pandas as pd
import numpy as np

In [None]:
columns_to_drop = ['DOB', 'SEX', 'OREC', 'LTIMCAID', 'NEMCAID', 'ESRD', 'MCAID', 'AGEF', 'HCC']
data_interaction = data_interaction.drop(*columns_to_drop)

In [None]:
dfColumnNames = data_interaction.schema.names
print(dfColumnNames)

In [None]:
data_interaction = data_interaction.select('DIAG', 'MODEL YEAR', 'Version', 'PERSON ID','DISABL','F0_34', 'F35_44', 'F45_54', 'F55_59', 'F60_64', 'F65_69', 'F70_74', 'F75_79', 'F80_84', 'F85_89', 'F90_94', 'F95_GT', 'LTIMCAID_O', 'M0_34', 'M35_44', 'M45_54', 'M55_59', 'M60_64', 'M65_69', 'M70_74', 'M75_79', 'M80_84', 'M85_89', 'M90_94', 'M95_GT', 'MCAID_NORIGDIS  ', 'MCAID_ORIGDIS   ', 'NE_ORIGDS', 'NEF0_34', 'NEF35_44', 'NEF45_54', 'NEF55_59', 'NEF60_64', 'NEF65', 'NEF66', 'NEF67', 'NEF68', 'NEF69', 'NEF70_74', 'NEF75_79', 'NEF80_84', 'NEF85_89', 'NEF90_94', 'NEF95_GT', 'NEM0_34', 'NEM35_44', 'NEM45_54', 'NEM55_59', 'NEM60_64', 'NEM65', 'NEM66', 'NEM67', 'NEM68', 'NEM69', 'NEM70_74', 'NEM75_79', 'NEM80_84', 'NEM85_89', 'NEM90_94', 'NEM95_GT', 'NMCAID_NORIGDIS ', 'NMCAID_ORIGDIS  ', 'ORIGDS', 'HCC1', 'HCC2', 'HCC6', 'HCC8', 'HCC9', 'HCC10', 'HCC11', 'HCC12', 'HCC17', 'HCC18', 'HCC19', 'HCC21', 'HCC22', 'HCC23', 'HCC27', 'HCC28', 'HCC29', 'HCC33', 'HCC34', 'HCC35', 'HCC39', 'HCC40', 'HCC46', 'HCC47', 'HCC48', 'HCC51', 'HCC52', 'HCC54', 'HCC55', 'HCC56', 'HCC57', 'HCC58', 'HCC59', 'HCC60', 'HCC70', 'HCC71', 'HCC72', 'HCC73', 'HCC74', 'HCC75', 'HCC76', 'HCC77', 'HCC78', 'HCC79', 'HCC80', 'HCC82', 'HCC83', 'HCC84', 'HCC85', 'HCC86', 'HCC87', 'HCC88', 'HCC96', 'HCC99', 'HCC100', 'HCC103', 'HCC104', 'HCC106', 'HCC107', 'HCC108', 'HCC110', 'HCC111', 'HCC112', 'HCC114', 'HCC115', 'HCC122', 'HCC124', 'HCC134', 'HCC135', 'HCC136', 'HCC137', 'HCC138', 'HCC157', 'HCC158', 'HCC159', 'HCC161', 'HCC162', 'HCC166', 'HCC167', 'HCC169', 'HCC170', 'HCC173', 'HCC176', 'HCC186', 'HCC188', 'HCC189')

In [None]:
dfColumnNames = data_interaction.schema.names
print(dfColumnNames)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Fetch score sheet
scoreSheetPath = '/content/drive/MyDrive/Index/Score Variables.xlsx'   #Path to access the score variables from drive
scoreSheetDF = pd.read_excel(scoreSheetPath)
del scoreSheetDF['Unnamed: 5'],scoreSheetDF['Unnamed: 6'],scoreSheetDF['Unnamed: 7'],scoreSheetDF['Unnamed: 8'],scoreSheetDF['Unnamed: 9'],scoreSheetDF['Unnamed: 10'],scoreSheetDF['Unnamed: 11'],scoreSheetDF['Unnamed: 12']
# scoreSheetDF = spark.createDataFrame(scoreSheetDF)

In [None]:
scoreSheetDF = scoreSheetDF.loc[(scoreSheetDF['Model Year'] == int(Choose_a_model_year)) & (scoreSheetDF['Model Version'] == Choose_a_model_version)]

In [None]:
for i in range(len(list(scoreSheetDF['Variable']))):

  if list(scoreSheetDF['Variable'])[i] in data_interaction.columns:  
    data_interaction = data_interaction.withColumn(list(scoreSheetDF['Variable'])[i], F.when((F.col(list(scoreSheetDF['Variable'])[i])==1)  ,(list(scoreSheetDF['Coefficient Value'])[i])).otherwise(0))


In [None]:
data_interaction.show()