# COVID-19 pandemic Insight
### Data Engineering Capstone Project

#### Project Summary
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Import all modules then config&setup Sparksession 
import os
import configparser
import pandas as pd

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, desc, asc, to_date, lit, year, month, dayofmonth
from pyspark.sql.types import StructType, StructField, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

output_path = './Project_Warehouse/'

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
This Capstone Project's purpose is to preparing data for analysis and finding significant conncections between many aspects the associates with COVID19 pandemic, such as covid19 situation in each countries, accesses to vaccines and effects from COVID19 which are unemployment in many country and trend of world's happiness index 

#### Describe and Gather Data 
The project consists of the following data :
* COVID-19 World Vaccination Progress Data by Our World in Data
* COVID-19 World Vaccine Adverse Reactions(VAERS) by FDA and CDC 
* Unemployment rate by OECD
* World Happiness Report

In [2]:
#Read all data from prepared dataset
country_covidStatus_path = './World_COVID19_vaccinations_dataset/country_covid_report.csv'
country_vac_by_man_path = './World_COVID19_vaccinations_dataset/country_vaccinations_by_manufacturer.csv'
vac_adverse_patinfo_path = './World_COVID19_vaccine_adverse_reaction/2021VAERSDATA.csv'
vac_adverse_syminfo_path = './World_COVID19_vaccine_adverse_reaction/2021VAERSSYMPTOMS.csv'
vac_adverse_vacinfo_path = './World_COVID19_vaccine_adverse_reaction/2021VAERSVAX.csv'
unemploy_path = './Unemployment_rate_OECD.json'
happiness_2018_report = './World_Happiness_Report/2018_report.csv'
happiness_2019_report = './World_Happiness_Report/2019_report.csv'
happiness_2020_report = './World_Happiness_Report/2020_report.csv'

df_country_covidStatus = spark.read.csv(country_covidStatus_path,header=True)
df_country_vacInfoManufacturer = spark.read.csv(country_vac_by_man_path,header=True)
df_vacAdverse_patInfo = spark.read.csv(vac_adverse_patinfo_path,header=True)
df_vacAdverse_symInfo = spark.read.csv(vac_adverse_syminfo_path,header=True)
df_vacAdverse_vacInfo = spark.read.csv(vac_adverse_vacinfo_path,header=True)
df_unemployRate = spark.read.json(unemploy_path, multiLine=True)
df_happiness_2018 = spark.read.csv(happiness_2018_report,header=True)
df_happiness_2019 = spark.read.csv(happiness_2019_report,header=True)
df_happiness_2020 = spark.read.csv(happiness_2020_report,header=True)

