In [1]:
import os
import sys
import numpy as np
import pandas as pd
import boto3
from time import time, gmtime, sleep, strftime, strptime
from datetime import datetime, timedelta
import gc
from IPython.core.display import display

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import BooleanType, DateType, TimestampType
from pyspark.sql.types import LongType, StructType, StructField


from awsglue.context import GlueContext
from awsglue.job import Job

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
7,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc = SparkSession.builder.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### Check Glue Databases

In [3]:
spark.sql("SHOW DATABASES").collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(namespace='`covid-test-db`'), Row(namespace='`covid-test-db-new`'), Row(namespace='default')]

In [4]:
spark.sql("USE `covid-test-db`")
df = spark.sql("SHOW TABLES")
df = df.withColumn("isTemporary", F.col("isTemporary").cast("int"))
df.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

          namespace                tableName  isTemporary
0   `covid-test-db`      json_states_current            0
1   `covid-test-db`        json_states_daily            0
2   `covid-test-db`         json_states_info            0
3   `covid-test-db`  json_states_screenshots            0
4   `covid-test-db`          json_us_current            0
5   `covid-test-db`            json_us_daily            0
6   `covid-test-db`           states_current            0
7   `covid-test-db`             states_daily            0
8   `covid-test-db`              states_info            0
9   `covid-test-db`       states_screenshots            0
10  `covid-test-db`               us_current            0
11  `covid-test-db`                 us_daily            0

In [5]:
del df
gc.collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

62

#### Read CSV Files from s3 bucket and save as Parquet files

In [6]:
def get_df_from_s3(
    csv_file_name:str,
    schema:T.StructType,
    s3_path:str ="s3://covid-dataset-rakesh/raw-data/dataset"
):
    """
    Reads a csv file saved in s3 to a PySpark dataframe and returns the same.

    Parameters
     - csv_file_name: File name (object name) of the csv fike saved in s3. Example: data.csv
     - schema: PySpark schema to be used to read the csv file.
     - s3_path: s3 URI of the parent folder containing the file.
    """    
    df = (spark.read
          .format("csv")
          .option("header", "true")
          .schema(schema)
          .load(f"{s3_path}/{csv_file_name}"))
    return df

def display_df(df:pyspark.sql.dataframe.DataFrame, n_rows=5):
    "Displays 5 rows (default) and all columns of a pyspark dataframe df"
    with pd.option_context("display.max_rows", n_rows, "display.max_columns", None):
        display(df.limit(n_rows).toPandas())


def get_dynamicframe_from_s3(
    csv_file_name:str,
    s3_path:str ="s3://covid-dataset-rakesh/raw-data/dataset"
):
    """
    Reads a csv file saved in s3 to a dynamicframe and returns the same.

    Parameters
     - csv_file_name: File name (object name) of the csv fike saved in s3. Example: data.csv
     - s3_path: s3 URI of the parent folder containing the file.
    """
    dynamic_frame = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={"paths": [f"{s3_path}/{csv_file_name}"]},
        format="csv",
        format_options={
            "withHeader": True,
            "optimizePerformance": False,
        },
    )
    return dynamic_frame

def show_dynamic_frame(
    csv_file_name:str,
    n_rows: int = 1,
    s3_path:str ="s3://covid-dataset-rakesh/raw-data/dataset"
):
    """
    Reads a csv file saved in s3 to a dynamicframe and displays the first record (default, n_rows=1).

    Parameters
     - csv_file_name: File name (object name) of the csv fike saved in s3. Example: data.csv
     - n_rows: The number of row from the csv file to be read into the dynamic frame (default: 1) 
     - s3_path: s3 URI of the parent folder containing the file.
    """
    dynamic_frame = get_dynamicframe_from_s3(csv_file_name, s3_path)
    dynamic_frame.show(n_rows)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
s3 = boto3.client('s3')
bucket_name = "covid-dataset-rakesh"
folder_name = "cleaned-data"
# s3.put_object(Bucket=bucket_name, Key=(folder_name+'/'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1. states_current.csv

