# Spark Connect and DF

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[*]") \
.appName("spark_setup") \
.getOrCreate()

## 2018 Spark DF for proof of concept

In [None]:
df_2018 = spark.read.option("header",True).option("inferSchema",True).csv("parsed_data/year_2018.csv")

In [None]:
df_2018.show(1)

In [None]:
df_2018.count() #2.85 million rows

## 2014-2018 Aggregated Spark DF

In [None]:
df_concat = spark.read.option("header",True).option("inferSchema",True).csv("parsed_data")

In [None]:
df_concat.cache()

In [None]:
df_concat.show(1)

In [None]:
df_concat.printSchema()

In [None]:
df_concat.count() #13.8 million rows

## Remove unnecessary columns for efficiency

In [None]:
columns_to_drop = ['line_num', 'age_recode_52', 'age_recode_27', 'age_recode_12', 'infant_age_recode_22', 'ucd_recode_358', 'ucd_infant_recode_130', 'mcd_count_E', 'mcd_1_E', 'mcd_2_E', 'mcd_3_E', 'mcd_4_E', 'mcd_5_E', 'mcd_6_E', 'mcd_7_E', 'mcd_8_E', 'mcd_9_E', 'mcd_10_E', 'mcd_11_E', 'mcd_12_E', 'mcd_13_E', 'mcd_14_E', 'mcd_15_E', 'mcd_16_E', 'mcd_17_E', 'mcd_18_E', 'mcd_19_E', 'mcd_20_E', 'race_recode_3',  'race_recode_5', 'race_recode_40']

df_concat = df_concat.drop(*columns_to_drop)

In [None]:
len(df_concat.columns)

In [None]:
df_concat.columns

In [None]:
df_concat.printSchema()

## Create Stratified Samply by Death Year: 20% sample for each year

In [1]:
from pyspark.sql.functions import col

stratified = df_concat.sampleBy("death_year", fractions = {2014: 0.2, 2015: 0.2, 2016: 0.2, 2017: 0.2, 2018: 0.2}, seed = 212)

ModuleNotFoundError: ignored

In [2]:
stratified.cache()

NameError: ignored

In [None]:
stratified.groupBy("death_year").count().show()

In [None]:
stratified.count() # 2.75 million rows

## Create a folder of partitioned csvs, can read back into Spark DF

In [None]:
stratified.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('stratified_csvs') 

## Read in stratified sample to Spark DF

In [None]:
df_strat = spark.read.option("header",True).option("inferSchema",True).csv("stratified_csvs")

In [None]:
df_strat.cache()

In [None]:
df_strat.columns

In [None]:
df_strat.printSchema()

In [None]:
df_strat.show(1)

In [None]:
df_strat.count() # 2.75 million rows

In [None]:
df_strat.groupBy("death_year").count().show()

## Cleaning

In [None]:
from pyspark.sql.functions import when, lit, col

def replace(column, value):
    return when(column != value, column)

In [None]:
df_strat = df_strat.withColumn("education", replace(col("education"), 99)) # change Education value "Not stated" to null
df_strat = df_strat.withColumn("education", replace(col("education"), 9)) # change Education value "Unknown" to null
df_strat = df_strat.withColumn("age_flag", replace(col("age_flag"), 9)) # change Age_flag value "Age not stated" to null
df_strat = df_strat.withColumn("place_of_death", replace(col("place_of_death"), 9)) # change Place of Death value "Unknown" to null
df_strat = df_strat.withColumn("marital_status", replace(col("marital_status"), "U")) # change Marital Status value "Unknown" to null


In [None]:
df_strat = df_strat.withColumn("death_day_of_week", replace(col("death_day_of_week"), 9)) # change Day of Week of Death value "Unknown" to null
df_strat = df_strat.withColumn("work_injury", replace(col("work_injury"), "U")) # change Injury at Work value "Unknown" to null
df_strat = df_strat.withColumn("disposition_method", replace(col("disposition_method"), "U")) # change Disposition method value "Unknown" to null
df_strat = df_strat.withColumn("autopsy", replace(col("autopsy"), "U")) # change Autopsy value "Unknown" to null


In [None]:
df_strat = df_strat.withColumn("hispanic", replace(col("hispanic"), 996)) # change Hispanic Origin value "Unknown" to null
df_strat = df_strat.withColumn("hispanic", replace(col("hispanic"), 997)) # change Hispanic Origin value "Unknown" to null
df_strat = df_strat.withColumn("hispanic", replace(col("hispanic"), 998)) # change Hispanic Origin value "Unknown" to null
df_strat = df_strat.withColumn("hispanic", replace(col("hispanic"), 999)) # change Hispanic Origin value "Unknown" to null
df_strat = df_strat.withColumn("hispanic_race_recode", replace(col("hispanic_race_recode"), 9)) # change Hispanic Origin value "Unknown" to null

In [None]:
df_strat.show(3)

In [None]:
# resaving cleaned data
df_strat.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('stratified_csvs_cleaned') 

## Test/Train Split

In [None]:
# Read in cleaned stratified data

df_strat = spark.read.option("header",True).option("inferSchema",True).csv("stratified_csvs_cleaned")

In [None]:
from pyspark.sql.functions import col

train_data = df_strat.sampleBy("death_year", fractions = {2014: 0.8, 2015: 0.8, 2016: 0.8, 2017: 0.8, 2018: 0.8}, seed = 212)
test_data = df_strat.subtract(train_data)

In [None]:
train_data.count()

2200579

In [None]:
test_data.count()

550650

In [None]:
# saving test and train sets to disk
train_data.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('train_data')
test_data.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('test_data')