In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
spark = SparkSession.builder \
            .appName("mongo") \
            .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:2.4.0")\
            .getOrCreate()

In [0]:
aws_access_key = 'AKIASMGR453JIJC7PTKH'
aws_secret_key = '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'
spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1') 
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)

In [0]:
### Define Schema of our main table
schema = StructType([StructField('Health_Service_Area', StringType(), True),\
                   StructField('Hospital_County', StringType(), True),\
                   StructField('Operating_Certificate_Number', IntegerType(), True),\
                   StructField('Facility_ID', IntegerType(), True),\
                   StructField('Facility_Name', StringType(), True),\
                   StructField('Age_Group', StringType(), True),\
                   StructField('Zip_Code', StringType(), True),\
                   StructField('Gender', StringType(), True),\
                   StructField('Race', StringType(), True),\
                     StructField('Ethnicity', StringType(), True),\
                     StructField('Length_of_Stay', StringType(), True),\
                     StructField('Type_of_Admission', StringType(), True),\
                     StructField('Patient_Disposition', StringType(), True),\
                     StructField('Discharge_Year', IntegerType(), True),\
                     StructField('CCS_Diagnosis_Code', IntegerType(), True),\
                     StructField('CCS_Diagnosis_Description', StringType(), True),\
                     StructField('CCS_Procedure_Code', IntegerType(), True),\
                     StructField('CCS_Procedure_Description', StringType(), True),\
                     StructField('APR_DRG_Code', IntegerType(), True),\
                     StructField('APR_DRG_Description', StringType(), True),\
                     StructField('APR_MDC_Code', IntegerType(), True),\
                     StructField('APR_MDC_Description', StringType(), True),\
                     StructField('APR_Severity_of_Illness_Code', IntegerType(), True),\
                     StructField('APR_Severity_of_Illness_Description', StringType(), True),\
                     StructField('APR_Risk_of_Mortality', StringType(), True),\
                     StructField('APR_Medical_Surgical_Description', StringType(), True),\
                     StructField('Payment_Typology_1', StringType(), True),\
                     StructField('Payment_Typology_2', StringType(), True),\
                     StructField('Payment_Typology_3', StringType(), True),\
                     StructField('Birth_Weight', StringType(), True),\
                     StructField('Abortion_Edit_Indicator', StringType(), True),\
                     StructField('Emergency_Department_Indicator', StringType(), True),\
                     StructField('Total_Charges', StringType(), True),\
                     StructField('Total_Costs', StringType(), True)]
                     )


In [0]:
%%time
### read main data: 3 csvs in total, including hospital patient incharge data from 2011 to 2013
df = spark.read.schema(schema).option("header","true").csv("s3://icu996-msds697/hospital/*")

### The total costs and total charges start with a dollar sign. Use regular expression to remove it and convert these data type to float
df = df.withColumn('Total_Costs', regexp_replace('Total_Costs', '[$,]', '').cast('float'))
df = df.withColumn('Total_Charges', regexp_replace('Total_Charges', '[$,]', '').cast('float'))

### Remove rows whose value of length of stay is recorded as 120+, and then convert its data type from string to int
df = df.where(col('Length_of_Stay') != '120+')
df = df.withColumn("Length_of_Stay",col("Length_of_Stay").cast('int'))

### We already have relevant encoding features, for example, data contains the CCS_Diagnosis_Code for every CCS_Diagnosis_Description,
### so we don't have to use stringindex, one-hot encoding or other feature engineering techiniques for these description features, 
### but use their code features and drop them.
df = df.drop("CCS_Diagnosis_Description","CCS_Procedure_Description","APR_DRG_Description",
       "APR_MDC_Description", "APR_Severity_of_Illness_Description", "APR_Medical_Surgical_Description")
df.printSchema()

In [0]:
%%time
df.select('Length_of_Stay').show(5)

In [0]:
df = df.cache()

In [0]:
# OOS refers to outside of state patients. To help us merge with other demographic information, we decide to drop them
df = df.filter("Zip_Code != 'OOS'")
df = df.dropna(subset = 'Zip_Code')
#df.count()

In [0]:
### Extra data, Median Household Income within New York State in zip code level around 2010
### Import income data, Source: https://www.psc.isr.umich.edu/dis/census/Features/tract2zip/
schema_income = StructType([StructField('Zip', StringType(), True),\
                            StructField('income_median', FloatType(), True)
                           ])

df_income = spark.read.schema(schema_income).option("header","true").csv("s3://icu996-msds697/zip_income.csv")
df_income = df_income.withColumnRenamed('Zip', 'Zip_Code')
df_income = df_income.withColumnRenamed('income_median', 'Income_Median')

# Merge with Main df using Zip_Code
df = df.join(df_income, 'Zip_Code', 'left_outer')


In [0]:
### We also consider demographic information to support our prediction
### Hand-collect median_age and sex_racial_ratio data from Cornell 2010 census report 
### Source:https://pad.human.cornell.edu/census2010/reports/2010%20race%20age%20sex%20New%20York.pdf
### Merge age_sex data with df and get final df_merged
schema_age = StructType([
    StructField('zip', StringType(), True),
    StructField('state', StringType(), True),
    StructField('race', StringType(), True),
    StructField('Median_Age', FloatType(), True),
    StructField('Sex_Racial_Ratio', FloatType(), True)])
df_age_sex = spark.read.schema(schema_age).option("header","true").csv("s3://icu996-msds697/age_sex_merge.csv")
df_age_sex = df_age_sex.withColumnRenamed('race', 'race_')
df_merged = df.join(df_age_sex, (df.Zip_Code == df_age_sex.zip)&(df.Race == df_age_sex.race_), 'left_outer')
df_merged = df_merged.drop('state', 'race_', 'zip')  
df_merged.select('Zip_Code', 'Income_Median', 'Race', 'Median_Age', 'Sex_Racial_Ratio').show(10)

In [0]:
# Deal with NA values

# check na value in all columns
df_merged.select([count(when(col(c).isNull(), c)).alias(c) for c in df_merged.columns]).show()

# deal with na value
# we notice that the payment typology 2 and 3 have many na values, but we would like to keep non-na values to capture some prediction patterns
# Therefore, for these two columns, we fill na value with UNK
# After that drop any other na values to exclude tiny outliers
df_merged = df_merged.fillna('UNK', subset=['Payment_Typology_2', 'Payment_Typology_3'])
df_merged = df_merged.dropna(how='any')

In [0]:
# Finally, we have 7M data for us further prediction on length of stay, medical cost, and medical charges.
df_merged.count()

#### MongoDB Atlas

In [0]:
database = 'icu996'
collection = 'hospital_merged_final'
user_name = 'icu996'
password = 'icu996'
address = 'team15.8iitt.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

In [0]:
connection_string

In [0]:
# took 3.45 minutes to store data into mongodb
df_merged.write.format("mongo").option("uri",connection_string).mode("append").save()

In [0]:
df = spark.read.format("mongo").option("uri",connection_string).load()