In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import col,when
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

In [2]:
# Import SparkConf class into program
from pyspark import SparkConf
# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[*]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "diabetes"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)
# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL
# Method 1: Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

## Load the file

In [3]:
raw_df = spark.read.option("header", True).option("inferSchema", True) .csv("diabetes_nhanes_17_18.csv")

check the file's schema

In [4]:
# file's schema
raw_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- SEQN: double (nullable = true)
 |-- DIQ010: integer (nullable = true)
 |-- DID040: string (nullable = true)
 |-- DIQ160: string (nullable = true)
 |-- DIQ170: string (nullable = true)
 |-- DIQ172: string (nullable = true)
 |-- DIQ175A: string (nullable = true)
 |-- DIQ175B: string (nullable = true)
 |-- DIQ175C: string (nullable = true)
 |-- DIQ175D: string (nullable = true)
 |-- DIQ175E: string (nullable = true)
 |-- DIQ175F: string (nullable = true)
 |-- DIQ175G: string (nullable = true)
 |-- DIQ175H: string (nullable = true)
 |-- DIQ175I: string (nullable = true)
 |-- DIQ175J: string (nullable = true)
 |-- DIQ175K: string (nullable = true)
 |-- DIQ175L: string (nullable = true)
 |-- DIQ175M: string (nullable = true)
 |-- DIQ175N: string (nullable = true)
 |-- DIQ175O: string (nullable = true)
 |-- DIQ175P: string (nullable = true)
 |-- DIQ175Q: string (nullable = true)
 |-- DIQ175R: string (nullable = true)
 |-- DIQ175S: string (nullable

In [5]:
raw_df = raw_df.drop('_c0')

In [6]:
raw_df.show(1,vertical=True)

-RECORD 0----------
 SEQN    | 93703.0 
 DIQ010  | 2       
 DID040  | NA      
 DIQ160  | NA      
 DIQ170  | NA      
 DIQ172  | NA      
 DIQ175A | NA      
 DIQ175B | NA      
 DIQ175C | NA      
 DIQ175D | NA      
 DIQ175E | NA      
 DIQ175F | NA      
 DIQ175G | NA      
 DIQ175H | NA      
 DIQ175I | NA      
 DIQ175J | NA      
 DIQ175K | NA      
 DIQ175L | NA      
 DIQ175M | NA      
 DIQ175N | NA      
 DIQ175O | NA      
 DIQ175P | NA      
 DIQ175Q | NA      
 DIQ175R | NA      
 DIQ175S | NA      
 DIQ175T | NA      
 DIQ175U | NA      
 DIQ175V | NA      
 DIQ175W | NA      
 DIQ175X | NA      
 DIQ180  | NA      
 DIQ050  | 2       
 DID060  | NA      
 DIQ060U | NA      
 DIQ070  | NA      
 DIQ230  | NA      
 DIQ240  | NA      
 DID250  | NA      
 DID260  | NA      
 DIQ260U | NA      
 DIQ275  | NA      
 DIQ280  | NA      
 DIQ291  | NA      
 DIQ300S | NA      
 DIQ300D | NA      
 DID310S | NA      
 DID310D | NA      
 DID320  | NA      
 DID330  | NA      


## check the df structure

In [7]:
# output: (rows, columns)
raw_df.count(), len(raw_df.columns)

(8897, 54)

 ## Deleting unneeded columns

In [9]:
need_to_remove = ['DIQ175L','DIQ175M','DIQ175N','DIQ175O','DIQ175P','DIQ175Q','DIQ175R','DIQ175S','DIQ175T','DIQ175U','DIQ175V','DIQ175W','DIQ175X','DIQ180','DIQ050','DID060','DIQ060U','DIQ070','DIQ230','DIQ240','DID250','DID260','DIQ260U','DIQ275','DIQ280','DIQ291','DIQ300S','DIQ300D','DID310S','DID310D','DID320','DID330','DID341','DID350','DIQ350U','DIQ360','DIQ080']
for column in need_to_remove:
    raw_df = raw_df.drop(column)

## change all NA into null

In [10]:
for c in raw_df.columns:
    raw_df = raw_df.withColumn(c, when(col(c)=='NA' ,None).otherwise(col(c)))

In [11]:
raw_df.show(1,vertical=True)

-RECORD 0----------
 SEQN    | 93703.0 
 DIQ010  | 2       
 DID040  | null    
 DIQ160  | null    
 DIQ170  | null    
 DIQ172  | null    
 DIQ175A | null    
 DIQ175B | null    
 DIQ175C | null    
 DIQ175D | null    
 DIQ175E | null    
 DIQ175F | null    
 DIQ175G | null    
 DIQ175H | null    
 DIQ175I | null    
 DIQ175J | null    
 DIQ175K | null    
only showing top 1 row



In [12]:
# set a list contain features exclude 'SEQN'
columns = raw_df.columns
columns.pop(0)

'SEQN'

## Check data components for each feature

In [13]:
# remove rows contain too many null values 
def new_df_desc(df,input_thresh,columns):
    # drop rows contain more than ... null
    raw_df_new=df.dropna(thresh=input_thresh) # input_thresh: require how many non-NA values
    print(raw_df_new.count())
    for column in columns:
        # group counts for each feature
        raw_df_new.select(column)\
        .groupBy(raw_df_new[column]).agg({column:'count'}).orderBy(count(raw_df_new[column]).desc()).show()
        # null percentage for each feature
        raw_df_new.select(count(when(col(column).isNull(),column))/count('*')*100).show()
    return raw_df_new

In [16]:
raw_df_new = new_df_desc(raw_df,1,columns)

8897
+------+-------------+
|DIQ010|count(DIQ010)|
+------+-------------+
|     2|         7816|
|     1|          893|
|     3|          184|
|     9|            4|
+------+-------------+

+----------------------------------------------------------------------+
|((count(CASE WHEN (DIQ010 IS NULL) THEN DIQ010 END) / count(1)) * 100)|
+----------------------------------------------------------------------+
|                                                                   0.0|
+----------------------------------------------------------------------+

+------+-------------+
|DID040|count(DID040)|
+------+-------------+
|    50|           79|
|    40|           48|
|    55|           45|
|    45|           45|
|    60|           42|
|    58|           31|
|    59|           23|
|    62|           23|
|    30|           22|
|    48|           21|
|    52|           20|
|    65|           20|
|    54|           20|
|    47|           19|
|    53|           19|
|    35|           18|
|    46

+-------+--------------+
|DIQ175K|count(DIQ175K)|
+-------+--------------+
|     20|            29|
|   null|             0|
+-------+--------------+

+------------------------------------------------------------------------+
|((count(CASE WHEN (DIQ175K IS NULL) THEN DIQ175K END) / count(1)) * 100)|
+------------------------------------------------------------------------+
|                                                       99.67404743171856|
+------------------------------------------------------------------------+



## Set the suited target variable

### 'DIQ010' OR 'DIQ170'

* split data by 'DIQ010': Doctor told you have diabetes

In [29]:
df_1 = raw_df[raw_df['DIQ010']==1] # has diabetes
df_2 = raw_df[raw_df['DIQ010']==2] # do not have
df_3 = raw_df[raw_df['DIQ010']==3] # borderline

In [30]:
for column in columns:
    # group counts for each feature
    df_1.select(column)\
        .groupBy(df_1[column]).agg({column:'count'}).orderBy(count(df_1[column]).desc()).show()
#     # null percentage for each feature
#     df_1.select(count(when(col(column).isNull(),column))/count('*')*100).show()

+------+-------------+
|DIQ010|count(DIQ010)|
+------+-------------+
|     1|          893|
+------+-------------+

+------+-------------+
|DID040|count(DID040)|
+------+-------------+
|    50|           79|
|    40|           48|
|    55|           45|
|    45|           45|
|    60|           42|
|    58|           31|
|    59|           23|
|    62|           23|
|    30|           22|
|    48|           21|
|    54|           20|
|    52|           20|
|    65|           20|
|    53|           19|
|    47|           19|
|    46|           18|
|    35|           18|
|    42|           17|
|    38|           17|
|    57|           17|
+------+-------------+
only showing top 20 rows

+------+-------------+
|DIQ160|count(DIQ160)|
+------+-------------+
|  null|            0|
+------+-------------+

+------+-------------+
|DIQ170|count(DIQ170)|
+------+-------------+
|  null|            0|
+------+-------------+

+------+-------------+
|DIQ172|count(DIQ172)|
+------+-------------+
|  nu

people has diabetes would have no data for other features --> 'DIQ010' could not be target variable

* split data by 'DIQ170': Ever told have health risk for diabetes

In [31]:
df_1_170 = raw_df[raw_df['DIQ170']==1] # yes
df_2_170 = raw_df[raw_df['DIQ170']==2] # no
df_3_170 = raw_df[raw_df['DIQ170']==9] # don't know

In [32]:
for column in columns:
    df_2_170.select(column)\
        .groupBy(df_1_170[column]).agg({column:'count'}).orderBy(count(df_2_170[column]).desc()).show()
    df_2_170.select(count(when(col(column).isNull(),column))/count('*')*100).show()

+------+-------------+
|DIQ010|count(DIQ010)|
+------+-------------+
|     2|         4807|
|     3|          103|
|     9|            1|
+------+-------------+

+----------------------------------------------------------------------+
|((count(CASE WHEN (DIQ010 IS NULL) THEN DIQ010 END) / count(1)) * 100)|
+----------------------------------------------------------------------+
|                                                                   0.0|
+----------------------------------------------------------------------+

+------+-------------+
|DID040|count(DID040)|
+------+-------------+
|  null|            0|
+------+-------------+

+----------------------------------------------------------------------+
|((count(CASE WHEN (DID040 IS NULL) THEN DID040 END) / count(1)) * 100)|
+----------------------------------------------------------------------+
|                                                                 100.0|
+---------------------------------------------------------------

* above all, 'DIQ170' is suited

## Using UDF to change data in 'DIQ170', 'DIQ160', 'DIQ175A':

change data in 'DIQ170':

    1 --> has risk of getting diabetes

    0 --> do not have risk of getting diabetes

In [33]:
def check_170(input_string):
    if input_string == '1': # has risk of diabetes
        return 1
    else:
        return 0

In [34]:
check_170_UDF = udf(lambda x:check_170(x))

change data in 'DIQ160':

    1 --> has prediabetes

    0 --> do not have prediabetes
    
    2 --> not sure

In [35]:
def check_160(input_string):
    if input_string == '1': # yes
        return str(1)
    elif input_string == '2': # no
        return str(0)
    else: # don't konw
        return str(2)

In [36]:
check_160_UDF = udf(lambda x:check_160(x))

change data in 'DIQ175A':

    1 --> has family history

    0 --> do not have

In [37]:
def check_175A(input_string):
    if input_string == '10': # yes
        return str(1)
    else: # no
        return str(0)

In [38]:
check_175A_UDF = udf(lambda x:check_175A(x))

change data for null:

    1 --> not null

    0 --> null

In [39]:
def check_null(input_string):
    if input_string == None: # has risk of diabetes
        return str(0)
    else:
        return str(1)

In [40]:
check_null_UDF = udf(lambda x:check_null(x))

# Data Processing Function

In [42]:
def df_cleaning(df_name):
    # load raw file
    df = spark.read.option('header', True).option('inferSchema', True) .csv(df_name)
    # drop useless feature
    need_to_remove = ['_c0','DIQ175C','DIQ175E','DIQ175L','DIQ175M','DIQ175N','DIQ175O','DIQ175P','DIQ175Q','DIQ175R','DIQ175S','DIQ175T','DIQ175U','DIQ175V','DIQ175W','DIQ175X','DIQ180','DIQ050','DID060','DIQ060U','DIQ070','DIQ230','DIQ240','DID250','DID260','DIQ260U','DIQ275','DIQ280','DIQ291','DIQ300S','DIQ300D','DID310S','DID310D','DID320','DID330','DID341','DID350','DIQ350U','DIQ360','DIQ080']
    for column in need_to_remove:
        df = df.drop(column)
    # NA --> null
    for c in df.columns:
        df = df.withColumn(c, when(col(c)=='NA' ,None).otherwise(col(c)))
    # target feature's value change --> [0,1]
    df = df.withColumn('DIQ170',check_170_UDF(col('DIQ170')))
    df = df.withColumn('DIQ160',check_160_UDF(col('DIQ160')))
    df = df.withColumn('DIQ175A',check_175A_UDF(col('DIQ175A')))
    # rename columns
    df = df.withColumnRenamed('DIQ010', 'have_diabetes')
    df = df.withColumnRenamed('DID040', 'age_get_diabetes')
    df = df.withColumnRenamed('DIQ160', 'have_prediabetes')
    df = df.withColumnRenamed('DIQ170', 'have_risk')
    df = df.withColumnRenamed('DIQ172', 'feel_risk')
    df = df.withColumnRenamed('DIQ175A', 'family_history')
    df = df.withColumnRenamed('DIQ175B', 'overweight')
    df = df.withColumnRenamed('DIQ175D', 'poor_diet')
    df = df.withColumnRenamed('DIQ175F', 'baby_weighed_over9')
    df = df.withColumnRenamed('DIQ175G', 'lack_physical_activity')
    df = df.withColumnRenamed('DIQ175H', 'high_blood_pressure')
    df = df.withColumnRenamed('DIQ175I', 'high_blood_sugar')
    df = df.withColumnRenamed('DIQ175J', 'high_cholesterol')
    df = df.withColumnRenamed('DIQ175K', 'hypoglycemic')
    for column in ['overweight','poor_diet','baby_weighed_over9','lack_physical_activity','high_blood_pressure','high_blood_sugar','high_cholesterol','hypoglycemic']:
        df = df.withColumn(column,check_null_UDF(col(column)))
    
    # drop rows contain more than ... null
#     df=df.dropna(thresh=6) # input_thresh: require how many non-NA values
#     for c in df.columns:
#         df = df.withColumn(c, when(col(c)==None ,str(0)).otherwise(col(c)))
    
    return df

In [43]:
df_17_18 = df_cleaning('diabetes_nhanes_17_18.csv')
df_15_16 = df_cleaning('diabetes_nhanes_15_16.csv')
df_13_14 = df_cleaning('diabetes_nhanes_13_14.csv')
df_11_12 = df_cleaning('diabetes_nhanes_11_12.csv')

In [44]:
union_all_df = df_17_18.union(df_15_16).cache()
union_all_df = union_all_df.union(df_13_14).cache()
union_all_df = union_all_df.union(df_11_12).cache()

In [45]:
union_all_df.count()

38012

In [46]:
union_all_df_pd = union_all_df.toPandas()

In [48]:
columns = union_all_df.columns
columns.pop(0)

'SEQN'

In [49]:
print(columns)

['have_diabetes', 'age_get_diabetes', 'have_prediabetes', 'have_risk', 'feel_risk', 'family_history', 'overweight', 'poor_diet', 'baby_weighed_over9', 'lack_physical_activity', 'high_blood_pressure', 'high_blood_sugar', 'high_cholesterol', 'hypoglycemic']


## final df structure:

In [53]:
for column in columns:
    union_all_df.select(column)\
        .groupBy(union_all_df[column]).agg({column:'count'}).orderBy(count(union_all_df[column]).desc()).show()
    # null percentage for each feature
    union_all_df.select(count(when(col(column).isNull(),column))/count('*')*100).show()

+-------------+--------------------+
|have_diabetes|count(have_diabetes)|
+-------------+--------------------+
|            2|               34066|
|            1|                3223|
|            3|                 701|
|            9|                  18|
|            7|                   2|
|         null|                   0|
+-------------+--------------------+

+------------------------------------------------------------------------------------+
|((count(CASE WHEN (have_diabetes IS NULL) THEN have_diabetes END) / count(1)) * 100)|
+------------------------------------------------------------------------------------+
|                                                                0.005261496369567504|
+------------------------------------------------------------------------------------+

+----------------+-----------------------+
|age_get_diabetes|count(age_get_diabetes)|
+----------------+-----------------------+
|              50|                    239|
|              40|   

+----------------+-----------------------+
|high_blood_sugar|count(high_blood_sugar)|
+----------------+-----------------------+
|               0|                  37719|
|               1|                    293|
+----------------+-----------------------+

+------------------------------------------------------------------------------------------+
|((count(CASE WHEN (high_blood_sugar IS NULL) THEN high_blood_sugar END) / count(1)) * 100)|
+------------------------------------------------------------------------------------------+
|                                                                                       0.0|
+------------------------------------------------------------------------------------------+

+----------------+-----------------------+
|high_cholesterol|count(high_cholesterol)|
+----------------+-----------------------+
|               0|                  37485|
|               1|                    527|
+----------------+-----------------------+

+---------------

## save in the csv

In [230]:
union_all_df_pd.to_csv('union_all_diabetes_final.csv',index=False)