In [1]:
from pyspark.sql import SparkSession
import sys
import re
import os

In [2]:
spark = SparkSession.builder.appName("myapp").getOrCreate()

In [3]:
#print(os.environ['SPARK_HOME'])

## 1. READING DATA INTO SPARK FRAMES
- We are reading two datasets 
    - BRFSS 2019 survey
    - BRFSS 2017 survey
- Not using the 2018 survey since it has significant differences in the key features that we extracting for the ML classification
- Related Work: https://www.kaggle.com/alexteboul/diabetes-health-indicators-dataset-notebook
    - The related work uses variuos lifestyle indicator habits, prior chronic disease indicator from the BRFSS survey to identify Diabetes risk for 2015 BRFSS survey
    - We will use similar indicators to see if we can predict risk of Heart Attack and Heart Disease.We will be using two yeas of data (2019 and 2017)

### Dataset Links 
- Original Links on CDC Website
    - https://www.cdc.gov/brfss/annual_data/annual_2017.html
    - https://www.cdc.gov/brfss/annual_data/annual_2019.html
- We have uploaded to S3 for easy access the locations are
    - s3a://brfss-big-data-project/BRFSS_2017.csv
    - s3a://brfss-big-data-project/BRFSS_2019.csv
    

In [4]:
# READ LOCAL DATA FILE
# Comment if reading from S3
# df2019 = spark.read.csv("5330 project data/BRFSS_2019.csv", header='true',inferSchema='true')
# df2017 = spark.read.csv("5330 project data/BRFSS_2017.csv", header='true',inferSchema='true')

df2019 = spark.read.csv("../../../BRFSS/CSV_version/BRFSS_2019.csv", header='true',inferSchema='true')
df2017 = spark.read.csv("../../../BRFSS/CSV_version/BRFSS_2017.csv", header='true',inferSchema='true')

                                                                                

In [5]:
# READ FROM S3 BUCKET
# Comment if reading locally
# sc._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
# sc._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
# sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

# df2019 = spark.read.csv("s3a://brfss-big-data-project/BRFSS_2019.csv", header = 'true',inferSchema='true')
# df2017 = spark.read.csv("s3a://brfss-big-data-project/BRFSS_2017.csv", header = 'true',inferSchema='true')

In [6]:
df2019.printSchema()

root
 |-- _STATE: double (nullable = true)
 |-- FMONTH: double (nullable = true)
 |-- IDATE: integer (nullable = true)
 |-- IMONTH: integer (nullable = true)
 |-- IDAY: integer (nullable = true)
 |-- IYEAR: integer (nullable = true)
 |-- DISPCODE: double (nullable = true)
 |-- SEQNO: integer (nullable = true)
 |-- _PSU: double (nullable = true)
 |-- CTELENM1: double (nullable = true)
 |-- PVTRESD1: double (nullable = true)
 |-- COLGHOUS: double (nullable = true)
 |-- STATERE1: double (nullable = true)
 |-- CELPHONE: double (nullable = true)
 |-- LADULT1: double (nullable = true)
 |-- COLGSEX: double (nullable = true)
 |-- NUMADULT: double (nullable = true)
 |-- LANDSEX: double (nullable = true)
 |-- NUMMEN: double (nullable = true)
 |-- NUMWOMEN: double (nullable = true)
 |-- RESPSLCT: double (nullable = true)
 |-- SAFETIME: double (nullable = true)
 |-- CTELNUM1: double (nullable = true)
 |-- CELLFON5: double (nullable = true)
 |-- CADULT1: double (nullable = true)
 |-- CELLSEX: doubl

In [7]:
df2017.select(['_STATE','_VEGLT1A']).show(5)

+------+--------+
|_STATE|_VEGLT1A|
+------+--------+
|   1.0|     1.0|
|   1.0|     1.0|
|   1.0|     2.0|
|   1.0|     9.0|
|   1.0|     2.0|
+------+--------+
only showing top 5 rows



In [8]:
print("Dimensions of the Data Frame:")
print((df2017.count(), len(df2017.columns)))

Dimensions of the Data Frame:
(450016, 358)