#### COVID-19 World Vaccination Progress Data
The dataset was obtained from Our World in Data : [Coronavirus (COVID-19) Vaccinations](https://ourworldindata.org/covid-vaccinations)

The complete COVID-19 dataset includes all historical data on the pandemic up to the date of publication.

In [3]:
df_country_covidStatus.limit(5).toPandas()

Unnamed: 0,iso_code,continent,location,date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,extreme_poverty,cardiovasc_death_rate,diabetes_prevalence,female_smokers,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,excess_mortality
0,AFG,Asia,Afghanistan,2020-02-24,1.0,1.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,
1,AFG,Asia,Afghanistan,2020-02-25,1.0,0.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,
2,AFG,Asia,Afghanistan,2020-02-26,1.0,0.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,
3,AFG,Asia,Afghanistan,2020-02-27,1.0,0.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,
4,AFG,Asia,Afghanistan,2020-02-28,1.0,0.0,,,,,...,,597.029,9.59,,,37.746,0.5,64.83,0.511,


Country level vaccination data is gathered and assembled in one single file, with manufacturers information, is included. Data shows the vaccines each country received increasing and sorted by date. 

In [4]:
df_country_vacInfoManufacturer.limit(5).toPandas()

Unnamed: 0,location,date,vaccine,total_vaccinations
0,Austria,2021-01-08,Johnson&Johnson,0
1,Austria,2021-01-08,Moderna,0
2,Austria,2021-01-08,Oxford/AstraZeneca,0
3,Austria,2021-01-08,Pfizer/BioNTech,31096
4,Austria,2021-01-15,Johnson&Johnson,0


#### COVID-19 World Vaccine Adverse Reactions(VAERS) by FDA and CDC 
This dataset is downloaded from [VAERS](https://vaers.hhs.gov/data/datasets.html?) datasets and for more details on the dataset refer to the [User Guide](https://vaers.hhs.gov/docs/VAERSDataUseGuide_November2020.pdf).

The Vaccine Adverse Event Reporting System (VAERS) was created by the Food and Drug Administration (FDA) and Centers for Disease Control and Prevention (CDC) to receive reports about adverse events that may be associated with vaccines. No prescription drug or biological product, such as a vaccine, is completely free from side effects. Vaccines protect many people from dangerous illnesses, but vaccines, like drugs, can cause side effects, a small percentage of which may be serious. VAERS is used to continually monitor reports to determine whether any vaccine or vaccine lot has a higher than expected rate of events.

In [5]:
df_vacAdverse_patInfo.limit(5).toPandas()

Unnamed: 0,VAERS_ID,RECVDATE,STATE,AGE_YRS,CAGE_YR,CAGE_MO,SEX,RPT_DATE,SYMPTOM_TEXT,DIED,...,CUR_ILL,HISTORY,PRIOR_VAX,SPLTTYPE,FORM_VERS,TODAYS_DATE,BIRTH_DEFECT,OFC_VISIT,ER_ED_VISIT,ALLERGIES
0,916600,01/01/2021,TX,33.0,33,,F,,Right side of epiglottis swelled up and hinder...,,...,,,,,2,01/01/2021,,Y,,Pcn and bee venom
1,916601,01/01/2021,CA,73.0,73,,F,,"""Approximately 30 min post vaccination adminis...",,...,Patient residing at nursing facility. See pati...,Patient residing at nursing facility. See pati...,,,2,01/01/2021,,Y,,"""""""Dairy"""""""
2,916602,01/01/2021,WA,23.0,23,,F,,"About 15 minutes after receiving the vaccine, ...",,...,,,,,2,01/01/2021,,,Y,Shellfish
3,916603,01/01/2021,WA,58.0,58,,F,,"extreme fatigue, dizziness,. could not lift my...",,...,kidney infection,"diverticulitis, mitral valve prolapse, osteoar...","got measles from measel shot, mums from mumps ...",,2,01/01/2021,,,,"Diclofenac, novacaine, lidocaine, pickles, tom..."
4,916604,01/01/2021,TX,47.0,47,,F,,"Injection site swelling, redness, warm to the ...",,...,Na,,,,2,01/01/2021,,,,Na


In [6]:
df_vacAdverse_symInfo.limit(5).toPandas()

Unnamed: 0,VAERS_ID,SYMPTOM1,SYMPTOMVERSION1,SYMPTOM2,SYMPTOMVERSION2,SYMPTOM3,SYMPTOMVERSION3,SYMPTOM4,SYMPTOMVERSION4,SYMPTOM5,SYMPTOMVERSION5
0,916600,Dysphagia,23.1,Epiglottitis,23.1,,,,,,
1,916601,Anxiety,23.1,Dyspnoea,23.1,,,,,,
2,916602,Chest discomfort,23.1,Dysphagia,23.1,Pain in extremity,23.1,Visual impairment,23.1,,
3,916603,Dizziness,23.1,Fatigue,23.1,Mobility decreased,23.1,,,,
4,916604,Injection site erythema,23.1,Injection site pruritus,23.1,Injection site swelling,23.1,Injection site warmth,23.1,,


In [7]:
df_vacAdverse_vacInfo.limit(5).toPandas()

Unnamed: 0,VAERS_ID,VAX_TYPE,VAX_MANU,VAX_LOT,VAX_DOSE_SERIES,VAX_ROUTE,VAX_SITE,VAX_NAME
0,916600,COVID19,MODERNA,037K20A,1,IM,LA,COVID19 (COVID19 (MODERNA))
1,916601,COVID19,MODERNA,025L20A,1,IM,RA,COVID19 (COVID19 (MODERNA))
2,916602,COVID19,PFIZER\BIONTECH,EL1284,1,IM,LA,COVID19 (COVID19 (PFIZER-BIONTECH))
3,916603,COVID19,MODERNA,unknown,UNK,,,COVID19 (COVID19 (MODERNA))
4,916604,COVID19,MODERNA,,1,IM,LA,COVID19 (COVID19 (MODERNA))


#### Unemployment Rate by OECD
The dataset was obtained from OECD : [Unemployment rate](https://data.oecd.org/unemp/unemployment-rate.htm)

The unemployed are people of working age who are without work, are available for work, and have taken specific steps to find work. The uniform application of this definition results in estimates of unemployment rates that are more internationally comparable than estimates based on national definitions of unemployment. This indicator is measured in numbers of unemployed people as a percentage of the labour force and it is seasonally adjusted. The labour force is defined as the total number of unemployed people plus those in employment. Data are based on labour force surveys (LFS).  For European Union countries where monthly LFS information is not available, the monthly unemployed figures are estimated by Eurostat.

In [8]:
df_unemployRate.limit(5).toPandas()

Unnamed: 0,FREQUENCY,Flag Codes,INDICATOR,LOCATION,MEASURE,SUBJECT,TIME,Value
0,M,,HUR,AUS,PC_LF,TOT,2005-01,5.074622
1,M,,HUR,AUS,PC_LF,TOT,2005-02,5.085431
2,M,,HUR,AUS,PC_LF,TOT,2005-03,5.163756
3,M,,HUR,AUS,PC_LF,TOT,2005-04,5.123358
4,M,,HUR,AUS,PC_LF,TOT,2005-05,5.100113


#### Unemployment Rate by OECD
The dataset was obtained from Kaggle Dataset : [World Happiness Report](https://www.kaggle.com/yamaerenay/world-happiness-report-preprocessed?select=2020_report.csv)

This dataset contains the happiness score of each country, and some key factors that contribute directly to the overall happiness of the country which are economic production, social support, life expectancy, freedom, generosity, and absence of corruption. The data source has information from 2018 to 2020.

In [9]:
df_happiness_2018.limit(5).toPandas()

Unnamed: 0,country,happiness_score,gdp_per_capita,social_support,health,freedom,generosity,government_trust,continent
0,Finland,7.6320000000000014,1.305,1.592,0.8740000000000001,0.6809999999999999,0.202,0.393,Europe
1,Norway,7.5939999999999985,1.456,1.582,0.861,0.6859999999999999,0.286,0.34,Europe
2,Denmark,7.555,1.351,1.59,0.868,0.6829999999999999,0.284,0.408,Europe
3,Iceland,7.495,1.3430000000000002,1.644,0.914,0.677,0.353,0.138,Europe
4,Switzerland,7.487,1.42,1.5490000000000002,0.927,0.66,0.256,0.357,Europe


_________________

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.
* Some dataframe have missing values in important column the will be used further
* Every dataframe collect data in string type
* Date format are different in each dataframe
* Vaccine name format are different in each dataframe

#### Cleaning Steps
* Scope and extract intersted column from each dataframe into specified table
* Drop rows with missing values in key columns
* Correct the data type of each columns
* Format the date column
* Format the vaccine name column
* Add date column to each World Happiness Report file and aggregate into a table


In [10]:
# Determine columns for each table

country_covidStatus_col = ['date','iso_code','location','total_cases','new_cases','total_deaths','new_deaths','total_tests','new_tests','population','population_density','median_age']
country_vacInfoManufacturer_col = ['location','date','vaccine','total_vaccinations']
vac_adverse_vacinfo_col = ['VAERS_ID','VAX_NAME','VAX_MANU','VAX_DOSE_SERIES','VAX_ROUTE','VAX_SITE']
vac_adverse_patinfo_col = ['VAERS_ID','AGE_YRS','SEX','DISABLE','ALLERGIES','VAX_DATE','RECOVD','DIED','DATEDIED','HISTORY']
vac_adverse_syminfo_col = ['VAERS_ID','SYMPTOM1','SYMPTOM2','SYMPTOM3','SYMPTOM4','SYMPTOM5']

# Extract specified columns from each dataframe to table

country_covidStatus_table = df_country_covidStatus.selectExpr(country_covidStatus_col)
country_vacInfoManufacturer_table = df_country_vacInfoManufacturer.selectExpr(country_vacInfoManufacturer_col)
vacAdverse_vacInfo_table = df_vacAdverse_vacInfo.selectExpr(vac_adverse_vacinfo_col)
vacAdverse_patInfo_table = df_vacAdverse_patInfo.selectExpr(vac_adverse_patinfo_col)
vacAdverse_symInfo_table = df_vacAdverse_symInfo.selectExpr(vac_adverse_syminfo_col)

# Drop rows with missing important value

country_covidStatus_table.dropna(how = "any", subset = ["date", "iso_code", "location"])
country_vacInfoManufacturer_table.dropna(how = "any", subset = ["date","location"])
vacAdverse_vacInfo_table.dropna(how = "any", subset = ["VAERS_ID","VAX_MANU"])
vacAdverse_patInfo_table.dropna(how = "any", subset = ["VAERS_ID"])
vacAdverse_symInfo_table.dropna(how = "any", subset = ["VAERS_ID"])

# Correct data type of some table's column

country_covidStatus_table = country_covidStatus_table.withColumn('date',col('date').cast(Date()))
country_covidStatus_table = country_covidStatus_table.withColumn('total_cases',col('total_cases').cast(Int()))
country_covidStatus_table = country_covidStatus_table.withColumn('new_cases',col('new_cases').cast(Int()))
country_covidStatus_table = country_covidStatus_table.withColumn('total_deaths',col('total_deaths').cast(Int()))
country_covidStatus_table = country_covidStatus_table.withColumn('new_deaths',col('new_deaths').cast(Int()))
country_covidStatus_table = country_covidStatus_table.withColumn('total_tests',col('total_tests').cast(Int()))
country_covidStatus_table = country_covidStatus_table.withColumn('new_tests',col('new_tests').cast(Int()))
country_covidStatus_table = country_covidStatus_table.withColumn('population',col('population').cast(Int()))
country_covidStatus_table = country_covidStatus_table.withColumn('population_density',col('population_density').cast(Dbl()))
country_covidStatus_table = country_covidStatus_table.withColumn('median_age',col('median_age').cast(Dbl()))

country_vacInfoManufacturer_table = country_vacInfoManufacturer_table.withColumn('date',col('date').cast(Date()))
country_vacInfoManufacturer_table = country_vacInfoManufacturer_table.withColumn('total_vaccinations',col('total_vaccinations').cast(Int()))

vacAdverse_patInfo_table = vacAdverse_patInfo_table.withColumn('AGE_YRS',col('AGE_YRS').cast(Int()))

unemployRate_table = df_unemployRate.withColumn('Value',col('Value').cast(Dbl()))

happinessSchema = StructType([
    StructField('country',Str()),
    StructField('happiness_score',Dbl()),
    StructField('gdp_per_capita',Dbl()),
    StructField('social_support',Dbl()),
    StructField('health',Dbl()),
    StructField('freedom',Dbl()),
    StructField('generosity',Dbl()),
    StructField('government_trust',Dbl()),
    StructField('continent',Str())
])
happiness_2018_table = spark.read.csv(happiness_2018_report,header=True,sep=',',schema=happinessSchema)
happiness_2019_table = spark.read.csv(happiness_2019_report,header=True,sep=',',schema=happinessSchema)
happiness_2020_table = spark.read.csv(happiness_2020_report,header=True,sep=',',schema=happinessSchema)

# Format the date data

vacAdverse_patInfo_table = vacAdverse_patInfo_table.withColumn('VAX_DATE',to_date(col('VAX_DATE'),'MM/dd/yyyy'))
vacAdverse_patInfo_table = vacAdverse_patInfo_table.withColumn('DATEDIED',to_date(col('DATEDIED'),'MM/dd/yyyy'))

unemployRate_table = unemployRate_table.withColumn('TIME',to_date(col('TIME'),'yyyy-MM'))
unemployRate_table = unemployRate_table.drop(col('Flag Codes'))
unemployRate_table = unemployRate_table.withColumnRenamed("LOCATION", "country_code")

In [11]:
# Add date column to each World Happiness Report file and integrate into a table

happiness_2018_table = happiness_2018_table.withColumn('date',lit('2018')).withColumn('date',to_date(col('date'),'yyyy'))
happiness_2019_table = happiness_2019_table.withColumn('date',lit('2019')).withColumn('date',to_date(col('date'),'yyyy'))
happiness_2020_table = happiness_2020_table.withColumn('date',lit('2020')).withColumn('date',to_date(col('date'),'yyyy'))
happinessRate_table = happiness_2018_table.union(happiness_2019_table).distinct()
happinessRate_table = happinessRate_table.union(happiness_2020_table).distinct()

In [12]:
# Check distinct vaccine names in country_vacInfoManufacturer_table

country_vacInfoManufacturer_table.select('vaccine').dropDuplicates().toPandas()

Unnamed: 0,vaccine
0,Oxford/AstraZeneca
1,Sinovac
2,Johnson&Johnson
3,CanSino
4,Sputnik V
5,Moderna
6,Pfizer/BioNTech
7,Sinopharm/Beijing


In [13]:
# Check distinct vaccine names in vacAdverse_vacInfo_table

vacAdverse_vacInfo_table.select('VAX_NAME').dropDuplicates().limit(10).toPandas()

Unnamed: 0,VAX_NAME
0,HIB (PEDVAXHIB)
1,MENINGOCOCCAL CONJUGATE (MENACTRA)
2,COVID19 (COVID19 (PFIZER-BIONTECH))
3,INFLUENZA (SEASONAL) (FLUZONE)
4,DTAP + HEPB + IPV (PEDIARIX)
5,HPV (GARDASIL 9)
6,VACCINE NOT SPECIFIED (NO BRAND NAME)
7,ANTHRAX (BIOTHRAX)
8,ROTAVIRUS (ROTATEQ)
9,VARICELLA (VARIVAX)


In [14]:
vacAdverse_vacInfo_table.select('VAX_NAME').dropDuplicates().filter(col("VAX_NAME").contains("COVID")).toPandas()

Unnamed: 0,VAX_NAME
0,COVID19 (COVID19 (PFIZER-BIONTECH))
1,COVID19 (COVID19 (UNKNOWN))
2,COVID19 (COVID19 (MODERNA))
3,COVID19 (COVID19 (JANSSEN))


In [15]:
# Format vaccine's name and remove all the adverse reaction not from Covid-19 vaccine
vacAdverse_vacInfo_table = vacAdverse_vacInfo_table.where(col('VAX_NAME').contains('COVID19'))
vacAdverse_vacInfo_table = vacAdverse_vacInfo_table.na.replace("COVID19 (COVID19 (UNKNOWN))","Unknown")
vacAdverse_vacInfo_table = vacAdverse_vacInfo_table.na.replace("COVID19 (COVID19 (PFIZER-BIONTECH))","Pfizer/BioNTech")
vacAdverse_vacInfo_table = vacAdverse_vacInfo_table.na.replace("COVID19 (COVID19 (MODERNA))","Moderna")
vacAdverse_vacInfo_table = vacAdverse_vacInfo_table.na.replace("COVID19 (COVID19 (JANSSEN))","Johnson&Johnson")

### Recheck the schema and data inside

In [16]:
country_covidStatus_table.printSchema()
country_vacInfoManufacturer_table.printSchema()
vacAdverse_vacInfo_table.printSchema()
vacAdverse_patInfo_table.printSchema()
vacAdverse_symInfo_table.printSchema()
unemployRate_table.printSchema()
happinessRate_table.printSchema()

root
 |-- date: date (nullable = true)
 |-- iso_code: string (nullable = true)
 |-- location: string (nullable = true)
 |-- total_cases: integer (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- total_deaths: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- total_tests: integer (nullable = true)
 |-- new_tests: integer (nullable = true)
 |-- population: integer (nullable = true)
 |-- population_density: double (nullable = true)
 |-- median_age: double (nullable = true)

root
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- vaccine: string (nullable = true)
 |-- total_vaccinations: integer (nullable = true)

root
 |-- VAERS_ID: string (nullable = true)
 |-- VAX_NAME: string (nullable = true)
 |-- VAX_MANU: string (nullable = true)
 |-- VAX_DOSE_SERIES: string (nullable = true)
 |-- VAX_ROUTE: string (nullable = true)
 |-- VAX_SITE: string (nullable = true)

root
 |-- VAERS_ID: string (nullable = true)
 |-- AGE_YRS: in

In [17]:
country_covidStatus_table.limit(10).toPandas()

Unnamed: 0,date,iso_code,location,total_cases,new_cases,total_deaths,new_deaths,total_tests,new_tests,population,population_density,median_age
0,2020-02-24,AFG,Afghanistan,1,1,,,,,38928341,54.422,18.6
1,2020-02-25,AFG,Afghanistan,1,0,,,,,38928341,54.422,18.6
2,2020-02-26,AFG,Afghanistan,1,0,,,,,38928341,54.422,18.6
3,2020-02-27,AFG,Afghanistan,1,0,,,,,38928341,54.422,18.6
4,2020-02-28,AFG,Afghanistan,1,0,,,,,38928341,54.422,18.6
5,2020-02-29,AFG,Afghanistan,1,0,,,,,38928341,54.422,18.6
6,2020-03-01,AFG,Afghanistan,1,0,,,,,38928341,54.422,18.6
7,2020-03-02,AFG,Afghanistan,1,0,,,,,38928341,54.422,18.6
8,2020-03-03,AFG,Afghanistan,2,1,,,,,38928341,54.422,18.6
9,2020-03-04,AFG,Afghanistan,4,2,,,,,38928341,54.422,18.6


In [18]:
country_vacInfoManufacturer_table.limit(10).toPandas()

Unnamed: 0,location,date,vaccine,total_vaccinations
0,Austria,2021-01-08,Johnson&Johnson,0
1,Austria,2021-01-08,Moderna,0
2,Austria,2021-01-08,Oxford/AstraZeneca,0
3,Austria,2021-01-08,Pfizer/BioNTech,31096
4,Austria,2021-01-15,Johnson&Johnson,0
5,Austria,2021-01-15,Moderna,90
6,Austria,2021-01-15,Oxford/AstraZeneca,0
7,Austria,2021-01-15,Pfizer/BioNTech,115843
8,Austria,2021-01-22,Johnson&Johnson,0
9,Austria,2021-01-22,Moderna,305


In [19]:
vacAdverse_vacInfo_table.limit(10).toPandas()

Unnamed: 0,VAERS_ID,VAX_NAME,VAX_MANU,VAX_DOSE_SERIES,VAX_ROUTE,VAX_SITE
0,916600,Moderna,MODERNA,1,IM,LA
1,916601,Moderna,MODERNA,1,IM,RA
2,916602,Pfizer/BioNTech,PFIZER\BIONTECH,1,IM,LA
3,916603,Moderna,MODERNA,UNK,,
4,916604,Moderna,MODERNA,1,IM,LA
5,916606,Moderna,MODERNA,1,IM,LA
6,916607,Moderna,MODERNA,UNK,IM,LA
7,916608,Moderna,MODERNA,1,IM,LA
8,916609,Moderna,MODERNA,1,IM,LA
9,916610,Moderna,MODERNA,1,SYR,LA


In [20]:
vacAdverse_patInfo_table.limit(10).toPandas()

Unnamed: 0,VAERS_ID,AGE_YRS,SEX,DISABLE,ALLERGIES,VAX_DATE,RECOVD,DIED,DATEDIED,HISTORY
0,916600,33,F,,Pcn and bee venom,2020-12-28,Y,,,
1,916601,73,F,,"""""""Dairy""""""",2020-12-31,Y,,,Patient residing at nursing facility. See pati...
2,916602,23,F,,Shellfish,2020-12-31,U,,,
3,916603,58,F,,"Diclofenac, novacaine, lidocaine, pickles, tom...",2020-12-23,Y,,,"diverticulitis, mitral valve prolapse, osteoar..."
4,916604,47,F,,Na,2020-12-22,N,,,
5,916606,44,F,,iodine (shellfish) has epipen,2020-12-29,Y,,,
6,916607,50,M,,Penicillin,2020-12-28,Y,,,"High blood pressure, high cholesterol, sleep a..."
7,916608,33,M,,,2020-12-29,,,,
8,916609,71,F,,"Sulfa antibiotics, azithromycin, adhesive in ...",2020-12-23,N,,,"Hashimoto's thyroiditis, Hypertension, depression"
9,916610,18,F,,jackfruit,2020-12-29,N,,,


In [21]:
vacAdverse_symInfo_table.limit(10).toPandas()

Unnamed: 0,VAERS_ID,SYMPTOM1,SYMPTOM2,SYMPTOM3,SYMPTOM4,SYMPTOM5
0,916600,Dysphagia,Epiglottitis,,,
1,916601,Anxiety,Dyspnoea,,,
2,916602,Chest discomfort,Dysphagia,Pain in extremity,Visual impairment,
3,916603,Dizziness,Fatigue,Mobility decreased,,
4,916604,Injection site erythema,Injection site pruritus,Injection site swelling,Injection site warmth,
5,916606,Pharyngeal swelling,,,,
6,916607,Abdominal pain,Chills,Sleep disorder,,
7,916608,Diarrhoea,Nasal congestion,,,
8,916609,Vaccination site erythema,Vaccination site pruritus,Vaccination site swelling,,
9,916610,Rash,Urticaria,,,


In [22]:
unemployRate_table.limit(10).toPandas()

Unnamed: 0,FREQUENCY,INDICATOR,country_code,MEASURE,SUBJECT,TIME,Value
0,M,HUR,AUS,PC_LF,TOT,2005-01-01,5.074622
1,M,HUR,AUS,PC_LF,TOT,2005-02-01,5.085431
2,M,HUR,AUS,PC_LF,TOT,2005-03-01,5.163756
3,M,HUR,AUS,PC_LF,TOT,2005-04-01,5.123358
4,M,HUR,AUS,PC_LF,TOT,2005-05-01,5.100113
5,M,HUR,AUS,PC_LF,TOT,2005-06-01,4.950796
6,M,HUR,AUS,PC_LF,TOT,2005-07-01,4.972756
7,M,HUR,AUS,PC_LF,TOT,2005-08-01,4.902418
8,M,HUR,AUS,PC_LF,TOT,2005-09-01,5.003174
9,M,HUR,AUS,PC_LF,TOT,2005-10-01,5.01636


In [23]:
happinessRate_table.limit(10).toPandas()

Unnamed: 0,country,happiness_score,gdp_per_capita,social_support,health,freedom,generosity,government_trust,continent,date
0,Peru,5.663,0.934,1.249,0.674,0.53,0.092,0.034,South America,2018-01-01
1,Myanmar,4.308,0.682,1.174,0.429,0.58,0.598,0.178,Asia,2018-01-01
2,Uzbekistan,5.987,0.73591,1.1681,0.50163,0.60848,0.28333,0.34326,2.34638,2019-01-01
3,Italy,5.977,1.35495,1.04167,0.85102,0.18827,0.02556,0.16684,2.34918,2019-01-01
4,Estonia,5.739,1.2,1.532,0.737,0.553,0.086,0.174,Europe,2018-01-01
5,Chile,6.705,1.2167,0.90587,0.81883,0.37789,0.11451,0.31595,2.95505,2019-01-01
6,Kosovo,6.3252,0.840481,1.183963,0.672709,0.55728,0.325287,0.008559,2.736902714,2020-01-01
7,Haiti,3.7208,0.284734,0.646671,0.374367,0.169298,0.46391,0.161936,1.619916916,2020-01-01
8,Guinea,3.964,0.344,0.792,0.211,0.394,0.185,0.094,South America,2018-01-01
9,Argentina,6.65,1.15137,1.06612,0.69711,0.42284,0.07296,0.10989,3.12985,2019-01-01


_________________

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

### Schema:

|          Table name         |                                                                Columns                                                                |                             Description                            |       Type      |
|:---------------------------:|:-------------------------------------------------------------------------------------------------------------------------------------:|:------------------------------------------------------------------:|:---------------:|
|     country_covidStatus     | date,iso_code,location,total_cases,new_cases, total_deaths,new_deaths,total_tests,new_tests, population,population_density,median_age |      stores covid-19 situation and information of each country     |    fact table   |
| country_vacInfoManufacturer |                                                location,date,vaccine,total_vaccinations                                               |            stores covid-19 vaccinations of each country            |    fact table   |
|      vacAdverse_vacInfo     |                                     VAERS_ID,VAX_NAME,VAX_MANU,VAX_DOSE_SERIES,VAX_ROUTE,VAX_SITE                                     | stores vaccine information of patients who has an adverse reaction | dimension table |
|      vacAdverse_patInfo     |                             VAERS_ID,AGE_YRS,SEX,DISABLE,ALLERGIES,VAX_DATE, RECOVD,DIED,DATEDIED,HISTORY                             |       stores patients information who has an adverse reaction      | dimension table |
|      vacAdverse_symInfo     |                                         VAERS_ID,SYMPTOM1,SYMPTOM2,SYMPTOM3,SYMPTOM4,SYMPTOM5                                         |         stores patients's adverse reaction from the vaccine        | dimension table |
|       unemploymentRate      |                                      country_code,INDICATOR,SUBJECT,MEASURE,FREQUENCY,TIME,Value                                     |              stores unemployment rate of each country              | dimension table |
|        happinessRate        |            country,happiness_score,gdp_per_capita,social_support, health,freedom,generosity,government_trust,continent,date           |                stores happiness rate of each country               | dimension table |
|  unemploy_CovidStatus |      date,iso_code,location,Value,total_cases,total_deaths,total_tests,population      | stores information about unemployment rate  and COVID-19 situation in each country | fact table |
| happiness_CovidStatus | date,iso_code,location,happiness_score,total_cases,total_deaths,total_tests,population |   stores information about happiness rate  and COVID-19 situation in each country  | fact table |

#### Tables Decision:

As stated above about the objectives of the project that is to find significant connections between diverse aspects about COVID-19 pandemic, So we must have the country_covidStatus and country_vacInfoManufacturer tables to store the key information and then we can integrate them with other aspects information to find data consistency. to do so efficiently, we need identifiers on all tables so they can be joined efficiently such as the date, iso_code, country name, vaccine name etc.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Reads all the data from dataset using spark
2. Extracts data from datframe and transfroms to described schema
3. Aggregates data to find insight relation between each tables
3. writes the data to destination(in this case is local) in the parquet format.

_________________

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### After the prepartion of dataset into formatted schema, next step is to join and insert into remaining tables

In [24]:
# merge unemployRate_table with country_covidStatus_table

unemploy_CovidStatus_col = ['date','iso_code','location','Value','total_cases','total_deaths','total_tests','population']
unemploy_CovidStatus = unemployRate_table.join(country_covidStatus_table,( (year(col('TIME')) == year(col('date'))) & (month(col('TIME')) == month(col('date'))) & (col('country_code') == col('iso_code')) & (dayofmonth(col('date'))==1) ))
unemploy_CovidStatus = unemploy_CovidStatus.selectExpr(unemploy_CovidStatus_col).withColumnRenamed('Value','unemployed_rate')
unemploy_CovidStatus.limit(10).toPandas()

Unnamed: 0,date,iso_code,location,unemployed_rate,total_cases,total_deaths,total_tests,population
0,2020-02-01,AUS,Australia,5.134253,12,,,25499881
1,2020-03-01,AUS,Australia,5.274256,27,1.0,,25499881
2,2020-04-01,AUS,Australia,6.379705,4862,20.0,,25499881
3,2020-05-01,AUS,Australia,7.020848,6778,93.0,588868.0,25499881
4,2020-06-01,AUS,Australia,7.357104,7221,102.0,1472220.0,25499881
5,2020-07-01,AUS,Australia,7.447178,8001,104.0,2505923.0,25499881
6,2020-08-01,AUS,Australia,6.756708,17895,208.0,4307224.0,25499881
7,2020-09-01,AUS,Australia,6.855995,25923,663.0,6255797.0,25499881
8,2020-10-01,AUS,Australia,6.941253,27109,890.0,7679651.0,25499881
9,2020-11-01,AUS,Australia,6.838203,27601,907.0,8825186.0,25499881


In [25]:
# merge happinessRate_table with country_covidStatus_table

happiness_CovidStatus_col = ['date','iso_code','location','happiness_score','total_cases','total_deaths','total_tests','population']
happinessRate_table = happinessRate_table.withColumnRenamed('date','Datetime')
happiness_CovidStatus = happinessRate_table.join(country_covidStatus_table,( (col('country')==col('location')) & (year(col('Datetime'))==year(col('date'))) ),'left')
happiness_CovidStatus = happiness_CovidStatus.selectExpr(happiness_CovidStatus_col)
#print(happiness_CovidStatus.count())
happiness_CovidStatus.dropDuplicates().limit(10).toPandas()

Unnamed: 0,date,Datetime,iso_code,location,happiness_score,total_cases,total_deaths,total_tests,population
0,2020-07-29,2020-01-01,OWID_KOS,Kosovo,6.3252,7846,196.0,,1932774
1,2020-11-09,2020-01-01,HTI,Haiti,3.7208,9127,232.0,,11402533
2,2020-09-13,2020-01-01,HTI,Haiti,3.7208,8457,216.0,,11402533
3,2020-05-11,2020-01-01,HTI,Haiti,3.7208,151,12.0,,11402533
4,2020-03-21,2020-01-01,HTI,Haiti,3.7208,2,,,11402533
5,2020-03-15,2020-01-01,ARM,Armenia,4.6768,26,,577.0,2963234
6,2020-10-11,2020-01-01,IRL,Ireland,7.0937,42528,1826.0,1327687.0,4937796
7,2020-09-09,2020-01-01,IRL,Ireland,7.0937,30164,1781.0,924118.0,4937796
8,2020-03-11,2020-01-01,IRL,Ireland,7.0937,43,1.0,,4937796
9,2020-11-22,2020-01-01,KHM,Cambodia,4.8484,306,,,16718971


#### After insert into every table , then write all table in Project_Warehouse folder in parquet format

In [26]:
# Write every single tables in formatted schema and determine how to partition each table

country_covidStatus_table.write.mode('overwrite').partitionBy('iso_code').parquet(path=output_path+'country_covidStatus_table')
country_vacInfoManufacturer_table.write.mode('overwrite').partitionBy('location').parquet(path=output_path+'country_vacInfoManufacturer_table')
vacAdverse_vacInfo_table.write.mode('overwrite').partitionBy('VAX_MANU').parquet(path=output_path+'vacAdverse_vacInfo_table')
vacAdverse_patInfo_table.write.mode('overwrite').partitionBy('VAX_DATE').parquet(path=output_path+'vacAdverse_patInfo_table')
vacAdverse_symInfo_table.write.mode('overwrite').parquet(path=output_path+'vacAdverse_symInfo_table')
unemployRate_table.write.mode('overwrite').partitionBy('country_code').parquet(path=output_path+'unemployRate_table')
happinessRate_table.write.mode('overwrite').partitionBy('country').parquet(path=output_path+'happinessRate_table')
unemploy_CovidStatus.write.mode('overwrite').partitionBy('iso_code').parquet(path=output_path+'unemploy_CovidStatus')
happiness_CovidStatus.write.mode('overwrite').partitionBy('iso_code').parquet(path=output_path+'happiness_CovidStatus')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [27]:
# Check reading all data data that stored in parquet format from warehouse

country_covidStatus_table = spark.read.parquet(output_path+"country_covidStatus_table")
country_vacInfoManufacturer_table = spark.read.parquet(output_path+"country_vacInfoManufacturer_table")
vacAdverse_vacInfo_table = spark.read.parquet(output_path+"vacAdverse_vacInfo_table")
vacAdverse_patInfo_table = spark.read.parquet(output_path+"vacAdverse_patInfo_table")
vacAdverse_symInfo_table = spark.read.parquet(output_path+"vacAdverse_symInfo_table")
unemployRate_table = spark.read.parquet(output_path+"unemployRate_table")
happinessRate_table = spark.read.parquet(output_path+"happinessRate_table")
unemploy_CovidStatus = spark.read.parquet(output_path+"unemploy_CovidStatus")
happiness_CovidStatus = spark.read.parquet(output_path+"happiness_CovidStatus")

In [28]:
# Do the count test in every table

if (country_covidStatus_table.count() < 100):
    print('country_covidStatus_table count test not passed')
if (country_vacInfoManufacturer_table.count() < 100):
    print('country_vacInfoManufacturer_table count test not passed')
if (vacAdverse_vacInfo_table.count() < 100):
    print('vacAdverse_vacInfo_table count test not passed')
if (vacAdverse_patInfo_table.count() < 100):
    print('vacAdverse_patInfo_table count test not passed')
if (vacAdverse_symInfo_table.count() < 100):
    print('vacAdverse_symInfo_table count test not passed')
if (unemployRate_table.count() < 100):
    print('unemployRate_table count test not passed')
if (happinessRate_table.count() < 100):
    print('happinessRate_table count test not passed')
if (unemploy_CovidStatus.count() < 100):
    print('unemploy_CovidStatus count test not passed')
if (unemploy_CovidStatus.count() < 100):
    print('unemploy_CovidStatus count test not passed')
print('COUNT TESTS ALL PASSED!')

COUNT TESTS ALL PASSED!


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Example for user to do the queries

* If user wish to know the consistency between unemployment rate and COVID-19 situation of Austria, then user can run the following query.


In [33]:
unemploy_CovidStatus.createOrReplaceTempView('unemployAndCovidStatus')
spark.sql("""SELECT * FROM unemployAndCovidStatus WHERE iso_code = 'AUS' ORDER BY date ASC""").show()

+----------+---------+---------------+-----------+------------+-----------+----------+--------+
|      date| location|unemployed_rate|total_cases|total_deaths|total_tests|population|iso_code|
+----------+---------+---------------+-----------+------------+-----------+----------+--------+
|2020-02-01|Australia|       5.134253|         12|        null|       null|  25499881|     AUS|
|2020-03-01|Australia|       5.274256|         27|           1|       null|  25499881|     AUS|
|2020-04-01|Australia|       6.379705|       4862|          20|       null|  25499881|     AUS|
|2020-05-01|Australia|       7.020848|       6778|          93|     588868|  25499881|     AUS|
|2020-06-01|Australia|       7.357104|       7221|         102|    1472220|  25499881|     AUS|
|2020-07-01|Australia|       7.447178|       8001|         104|    2505923|  25499881|     AUS|
|2020-08-01|Australia|       6.756708|      17895|         208|    4307224|  25499881|     AUS|
|2020-09-01|Australia|       6.855995|  

* If user wish to know the consistency between happiness rate and COVID-19 situation of Haiti, then user can run the following query.

In [35]:
happiness_CovidStatus.createOrReplaceTempView('happinessAndCovidStatus')
spark.sql("""SELECT * FROM happinessAndCovidStatus WHERE iso_code = 'HTI' ORDER BY date ASC""").show()

+----------+----------+--------+---------------+-----------+------------+-----------+----------+--------+
|      date|  Datetime|location|happiness_score|total_cases|total_deaths|total_tests|population|iso_code|
+----------+----------+--------+---------------+-----------+------------+-----------+----------+--------+
|2020-03-20|2020-01-01|   Haiti|    3.720799923|          2|        null|       null|  11402533|     HTI|
|2020-03-21|2020-01-01|   Haiti|    3.720799923|          2|        null|       null|  11402533|     HTI|
|2020-03-22|2020-01-01|   Haiti|    3.720799923|          2|        null|       null|  11402533|     HTI|
|2020-03-23|2020-01-01|   Haiti|    3.720799923|          2|        null|       null|  11402533|     HTI|
|2020-03-24|2020-01-01|   Haiti|    3.720799923|          7|        null|       null|  11402533|     HTI|
|2020-03-25|2020-01-01|   Haiti|    3.720799923|          7|        null|       null|  11402533|     HTI|
|2020-03-26|2020-01-01|   Haiti|    3.72079992

_________________


#### Step 5: Complete Project Write Up
#### choice of tools and technologies for the project

        In this project I used Spark to visualize data because it's easier to do schema on read and I also use Spark to clean the data as well, At a later stage, I recommend using Spark to process the data with the better environment and support larger dataset such as Amazon Elastic Map Reduce (EMR). Also, to perform automated updates, I recommend integrating the ETL pipeline into an Airflow DAG.
    
        The Jupyter Notebook is used to show the steps how I structured the project and easier to markdown and explain in each step about what I did. Apart from this, Python is an often used programming language and was used because it is the language I am the most comfortable with.
    
    
#### how often the data should be updated and why

        The COVID-19 situation still continues to occur, and the vaccination and other informations are also necessary to assess and deal with such situations. thus, In my opinion. monthly update is what I recommended the most.
        
        
#### How I would handle the following sceanarios :
 * The data was increased by 100x.
     * Use Spark to process the data efficiently in a distributed way e.g. with EMR.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * Use Airflow and create a DAG that performs the logic of the described pipeline.
 * The database needed to be accessed by 100+ people.
     * Use RedShift to have the data stored in a way that it can efficiently be accessed by many people.

_________________