# Project Title
### Data Engineering Capstone Project

#### Project Summary
The project using public covid19 datasets from github to create a data warehouse to help with building reports and analyze covid pandemic. A use case of this is to generate a worldmap represents the spread of covid pandemic, and the progress of vaccine from every country. From that we can identify the center of pandemic and make decision to minimize the damage of it.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
!pip install pydeequ

Collecting pydeequ
  Downloading https://files.pythonhosted.org/packages/64/25/8fb417ac850ddf052297a52e33a90b5000f6b6925fb3a9bcfbd3e1c5ca0a/pydeequ-1.0.1-py3-none-any.whl
Collecting numpy>=1.14.1 (from pydeequ)
[?25l  Downloading https://files.pythonhosted.org/packages/45/b2/6c7545bb7a38754d63048c7696804a0d947328125d81bf12beaa692c3ae3/numpy-1.19.5-cp36-cp36m-manylinux1_x86_64.whl (13.4MB)
[K    100% |████████████████████████████████| 13.4MB 2.8MB/s eta 0:00:01   22% |███████▎                        | 3.1MB 27.8MB/s eta 0:00:01    88% |████████████████████████████▍   | 11.9MB 26.7MB/s eta 0:00:01
[31mtensorflow 1.3.0 requires tensorflow-tensorboard<0.2.0,>=0.1.0, which is not installed.[0m
Installing collected packages: numpy, pydeequ
  Found existing installation: numpy 1.12.1
    Uninstalling numpy-1.12.1:
      Successfully uninstalled numpy-1.12.1
Successfully installed numpy-1.19.5 pydeequ-1.0.1


In [2]:
%env SPARK_VERSION=2.4.3

env: SPARK_VERSION=2.4.3


In [3]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession, Row
from  pyspark.sql.functions import *
from pyspark.sql.types import *
from os import walk
import pydeequ

In [4]:
spark = SparkSession.builder.\
config("spark.jars.packages", pydeequ.deequ_maven_coord).\
config("spark.jars.excludes", pydeequ.f2j_maven_coord).\
enableHiveSupport().getOrCreate()

# config("spark.jars.repositories", "https://repos.spark-packages.org/").\
# config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\

### Step 1: Scope the Project and Gather Data

#### Scope 
This project gather public covid19 data from github, I try to combine information from 2 repositories to create a single source of truth, a data warehouse that can later be used in report or analytics data.

I will use spark standalone here to process the data, we can easily migrate into running spark on any cloud provider to speed up processing. The following steps will be carried out:

- Exploratory data analysis of data on covid19 by Our World in Data to identify data schema and strategies for data cleaning 
- Exploratory data analysis of covid data at Johns Hopkins University to identify data schema and strategies for data cleaning
- Perform data cleaning on all datasets
- Identify the grain of fact table is by datetime and by location.
- Create datetime and location dimension tables derived from 2 datasets
- Create fact table from 2 datasets and reference to dimension tables.


#### Describe and Gather Data 
Thanks to [Data on COVID-19 (coronavirus) by Our World in Data](https://github.com/owid/covid-19-data/tree/master/public/data) has public covid data daily updated for every country, it include information about vaccine and patients in ICU treament. The data is in csv files with 62 columns, I will not use all columns in this, instead I select some of the columns that I find valuable. I will name it covid_country because its data is at country level. Here is columns description:

|columns|description|
|:--|:--|
|`iso_code`|ISO 3166-1 alpha-3 – three-letter country codes|
|`location`|Geographical location|
|`date`|Date of observation|
|`total_cases`|Total confirmed cases of COVID-19|
|`new_cases`|New confirmed cases of COVID-19|
|`total_deaths`|Total deaths attributed to COVID-19|
|`new_deaths`|New deaths attributed to COVID-19|
|`icu_patients`|Number of COVID-19 patients in intensive care units (ICUs) on a given day|
|`hosp_patients`|Number of COVID-19 patients in hospital on a given day|
|`total_vaccinations`|Total number of COVID-19 vaccination doses administered|
|`people_vaccinated`|Total number of people who received at least one vaccine dose|
|`people_fully_vaccinated`|Total number of people who received all doses prescribed by the vaccination protocol|
|`new_vaccinations`|New COVID-19 vaccination doses administered (only calculated for consecutive days)|

Thanks to [COVID-19 Data Repository by the Center for Systems Science and Engineering (CSSE) at Johns Hopkins University](https://github.com/CSSEGISandData/COVID-19) has public covid19 data detail in province and number of recovered patients. Here is detail information about the data. I will call it covid_province because its data is detail at province level.

|columns|description|
|:--|:--|
|`Province_State`|Province, state or dependency name|
|`Country_Region`|Country, region or sovereignty name. The names of locations included on the Website correspond with the official designations used by the U.S. Department of State|
|`Last_Update`|Has different format in files, we have to format it|
|`Confirmed`|Counts include confirmed and probable (where reported)|
|`Deaths`|Counts include confirmed and probable (where reported)|
|`Recovered`|Recovered cases are estimates based on local media reports, and state and local reporting when available, and therefore may be substantially lower than the true number|

Last but not least is location data, it is used to create location table and reference to location of 2 datasets above, it is the combination of [World Cities Database](https://simplemaps.com/data/world-cities) and [JHU CSSE COVID-19 Dataset](https://github.com/CSSEGISandData/COVID-19/blob/master/csse_covid_19_data/UID_ISO_FIPS_LookUp_Table.csv)
and the location from covid19 datasets but not in 2 location datasets above. Sample location schema from World Cities Database

|columns|description|
|:--|:--|
|`city`|City name|
|`city_ascii`|City name in ascii|
|`lat`|Latitude of location|
|`lon`|Longitude of location|
|`country`|Country, region or sovereignty name|
|`iso2`|iso2 code of location|
|`iso3`|iso3 code of location|
|`admin_name`|The name of the highest level administration region of the city town (e.g. a US state or Canadian province).|
|`population`|Population of location|
|`id`|ID of location|




In [5]:
# Sample data of Data on COVID-19 (coronavirus) by Our World in Data
covid_country_df = spark.read.option("header", "true").csv("./lake/raw_zone/covid_country/owid-covid-data.csv")
covid_country_df.createOrReplaceTempView("raw_covid_country")
covid_country_df.toPandas().head()

Unnamed: 0,iso_code,continent,location,date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,extreme_poverty,cardiovasc_death_rate,diabetes_prevalence,female_smokers,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,excess_mortality
0,AFG,Asia,Afghanistan,2020-02-24,1.0,1.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,
1,AFG,Asia,Afghanistan,2020-02-25,1.0,0.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,
2,AFG,Asia,Afghanistan,2020-02-26,1.0,0.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,
3,AFG,Asia,Afghanistan,2020-02-27,1.0,0.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,
4,AFG,Asia,Afghanistan,2020-02-28,1.0,0.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,


In [6]:
# Sample data of CCSE at Johns Hopkins University
covid_province_df = spark.read.option("header", "true").csv('./lake/raw_zone/covid_province/')
covid_province_df.createOrReplaceTempView("raw_covid_province")
spark.sql("select * from raw_covid_province").show(5, truncate=False)

+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|FIPS|Admin2|Province_State|Country_Region|Last_Update        |Lat     |Long_    |Confirmed|Deaths|Recovered|Active|Combined_Key|Incident_Rate     |Case_Fatality_Ratio|
+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+------------+------------------+-------------------+
|null|null  |null          |Afghanistan   |2021-02-27 05:22:28|33.93911|67.709953|55696    |2442  |49285    |3969  |Afghanistan |143.0731404659654 |4.384515943694341  |
|null|null  |null          |Albania       |2021-02-27 05:22:28|41.1533 |20.1683  |105229   |1756  |68007    |35466 |Albania     |3656.5779414830768|1.6687415066188978 |
|null|null  |null          |Algeria       |2021-02-27 05:22:28|28.0339 |1.6596   |112805   |2977  |77842    |31986 |Algeria     |257.2458766830244 |2.63906

In [7]:
# Sample location of province level from world city datasets
province_df = spark.read.option("header", "true").csv('./lake/raw_zone/country_lookup/worldcities.csv')
province_df.createOrReplaceTempView("province_df")
spark.sql("select * from province_df").show(5, truncate=False)

+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|city   |city_ascii|lat    |lng     |country    |iso2|iso3|admin_name |capital|population|id        |
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|Tokyo  |Tokyo     |35.6897|139.6922|Japan      |JP  |JPN |Tōkyō      |primary|37977000  |1392685764|
|Jakarta|Jakarta   |-6.2146|106.8451|Indonesia  |ID  |IDN |Jakarta    |primary|34540000  |1360771077|
|Delhi  |Delhi     |28.6600|77.2300 |India      |IN  |IND |Delhi      |admin  |29617000  |1356872604|
|Mumbai |Mumbai    |18.9667|72.8333 |India      |IN  |IND |Mahārāshtra|admin  |23355000  |1356226629|
|Manila |Manila    |14.6000|120.9833|Philippines|PH  |PHL |Manila     |primary|23088000  |1608618140|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
only showing top 5 rows



In [8]:
# Sample location of province + country level from covid19 datasets
country_df = spark.read.option("header", "true").csv('./lake/raw_zone/country_lookup/UID_ISO_FIPS_LookUp_Table.csv')
country_df.createOrReplaceTempView("country_df")
spark.sql("select * from country_df").show(5, truncate=False)

+---+----+----+-----+----+------+--------------+--------------+--------+---------+------------+----------+
|UID|iso2|iso3|code3|FIPS|Admin2|Province_State|Country_Region|Lat     |Long_    |Combined_Key|Population|
+---+----+----+-----+----+------+--------------+--------------+--------+---------+------------+----------+
|4  |AF  |AFG |4    |null|null  |null          |Afghanistan   |33.93911|67.709953|Afghanistan |38928341  |
|8  |AL  |ALB |8    |null|null  |null          |Albania       |41.1533 |20.1683  |Albania     |2877800   |
|12 |DZ  |DZA |12   |null|null  |null          |Algeria       |28.0339 |1.6596   |Algeria     |43851043  |
|20 |AD  |AND |20   |null|null  |null          |Andorra       |42.5063 |1.5218   |Andorra     |77265     |
|24 |AO  |AGO |24   |null|null  |null          |Angola        |-11.2027|17.8739  |Angola      |32866268  |
+---+----+----+-----+----+------+--------------+--------------+--------+---------+------------+----------+
only showing top 5 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
Data for province and country is duplidate between locations and covid_daily, I will create dim_location table to store location information and reference location uid from dim_location back to covid_province (I will call it fact_covid_province now). For country that has Province_State is null is because that row represent the country average latitude and longitude, and total population of that country, I will replace null with empty province so it will not cause unexpected behaviors when we doing join later.

Covid_country dataframe has some columns with None value (etc: `total_deaths`, `new_deaths`,...). I will replace None with 0 indicate that there are no deaths on that day. It will help us easy to do aggregation later.

After investigate covid_province data, there are mismatch schema between csv files, for example: 

In `./lake/raw_zone/covid_province/01-25-2020.csv` we have this schema: ![old schema](./img/01-25-2020.png)

In `./lake/raw_zone/covid_province/01-01-2021.csv` we have this schema: ![new schema](./img/01-01-2021.png)

I will process this schema mismatch in data, merge into one schema to use later. I also handle the mismatch in datetime format between `Last_Update` and `last Update`  columns.

In most data files, `Last Update` is in this format `M/d/yy HH:mm`, but in some files like `./lake/raw_zone/covid_province/03-15-2020.csv` it has format like this `yyyy-MM-ddTHH:mm:ss`

Meanwhile `Last_Update` is different between files, some file is `M/d/yy HH:mm` and some is `yyyy-MM-dd HH:mm:ss`.


#### Cleaning Steps
##### 1. Merge mismatch schema in csv data of covid data detail in provinces

In [9]:
covid_province_schema = StructType([
    StructField("Last_Update", StringType(), True),
    StructField("Province_State", StringType(), True),
    StructField("Country_Region", StringType(), True),
    StructField("Confirmed", IntegerType(), True),
    StructField("Deaths", IntegerType(), True),
    StructField("Recovered", IntegerType(), True),
    StructField("Raw_Last_Update", StringType(), True)
])

emptyRDD = spark.sparkContext.emptyRDD()
covid_province_df = spark.createDataFrame(emptyRDD,covid_province_schema)


filenames = next(walk("./lake/raw_zone/covid_province"), (None, None, []))[2]
for file in filenames:
    rawDf = spark.read.option("header", "true").csv(f"./lake/raw_zone/covid_province/{file}")
    if "Province/State" in rawDf.schema.names:
        
        covid_province_df = covid_province_df.union(rawDf.selectExpr("case when year(`Last Update`) is null then date_format(from_unixtime(unix_timestamp(`Last Update`, 'M/d/yy HH:mm')), 'yyyy-MM-dd') else date_format(`Last Update`, 'yyyy-MM-dd') end as Last_Update",
                                                                     "coalesce(`Province/State`,'') as Province_State", 
                                                                     "case when `Country/Region` = 'Mainland China' then 'China' else `Country/Region` end as Country_Region", 
                                                                     "cast(Confirmed as int) as Confirmed", "cast(Deaths as int) as Deaths", 
                                                                     "cast(Recovered as int) as Recovered", "`Last Update` as Raw_Last_Update"))
    else:
        covid_province_df = covid_province_df.union(rawDf.selectExpr("case when year(Last_Update) is null then date_format(from_unixtime(unix_timestamp(`Last_Update`, 'M/d/yy HH:mm')), 'yyyy-MM-dd') else date_format(Last_Update, 'yyyy-MM-dd') end as Last_Update",
                                                                     "coalesce(Province_State, '') as Province_State", 
                                                                     "case when `Country_Region` = 'Mainland China' then 'China' else `Country_Region` end Country_Region", 
                                                                     "cast(Confirmed as int) as Confirmed", "cast(Deaths as int) as Deaths", "cast(Recovered as int) as Recovered",
                                                                     "Last_Update as Raw_Last_Update"))


covid_province_df.na.fill(0, subset=["confirmed", "deaths", "recovered"]).write.mode("overwrite").parquet("./lake/work_zone/covid_province")

##### 2. Replace null value in covid_country_df by 0, also change data type of integer columns to integer type

In [10]:
covid_country_df = spark.sql("""
select iso_code iso3, 
       case when `location` = 'United States' then 'US' else `location` end country_region, 
       date, 
       cast(total_cases as int) total_cases, 
       cast(new_cases as int) new_cases, 
       cast(total_deaths as int) total_deaths, 
       cast(new_deaths as int) new_deaths, 
       cast(icu_patients as int) icu_patients,
       cast(hosp_patients as int) hosp_patients, 
       cast(total_vaccinations as int) total_vaccinations, 
       cast(people_vaccinated as int) people_vaccinated, 
       cast(people_fully_vaccinated as int) people_fully_vaccinated, 
       cast(new_vaccinations as int) new_vaccinations
from raw_covid_country
where continent is not null
""").na.fill(value=0)
covid_country_df.createOrReplaceTempView("covid_country")
covid_country_df.write.mode("overwrite").parquet("./lake/work_zone/covid_country")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The schema will look like this:

![Covid warehouse schema](./img/covid_datawarehouse_schema.png)

The dim_datetime dimension table is coming from all possible datetime appear in covid_country and covid_province. I prepopulate addition columns that might need in report or analysis in the future, maybe we will want to get day or month in string, or roll up to analyze data in quarter, year.

The dim_location table is populated from raw_locations dataset, it includes iso code of the country, the cordination and population. It allow we design a report for different country, in different day, and visualize that useful information to a worldmap. It bring us the overall picture of covid pandemic in all over the world in an intuitive way. We can also calculate the percent of people who get vaccine of a country, from that we can see how that country is dealing with covid pandemic.

The fact_covid_country table is derived from covid_country datasets, it brings us daily data of a country, it includes the number of total cases, new cases, total deaths,... of covid19. It also includes the number of patients in ICU (intensive care unit), number of people who get vaccine. From that data, we can visualize in report the spread and damage that covid bring to a country.

The fact_covid_province table is coming from covid_province dataset, it drills down the data detail in province level, help us identify the center of pandemic in a country. It also includes the number of recovered patient in province level.



#### 3.2 Mapping Out Data Pipelines
The pipeline steps to get the final schema are as follow:
1. Gather data source from github
2. Load data into spark dataframe
3. Merge mistmatch data in covid_country dataset 
4. Cast data type of columns in covid_country into its correct one, replace null value with appropriate number.
5. Create dim_datetime table derived from time columns in covid_province and covid_country datasets
6. Create dim_location from raw_locations dataset, filter out unnecessary columns
7. Select only valuable columns to create fact_covid_country and fact_covid_province tables, reference datetime and location key to 2 appropriate dimension tables.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [11]:
# Read raw_data in work_zone area to start create model
covid_province_df = spark.read.parquet("./lake/work_zone/covid_province")
covid_province_df.createOrReplaceTempView("covid_province")
covid_province_df.cache()

covid_country_df = spark.read.parquet("./lake/work_zone/covid_country")
covid_country_df.createOrReplaceTempView("covid_country")
covid_country_df.cache()

DataFrame[iso3: string, country_region: string, date: string, total_cases: int, new_cases: int, total_deaths: int, new_deaths: int, icu_patients: int, hosp_patients: int, total_vaccinations: int, people_vaccinated: int, people_fully_vaccinated: int, new_vaccinations: int]

In [12]:
from pyspark.sql.types import LongType, StructField, StructType

# This function is used to create surrogate key for dataframe
def dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Ref: https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda row: ([row[1] +offset] + list(row[0])))

    return spark.createDataFrame(new_rdd, new_schema)



In [13]:
# Create dim_datetime
datetime_df = spark.sql("""
    select distinct date_format(date, "yyyyMMdd") as date_sk,
           date,
           dayofmonth(date) as day,
           date_format(date, 'E') as day_short,
           date_format(date, 'EEEE') as day_full,
           month(date) as month,
           date_format(date, 'MMM') as month_short,
           date_format(date, 'MMMM') as month_full,
           quarter(date) as quarter,
           year(date) as year
    from covid_country
    union
    select distinct date_format(last_update, "yyyyMMdd") as date_sk,
           last_update as date,
           dayofmonth(last_update) as day,
           date_format(last_update, 'E') as day_short,
           date_format(last_update, 'EEEE') as day_full,
           month(last_update) as month,
           date_format(last_update, 'MMM') as month_short,
           date_format(last_update, 'MMMM') as month_full,
           quarter(last_update) as quarter,
           year(last_update) as year
    from covid_province
""")
datetime_df.createOrReplaceTempView("dim_datetime")
datetime_df.write.mode("overwrite").parquet("./lake/gold_zone/dimension/dim_datetime")

In [14]:
# Get location data from covid19 datasets
spark.sql("""
    select distinct case when t2.iso3 is null then '' else t2.iso3 end iso3,
           t1.province_state, 
           t1.country_region
    from
        (select distinct '' province_state, country_region from covid_country
            union
        select distinct province_state, country_region from covid_province
        ) t1 
    left join (select distinct iso3, country_region from covid_country) t2 
        on t1.country_region = t2.country_region
""").createOrReplaceTempView("tv_location_from_covid19")

In [15]:
# Create dim_location dimension table
# Get location data from worldcities dataset
spark.sql("""
    select iso2, 
           iso3, 
           coalesce(admin_name, country) as province_state, 
           case when country = 'United States' then 'US' else country end as country_region, 
           cast(avg(lat) as decimal(29,4)) as lat, 
           cast(avg(lng) as decimal(29,4)) as lon, 
           cast(sum(population) as int) population
    from province_df
    group by iso2, iso3, province_state, country_region
""").createOrReplaceTempView("tv_province")

# Get location from UID_ISO_FIPS_LookUp_Table dataset
spark.sql("""
   select iso2, 
          iso3, 
          coalesce(province_state,'') as province_state, 
          country_region, 
          cast(lat as decimal(29,4)) as lat, 
          cast(`long_` as decimal(29,4)) as lon, 
          cast(population as int) population  
    from country_df
""").createOrReplaceTempView("tv_country")

# Merge two location datasets into one
spark.sql("""
    select case when t1.iso2 is null then t2.iso2 else t1.iso2 end as iso2, 
           case when t1.iso3 is null then t2.iso3 else t1.iso3 end as iso3, 
           case when t1.province_state is null then t2.province_state else t1.province_state end as province_state,
           case when t1.country_region is null then t2.country_region else t1.country_region end as country_region,
           case when t1.lat is null then t2.lat else t1.lat end lat, 
           case when t1.lon is null then t2.lon else t1.lon end lon, 
           case when t1.population is null then t2.population else t1.population end as population
    from tv_province t1
        full join
    tv_country t2 
        on t1.iso3 = t2.iso3 and t1.province_state = t2.province_state
""").createOrReplaceTempView("tv_location_from_datasets")

# Get updates location data from covid19 datasets
spark.sql("""
    select '' iso2, t1.iso3, 
            t1.province_state, 
            t1.country_region, 
            cast(181 as decimal(29,4)) as lat,
            cast(181 as decimal(29,4)) as lon, 
            -1 as population
    from tv_location_from_covid19 t1 
        anti join 
    tv_location_from_datasets t2 
        on t1.province_state = t2.province_state and t1.country_region = t2.country_region
      --on t1.province_state = t2.province_state and t1.iso3 = t2.iso3 and t1.country_region = t2.country_region
""").createOrReplaceTempView("tv_locations_updates")

# Update location dataframe using 2 location datasets and covid19 datasets
location_df = spark.sql("""
    select * from tv_location_from_datasets
    union
    select * from tv_locations_updates
""").na.fill(value=-1, subset=['population'])

# Create dim_location table with surrogate key
location_df = dfZipWithIndex(location_df, colName="id")
location_df.createOrReplaceTempView("dim_location")
location_df.write.mode("overwrite").format("parquet").save("./lake/gold_zone/dimension/dim_location")

In [16]:
# Create fact_covid_country table
fact_covid_country_df = spark.sql("""
    select distinct id as location_sk, 
           cast(date_format(date, 'yyyyMMdd') as int) as date_sk, 
           total_cases, 
           new_cases, 
           total_deaths, 
           new_deaths, 
           icu_patients, 
           hosp_patients, 
           total_vaccinations, 
           people_vaccinated, 
           people_fully_vaccinated, 
           new_vaccinations
    from covid_country cc 
        left join 
    dim_location dl
        on cc.country_region = dl.country_region
""")
fact_covid_country_df.createOrReplaceTempView("fact_covid_country")
fact_covid_country_df.write.mode("overwrite").format("parquet").save("./lake/gold_zone/fact/fact_covid_country")

In [17]:
# Create fact_covid_province table
fact_covid_province_df = spark.sql("""
    select distinct cast(date_format(last_update, 'yyyyMMdd') as int) as date_sk, 
           id as location_sk,
           confirmed as total_cases, 
           Deaths as total_deaths, 
           Recovered as recovered
    from covid_province cp
        left join 
    dim_location dl
        on cp.province_state = dl.province_state and cp.country_region = dl.country_region
""")
fact_covid_province_df.createOrReplaceTempView("fact_covid_province")
fact_covid_province_df.write.mode("overwrite").format("parquet").save("./lake/gold_zone/fact/fact_covid_province")

#### 4.2 Data Quality Checks
In order to make sure data quality after cleaning and transforming data in warehouse, I perform some data quality check using pydeequ, an open-source tool developed and used at Amazon to verify data quality of many large datasets. 

There are some actions I run to check the data quality:
- Check uniqueness of key in dimension table
- Check number of records in dim_location after derived from raw_locations
- Check data type of some important table columns 

Run Quality Checks

In [18]:
# Perform quality checks 
from pydeequ.checks import *
from pydeequ.verification import *

# Check quality of dim_datetime
dim_datetime_df = spark.read.parquet("./lake/gold_zone/dimension/dim_datetime")
checkDatetimeSuit = VerificationSuite(spark) \
    .onData(dim_datetime_df) \
    .addCheck(
        Check(spark, CheckLevel.Warning, "Check dim_datetime").isComplete("date_sk")  \
        .isUnique("date_sk")  \
        .isContainedIn("day_short", ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]) \
        .isContainedIn("day_full", ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]) \
        .hasMin("month", lambda x: x == 1) \
        .hasMax("month", lambda x: x == 12) \
        .isComplete("month_short")  \
        .isContainedIn("month_short", ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]) \
        .isComplete("month_full")  \
        .isContainedIn("month_full", ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"]) \
        .hasMin("quarter", lambda x: x == 1) \
        .hasMax("quarter", lambda x: x == 4) \
    )\
    .run()

# Check quality of dim_location
dim_location_df = spark.read.parquet("./lake/gold_zone/dimension/dim_location")
checkLocationSuit = VerificationSuite(spark) \
    .onData(dim_location_df) \
    .addCheck(Check(spark, CheckLevel.Warning, "Check dim_location")\
        .isComplete("id")  \
        .isUnique("id")  \
        .hasDataType("lat", ConstrainableDataTypes.Fractional)\
        .hasDataType("lon", ConstrainableDataTypes.Fractional)\
        .hasDataType("population", ConstrainableDataTypes.Integral)\
    )\
    .run()

# Check quality of fact_covid_province
fact_covid_province_df = spark.read.parquet("./lake/gold_zone/fact/fact_covid_province")
checkCovidProvinceSuit = VerificationSuite(spark) \
    .onData(fact_covid_province_df) \
    .addCheck(Check(spark, CheckLevel.Warning, "Check fact_covid_province").isComplete("date_sk")  \
        .isComplete("location_sk")  \
        .hasDataType("total_cases", ConstrainableDataTypes.Integral)\
        .hasDataType("total_deaths", ConstrainableDataTypes.Integral)\
        .hasDataType("recovered", ConstrainableDataTypes.Integral)\
    )\
    .run()

# Check quality of fact_covid_country
fact_covid_country_df = spark.read.parquet("./lake/gold_zone/fact/fact_covid_country")
checkCovidCountrySuit = VerificationSuite(spark) \
    .onData(fact_covid_country_df) \
    .addCheck(Check(spark, CheckLevel.Warning, "Check fact_covid_country").isComplete("date_sk")  \
        .isComplete("location_sk")  \
        .hasDataType("total_cases", ConstrainableDataTypes.Integral)\
        .hasDataType("new_cases", ConstrainableDataTypes.Integral)\
        .hasDataType("total_deaths", ConstrainableDataTypes.Integral)\
        .hasDataType("new_deaths", ConstrainableDataTypes.Integral)\
        .hasDataType("icu_patients", ConstrainableDataTypes.Integral)\
        .hasDataType("hosp_patients", ConstrainableDataTypes.Integral)\
        .hasDataType("total_vaccinations", ConstrainableDataTypes.Integral)\
        .hasDataType("people_vaccinated", ConstrainableDataTypes.Integral)\
        .hasDataType("people_fully_vaccinated", ConstrainableDataTypes.Integral)\
        .hasDataType("new_vaccinations", ConstrainableDataTypes.Integral)\
    )\
    .run()

checkCovidCountryResultDF = VerificationResult.checkResultsAsDataFrame(spark, checkCovidCountrySuit)
checkCovidProvinceResultDF = VerificationResult.checkResultsAsDataFrame(spark, checkCovidProvinceSuit)
checkDatetimeResultDF = VerificationResult.checkResultsAsDataFrame(spark, checkDatetimeSuit)
checkLocationResultDF = VerificationResult.checkResultsAsDataFrame(spark, checkLocationSuit)

Python Callback server started!


In [19]:
checkCovidCountryResultDF.show(truncate=False)

+------------------------+-----------+------------+--------------------------------------------------------------------------------------------------+-----------------+------------------+
|check                   |check_level|check_status|constraint                                                                                        |constraint_status|constraint_message|
+------------------------+-----------+------------+--------------------------------------------------------------------------------------------------+-----------------+------------------+
+------------------------+-----------+------------+--------------------------------------------------------------------------------------------------+-----------------+------------------+



In [20]:
checkCovidProvinceResultDF.show(truncate=False)

+-------------------------+-----------+------------+---------------------------------------------------------------------------------------+-----------------+------------------+
|check                    |check_level|check_status|constraint                                                                             |constraint_status|constraint_message|
+-------------------------+-----------+------------+---------------------------------------------------------------------------------------+-----------------+------------------+
+-------------------------+-----------+------------+---------------------------------------------------------------------------------------+-----------------+------------------+



In [21]:
checkDatetimeResultDF.toPandas()

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Check dim_datetime,Warning,Success,"CompletenessConstraint(Completeness(date_sk,No...",Success,
1,Check dim_datetime,Warning,Success,"UniquenessConstraint(Uniqueness(List(date_sk),...",Success,
2,Check dim_datetime,Warning,Success,ComplianceConstraint(Compliance(day_short cont...,Success,
3,Check dim_datetime,Warning,Success,ComplianceConstraint(Compliance(day_full conta...,Success,
4,Check dim_datetime,Warning,Success,"MinimumConstraint(Minimum(month,None))",Success,
5,Check dim_datetime,Warning,Success,"MaximumConstraint(Maximum(month,None))",Success,
6,Check dim_datetime,Warning,Success,CompletenessConstraint(Completeness(month_shor...,Success,
7,Check dim_datetime,Warning,Success,ComplianceConstraint(Compliance(month_short co...,Success,
8,Check dim_datetime,Warning,Success,CompletenessConstraint(Completeness(month_full...,Success,
9,Check dim_datetime,Warning,Success,ComplianceConstraint(Compliance(month_full con...,Success,


In [22]:
checkLocationResultDF.show(truncate=False)

+------------------+-----------+------------+-------------------------------------------------------------------------------------+-----------------+------------------+
|check             |check_level|check_status|constraint                                                                           |constraint_status|constraint_message|
+------------------+-----------+------------+-------------------------------------------------------------------------------------+-----------------+------------------+
+------------------+-----------+------------+-------------------------------------------------------------------------------------+-----------------+------------------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model.

##### fact_covid_country: Table represent data about total cases, deaths and vaccince of the country by day
|columns|description|
|:--|:--|
|`date_sk`|datetime reference to dim_datetime dimension|
|`location_sk`|location reference to dim_location dimension|
|`total_cases`|Total confirmed cases of COVID-19|
|`new_cases`|New confirmed cases of COVID-19|
|`total_deaths`|Total deaths attributed to COVID-19|
|`new_deaths`|New deaths attributed to COVID-19|
|`icu_patients`|Number of COVID-19 patients in intensive care units (ICUs) on a given day|
|`hosp_patients`|Number of COVID-19 patients in hospital on a given day|
|`total_vaccinations`|Total number of COVID-19 vaccination doses administered|
|`people_vaccinated`|Total number of people who received at least one vaccine dose|
|`people_fully_vaccinated`|Total number of people who received all doses prescribed by the vaccination protocol|
|`new_vaccinations`|New COVID-19 vaccination doses administered (only calculated for consecutive days)|



##### fact_covid_province: Table detail in province data about covid19 with recovered case y day
|columns|description|
|:--|:--|
|`date_sk`|datetime reference to dim_datetime dimension|
|`location_sk`|location reference to dim_location dimension|
|`total_cases`|Counts include confirmed and probable (where reported)|
|`total_deaths`|Counts include confirmed and probable (where reported)|
|`recovered`|Recovered cases are estimates based on local media reports, and state and local reporting when available, and therefore may be substantially lower than the true number|


##### dim_location: Table represent all information related to location like iso code, latitude, longitude
|columns|description|
|:--|:--|
|`id`|ID of location|
|`iso2`|iso2 code of location|
|`iso3`|iso3 code of location|
|`Province_State`|Province, state or dependency name|
|`Country_Region`|Country, region or sovereignty name|
|`Lat`|Latitude of location|
|`Lon`|Longitude of location|
|`population`|Population of location|


##### `dim_datetime`: datetime dimension table precalculate some useful information
|column|description|
|:--|:--|
|`date_sk`|datetime key in interger type, with yyyyMMdd format|
|`date`|datetime value in string type, with yyyy-MM-dd format|
|`day`|day of month|
|`day_short`|short string represent day value|
|`day_full`|full string represent day value|
|`month`|month value in integer|
|`month_short`|short string represent month value|
|`month_full`|full string represent month value|
|`quarter`|number of quater in a year|
|`year`|year value|


#### Step 5: Complete Project Write Up
##### 1. Technologies used in this project

I use mainly is pyspark to clean data, it has ability to handle big data, it can working with many different type of files and it is easy to use. For now it just running standalone. In order to utilize all its power, we can run this project with Spark cluster running distributed on YARN or using databricks platform.

In order to perform checking data quality, I use [pydeequ](https://github.com/awslabs/python-deequ), an open source python wrapper of Deequ (an open-source tool developed and used at Amazon). It is very powerful tool when you want to check data quality on big data.

##### 2. How often the data should be updated

For covid19 datasets, I think it is the best that dataset is updated daily, because it is a resonable time frame to measure all the numbers and events.

##### 3. How you would approach the problem differently under the following scenarios:
* The data was increased by 100x: Spark can easily handle that but with distributed mode and reasonable number of nodes.
* The data populates a dashboard that must be updated on a daily basis by 7am every day: We can use Apache Airflow to schedule the pipeline.
* The database needed to be accessed by 100+ people: We can use Amazon Redshift, it has ability to work with large number of people.
