In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StringType
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType, DateType
from pyspark.sql.functions import udf, col, array_contains, when, concat_ws, to_date, to_timestamp, lit
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [2]:
spark = SparkSession \
        .builder \
        .appName("Spark Covid Earlier Transformations") \
        .getOrCreate()

In [26]:
# Load dataset from Johns Hopkins University Github page
# with command pd.read_csv("datafile.csv")
#covid_daily_report = pd.read_csv("data/01-23-2020.csv")
covid_daily_report = spark.read \
                    .option("header",True) \
                    .option("inferSchema",True) \
                    .csv("data/*.csv")


#                    

In [27]:
#covid_daily_report.createOrReplaceTempView("cases_table")

In [28]:
covid_daily_report.printSchema()

root
 |-- FIPS: string (nullable = true)
 |-- Admin2: string (nullable = true)
 |-- Province_State: string (nullable = true)
 |-- Country_Region: string (nullable = true)
 |-- Last_Update: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long_: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- Combined_Key: string (nullable = true)
 |-- Incident_Rate: string (nullable = true)
 |-- Case_Fatality_Ratio: string (nullable = true)



### JHU changed the dataset schema during the time
*Initial schema: Province/State,Country/Region,Last Update,Confirmed,Deaths,Recovered*

*Second schema: FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key*

*Third schema: Province/State,Country/Region,Last Update,Confirmed,Deaths,Recovered,Latitude,Longitude*

*Forth schema: FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key,Incidence_Rate,Case-Fatality_Ratio*

In [29]:
# spark.sql("SELECT COUNT(DISTINCT 'Country/Region') AS Country, COUNT(DISTINCT 'Province/State') AS State \
#             FROM cases_table").show()

In [30]:
print((str(covid_daily_report.columns)))
type(covid_daily_report)
covid_daily_report.head(5)


['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Lat', 'Long_', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'Incident_Rate', 'Case_Fatality_Ratio']


