## PySpark pipeline to clean up the COVID-19 dataset in AWS Glue 

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, when, to_date

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1583234775893_0008,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Loading the table created by the Glue Crawler after scanning S3 

In [3]:
ds0 = glueContext.create_dynamic_frame.from_catalog(database = "covid19", 
                                                    table_name = "pochetti_covid_19_input", 
                                                    transformation_ctx = "ds0")
ds0 = ds0.select_fields(['Province-State', 'Country-Region', 'Lat', 'Long', 'Date', 'Confirmed', 'Deaths', 'Recovered', 'id'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
ds0.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- Province-State: string
|-- Country-Region: string
|-- Lat: double
|-- Long: double
|-- Date: string
|-- Confirmed: choice
|    |-- int
|    |-- string
|-- Deaths: choice
|    |-- int
|    |-- string
|-- Recovered: choice
|    |-- int
|    |-- string
|-- id: int

In [5]:
df = ds0.toDF()
type(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'pyspark.sql.dataframe.DataFrame'>

In [6]:
df.count(), len(df.columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(34410, 9)

### Example of 3 rows: multiple columns need to be fixed!

In [7]:
df.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------+-------+--------+---------------+---------+------+---------+---+
|Province-State|Country-Region|    Lat|    Long|           Date|Confirmed|Deaths|Recovered| id|
+--------------+--------------+-------+--------+---------------+---------+------+---------+---+
|         Anhui|Mainland China|31.8257|117.2264|     2020-01-22|     [1,]|  [0,]|     [0,]|  0|
|        Shanxi|Mainland China|37.5777|112.2922|    2020-Jan-22|     [1,]| [, o]|     [0,]| 24|
|       missing|     Singapore| 1.2833|103.8333|2020/January/22|     [0,]|  [0,]| [, zero]| 40|
+--------------+--------------+-------+--------+---------------+---------+------+---------+---+
only showing top 3 rows

In [8]:
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
df = flatten_df(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Cleaning up the dataset

In [10]:
df = df.select(
    when(df['Province-State'].isin({'NULL', '', 'missing', '--'}), None)
    .otherwise(df['Province-State']).alias('Province-State'), 
    
    'Country-Region',
    'Lat',
    'Long',
    
    when(df['Recovered_int'].isNull(), 0).otherwise(df['Recovered_int']).alias('Recovered'),
    when(df['Confirmed_int'].isNull(), 0).otherwise(df['Confirmed_int']).alias('Confirmed'),
    when(df['Deaths_int'].isNull(), 0).otherwise(df['Deaths_int']).alias('Deaths'),
    
    when(to_date(col("Date"),"yyyy-MM-dd").isNotNull(), 
         to_date(col("Date"),"yyyy-MM-dd"))
    .when(to_date(col("Date"),"yyyy/MM/dd").isNotNull(),
          to_date(col("Date"),"yyyy/MM/dd"))
    .when(to_date(col("Date"),"yyyy-MMM-dd").isNotNull(),
          to_date(col("Date"),"yyyy-MMM-dd"))    
    .when(to_date(col("Date"),"yyyy/MMMM/dd").isNotNull(),
          to_date(col("Date"),"yyyy/MMMM/dd"))    
    .when(to_date(col("Date"),"yyyy, MMMM, dd").isNotNull(),
          to_date(col("Date"),"yyyy, MMMM, dd"))
    .otherwise("Unknown Format").alias("Date"),
    
    'id').dropDuplicates()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### This looks much better!

In [11]:
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------+---------+---------+---------+------+----------+----+
|      Province-State|Country-Region|     Lat|     Long|Recovered|Confirmed|Deaths|      Date|  id|
+--------------------+--------------+--------+---------+---------+---------+------+----------+----+
|          Orange, CA|            US| 33.7879|-117.8531|        0|        1|     0|2020-02-06|1922|
|            Xinjiang|Mainland China| 41.1129|  85.2401|        0|        2|     0|2020-01-23| 153|
|              Taiwan|        Taiwan|    23.7|    121.0|        1|       16|     0|2020-02-07|2034|
|              Taiwan|        Taiwan|    23.7|    121.0|        0|        1|     0|2020-01-22|  34|
|Snohomish County, WA|            US|  48.033|-121.8339|        0|        0|     0|2020-02-15|3118|
|                null|       Belgium| 50.8333|      4.0|        1|        1|     0|2020-02-21|3819|
|                null|        Brazil| -14.235| -51.9253|        0|        0|     0|2020-01-25| 470|