In [8]:
csv_file_name = "states_current.csv"
parquet_file_name = csv_file_name.split(".")[0]+".parquet"
folder_name = f"cleaned-data/{csv_file_name.split('.')[0]}"
print(f"Folder name: {folder_name}\nParquet file name: {parquet_file_name}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Folder name: cleaned-data/states_current
Parquet file name: states_current.parquet

In [18]:
# Create folder in s3
s3.put_object(Bucket=bucket_name, Key=(folder_name+"/"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': 'CWSP0DQ45CJKNBQS', 'HostId': 'YdmCqBuLCmppieJiuCb039pGLdNr7o3B2325zPC0bubTd030oMSzhi9eMLaeY2mxhd/1ROfKlt0=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'YdmCqBuLCmppieJiuCb039pGLdNr7o3B2325zPC0bubTd030oMSzhi9eMLaeY2mxhd/1ROfKlt0=', 'x-amz-request-id': 'CWSP0DQ45CJKNBQS', 'date': 'Thu, 15 Jun 2023 05:39:28 GMT', 'x-amz-server-side-encryption': 'AES256', 'etag': '"d41d8cd98f00b204e9800998ecf8427e"', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"', 'ServerSideEncryption': 'AES256'}

In [10]:
show_dynamic_frame(csv_file_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"date": "20210307", "state": "AK", "positive": "56886", "probableCases": "", "negative": "", "pending": "", "totalTestResultsSource": "totalTestsViral", "totalTestResults": "1731628", "hospitalizedCurrently": "33", "hospitalizedCumulative": "1293", "inIcuCurrently": "", "inIcuCumulative": "", "onVentilatorCurrently": "2", "onVentilatorCumulative": "", "recovered": "", "lastUpdateEt": "3/5/2021 03:59", "dateModified": "2021-03-05T03:59:00Z", "checkTimeEt": "03/04 22:59", "death": "305", "hospitalized": "1293", "hospitalizedDischarged": "", "dateChecked": "2021-03-05T03:59:00Z", "totalTestsViral": "1731628", "positiveTestsViral": "68693", "negativeTestsViral": "1660758", "positiveCasesViral": "", "deathConfirmed": "", "deathProbable": "", "totalTestEncountersViral": "", "totalTestsPeopleViral": "", "totalTestsAntibody": "", "positiveTestsAntibody": "", "negativeTestsAntibody": "", "totalTestsPeopleAntibody": "", "positiveTestsPeopleAntibody": "", "negativeTestsPeopleAntibody": "", "tota

In [11]:
schema = StructType([
    StructField("date", StringType(), True),
    StructField("state", StringType(), True),
    StructField("positive", IntegerType(), True),
    StructField("probableCases", IntegerType(), True),
    StructField("negative", IntegerType(), True),
    StructField("pending", IntegerType(), True),
    StructField("totalTestResultsSource", StringType(), True),
    StructField("totalTestResults", IntegerType(), True),
    StructField("hospitalizedCurrently", IntegerType(), True),
    StructField("hospitalizedCumulative", IntegerType(), True),
    StructField("inIcuCurrently", IntegerType(), True),
    StructField("inIcuCumulative", IntegerType(), True),
    StructField("onVentilatorCurrently", IntegerType(), True),
    StructField("onVentilatorCumulative", IntegerType(), True),
    StructField("recovered", IntegerType(), True),
    StructField("lastUpdateEt", StringType(), True),
    StructField("dateModified", StringType(), True),
    StructField("checkTimeEt", StringType(), True),
    StructField("death", IntegerType(), True),
    StructField("hospitalized", IntegerType(), True),
    StructField("hospitalizedDischarged", IntegerType(), True),
    StructField("dateChecked", StringType(), True),
    StructField("totalTestsViral", LongType(), True),
    StructField("positiveTestsViral", LongType(), True),
    StructField("negativeTestsViral", LongType(), True),
    StructField("positiveCasesViral", IntegerType(), True),
    StructField("deathConfirmed", IntegerType(), True),
    StructField("deathProbable", IntegerType(), True),
    StructField("totalTestEncountersViral", IntegerType(), True),
    StructField("totalTestsPeopleViral", IntegerType(), True),
    StructField("totalTestsAntibody", IntegerType(), True),
    StructField("positiveTestsAntibody", IntegerType(), True),
    StructField("negativeTestsAntibody", IntegerType(), True),
    StructField("totalTestsPeopleAntibody", IntegerType(), True),
    StructField("positiveTestsPeopleAntibody", IntegerType(), True),
    StructField("negativeTestsPeopleAntibody", IntegerType(), True),
    StructField("totalTestsPeopleAntigen", IntegerType(), True),
    StructField("positiveTestsPeopleAntigen", IntegerType(), True),
    StructField("totalTestsAntigen", IntegerType(), True),
    StructField("positiveTestsAntigen", IntegerType(), True),
    StructField("fips", IntegerType(), True),
    StructField("positiveIncrease", IntegerType(), True),
    StructField("negativeIncrease", IntegerType(), True),
    StructField("total", IntegerType(), True),
    StructField("totalTestResultsIncrease", IntegerType(), True),
    StructField("posNeg", IntegerType(), True),
    StructField("dataQualityGrade", StringType(), True),
    StructField("deathIncrease", IntegerType(), True),
    StructField("hospitalizedIncrease", IntegerType(), True),
    StructField("hash", StringType(), True),
    StructField("commercialScore", IntegerType(), True),
    StructField("negativeRegularScore", IntegerType(), True),
    StructField("negativeScore", IntegerType(), True),
    StructField("positiveScore", IntegerType(), True),
    StructField("score", IntegerType(), True),
    StructField("grade", IntegerType(), True),
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
df = get_df_from_s3(csv_file_name, schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Validate the data read
assert df.count() == 56, f"No. of rows in {csv_file_name} is not matching!"
assert len(df.columns) == 56, f"No. of columns in {csv_file_name} is not matching!"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
df = (df
      .withColumn("date", F.to_date(F.col("date"), "yyyyMMdd"))
      .withColumn("lastUpdateEt", F.to_timestamp(F.col("lastUpdateEt"), "M/d/yyyy HH:mm"))
      .withColumn("dateModified", F.to_timestamp(F.col("dateModified"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
      .withColumn("dateChecked", F.to_timestamp(F.col("dateChecked"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
     )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- positive: integer (nullable = true)
 |-- probableCases: integer (nullable = true)
 |-- negative: integer (nullable = true)
 |-- pending: integer (nullable = true)
 |-- totalTestResultsSource: string (nullable = true)
 |-- totalTestResults: integer (nullable = true)
 |-- hospitalizedCurrently: integer (nullable = true)
 |-- hospitalizedCumulative: integer (nullable = true)
 |-- inIcuCurrently: integer (nullable = true)
 |-- inIcuCumulative: integer (nullable = true)
 |-- onVentilatorCurrently: integer (nullable = true)
 |-- onVentilatorCumulative: integer (nullable = true)
 |-- recovered: integer (nullable = true)
 |-- lastUpdateEt: timestamp (nullable = true)
 |-- dateModified: timestamp (nullable = true)
 |-- checkTimeEt: string (nullable = true)
 |-- death: integer (nullable = true)
 |-- hospitalized: integer (nullable = true)
 |-- hospitalizedDischarged: integer (nullable = true)
 |-- dateChecked: times

In [19]:
df.write.parquet(f"s3://{bucket_name}/{folder_name}/{parquet_file_name}", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2. states_daily.csv

In [20]:
csv_file_name = "states_daily.csv"
parquet_file_name = csv_file_name.split(".")[0]+".parquet"
folder_name = f"cleaned-data/{csv_file_name.split('.')[0]}"
print(f"Folder name: {folder_name}\nParquet file name: {parquet_file_name}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Folder name: cleaned-data/states_daily
Parquet file name: states_daily.parquet

In [21]:
# Create folder in s3
s3.put_object(Bucket=bucket_name, Key=(folder_name+"/"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': 'TJE445F4CFVHHPCC', 'HostId': '+rHJxkS23rryKKmmaglBKAM9IJk26bnln9j24Be2+5P8XqhKhNUrhI++9TYWPHk2P0zX5VC0bec=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': '+rHJxkS23rryKKmmaglBKAM9IJk26bnln9j24Be2+5P8XqhKhNUrhI++9TYWPHk2P0zX5VC0bec=', 'x-amz-request-id': 'TJE445F4CFVHHPCC', 'date': 'Thu, 15 Jun 2023 05:40:54 GMT', 'x-amz-server-side-encryption': 'AES256', 'etag': '"d41d8cd98f00b204e9800998ecf8427e"', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"', 'ServerSideEncryption': 'AES256'}

In [22]:
show_dynamic_frame(csv_file_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"date": "20210307", "state": "AK", "positive": "56886", "probableCases": "", "negative": "", "pending": "", "totalTestResultsSource": "totalTestsViral", "totalTestResults": "1731628", "hospitalizedCurrently": "33", "hospitalizedCumulative": "1293", "inIcuCurrently": "", "inIcuCumulative": "", "onVentilatorCurrently": "2", "onVentilatorCumulative": "", "recovered": "", "lastUpdateEt": "3/5/2021 03:59", "dateModified": "2021-03-05T03:59:00Z", "checkTimeEt": "03/04 22:59", "death": "305", "hospitalized": "1293", "hospitalizedDischarged": "", "dateChecked": "2021-03-05T03:59:00Z", "totalTestsViral": "1731628", "positiveTestsViral": "68693", "negativeTestsViral": "1660758", "positiveCasesViral": "", "deathConfirmed": "", "deathProbable": "", "totalTestEncountersViral": "", "totalTestsPeopleViral": "", "totalTestsAntibody": "", "positiveTestsAntibody": "", "negativeTestsAntibody": "", "totalTestsPeopleAntibody": "", "positiveTestsPeopleAntibody": "", "negativeTestsPeopleAntibody": "", "tota

In [23]:
# states_daily has the same schema as states_current
df = get_df_from_s3(csv_file_name, schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# Validate the data read
assert df.count() == 20780, f"No. of rows in {csv_file_name} is not matching!"
assert len(df.columns) == 56, f"No. of columns in {csv_file_name} is not matching!"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
df = (df
      .withColumn("date", F.to_date(F.col("date"), "yyyyMMdd"))
      .withColumn("lastUpdateEt", F.to_timestamp(F.col("lastUpdateEt"), "M/d/yyyy HH:mm"))
      .withColumn("dateModified", F.to_timestamp(F.col("dateModified"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
      .withColumn("dateChecked", F.to_timestamp(F.col("dateChecked"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
     )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- positive: integer (nullable = true)
 |-- probableCases: integer (nullable = true)
 |-- negative: integer (nullable = true)
 |-- pending: integer (nullable = true)
 |-- totalTestResultsSource: string (nullable = true)
 |-- totalTestResults: integer (nullable = true)
 |-- hospitalizedCurrently: integer (nullable = true)
 |-- hospitalizedCumulative: integer (nullable = true)
 |-- inIcuCurrently: integer (nullable = true)
 |-- inIcuCumulative: integer (nullable = true)
 |-- onVentilatorCurrently: integer (nullable = true)
 |-- onVentilatorCumulative: integer (nullable = true)
 |-- recovered: integer (nullable = true)
 |-- lastUpdateEt: timestamp (nullable = true)
 |-- dateModified: timestamp (nullable = true)
 |-- checkTimeEt: string (nullable = true)
 |-- death: integer (nullable = true)
 |-- hospitalized: integer (nullable = true)
 |-- hospitalizedDischarged: integer (nullable = true)
 |-- dateChecked: times

In [27]:
df.write.parquet(f"s3://{bucket_name}/{folder_name}/{parquet_file_name}", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3. states_info.csv

In [28]:
csv_file_name = "states_info.csv"
parquet_file_name = csv_file_name.split(".")[0]+".parquet"
folder_name = f"cleaned-data/{csv_file_name.split('.')[0]}"
print(f"Folder name: {folder_name}\nParquet file name: {parquet_file_name}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Folder name: cleaned-data/states_info
Parquet file name: states_info.parquet

In [29]:
# Create folder in s3
s3.put_object(Bucket=bucket_name, Key=(folder_name+"/"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': 'WMQ7B68HFQRPVFH9', 'HostId': 'zGz8u3VE9SShOQoM9YY4d5m1oIzS7YKgTUm1ldyHJH2mORELU9LxxLlHlYZd0NkV5GyvrI+J8xj7rJjiNrCZ+Z+9E2oPmJHF', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'zGz8u3VE9SShOQoM9YY4d5m1oIzS7YKgTUm1ldyHJH2mORELU9LxxLlHlYZd0NkV5GyvrI+J8xj7rJjiNrCZ+Z+9E2oPmJHF', 'x-amz-request-id': 'WMQ7B68HFQRPVFH9', 'date': 'Thu, 15 Jun 2023 05:44:11 GMT', 'x-amz-server-side-encryption': 'AES256', 'etag': '"d41d8cd98f00b204e9800998ecf8427e"', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"', 'ServerSideEncryption': 'AES256'}

In [30]:
show_dynamic_frame(csv_file_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"state": "AK", "notes": "Alaska combines PCR and antigen tests in the total tests figure reported on the state's dashboard.

As of February 6, 2021, [Alaska no longer updates](https://twitter.com/alaska_dhss/status/1355349178335944708?s=21) their COVID-19 dashboards on weekends. As a result, we will be unable to update their data on Saturdays and Sundays. 

On February 12, 2021, Alaska [announced](https://twitter.com/Alaska_DHSS/status/1360413364065767424?s=20) via the official Alaska Department of Health and Social Services twitter that there would be no update to their data on February 15, 2021 due to the Presidents Day holiday.

On January 20, 2021, Alaska [reported](https://www.adn.com/alaska-news/2021/01/20/tracking-covid-19-in-alaska-record-23-deaths-and-167-infections-reported-wednesday/) a comparatively large increase of 23 deaths due to a review of death certificates.

On January 4, 2020, Alaska noted that \"counted deaths in Alaska include COVID-19 cases confirmed through a 

In [31]:
schema = StructType([
    StructField("state", StringType(), True),
    StructField("notes", StringType(), True),
    StructField("covid19Site", StringType(), True),
    StructField("covid19SiteSecondary", StringType(), True),
    StructField("covid19SiteTertiary", StringType(), True),
    StructField("covid19SiteQuaternary", StringType(), True),
    StructField("covid19SiteQuinary", StringType(), True),
    StructField("twitter", StringType(), True),
    StructField("covid19SiteOld", StringType(), True),
    StructField("covidTrackingProjectPreferredTotalTestUnits", StringType(), True),
    StructField("covidTrackingProjectPreferredTotalTestField", StringType(), True),
    StructField("totalTestResultsField", StringType(), True),
    StructField("pui", StringType(), True),
    StructField("pum", StringType(), True),
    StructField("name", StringType(), True),
    StructField("fips", IntegerType(), True)
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The original states_info.csv file had line breaks and double quotes in notes column. This was causing problem when we read the data into spark dataframe as spark was not able to identify the end of each record correctly. To mitigate this problem, we enclosed the values in notes column in double quotes and escaped " within the text with "". The updated file is uploaded with the name "states_info_corrected.csv". We read the corrected file here. 

In [32]:
df = (spark.read
      .format("csv")
      .option("header", "true")
      .option("multiLine", "true")
      .option("escape", '"')
      .schema(schema)
      .load("s3://covid-dataset-rakesh/ordered-data/dataset/csv/states_info/states_info_corrected.csv"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
# Validate the data read
assert df.count() == 56, f"No. of rows in {csv_file_name} is not matching!"
assert len(df.columns) == 16, f"No. of columns in {csv_file_name} is not matching!"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
# Check 1 records
df.limit(1).collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(state='AK', notes='"Alaska combines PCR and antigen tests in the total tests figure reported on the state\'s dashboard.\n\nAs of February 6, 2021, [Alaska no longer updates](https://twitter.com/alaska_dhss/status/1355349178335944708?s=21) their COVID-19 dashboards on weekends. As a result, we will be unable to update their data on Saturdays and Sundays. \n\nOn February 12, 2021, Alaska [announced](https://twitter.com/Alaska_DHSS/status/1360413364065767424?s=20) via the official Alaska Department of Health and Social Services twitter that there would be no update to their data on February 15, 2021 due to the Presidents Day holiday.\n\nOn January 20, 2021, Alaska [reported](https://www.adn.com/alaska-news/2021/01/20/tracking-covid-19-in-alaska-record-23-deaths-and-167-infections-reported-wednesday/) a comparatively large increase of 23 deaths due to a review of death certificates.\n\nOn January 4, 2020, Alaska noted that ""counted deaths in Alaska include COVID-19 cases confirmed th

In [35]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- state: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- covid19Site: string (nullable = true)
 |-- covid19SiteSecondary: string (nullable = true)
 |-- covid19SiteTertiary: string (nullable = true)
 |-- covid19SiteQuaternary: string (nullable = true)
 |-- covid19SiteQuinary: string (nullable = true)
 |-- twitter: string (nullable = true)
 |-- covid19SiteOld: string (nullable = true)
 |-- covidTrackingProjectPreferredTotalTestUnits: string (nullable = true)
 |-- covidTrackingProjectPreferredTotalTestField: string (nullable = true)
 |-- totalTestResultsField: string (nullable = true)
 |-- pui: string (nullable = true)
 |-- pum: string (nullable = true)
 |-- name: string (nullable = true)
 |-- fips: integer (nullable = true)

In [36]:
df.write.parquet(f"s3://{bucket_name}/{folder_name}/{parquet_file_name}", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4. states_screenshots.csv

In [38]:
csv_file_name = "states_screenshots.csv"
parquet_file_name = csv_file_name.split(".")[0]+".parquet"
folder_name = f"cleaned-data/{csv_file_name.split('.')[0]}"
print(f"Folder name: {folder_name}\nParquet file name: {parquet_file_name}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Folder name: cleaned-data/states_screenshots
Parquet file name: states_screenshots.parquet

In [39]:
# Create folder in s3
s3.put_object(Bucket=bucket_name, Key=(folder_name+"/"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': '44JHWD3NZK85T0VK', 'HostId': 'hn5FnQke6jCFBH7RmDLiejpyLuc9c0UEnOPrQ0/u4m8hlctOvXm0L5TOLRzCW8IeZFL9YfW+HNE=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'hn5FnQke6jCFBH7RmDLiejpyLuc9c0UEnOPrQ0/u4m8hlctOvXm0L5TOLRzCW8IeZFL9YfW+HNE=', 'x-amz-request-id': '44JHWD3NZK85T0VK', 'date': 'Thu, 15 Jun 2023 05:49:38 GMT', 'x-amz-server-side-encryption': 'AES256', 'etag': '"d41d8cd98f00b204e9800998ecf8427e"', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"', 'ServerSideEncryption': 'AES256'}

In [40]:
show_dynamic_frame(csv_file_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"state": "AK", "url": "https://covidtracking.com/screenshots/AK/AK-20200315-021315.png", "secondary": "false", "tertiary": "false", "dateChecked": "2020-03-15T06:13:15.000Z", "date": "20200315", "size": "563460"}

In [41]:
schema = StructType([
    StructField("state", StringType(), True),
    StructField("url", StringType(), True),
    StructField("secondary", StringType(), True),
    StructField("tertiary", StringType(), True),
    StructField("dateChecked", StringType(), True),
    StructField("date", StringType(), True),
    StructField("size", IntegerType(), True)
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
df = get_df_from_s3(csv_file_name, schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# Validate the data read
assert df.count() == 67106, f"No. of rows in {csv_file_name} is not matching!"
assert len(df.columns) == 7, f"No. of columns in {csv_file_name} is not matching!"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- state: string (nullable = true)
 |-- url: string (nullable = true)
 |-- secondary: string (nullable = true)
 |-- tertiary: string (nullable = true)
 |-- dateChecked: string (nullable = true)
 |-- date: string (nullable = true)
 |-- size: integer (nullable = true)

In [45]:
df = (df
      .withColumn("date", F.to_date(F.col("date"), "yyyyMMdd"))
      .withColumn("dateChecked", F.to_timestamp(F.col("dateChecked"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
     )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- state: string (nullable = true)
 |-- url: string (nullable = true)
 |-- secondary: string (nullable = true)
 |-- tertiary: string (nullable = true)
 |-- dateChecked: timestamp (nullable = true)
 |-- date: date (nullable = true)
 |-- size: integer (nullable = true)

In [47]:
df.limit(3).collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(state='AK', url='https://covidtracking.com/screenshots/AK/AK-20200315-021315.png', secondary='false', tertiary='false', dateChecked=None, date=datetime.date(2020, 3, 15), size=563460), Row(state='AK', url='https://covidtracking.com/screenshots/AK/AK-20200315-163225.png', secondary='false', tertiary='false', dateChecked=None, date=datetime.date(2020, 3, 15), size=432003), Row(state='AK', url='https://covidtracking.com/screenshots/AK/AK-20200316-105336.png', secondary='false', tertiary='false', dateChecked=None, date=datetime.date(2020, 3, 16), size=563460)]

In [48]:
df.write.parquet(f"s3://{bucket_name}/{folder_name}/{parquet_file_name}", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5. us_current.csv

In [49]:
csv_file_name = "us_current.csv"
parquet_file_name = csv_file_name.split(".")[0]+".parquet"
folder_name = f"cleaned-data/{csv_file_name.split('.')[0]}"
print(f"Folder name: {folder_name}\nParquet file name: {parquet_file_name}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Folder name: cleaned-data/us_current
Parquet file name: us_current.parquet

In [50]:
# Create folder in s3
s3.put_object(Bucket=bucket_name, Key=(folder_name+"/"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': 'DM2FEXHZQ193K20E', 'HostId': 'vcDb+v4lDbaoDrbma+ySfCYKG4L2ITxjH8R9+V5OXHRSt6mCHHPK2pXM4JkxYbTtZPeIj2G0hgA=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'vcDb+v4lDbaoDrbma+ySfCYKG4L2ITxjH8R9+V5OXHRSt6mCHHPK2pXM4JkxYbTtZPeIj2G0hgA=', 'x-amz-request-id': 'DM2FEXHZQ193K20E', 'date': 'Thu, 15 Jun 2023 05:51:57 GMT', 'x-amz-server-side-encryption': 'AES256', 'etag': '"d41d8cd98f00b204e9800998ecf8427e"', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"', 'ServerSideEncryption': 'AES256'}

In [51]:
show_dynamic_frame(csv_file_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"date": "20210307", "states": "56", "positive": "28755524", "negative": "74579770", "pending": "11808", "hospitalizedCurrently": "40212", "hospitalizedCumulative": "878613", "inIcuCurrently": "8137", "inIcuCumulative": "45475", "onVentilatorCurrently": "2801", "onVentilatorCumulative": "4281", "dateChecked": "2021-03-07T24:00:00Z", "death": "515142", "hospitalized": "878613", "totalTestResults": "363789451", "lastModified": "2021-03-07T24:00:00Z", "recovered": "", "total": "0", "posNeg": "0", "deathIncrease": "839", "hospitalizedIncrease": "726", "negativeIncrease": "130414", "positiveIncrease": "41265", "totalTestResultsIncrease": "1156241", "hash": "8b26839690cd05c0cef69cb9ed85641a76b5e78e"}

In [52]:
schema = StructType([
    StructField("date", StringType(), True),
    StructField("states", StringType(), True),
    StructField("positive", LongType(), True),
    StructField("negative", LongType(), True),
    StructField("pending", LongType(), True),
    StructField("hospitalizedCurrently", LongType(), True),
    StructField("hospitalizedCumulative", LongType(), True),
    StructField("inIcuCurrently", IntegerType(), True),
    StructField("inIcuCumulative", LongType(), True),
    StructField("onVentilatorCurrently", IntegerType(), True),
    StructField("onVentilatorCumulative", LongType(), True),
    StructField("dateChecked", StringType(), True),
    StructField("death", IntegerType(), True),
    StructField("hospitalized", LongType(), True),
    StructField("totalTestResults", LongType(), True),
    StructField("lastModified", StringType(), True),
    StructField("recovered", LongType(), True),
    StructField("total", LongType(), True),
    StructField("posNeg", IntegerType(), True),
    StructField("deathIncrease", IntegerType(), True),
    StructField("hospitalizedIncrease", IntegerType(), True),
    StructField("negativeIncrease", LongType(), True),
    StructField("positiveIncrease", IntegerType(), True),
    StructField("totalTestResultsIncrease", LongType(), True),
    StructField("hash", StringType(), True)
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
df = get_df_from_s3(csv_file_name, schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
# Validate the data read
assert df.count() == 1, f"No. of rows in {csv_file_name} is not matching!"
assert len(df.columns) == 25, f"No. of columns in {csv_file_name} is not matching!"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
df = (df
      .withColumn("date", F.to_date(F.col("date"), "yyyyMMdd"))
      .withColumn("dateChecked", F.to_timestamp(F.col("dateChecked"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
      .withColumn("lastModified", F.to_timestamp(F.col("lastModified"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
     )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date: date (nullable = true)
 |-- states: string (nullable = true)
 |-- positive: long (nullable = true)
 |-- negative: long (nullable = true)
 |-- pending: long (nullable = true)
 |-- hospitalizedCurrently: long (nullable = true)
 |-- hospitalizedCumulative: long (nullable = true)
 |-- inIcuCurrently: integer (nullable = true)
 |-- inIcuCumulative: long (nullable = true)
 |-- onVentilatorCurrently: integer (nullable = true)
 |-- onVentilatorCumulative: long (nullable = true)
 |-- dateChecked: timestamp (nullable = true)
 |-- death: integer (nullable = true)
 |-- hospitalized: long (nullable = true)
 |-- totalTestResults: long (nullable = true)
 |-- lastModified: timestamp (nullable = true)
 |-- recovered: long (nullable = true)
 |-- total: long (nullable = true)
 |-- posNeg: integer (nullable = true)
 |-- deathIncrease: integer (nullable = true)
 |-- hospitalizedIncrease: integer (nullable = true)
 |-- negativeIncrease: long (nullable = true)
 |-- positiveIncrease: integer (

In [57]:
df.limit(3).collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(date=datetime.date(2021, 3, 7), states='56', positive=28755524, negative=74579770, pending=11808, hospitalizedCurrently=40212, hospitalizedCumulative=878613, inIcuCurrently=8137, inIcuCumulative=45475, onVentilatorCurrently=2801, onVentilatorCumulative=4281, dateChecked=None, death=515142, hospitalized=878613, totalTestResults=363789451, lastModified=None, recovered=None, total=0, posNeg=0, deathIncrease=839, hospitalizedIncrease=726, negativeIncrease=130414, positiveIncrease=41265, totalTestResultsIncrease=1156241, hash='8b26839690cd05c0cef69cb9ed85641a76b5e78e')]

In [58]:
df.write.parquet(f"s3://{bucket_name}/{folder_name}/{parquet_file_name}", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

6. us_daily.csv

In [59]:
csv_file_name = "us_daily.csv"
parquet_file_name = csv_file_name.split(".")[0]+".parquet"
folder_name = f"cleaned-data/{csv_file_name.split('.')[0]}"
print(f"Folder name: {folder_name}\nParquet file name: {parquet_file_name}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Folder name: cleaned-data/us_daily
Parquet file name: us_daily.parquet

In [60]:
# Create folder in s3
s3.put_object(Bucket=bucket_name, Key=(folder_name+"/"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'ResponseMetadata': {'RequestId': 'AX8WDQW40TY66YQS', 'HostId': 'ozYNsNSjehJJZBs/Yin9MQvx4u6/cDLQ+L5fqp8CCMhFU/RYA064EmS2E9iF13dmdmNRXkVoGJ8=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'ozYNsNSjehJJZBs/Yin9MQvx4u6/cDLQ+L5fqp8CCMhFU/RYA064EmS2E9iF13dmdmNRXkVoGJ8=', 'x-amz-request-id': 'AX8WDQW40TY66YQS', 'date': 'Thu, 15 Jun 2023 05:53:02 GMT', 'x-amz-server-side-encryption': 'AES256', 'etag': '"d41d8cd98f00b204e9800998ecf8427e"', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"', 'ServerSideEncryption': 'AES256'}

In [61]:
show_dynamic_frame(csv_file_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"date": "20210307", "states": "56", "positive": "28755524", "negative": "74579770", "pending": "11808", "hospitalizedCurrently": "40212", "hospitalizedCumulative": "878613", "inIcuCurrently": "8137", "inIcuCumulative": "45475", "onVentilatorCurrently": "2801", "onVentilatorCumulative": "4281", "dateChecked": "2021-03-07T24:00:00Z", "death": "515142", "hospitalized": "878613", "totalTestResults": "363789451", "lastModified": "2021-03-07T24:00:00Z", "recovered": "", "total": "0", "posNeg": "0", "deathIncrease": "839", "hospitalizedIncrease": "726", "negativeIncrease": "130414", "positiveIncrease": "41265", "totalTestResultsIncrease": "1156241", "hash": "8b26839690cd05c0cef69cb9ed85641a76b5e78e"}

In [62]:
# us_daily has the same schema as that of us_current
df = get_df_from_s3(csv_file_name, schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
# Validate the data read
assert df.count() == 420, f"No. of rows in {csv_file_name} is not matching!"
assert len(df.columns) == 25, f"No. of columns in {csv_file_name} is not matching!"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
df = (df
      .withColumn("date", F.to_date(F.col("date"), "yyyyMMdd"))
      .withColumn("dateChecked", F.to_timestamp(F.col("dateChecked"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
      .withColumn("lastModified", F.to_timestamp(F.col("lastModified"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
     )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date: date (nullable = true)
 |-- states: string (nullable = true)
 |-- positive: long (nullable = true)
 |-- negative: long (nullable = true)
 |-- pending: long (nullable = true)
 |-- hospitalizedCurrently: long (nullable = true)
 |-- hospitalizedCumulative: long (nullable = true)
 |-- inIcuCurrently: integer (nullable = true)
 |-- inIcuCumulative: long (nullable = true)
 |-- onVentilatorCurrently: integer (nullable = true)
 |-- onVentilatorCumulative: long (nullable = true)
 |-- dateChecked: timestamp (nullable = true)
 |-- death: integer (nullable = true)
 |-- hospitalized: long (nullable = true)
 |-- totalTestResults: long (nullable = true)
 |-- lastModified: timestamp (nullable = true)
 |-- recovered: long (nullable = true)
 |-- total: long (nullable = true)
 |-- posNeg: integer (nullable = true)
 |-- deathIncrease: integer (nullable = true)
 |-- hospitalizedIncrease: integer (nullable = true)
 |-- negativeIncrease: long (nullable = true)
 |-- positiveIncrease: integer (

In [66]:
df.limit(3).collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(date=datetime.date(2021, 3, 7), states='56', positive=28755524, negative=74579770, pending=11808, hospitalizedCurrently=40212, hospitalizedCumulative=878613, inIcuCurrently=8137, inIcuCumulative=45475, onVentilatorCurrently=2801, onVentilatorCumulative=4281, dateChecked=None, death=515142, hospitalized=878613, totalTestResults=363789451, lastModified=None, recovered=None, total=0, posNeg=0, deathIncrease=839, hospitalizedIncrease=726, negativeIncrease=130414, positiveIncrease=41265, totalTestResultsIncrease=1156241, hash='8b26839690cd05c0cef69cb9ed85641a76b5e78e'), Row(date=datetime.date(2021, 3, 6), states='56', positive=28714259, negative=74449356, pending=11783, hospitalizedCurrently=41401, hospitalizedCumulative=877887, inIcuCurrently=8409, inIcuCumulative=45453, onVentilatorCurrently=2811, onVentilatorCumulative=4280, dateChecked=None, death=514303, hospitalized=877887, totalTestResults=362633210, lastModified=None, recovered=None, total=0, posNeg=0, deathIncrease=1674, hospi

In [67]:
df.write.parquet(f"s3://{bucket_name}/{folder_name}/{parquet_file_name}", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…