In [87]:
import os
import numpy as np
import ast
import re

import pyspark.sql.functions as f
# f.lit
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, StructField, StructType, FloatType, DoubleType

In [88]:
DATA_DIR = "./data/chronic-disease-data"

path = os.path.join(DATA_DIR, "U.S._Chronic_Disease_Indicators__CDI___2023_Release.csv")

spark = SparkSession.builder.appName('test')\
    .config("spark.executor.memory", "16g")\
    .getOrCreate()

test_spark_df_00_10 = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path)

test_spark_df_00_10.show()

+---------+-------+------------+--------------------+----------+------+--------------------+--------+-------------+-------------+---------+------------+-----------------------+-----------------+------------------+-------------------+-----------------------+-------------------+-----------------------+---------------+-----------------------+---------------+--------------------+----------+----------+-------+----------+---------------+-------------------------+-----------------+-------------------------+-----------------+-------------------------+-----------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc|DataSource| Topic|            Question|Response|DataValueUnit|DataValueType|DataValue|DataValueAlt|DataValueFootnoteSymbol|DatavalueFootnote|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|    Stratification1|StratificationCategory2|Stratification2|StratificationCategory3|Stratification3|         GeoLocation|ResponseID|LocationID|TopicID|QuestionID|DataValueType

# Drop uneccessary columns

In [89]:
cols_to_drop = ["Response",
    "ResponseID",
    "DataValueFootnoteSymbol",
    "DatavalueFootnote",

    "StratificationCategory2",
    "Stratification2",
    "StratificationCategory3",
    "Stratification3",

    "StratificationCategoryID1",
    "StratificationID1",
    "StratificationCategoryID2",
    "StratificationID2",
    "StratificationCategoryID3",
    "StratificationID3"]
test_spark_df_00_10 = test_spark_df_00_10.drop(*cols_to_drop)
test_spark_df_00_10.show()

+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc|DataSource| Topic|            Question|DataValueUnit|DataValueType|DataValue|DataValueAlt|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|    Stratification1|         GeoLocation|LocationID|TopicID|QuestionID|DataValueTypeID|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|     2014|   2014|          AR|            Arkansas| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Numbe

In [90]:
test_spark_df_00_10.count()

1185676

In [91]:
test_spark_df_00_10.tail(5)

