# Final Project: Admission Prediction from NHAMCS
## Data preprocessing
### DS5559: Big Data Analysis
### Thomas Hartka, Alicia Doan, Michael Langmayr
Created: 6/27/2020 
  
In this notebook preprocess the predictors and create our outcome variable.

## Configure

In [1]:
# set data directory
data_dir = "../data"

In [2]:
# import python libraries
import os
import pandas as pd
import numpy as np
from functools import reduce

In [3]:
# set up pyspark
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.getOrCreate()

## Read in data

In [4]:
NHAMCS = spark.read.parquet(data_dir + "/NHAMCS.2007-2017")

## Select only cases with year >=2014

In [5]:
NHAMCS = NHAMCS.filter(col('YEAR')>=2014)

## Create outcome variable

In [6]:
# create outcome variable
NHAMCS = NHAMCS.withColumn("ADM_OUTCOME", when((col("ADMITHOS")=="Yes") | \
                                                (col("TRANPSYC")=="Yes") | \
                                                (col("TRANOTH")=="Yes") | \
                                                (col("OBSHOS")=="Yes"), 1).otherwise(0))

## Fix variables

**Age**  
Make age 0 for those less than 1 and 100 for those greater than 100

In [7]:
# fix group classifiers
NHAMCS = NHAMCS.withColumn('AGE', regexp_replace('AGE', 'Under one year','0')) \
    .withColumn('AGE', regexp_replace('AGE', '93 years and over','93')) \
    .withColumn('AGE', regexp_replace('AGE', '100 years and over','100'))

# convert to integers
NHAMCS = NHAMCS.withColumn('AGEYEAR', NHAMCS['AGE'].cast(IntegerType()))

**Sex**  
Make new variable SEXMALE. 1=male, 0=female

In [8]:
NHAMCS = NHAMCS.withColumn('SEXMALE', when(col('SEX')=="Male",1).otherwise(0))

**Arrival time**  
Make new variable ARRTIMEMIN.  Minutes past midnight of arrival.

In [9]:
NHAMCS = NHAMCS.withColumn('ARRTIME', NHAMCS['ARRTIME'].cast(IntegerType()))

In [10]:
def convert_time(time):
    return  int(time/100)*60  + (time % 100)

udf_cTime = udf(convert_time, IntegerType())

In [11]:
NHAMCS = NHAMCS.withColumn('ARRTIMEMIN', udf_cTime("ARRTIME"))

**Strings to integer**  
These variables should be integers.

In [12]:
NHAMCS = NHAMCS.withColumn('YEAR', NHAMCS['YEAR'].cast(IntegerType())) \
    .withColumn('PULSE', NHAMCS['PULSE'].cast(IntegerType()))  \
    .withColumn('RESPR', NHAMCS['RESPR'].cast(IntegerType())) \
    .withColumn('BPSYS', NHAMCS['BPSYS'].cast(IntegerType())) \
    .withColumn('BPDIAS', NHAMCS['BPDIAS'].cast(IntegerType())) \
    .withColumn('POPCT', NHAMCS['POPCT'].cast(IntegerType())) \
    .withColumn('PAINSCALE', NHAMCS['PAINSCALE'].cast(IntegerType())) \
    .withColumn('TOTCHRON', NHAMCS['TOTCHRON'].cast(IntegerType())) 

**Strings to floats**  
These variables should be floats.

In [13]:
NHAMCS = NHAMCS.withColumn('TEMPF', NHAMCS['TEMPF'].cast(IntegerType()))

**Comorbidities to 0/1**  
Convert comorbities to 0 or 1. 0=Not present, 1=Present

In [14]:
def convert_comb(dis):
    if dis == "Yes":
        return 1
    elif dis == "No":
        return 0
    else:
        return None
    
udf_cDis = udf(convert_comb, IntegerType())

In [15]:
NHAMCS = NHAMCS.withColumn('ALZHD', udf_cDis("ALZHD")) \
    .withColumn('ASTHMA', udf_cDis("ASTHMA")) \
    .withColumn('CAD', udf_cDis("CAD")) \
    .withColumn('CANCER', udf_cDis("CANCER")) \
    .withColumn('CEBVD', udf_cDis("CEBVD")) \
    .withColumn('CHF', udf_cDis("CHF")) \
    .withColumn('COPD', udf_cDis("COPD")) \
    .withColumn('DEPRN', udf_cDis("DEPRN")) \
    .withColumn('DIABTYP0', udf_cDis("DIABTYP0")) \
    .withColumn('DIABTYP1', udf_cDis("DIABTYP1")) \
    .withColumn('DIABTYP2', udf_cDis("DIABTYP2")) \
    .withColumn('EDHIV', udf_cDis("EDHIV")) \
    .withColumn('ESRD', udf_cDis("ESRD")) \
    .withColumn('ETOHAB', udf_cDis("ETOHAB")) \
    .withColumn('HPE', udf_cDis("HPE")) \
    .withColumn('HTN', udf_cDis("HTN")) \
    .withColumn('HYPLIPID', udf_cDis("HYPLIPID")) \
    .withColumn('OBESITY', udf_cDis("OBESITY")) \
    .withColumn('OSA', udf_cDis("OSA")) \
    .withColumn('OSTPRSIS', udf_cDis("OSTPRSIS")) \
    .withColumn('SUBSTAB', udf_cDis("SUBSTAB")) \
    .withColumn('NOCHRON', udf_cDis("NOCHRON")) \
    .withColumn('INJURY', udf_cDis("INJURY")) \
    .withColumn('INJURY72', udf_cDis("INJURY72"))   

## Select columns of interest

In [16]:
NHAMCS = NHAMCS.select(['AGEYEAR','AGER','SEXMALE','RESIDNCE','ARRTIMEMIN','YEAR','PULSE','TEMPF', \
                            'RESPR','BPSYS','BPDIAS','POPCT','PAINSCALE','ALZHD','ASTHMA','CAD','CANCER', \
                            'CEBVD','CHF','CKD','COPD','DEPRN','DIABTYP0','DIABTYP1','DIABTYP2','EDHIV', \
                            'ESRD','ETOHAB','HPE','HTN','HYPLIPID','OBESITY','OSA','OSTPRSIS','SUBSTAB', \
                            'NOCHRON','TOTCHRON','RFV1','RFV2','RFV3','RFV4','RFV5','INJURY','INJURY72', \
                            'ADM_OUTCOME'])

## Write out data

In [None]:
# write out data
NHAMCS.write.parquet(data_dir + "/NHAMCS_processed.2007-2017")