Note: We can see that the complete brfss dataset has about 0.4 million records and 358 columns in 2017 survey. The 2019 survey is similar in dimension

We are only interested in the columns related to the prediction of heart attacks and heart disease in individuals. The current column names are based on a code book that the CDC maintains. We will selected relavent columns using the code book to map key indicators.

- Link to code book: https://www.cdc.gov/brfss/annual_data/annual_2019.html
- Link to code book: https://www.cdc.gov/brfss/annual_data/annual_2017.html

## 2. SELECTING THE COLUMNS OF INTEREST

In [9]:
# Selecting coloumns containing indicators for Heart Disease

In [10]:
cols_to_select2017 = ["_MICHD", # target variable if person has had Heart Attack aka Myocardial Infraction
                  "_STATE","_BMI5", # State person belongs to, Body Mass Index 
                 "_RFHYPE5","TOLDHI2","_CHOLCH1", # BP and cholestrol
                 "_FRTLT1A","_VEGLT1A", "SMOKE100","_RFDRHV5", # Food (Fruit and vegetable consumption), alcohol and smoking
                 "DIABETE3","CVDSTRK3", # chronic diseases = Diabetes, Stroke
                 "HLTHPLN1","MEDCOST", # Insurance and medical access
                 "_TOTINDA","GENHLTH","PHYSHLTH","MENTHLTH","DIFFWALK", #Fitness and activity 
                 "SEX","_AGEG5YR","EDUCA","INCOME2"] # demographic Data = Gender, Age bracket, Education level, Income bracket

In [11]:
cols_to_select2019 = ["_MICHD", # target variable if person has had Heart Attack aka Myocardial Infraction
                  "_STATE","_BMI5", # State person belongs to, Body Mass Index 
                 "_RFHYPE5","TOLDHI2","_CHOLCH2", # BP and cholestrol
                 "_FRTLT1A","_VEGLT1A", "SMOKE100","_RFDRHV7", # Food (Fruit and vegetable consumption), alcohol and smoking
                 "DIABETE4","CVDSTRK3", # chronic diseases = Diabetes, Stroke
                 "HLTHPLN1","MEDCOST", # Insurance and medical access
                 "_TOTINDA","GENHLTH","PHYSHLTH","MENTHLTH","DIFFWALK", #Fitness and activity 
                 "SEXVAR","_AGEG5YR","EDUCA","INCOME2"] # demographic Data = Gender, Age bracket, Education level, Income bracket

In [12]:
heartDisease2019 = df2019.select(cols_to_select2019)
heartDisease2019.show(5)

+------+------+------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+--------+------+--------+-----+-------+
|_MICHD|_STATE| _BMI5|_RFHYPE5|TOLDHI2|_CHOLCH2|_FRTLT1A|_VEGLT1A|SMOKE100|_RFDRHV7|DIABETE4|CVDSTRK3|HLTHPLN1|MEDCOST|_TOTINDA|GENHLTH|PHYSHLTH|MENTHLTH|DIFFWALK|SEXVAR|_AGEG5YR|EDUCA|INCOME2|
+------+------+------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+--------+------+--------+-----+-------+
|   2.0|   1.0|2817.0|     2.0|    1.0|     1.0|     1.0|     1.0|     1.0|     1.0|     3.0|     2.0|     1.0|    2.0|     2.0|    3.0|    15.0|    88.0|     1.0|   2.0|    13.0|  3.0|    3.0|
|   2.0|   1.0|1854.0|     1.0|    2.0|     1.0|     1.0|     1.0|     2.0|     1.0|     3.0|     2.0|     1.0|    2.0|     1.0|    4.0|    10.0|    88.0|     2.0|   2.0|    11.0|  5.0|    5.0|
|   2.0|   1.0|3162.0|     2.0

In [13]:
heartDisease2017 = df2017.select(cols_to_select2017)
heartDisease2017.show(5)