[Row(YearStart=2020, YearEnd=2020, LocationAbbr='WY', LocationDesc='Wyoming', DataSource='BRFSS', Topic='Diabetes', Question='Dilated eye examination among adults aged >= 18 years with diagnosed diabetes', DataValueUnit='%', DataValueType='Age-adjusted Prevalence', DataValue=None, DataValueAlt=None, LowConfidenceLimit=None, HighConfidenceLimit=None, StratificationCategory1='Race/Ethnicity', Stratification1='White, non-Hispanic', GeoLocation='POINT (-108.10983035299967 43.23554134300048)', LocationID=56, TopicID='DIA', QuestionID='DIA7_0', DataValueTypeID='AGEADJPREV'),
 Row(YearStart=2020, YearEnd=2020, LocationAbbr='WY', LocationDesc='Wyoming', DataSource='BRFSS', Topic='Older Adults', Question='Proportion of older adults aged >= 65 years who are up to date on a core set of clinical preventive services', DataValueUnit='%', DataValueType='Crude Prevalence', DataValue='41.5', DataValueAlt=41.5, LowConfidenceLimit=38.5, HighConfidenceLimit=44.6, StratificationCategory1='Race/Ethnicity', 

In [92]:
test_spark_df_00_10.show(5)

+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+---------------+--------------------+----------+-------+----------+---------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc|DataSource| Topic|            Question|DataValueUnit|DataValueType|DataValue|DataValueAlt|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|Stratification1|         GeoLocation|LocationID|TopicID|QuestionID|DataValueTypeID|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+---------------+--------------------+----------+-------+----------+---------------+
|     2014|   2014|          AR|            Arkansas| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      916|

# Extract latitude and longitude from geolocation

In [93]:
test_spark_df_00_10 = test_spark_df_00_10.withColumn("GeoLocation", f.regexp_extract_all(f.col("GeoLocation"), f.lit("(-*\\d+.\\d+)"), 1))
test_spark_df_00_10.show()

+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc|DataSource| Topic|            Question|DataValueUnit|DataValueType|DataValue|DataValueAlt|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|    Stratification1|         GeoLocation|LocationID|TopicID|QuestionID|DataValueTypeID|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|     2014|   2014|          AR|            Arkansas| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Numbe

# Cast latitude and longitude str columns to doubles

In [94]:
test_spark_df_00_10 = test_spark_df_00_10.withColumn("Latitude", test_spark_df_00_10.GeoLocation[0].cast(DoubleType()))
test_spark_df_00_10 = test_spark_df_00_10.withColumn("Longitude", test_spark_df_00_10.GeoLocation[1].cast(DoubleType()))

In [95]:
test_spark_df_00_10.show(5)

+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+---------------+--------------------+----------+-------+----------+---------------+-------------------+------------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc|DataSource| Topic|            Question|DataValueUnit|DataValueType|DataValue|DataValueAlt|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|Stratification1|         GeoLocation|LocationID|TopicID|QuestionID|DataValueTypeID|           Latitude|         Longitude|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+---------------+--------------------+----------+-------+----------+---------------+-------------------+------------------+
|     2014|  

# Delete GeoLocation, DataSource, and DataValue columns as these are redundant

In [96]:
test_spark_df_00_10 = test_spark_df_00_10.drop(*["GeoLocation", "DataSource", "DataValue"])
test_spark_df_00_10.show(5)

+---------+-------+------------+--------------------+------+--------------------+-------------+-------------+------------+------------------+-------------------+-----------------------+---------------+----------+-------+----------+---------------+-------------------+------------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc| Topic|            Question|DataValueUnit|DataValueType|DataValueAlt|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|Stratification1|LocationID|TopicID|QuestionID|DataValueTypeID|           Latitude|         Longitude|
+---------+-------+------------+--------------------+------+--------------------+-------------+-------------+------------+------------------+-------------------+-----------------------+---------------+----------+-------+----------+---------------+-------------------+------------------+
|     2014|   2014|          AR|            Arkansas|Asthma|Hospitalizations ...|         NULL|       Number|       916.0|              NUL

# rename datavaluealt column (which is already a double) to just datavalue 

In [97]:
test_spark_df_00_10 = test_spark_df_00_10.withColumnRenamed("DataValueAlt", "DataValue")
test_spark_df_00_10.show(5)

+---------+-------+------------+--------------------+------+--------------------+-------------+-------------+---------+------------------+-------------------+-----------------------+---------------+----------+-------+----------+---------------+-------------------+------------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc| Topic|            Question|DataValueUnit|DataValueType|DataValue|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|Stratification1|LocationID|TopicID|QuestionID|DataValueTypeID|           Latitude|         Longitude|
+---------+-------+------------+--------------------+------+--------------------+-------------+-------------+---------+------------------+-------------------+-----------------------+---------------+----------+-------+----------+---------------+-------------------+------------------+
|     2014|   2014|          AR|            Arkansas|Asthma|Hospitalizations ...|         NULL|       Number|    916.0|              NULL|          

In [98]:
test_spark_df_00_10.count()

1185676

# Remove rows with null values either in datavalue, datavalueunit, and datavaluetype

In [None]:
# means thhat if datavalueunit or datavalue or 
# datavaluetype is null then return true and negate it
cond = ~(f.isnull("DataValueUnit") | f.isnull("DataValue") | f.isnull("DataValueType"))
test_spark_df_00_10 = test_spark_df_00_10.filter(cond)
test_spark_df_00_10.show()

+---------+-------+------------+--------------+-------------------+--------------------+-----------------+-----------------+---------+------------------+-------------------+-----------------------+--------------------+----------+-------+----------+---------------+-------------------+------------------+
|YearStart|YearEnd|LocationAbbr|  LocationDesc|              Topic|            Question|    DataValueUnit|    DataValueType|DataValue|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|     Stratification1|LocationID|TopicID|QuestionID|DataValueTypeID|           Latitude|         Longitude|
+---------+-------+------------+--------------+-------------------+--------------------+-----------------+-----------------+---------+------------------+-------------------+-----------------------+--------------------+----------+-------+----------+---------------+-------------------+------------------+
|     2020|   2020|          IL|      Illinois|             Asthma|Asthma mortality ...|

# Replace `per 100,000` and `per 100,000 residents` with `cases per 100,000` instead to reduce redundancy

In [102]:
test_spark_df_00_10.select("DataValueUnit").distinct().collect()

[Row(DataValueUnit='Number'),
 Row(DataValueUnit='cases per 1,000,000'),
 Row(DataValueUnit='per 100,000'),
 Row(DataValueUnit='cases per 10,000'),
 Row(DataValueUnit='cases per 100,000'),
 Row(DataValueUnit='%'),
 Row(DataValueUnit='gallons'),
 Row(DataValueUnit='$'),
 Row(DataValueUnit='cases per 1,000'),
 Row(DataValueUnit='pack sales per capita'),
 Row(DataValueUnit='Years'),
 Row(DataValueUnit='per 100,000 residents')]

In [104]:
test_spark_df_00_10 = test_spark_df_00_10.withColumn("DataValueUnit", f.when(f.col("DataValueUnit") == "per 100,000", "cases per 100,000").when(f.col("DataValueUnit") == "per 100,000 residents", "cases per 100,000").otherwise(f.col("DataValueUnit")))
test_spark_df_00_10.show()

+---------+-------+------------+--------------+-------------------+--------------------+-----------------+-----------------+---------+------------------+-------------------+-----------------------+--------------------+----------+-------+----------+---------------+-------------------+------------------+
|YearStart|YearEnd|LocationAbbr|  LocationDesc|              Topic|            Question|    DataValueUnit|    DataValueType|DataValue|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|     Stratification1|LocationID|TopicID|QuestionID|DataValueTypeID|           Latitude|         Longitude|
+---------+-------+------------+--------------+-------------------+--------------------+-----------------+-----------------+---------+------------------+-------------------+-----------------------+--------------------+----------+-------+----------+---------------+-------------------+------------------+
|     2020|   2020|          IL|      Illinois|             Asthma|Asthma mortality ...|

In [105]:
test_spark_df_00_10.select("DataValueUnit").distinct().collect()

[Row(DataValueUnit='Number'),
 Row(DataValueUnit='cases per 1,000,000'),
 Row(DataValueUnit='cases per 10,000'),
 Row(DataValueUnit='cases per 100,000'),
 Row(DataValueUnit='%'),
 Row(DataValueUnit='gallons'),
 Row(DataValueUnit='$'),
 Row(DataValueUnit='cases per 1,000'),
 Row(DataValueUnit='pack sales per capita'),
 Row(DataValueUnit='Years')]

# Extract out the age brackets in each question if there are any

In [None]:
test_spark_df_00_10.withColumn("Question", f.regexp_substr(f.col("Question"), f.lit("")))

In [None]:
test_spark_df_00_10.dtypes

[('YearStart', 'int'),
 ('YearEnd', 'int'),
 ('LocationAbbr', 'string'),
 ('LocationDesc', 'string'),
 ('Topic', 'string'),
 ('Question', 'string'),
 ('DataValueUnit', 'string'),
 ('DataValueType', 'string'),
 ('DataValue', 'double'),
 ('LowConfidenceLimit', 'double'),
 ('HighConfidenceLimit', 'double'),
 ('StratificationCategory1', 'string'),
 ('Stratification1', 'string'),
 ('LocationID', 'int'),
 ('TopicID', 'string'),
 ('QuestionID', 'string'),
 ('DataValueTypeID', 'string'),
 ('Latitude', 'double'),
 ('Longitude', 'double')]

# Errors
* Out of memory error: 
- https://stackoverflow.com/questions/73111729/pyspark-java-heap-out-of-memory-when-saving-5m-rows-dataframe
- https://medium.com/@rakeshchanda/spark-out-of-memory-issue-memory-tuning-and-management-in-pyspark-802b757b562f
- https://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space
* EOF errror: 