[Row(FIPS=None, Admin2=None, Province_State=None, Country_Region='Afghanistan', Last_Update='2021-02-27 05:22:28', Lat='33.93911', Long_='67.709953', Confirmed='55696', Deaths='2442', Recovered='49285', Active='3969', Combined_Key='Afghanistan', Incident_Rate='143.0731404659654', Case_Fatality_Ratio='4.384515943694341'),
 Row(FIPS=None, Admin2=None, Province_State=None, Country_Region='Albania', Last_Update='2021-02-27 05:22:28', Lat='41.1533', Long_='20.1683', Confirmed='105229', Deaths='1756', Recovered='68007', Active='35466', Combined_Key='Albania', Incident_Rate='3656.5779414830768', Case_Fatality_Ratio='1.6687415066188978'),
 Row(FIPS=None, Admin2=None, Province_State=None, Country_Region='Algeria', Last_Update='2021-02-27 05:22:28', Lat='28.0339', Long_='1.6596', Confirmed='112805', Deaths='2977', Recovered='77842', Active='31986', Combined_Key='Algeria', Incident_Rate='257.2458766830244', Case_Fatality_Ratio='2.639067417224414'),
 Row(FIPS=None, Admin2=None, Province_State=None

In [31]:
df2=covid_daily_report
df2= df2 \
        .withColumn("Country_Region", 
        when(df2["Country_Region"].contains("China"),"China")
        .otherwise(df2["Country_Region"]))

In [32]:
df2= df2 \
        .withColumn("Country_Region", 
        when(df2["Country_Region"].contains("Republic of Korea"),"Korea, South")
        .otherwise(df2["Country_Region"]))

In [33]:
df2= df2 \
        .withColumn("Country_Region", 
        when(df2["Country_Region"].contains("Cote d'Ivoir"),"Cote d Ivoir")
        .otherwise(df2["Country_Region"]))

In [34]:
df2= df2 \
        .withColumn("Combined_Key", 
        when(df2["Country_Region"].isNull(),
        concat_ws(", ", df2["Province_State"], df2["Country_Region"]))
        .otherwise(df2["Combined_Key"]))

In [35]:
df2= df2 \
        .withColumn("CaseFatalityRatio", (df2.Deaths/df2.Confirmed)*100)

In [36]:
if not (StructField("Incidence_Rate",StringType(),True) in df2.schema):
    df2= df2 \
        .withColumn("Incidence_Rate", 
        lit(0.0))

In [37]:
# get_datetime = udf(date_convert, TimestampType())
# df2 = df2.withColumn("Date", get_datetime('Last_Update'))

In [38]:
df2 = df2.withColumn("Last_up", to_timestamp("Last_Update", "yyyy-MM-dd HH:mm:ss"))

In [39]:
df2= df2 \
        .withColumn("Date",
        to_date(col("Last_up"),"yyyy-MM-dd")
                    )

In [40]:
df2.head(10)

[Row(FIPS=None, Admin2=None, Province_State=None, Country_Region='Afghanistan', Last_Update='2021-02-27 05:22:28', Lat='33.93911', Long_='67.709953', Confirmed='55696', Deaths='2442', Recovered='49285', Active='3969', Combined_Key='Afghanistan', Incident_Rate='143.0731404659654', Case_Fatality_Ratio='4.384515943694341', CaseFatalityRatio=4.384515943694341, Incidence_Rate=0.0, Last_up=datetime.datetime(2021, 2, 27, 5, 22, 28), Date=datetime.date(2021, 2, 27)),
 Row(FIPS=None, Admin2=None, Province_State=None, Country_Region='Albania', Last_Update='2021-02-27 05:22:28', Lat='41.1533', Long_='20.1683', Confirmed='105229', Deaths='1756', Recovered='68007', Active='35466', Combined_Key='Albania', Incident_Rate='3656.5779414830768', Case_Fatality_Ratio='1.6687415066188978', CaseFatalityRatio=1.6687415066188978, Incidence_Rate=0.0, Last_up=datetime.datetime(2021, 2, 27, 5, 22, 28), Date=datetime.date(2021, 2, 27)),
 Row(FIPS=None, Admin2=None, Province_State=None, Country_Region='Algeria', La

In [41]:
#dataset_fields = ['Date', 'Province_State as State', 'Country_Region as Country', 'Lat as Latitude', 'Long_ as Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Incidence_Rate', 'CaseFatalityRatio', 'Combined_Key']

# extract columns to create songs table
#dataset_table = dataset_table = df2.select(dataset_fields)

In [42]:
df2 = df2.select(
                 col('Province_State').alias('State'), \
                 col('Country_Region').alias('Country'), \
                 col('Date'), \
                 col('Lat').alias('Latitude'), \
                 col('Long_').alias('Longitude'), \
                 col('Confirmed'), \
                 col('Deaths'), \
                 col('Recovered'), \
                 col('Active'), \
                 col('Incidence_Rate'), \
                 col('CaseFatalityRatio'), \
                 col('Combined_Key')).repartition("Country", "Date")

In [43]:
df2.head(5)

[Row(State=None, Country='Kenya', Date=datetime.date(2021, 2, 27), Latitude='-0.0236', Longitude='37.9062', Confirmed='105467', Deaths='1853', Recovered='86521', Active='17093', Incidence_Rate=0.0, CaseFatalityRatio=1.7569476708354272, Combined_Key='Kenya'),
 Row(State=None, Country='Seychelles', Date=datetime.date(2021, 2, 26), Latitude='-4.6796', Longitude='55.492', Confirmed='2562', Deaths='11', Recovered='2048', Active='503', Incidence_Rate=0.0, CaseFatalityRatio=0.429352068696331, Combined_Key='Seychelles'),
 Row(State='Antofagasta', Country='Chile', Date=datetime.date(2021, 3, 6), Latitude='-23.6509', Longitude='-70.3975', Confirmed='38610', Deaths='786', Recovered='36448', Active='1376', Incidence_Rate=0.0, CaseFatalityRatio=2.035742035742036, Combined_Key='Antofagasta, Chile'),
 Row(State='Araucania', Country='Chile', Date=datetime.date(2021, 3, 6), Latitude='-38.9489', Longitude='-72.3311', Confirmed='42105', Deaths='543', Recovered='39310', Active='2252', Incidence_Rate=0.0, 

In [21]:
# window = Window.partitionBy("Country","State").orderBy("Date")

# data = F.col("Confirmed") - F.coalesce(F.lag(F.col("Confirmed")).over(window), F.lit(0))
# df2.withColumn("newConfirmed", data)

In [44]:
df2.printSchema()

root
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- Incidence_Rate: double (nullable = false)
 |-- CaseFatalityRatio: double (nullable = true)
 |-- Combined_Key: string (nullable = true)



In [23]:
output_schema = StructType() \
      .add("State",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("Date",DateType(),True) \
      .add("Latitude",DoubleType(),True) \
      .add("Longitude",DoubleType(),True) \
      .add("Confirmed",IntegerType(),True) \
      .add("Deaths",IntegerType(),True) \
      .add("Recovered",IntegerType(),True) \
      .add("Active",IntegerType(),True) \
      .add("IncidentRate",DoubleType(),True) \
      .add("CaseFatalityRatio",DoubleType(),True) \
      .add("CombinedKey",StringType(),True)

In [45]:
df2.write.partitionBy("Country", "Date").parquet('out/curated/')

KeyboardInterrupt: 

In [None]:
df2.write \
    .format("com.databricks.spark.csv") \
    .option("header",True) \
    .option("escape", "") \
    .option("quote", "") \
    .option("emptyValue", "") \
    .option("delimiter", ";") \
    .save("out/spark_output/")