+------+------+------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+--------+---+--------+-----+-------+
|_MICHD|_STATE| _BMI5|_RFHYPE5|TOLDHI2|_CHOLCH1|_FRTLT1A|_VEGLT1A|SMOKE100|_RFDRHV5|DIABETE3|CVDSTRK3|HLTHPLN1|MEDCOST|_TOTINDA|GENHLTH|PHYSHLTH|MENTHLTH|DIFFWALK|SEX|_AGEG5YR|EDUCA|INCOME2|
+------+------+------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+--------+---+--------+-----+-------+
|   2.0|   1.0|2696.0|     2.0|    1.0|     1.0|     1.0|     1.0|     2.0|     1.0|     1.0|     2.0|     1.0|    2.0|     1.0|    2.0|    88.0|    88.0|     1.0|2.0|    11.0|  6.0|    6.0|
|   2.0|   1.0|2943.0|     2.0|    2.0|     1.0|     1.0|     1.0|     2.0|     1.0|     3.0|     2.0|     1.0|    2.0|     1.0|    2.0|    88.0|    88.0|     2.0|1.0|    10.0|  6.0|    8.0|
|   2.0|   1.0|2504.0|     1.0|    1.0|     1

## 3. REMOVE ROWS THAT HAVE NULL VALUES

In [14]:
print("Dimensions of the Data Frame:")
print((heartDisease2019.count(), len(heartDisease2019.columns)))

Dimensions of the Data Frame:
(418268, 23)


In [15]:
data2017=heartDisease2017.na.drop()
data2019=heartDisease2019.na.drop()

In [16]:
print("Dimensions of the Data Frame:")
print((data2019.count(), len(data2019.columns)))

Dimensions of the Data Frame:


[Stage 13:>                                                         (0 + 8) / 8]

(351875, 23)


                                                                                

## 4. MAPPING THE FEATURES APPROPRIATELY
- We will use our understanding of the code book to correctly map values 
- Survey of writes 7 or 9 to indicate the person did not know or refused to answer.
- We need to clean this values and map them so that the values are meaningful

In [17]:
data2019.printSchema()

root
 |-- _MICHD: double (nullable = true)
 |-- _STATE: double (nullable = true)
 |-- _BMI5: double (nullable = true)
 |-- _RFHYPE5: double (nullable = true)
 |-- TOLDHI2: double (nullable = true)
 |-- _CHOLCH2: double (nullable = true)
 |-- _FRTLT1A: double (nullable = true)
 |-- _VEGLT1A: double (nullable = true)
 |-- SMOKE100: double (nullable = true)
 |-- _RFDRHV7: double (nullable = true)
 |-- DIABETE4: double (nullable = true)
 |-- CVDSTRK3: double (nullable = true)
 |-- HLTHPLN1: double (nullable = true)
 |-- MEDCOST: double (nullable = true)
 |-- _TOTINDA: double (nullable = true)
 |-- GENHLTH: double (nullable = true)
 |-- PHYSHLTH: double (nullable = true)
 |-- MENTHLTH: double (nullable = true)
 |-- DIFFWALK: double (nullable = true)
 |-- SEXVAR: double (nullable = true)
 |-- _AGEG5YR: double (nullable = true)
 |-- EDUCA: double (nullable = true)
 |-- INCOME2: double (nullable = true)



In [18]:
print(type(data2019))

<class 'pyspark.sql.dataframe.DataFrame'>


In [19]:
# imports for pyspark SQL
from pyspark.sql.functions import when

#### 4.1 _MICHD
- 1: Yes has Heart Issues -->  1 
- 2: No has no heart issues --> 0
- Remove all 7 (dont knows)
- Remove all 9 (refused)

In [20]:
data2019 = data2019.filter((data2019["_MICHD"] == 2)| (data2019["_MICHD"] == 1))
data2019 = data2019.withColumn("_MICHD", when(data2019._MICHD == 2,0).otherwise(data2019._MICHD))

data2017 = data2017.filter((data2017["_MICHD"] == 2)| (data2017["_MICHD"] == 1))
data2017 = data2017.withColumn("_MICHD", when(data2017._MICHD == 2,0).otherwise(data2017._MICHD))

#### 4.2 MAPPING THE STATES .... Will do later

#### 4.3 _BMI5
- these are BMI * 100. So for example a BMI of 4018 is really 40.18
- 777 and 999 indicate did not answer or refused (filter these out)

