# Data Cleaning/Processing

#### Pyspark Imports


In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

#### Define Spark Context

In [2]:
sc = pyspark.SparkContext(appName="project")
sqlContext = SQLContext(sc)

#### Function to load data

In [3]:
df = sqlContext.read.option("header",True).csv("us-counties.csv")
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- geoid: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- cases_avg: string (nullable = true)
 |-- cases_avg_per_100k: string (nullable = true)
 |-- deaths: string (nullable = true)
 |-- deaths_avg: string (nullable = true)
 |-- deaths_avg_per_100k: string (nullable = true)



In [5]:
df1 = sqlContext.read.option("header",True).csv("us_state_vaccinations.csv")
df1.printSchema()

root
 |-- date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- total_vaccinations: string (nullable = true)
 |-- total_distributed: string (nullable = true)
 |-- people_vaccinated: string (nullable = true)
 |-- people_fully_vaccinated_per_hundred: string (nullable = true)
 |-- total_vaccinations_per_hundred: string (nullable = true)
 |-- people_fully_vaccinated: string (nullable = true)
 |-- people_vaccinated_per_hundred: string (nullable = true)
 |-- distributed_per_hundred: string (nullable = true)
 |-- daily_vaccinations_raw: string (nullable = true)
 |-- daily_vaccinations: string (nullable = true)
 |-- daily_vaccinations_per_million: string (nullable = true)
 |-- share_doses_used: string (nullable = true)



#### Filter empty rows (us-counties.csv)

In [6]:
df = df.filter("date is not NULL")
df = df.filter("county is not NULL")
df = df.filter("state is not NULL")
df = df.filter("cases is not NULL")
df = df.filter("deaths is not NULL")

In [7]:
df = df.filter(df["date"] != '')
df = df.filter(df["county"] != '')
df = df.filter(df["state"] != '')
df = df.filter(df["cases"] != '')
df = df.filter(df["deaths"] != '')

In [8]:
df = df.withColumn("cases",df.cases.cast('integer'))\
    .withColumn("deaths",df.deaths.cast('integer'))\

In [10]:
df = df.select('date', 'county', 'state', 'cases', 'deaths')
print(df.count())
df.printSchema()
df.show()

1251735
root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)

+----------+-----------+----------+-----+------+
|      date|     county|     state|cases|deaths|
+----------+-----------+----------+-----+------+
|2020-01-21|  Snohomish|Washington|    1|     0|
|2020-01-22|  Snohomish|Washington|    0|     0|
|2020-01-23|  Snohomish|Washington|    0|     0|
|2020-01-24|  Snohomish|Washington|    0|     0|
|2020-01-24|       Cook|  Illinois|    1|     0|
|2020-01-25|  Snohomish|Washington|    0|     0|
|2020-01-25|       Cook|  Illinois|    0|     0|
|2020-01-25|     Orange|California|    1|     0|
|2020-01-26|  Snohomish|Washington|    0|     0|
|2020-01-26|       Cook|  Illinois|    0|     0|
|2020-01-26|     Orange|California|    0|     0|
|2020-01-26|Los Angeles|California|    1|     0|
|2020-01-26|   Maricopa|   Arizona|    1|     0|
|2020-01-27|  

#### Convert to state-level statistics (us-counties.csv)

In [11]:
df2 = df.groupBy('state', 'date').agg(sum('cases').alias('cases'), sum('deaths').alias('deaths'))
print(df2.count())
df2.show()

22620
+--------------+----------+-----+------+
|         state|      date|cases|deaths|
+--------------+----------+-----+------+
|        Oregon|2020-02-28|    1|     0|
| Massachusetts|2020-03-10|   51|     0|
|    Washington|2020-03-25|  116|     7|
|         Texas|2020-03-26|  410|     8|
|        Oregon|2020-03-29|   69|     0|
|     Louisiana|2020-04-03| 1147|    60|
|          Guam|2020-04-03|    2|     1|
|    New Mexico|2020-04-05|   80|     1|
|   Puerto Rico|2020-04-13|    6|     1|
|         Texas|2020-04-13|  711|    32|
|      Oklahoma|2020-04-18|  105|     3|
|     Wisconsin|2020-05-02|  346|     7|
|          Iowa|2020-05-04|  534|     4|
|     Tennessee|2020-05-05|  272|     9|
|Virgin Islands|2020-05-12|    0|     1|
|  Rhode Island|2020-05-26|  145|    26|
|       Georgia|2020-06-03|  648|    22|
|          Ohio|2020-06-11|  429|    33|
|       Georgia|2020-06-12|  869|    47|
|     Tennessee|2020-06-13|  398|     4|
+--------------+----------+-----+------+
only showi

#### Filter empty rows (us_state_vaccinations.csv)

In [12]:
df1 = df1.filter("date is not NULL")
df1 = df1.filter("location is not NULL")
df1 = df1.filter("total_vaccinations is not NULL")
df1 = df1.filter("total_distributed is not NULL")
df1 = df1.filter("people_vaccinated is not NULL")
df1 = df1.filter("people_fully_vaccinated_per_hundred is not NULL")
df1 = df1.filter("total_vaccinations_per_hundred is not NULL")
df1 = df1.filter("people_fully_vaccinated is not NULL")
df1 = df1.filter("people_vaccinated_per_hundred is not NULL")
df1 = df1.filter("distributed_per_hundred is not NULL")
df1 = df1.filter("daily_vaccinations_raw is not NULL")
df1 = df1.filter("daily_vaccinations is not NULL")
df1 = df1.filter("daily_vaccinations_per_million is not NULL")
df1 = df1.filter("share_doses_used is not NULL")

In [13]:
df1 = df1.filter(df1["date"] != '')
df1 = df1.filter(df1["location"] != '')
df1 = df1.filter(df1["total_vaccinations"] != '')
df1 = df1.filter(df1["total_distributed"] != '')
df1 = df1.filter(df1["people_vaccinated"] != '')
df1 = df1.filter(df1["people_fully_vaccinated_per_hundred"] != '')
df1 = df1.filter(df1["total_vaccinations_per_hundred"] != '')
df1 = df1.filter(df1["people_fully_vaccinated"] != '')
df1 = df1.filter(df1["people_vaccinated_per_hundred"] != '')
df1 = df1.filter(df1["distributed_per_hundred"] != '')
df1 = df1.filter(df1["daily_vaccinations_raw"] != '')
df1 = df1.filter(df1["daily_vaccinations"] != '')
df1 = df1.filter(df1["daily_vaccinations_per_million"] != '')
df1 = df1.filter(df1["share_doses_used"] != '')

#### Export to csv files

In [14]:
df.toPandas().to_csv("us-counties-clean.csv", index=False)
df2.toPandas().to_csv("us-states-clean.csv", index=False)
df1.toPandas().to_csv("us_state_vaccinations_clean.csv", index=False)