In [21]:
data2019 = data2019.filter(data2019["_BMI5"] != 777)
data2019 = data2019.filter(data2019["_BMI5"] != 999)
data2019 = data2019.withColumn('_BMI5', data2019["_BMI5"]/100)

data2017 = data2017.filter(data2017["_BMI5"] != 777)
data2017 = data2017.filter(data2017["_BMI5"] != 999)
data2017 = data2017.withColumn('_BMI5', data2017["_BMI5"]/100)

#### 4.4 _RFHYPE5
- Change 1 to 0 so it represents No high blood pressure and 2 to 1 so it represents high blood pressure

In [22]:
data2019 = data2019.filter((data2019["_RFHYPE5"] == 2)| (data2019["_RFHYPE5"] == 1))
data2019 = data2019.withColumn("_RFHYPE5", when(data2019._RFHYPE5 == 1,0).when(data2019._RFHYPE5 == 2,1).otherwise(data2019._RFHYPE5))

data2017 = data2017.filter((data2017["_RFHYPE5"] == 2)| (data2017["_RFHYPE5"] == 1))
data2017 = data2017.withColumn("_RFHYPE5", when(data2017._RFHYPE5 == 1,0).when(data2017._RFHYPE5 == 2,1).otherwise(data2017._RFHYPE5))

#### 4.5 TOLDHI2
- Change 2 to 0 so it represents no high cholesterol diagnosis and 1 stays as a high cholesterol diagnosis
- Filter out values of 7 (don't know) and 9 (refused)

In [23]:
data2019 = data2019.filter((data2019["TOLDHI2"] == 2)| (data2019["TOLDHI2"] == 1))
data2019 = data2019.withColumn("TOLDHI2", when(data2019.TOLDHI2 == 2,0).otherwise(data2019.TOLDHI2))

data2017 = data2017.filter((data2017["TOLDHI2"] == 2)| (data2017["TOLDHI2"] == 1))
data2017 = data2017.withColumn("TOLDHI2", when(data2017.TOLDHI2 == 2,0).otherwise(data2017.TOLDHI2))

#### 4.6  _CHOLCH2  or _CHOLCH1
- Keep  1 as 1 has checked cholestrol in past 5 years. 
- 2 to 0 for Not checked cholesterol in past 5 years
- 3 to 0 never had cholestrol checked
- Remove 9 (Not sure, refused)

In [24]:
data2019 = data2019.filter((data2019["_CHOLCH2"] != 9))
data2019 = data2019.withColumn("_CHOLCH2", when(data2019._CHOLCH2 == 2,0).when(data2019._CHOLCH2 == 3,0).otherwise(data2019._CHOLCH2))

data2017 = data2017.filter((data2017["_CHOLCH1"] != 9))
data2017 = data2017.withColumn("_CHOLCH1", when(data2017._CHOLCH1 == 2,0).when(data2017._CHOLCH1 == 3,0).otherwise(data2017._CHOLCH1))

#### 4.7 _FRTLT1A
- Filter out value of 9 (don't know, missing, or refused)
- Recode 2 to 0 indicating not consuming fruit 1 or more times per day

In [25]:
data2019 = data2019.filter((data2019["_FRTLT1A"] != 9))
data2019 = data2019.withColumn("_FRTLT1A", when(data2019._FRTLT1A == 2,0).otherwise(data2019._FRTLT1A))

data2017 = data2017.filter((data2017["_FRTLT1A"] != 9))
data2017 = data2017.withColumn("_FRTLT1A", when(data2017._FRTLT1A == 2,0).otherwise(data2017._FRTLT1A))

#### 4.8 _VEGLT1A
- Filter out value of 9 (don't know, missing, or refused)
- Recode 2 to 0 indicating not consuming vegetables 1 or more times per day

In [26]:
data2019 = data2019.filter((data2019["_VEGLT1A"] != 9))
data2019 = data2019.withColumn("_VEGLT1A", when(data2019._VEGLT1A == 2,0).otherwise(data2019._VEGLT1A))

data2017 = data2017.filter((data2017["_VEGLT1A"] != 9))
data2017 = data2017.withColumn("_VEGLT1A", when(data2017._VEGLT1A == 2,0).otherwise(data2017._VEGLT1A))

#### 4.9 SMOKE100
- Filter out values of 7 or 9
- Recode 2 to 0 indicating not having smoked at least 100 cigarettes in lifetime

In [27]:
data2019 = data2019.filter((data2019["SMOKE100"] == 2)| (data2019["SMOKE100"] == 1))
data2019 = data2019.withColumn("SMOKE100", when(data2019.SMOKE100 == 2,0).otherwise(data2019.SMOKE100))

data2017 = data2017.filter((data2017["SMOKE100"] == 2)| (data2017["SMOKE100"] == 1))
data2017 = data2017.withColumn("SMOKE100", when(data2017.SMOKE100 == 2,0).otherwise(data2017.SMOKE100))

#### 4.10 _RFDRHV7 or _RFDRHV5
- Different names in 2017 and 2019 but same labels
- Filter out value of 9
- Recode 1 to 0 (no heavy drinking) and 2 to 1 (heavy drinking)

In [28]:
data2019 = data2019.filter((data2019["_RFDRHV7"] == 2)| (data2019["_RFDRHV7"] == 1))
data2019 = data2019.withColumn("_RFDRHV7", when(data2019._RFDRHV7 == 1,0).when(data2019._RFDRHV7 == 2,1).otherwise(data2019._RFDRHV7))

data2017 = data2017.filter((data2017["_RFDRHV5"] == 2)| (data2017["_RFDRHV5"] == 1))
data2017 = data2017.withColumn("_RFDRHV5", when(data2017._RFDRHV5 == 1,0).when(data2017._RFDRHV5 == 2,1).otherwise(data2017._RFDRHV5))

#### 4.11 DIABETE4 or DIABETE3
- Different names in 2017 and 2019
- Making this a Boolean Binary.
- 1 = Yes has diabetes --> 1
- 2 = Yes only during Pregnancy --> 1
- 3 =  No -->0
- 4 = No but has prediabetes --> 1
- Remove all 7 (dont knows)
- Remove all 9 (refused)

In [29]:
data2019 = data2019.filter((data2019["DIABETE4"] != 7))
data2019 = data2019.filter((data2019["DIABETE4"] != 9))
data2019 = data2019.withColumn("DIABETE4", when(data2019.DIABETE4 == 2,1).when(data2019.DIABETE4 == 3,0).when(data2019.DIABETE4 == 4,1).otherwise(data2019.DIABETE4))

data2017 = data2017.filter((data2017["DIABETE3"] != 7))
data2017 = data2017.filter((data2017["DIABETE3"] != 9))
data2017 = data2017.withColumn("DIABETE3", when(data2017.DIABETE3 == 2,1).when(data2017.DIABETE3 == 3,0).when(data2017.DIABETE3 == 4,1).otherwise(data2017.DIABETE3))

#### 4.12 CVDSTRK3
- 1 is had a stroke --> 1 
- Change 2 to 0 because it is No
- Remove all 7 (dont knows)
- Remove all 9 (refused)

In [30]:
data2019 = data2019.filter((data2019["CVDSTRK3"] == 2)| (data2019["CVDSTRK3"] == 1))
data2019 = data2019.withColumn("CVDSTRK3", when(data2019.CVDSTRK3 == 2,0).otherwise(data2019.CVDSTRK3))

data2017 = data2017.filter((data2017["CVDSTRK3"] == 2)| (data2017["CVDSTRK3"] == 1))
data2017 = data2017.withColumn("CVDSTRK3", when(data2017.CVDSTRK3 == 2,0).otherwise(data2017.CVDSTRK3))

#### 4.13 HLTHPLN1
- 1 is yes, Person has health coverage 
- change 2 to 0 because it is No health care access
- remove 7 and 9 for don't know or refused 

In [31]:
data2019 = data2019.filter((data2019["HLTHPLN1"] == 2)| (data2019["HLTHPLN1"] == 1))
data2019 = data2019.withColumn("HLTHPLN1", when(data2019.HLTHPLN1 == 2,0).otherwise(data2019.HLTHPLN1))

data2017 = data2017.filter((data2017["HLTHPLN1"] == 2)| (data2017["HLTHPLN1"] == 1))
data2017 = data2017.withColumn("HLTHPLN1", when(data2017.HLTHPLN1 == 2,0).otherwise(data2017.HLTHPLN1))

#### 4.14 MEDCOST
- Did not go see doctor in last 12 months due to cost ? Yes = 1
- Change 2 to 0 for no, 1 is already yes
- Remove 7 for don/t know and 9 for refused

In [32]:
data2019 = data2019.filter((data2019["MEDCOST"] == 2)| (data2019["MEDCOST"] == 1))
data2019 = data2019.withColumn("MEDCOST", when(data2019.MEDCOST == 2,0).otherwise(data2019.MEDCOST))

data2017 = data2017.filter((data2017["MEDCOST"] == 2)| (data2017["MEDCOST"] == 1))
data2017 = data2017.withColumn("MEDCOST", when(data2017.MEDCOST == 2,0).otherwise(data2017.MEDCOST))

#### 4.15 _TOTINDA
- Adults who reported doing physical activity or exercise during the past 30 days other than their regular job
- 1 for physical activity
- change 2 to 0 for no physical activity
- Remove all 9 (don't know/refused)

In [33]:
data2019 = data2019.filter((data2019["_TOTINDA"] == 2)| (data2019["_TOTINDA"] == 1))
data2019 = data2019.withColumn("_TOTINDA", when(data2019._TOTINDA == 2,0).otherwise(data2019._TOTINDA))

data2017 = data2017.filter((data2017["_TOTINDA"] == 2)| (data2017["_TOTINDA"] == 1))
data2017 = data2017.withColumn("_TOTINDA", when(data2017._TOTINDA == 2,0).otherwise(data2017._TOTINDA))

#### 4.16 GENHLTH
- Would you say that in general your health is:
- This is an ordinal variable  (1 is Excellent -> 5 is Poor) we will reverse it so that(1 is poor and 5 is excellent)
- Remove 7 and 9 for don't know and refused

In [34]:
data2019 = data2019.filter((data2019["GENHLTH"] != 7))
data2019 = data2019.filter((data2019["GENHLTH"] != 9))

data2017 = data2017.filter((data2017["GENHLTH"] != 7))
data2017 = data2017.filter((data2017["GENHLTH"] != 9))

In [35]:
data2019.select("GENHLTH").show(10)

+-------+
|GENHLTH|
+-------+
|    3.0|
|    4.0|
|    3.0|
|    2.0|
|    2.0|
|    1.0|
|    5.0|
|    2.0|
|    5.0|
|    1.0|
+-------+
only showing top 10 rows



In [36]:
data2019 = data2019.withColumn("GENHLTH", when(data2019.GENHLTH == 1,5) \
                               .when(data2019.GENHLTH == 5,1) \
                               .when(data2019.GENHLTH == 2,4) \
                               .when(data2019.GENHLTH == 4,2) \
                               .otherwise(data2019.GENHLTH))

data2017 = data2017.withColumn("GENHLTH", when(data2017.GENHLTH == 1,5) \
                               .when(data2017.GENHLTH == 5,1) \
                               .when(data2017.GENHLTH == 2,4) \
                               .when(data2017.GENHLTH == 4,2) \
                               .otherwise(data2017.GENHLTH))

In [37]:
data2019.select("GENHLTH").show(10)

+-------+
|GENHLTH|
+-------+
|    3.0|
|    2.0|
|    3.0|
|    4.0|
|    4.0|
|    5.0|
|    1.0|
|    4.0|
|    1.0|
|    5.0|
+-------+
only showing top 10 rows



#### 4.17 PHYSHLTH
- for how many days during the past 30 days was your physical health not good?
- already in days so keep that, scale will be 0-30
- change 88 to 0 because it means none (no bad physical health days)
- remove 77 and 99 for don't know not sure and refused

In [38]:
data2019 = data2019.filter((data2019["PHYSHLTH"] != 77))
data2019 = data2019.filter((data2019["PHYSHLTH"] != 99))
data2019 = data2019.withColumn("PHYSHLTH", when(data2019.PHYSHLTH == 88,0).otherwise(data2019.PHYSHLTH))

data2017 = data2017.filter((data2017["PHYSHLTH"] != 77))
data2017 = data2017.filter((data2017["PHYSHLTH"] != 99))
data2017 = data2017.withColumn("PHYSHLTH", when(data2017.PHYSHLTH == 88,0).otherwise(data2017.PHYSHLTH))

#### 4.18 MENTHLTH
- for how many days during the past 30 days was your mental health not good?
- already in days so keep that, scale will be 0-30
- change 88 to 0 because it means none (no bad mental health days)
- remove 77 and 99 for don't know not sure and refused

In [39]:
data2019 = data2019.filter((data2019["MENTHLTH"] != 77))
data2019 = data2019.filter((data2019["MENTHLTH"] != 99))
data2019 = data2019.withColumn("MENTHLTH", when(data2019.MENTHLTH == 88,0).otherwise(data2019.MENTHLTH))

data2017 = data2017.filter((data2017["MENTHLTH"] != 77))
data2017 = data2017.filter((data2017["MENTHLTH"] != 99))
data2017 = data2017.withColumn("MENTHLTH", when(data2017.MENTHLTH == 88,0).otherwise(data2017.MENTHLTH))

#### 4.19 DIFFWALK
- Do you have serious difficulty walking or climbing stairs? yes = 1
- change 2 to 0 for no. 1 is already yes
- remove 7 and 9 for don't know not sure and refused

In [40]:
data2019 = data2019.filter((data2019["DIFFWALK"] == 2)| (data2019["DIFFWALK"] == 1))
data2019 = data2019.withColumn("DIFFWALK", when(data2019.DIFFWALK == 2,0).otherwise(data2019.DIFFWALK))

data2017 = data2017.filter((data2017["DIFFWALK"] == 2)| (data2017["DIFFWALK"] == 1))
data2017 = data2017.withColumn("DIFFWALK", when(data2017.DIFFWALK == 2,0).otherwise(data2017.DIFFWALK))

#### 4.20 SEXVAR or SEX
- Different names in 2017 and 2019 but same labels
- is respondent male 
- men may be  at higher risk for heart disease
- change 2 to 0 (female as 0). Male is 1
- 9 means refused

In [41]:
data2019 = data2019.filter((data2019["SEXVAR"] == 2)| (data2019["SEXVAR"] == 1))
data2019 = data2019.withColumn("SEXVAR", when(data2019.SEXVAR == 2,0).otherwise(data2019.SEXVAR))

data2017 = data2017.filter((data2017["SEX"] == 2)| (data2017["SEX"] == 1))
data2017 = data2017.withColumn("SEX", when(data2017.SEX == 2,0).otherwise(data2017.SEX))

#### 4.21 _AGEG5YR
- Reported age in five-year age categories calculated variable
- already ordinal. 1 is 18-24 all the way up to 13 wis 80 and older. 5 year increments.
- remove 14 because it is don't know or missing

In [42]:
data2019 = data2019.filter((data2019["_AGEG5YR"] != 14))

data2017 = data2017.filter((data2017["_AGEG5YR"] != 14))

#### 4.22 EDUCA
- Level of education completed - This is already an ordinal variable 
- 1 being never attended school or kindergarten only up to 6 being college 4 years or more
- Scale here is 1-6
- Remove 9 for refused:

In [43]:
data2019 = data2019.filter((data2019["EDUCA"] != 9))

data2017 = data2017.filter((data2017["EDUCA"] != 9))

#### 4.23 INCOME2
- Annual household income - in levels
- Variable is already ordinal with 1 being less than $10,000 all the way up to 8 being $75,000 or more
- Remove 77 and 99 for don't know and refused

In [44]:
data2019 = data2019.filter((data2019["INCOME2"] != 77))
data2019 = data2019.filter((data2019["INCOME2"] != 99))


data2017 = data2017.filter((data2017["INCOME2"] != 77))
data2017 = data2017.filter((data2017["INCOME2"] != 99))

## 5. RENAMING COLUMNS

In [45]:
import pyspark.sql.functions as F

#Method for renaming columns
def rename_columns(df, columns):
    if isinstance(columns, dict):
        return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
    else:
        raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

In [46]:
column_mapping_2019 = {"_MICHD" : "HeartDisease", "_STATE" : "State", "_BMI5" : "BMI",
                 "_RFHYPE5" : "HighBP","TOLDHI2" : "HighChol", "_CHOLCH2" : "CholCheck",
                 "_FRTLT1A" : "FruitConsume" , "_VEGLT1A" : "VegetableConsume" ,  
                 "SMOKE100" : "Smoker" , "_RFDRHV7" : "HeavyDrinker",
                 "DIABETE4" : "Diabetes" , "CVDSTRK3" : "Stroke",
                 "HLTHPLN1" : "Healthcare" , "MEDCOST" : "NoDoctorDueToCost",
                  "_TOTINDA" : "PhysicalActivity", "GENHLTH" : "GeneralHealth", "PHYSHLTH" : "PhysicalHealth",
                  "MENTHLTH" : "MentalHealth", "DIFFWALK" : "DifficultyWalking",
                  "SEXVAR" : "Gender", "_AGEG5YR" : "Age", "EDUCA" : "Education" , "INCOME2" : "Income"}

In [47]:
column_mapping_2017 = {"_MICHD" : "HeartDisease", "_STATE" : "State", "_BMI5" : "BMI",
                 "_RFHYPE5" : "HighBP","TOLDHI2" : "HighChol", "_CHOLCH1" : "CholCheck",
                 "_FRTLT1A" : "FruitConsume" , "_VEGLT1A" : "VegetableConsume" ,  
                 "SMOKE100" : "Smoker" , "_RFDRHV5" : "HeavyDrinker",
                 "DIABETE3" : "Diabetes" , "CVDSTRK3" : "Stroke",
                 "HLTHPLN1" : "Healthcare" , "MEDCOST" : "NoDoctorDueToCost",
                  "_TOTINDA" : "PhysicalActivity", "GENHLTH" : "GeneralHealth", "PHYSHLTH" : "PhysicalHealth",
                  "MENTHLTH" : "MentalHealth", "DIFFWALK" : "DifficultyWalking",
                  "SEX" : "Gender", "_AGEG5YR" : "Age", "EDUCA" : "Education" , "INCOME2" : "Income"}

In [56]:
data2019 = rename_columns(data2019, column_mapping_2019)

In [57]:
data2017 = rename_columns(data2017, column_mapping_2017)

## 6. CONCATE OR UNION OF DATA

In [60]:
data_full = data2017.unionByName(data2019)

In [61]:
data_full.printSchema()

root
 |-- HeartDisease: double (nullable = true)
 |-- State: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- FruitConsume: double (nullable = true)
 |-- VegetableConsume: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- HeavyDrinker: double (nullable = true)
 |-- Diabetes: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- Healthcare: double (nullable = true)
 |-- NoDoctorDueToCost: double (nullable = true)
 |-- PhysicalActivity: double (nullable = true)
 |-- GeneralHealth: double (nullable = true)
 |-- PhysicalHealth: double (nullable = true)
 |-- MentalHealth: double (nullable = true)
 |-- DifficultyWalking: double (nullable = true)
 |-- Gender: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)



In [65]:
data2017.count() + data2019.count()

                                                                                

519171

In [64]:
data_full.count()

                                                                                

519171

## 7. WRITE THE CLEANED DATA AND CHECK

In [75]:
# Comment after running once
#data_full.write.option("header",True).csv("../../../BRFSS/HeartRiskData")

In [76]:
check_data = spark.read.csv("../../../BRFSS/HeartRiskData/", header='true',inferSchema='true')

In [77]:
check_data.printSchema()

root
 |-- HeartDisease: double (nullable = true)
 |-- State: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- FruitConsume: double (nullable = true)
 |-- VegetableConsume: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- HeavyDrinker: double (nullable = true)
 |-- Diabetes: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- Healthcare: double (nullable = true)
 |-- NoDoctorDueToCost: double (nullable = true)
 |-- PhysicalActivity: double (nullable = true)
 |-- GeneralHealth: double (nullable = true)
 |-- PhysicalHealth: double (nullable = true)
 |-- MentalHealth: double (nullable = true)
 |-- DifficultyWalking: double (nullable = true)
 |-- Gender: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)



In [78]:
check_data.count()

519171