# Covid-19 Data Collection

## Data Engineering Capstone Project


### Project Summary
During the pandemic of 2020, the ability to track and use data to ensure the safety of others is/was crucial. This project takes some of that data collected by others, and houses it in a cluster on AWS. The benefit of this collection would allow others to use this data in its normalized format to perform analytical queries. For ease of use during this ETL process the data was downloaded from the corresponding sites and loaded directly onto my AWS S3 bucket. The data itself was pulled from the two source listed below. Additionally, when combined in tabular form this data was over a million rows meeting the requirement of the project. I will be limiting my scope to that of the United States, which will be filtered out later during the ETL process.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
## Using Version Python 3.7.6
# This imports all the necessary packages that will be used during this main ETL script
import pandas as pd
import json  
import os
import urllib.request
import boto 
import sys
import boto3
import configparser
import numpy as np
import psycopg2
from sql_queries import counting_audit_queries
from IPython.display import Image



# Gets configuration file used here for S3 Bucket specifically
config = configparser.ConfigParser()
config.read('dl.cfg')

# Gets current directory location used several time through this project
currentDir = os.getcwd()

### Step 1: Scope the Project and Gather Data

#### Scope and Description
##### Google Community Mobility Reports
This data aims to provide insights into how movement has shifted and changed throughout the pandemic. The data was collected through Google's store of location data throughout the world and was downloaded as a CSV file. In tabular form this data is approximately 470,000 rows of data (at the time of ingestion.) If you would like to know more about this data set, please visit Google's page [here.](https://www.google.com/covid19/mobility/)


##### AWS Covid 19 Data Lake
This data is an open source data lake collected by various parties and loaded onto AWS. The data itself contains population, confirmed and death cases data. I used tableau's collection of three Json files. In tabular form this data would be approximately 940,000 rows of data (at the time of ingestion.) It appears to have been gathered by John Hopkins University using a robotic collect and store methodology. You can find out more about the collection means by John Hopkins University [here.](https://github.com/CSSEGISandData/COVID-19) Additionally, you can find the actual data lake where the Json files were downloaded at [this AWS location.](https://dj2taa9i652rf.cloudfront.net/)


##### Tools and plan
I plan to take the data from my local computer and load the files into S3. From there I will be doing some light transformation on my local machine just for formatting purposes. The rest of the transformation process will take place in AWS Redshift (cloud data warehouse solution.) The data will be in a normalized for hypothetical data analysis purposes. 

##### Getting Started
To get started I downloaded the files above and placed them in my Jupyter Notebook directory. I have them placed in this folder for convenience. 

* The steps to find the John Hopkins data once clicking the link is to navigate to the following directory: covid19-lake/ tableau-jhu / json  -- Note: I would discourage downloading this data as it is subject to column change often and should use what is already downloaded. You will see later where column adjustment had to be made
* To download the Google data from the link mentioned above, simply click the "Download global CSV" button


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identifying data quality issues, like missing values, duplicate data, etc....

#### Google Mobility Exploration

In [2]:
# Exploring the Google Global Mobility Report
# The CSV is huge, in order to view the data complete I chunked the data into a dataframe
fullGoogleData = pd.DataFrame([])

for df in pd.read_csv('Global_Mobility_Report.csv', iterator=True, chunksize=1000):
    fullGoogleData = fullGoogleData.append(df)
fullGoogleData.head()

Unnamed: 0,country_region_code,country_region,sub_region_1,sub_region_2,date,retail_and_recreation_percent_change_from_baseline,grocery_and_pharmacy_percent_change_from_baseline,parks_percent_change_from_baseline,transit_stations_percent_change_from_baseline,workplaces_percent_change_from_baseline,residential_percent_change_from_baseline
0,AE,United Arab Emirates,,,2020-02-15,0.0,4.0,5.0,0.0,2.0,1.0
1,AE,United Arab Emirates,,,2020-02-16,1.0,4.0,4.0,1.0,2.0,1.0
2,AE,United Arab Emirates,,,2020-02-17,-1.0,1.0,5.0,1.0,2.0,1.0
3,AE,United Arab Emirates,,,2020-02-18,-2.0,1.0,5.0,0.0,2.0,1.0
4,AE,United Arab Emirates,,,2020-02-19,-2.0,0.0,4.0,-1.0,2.0,1.0


In [3]:
# First checking row count and then de duplicating to see if this will be a necessary part of clean up later
rowNum = len(fullGoogleData.index)
print('Number of rows present in Google Global Mobility Report:', rowNum)
fullGoogleData.drop_duplicates() 
rowNum2 = len(fullGoogleData.index)
print('Total number of rows deduplicated:', rowNum-rowNum2)

Number of rows present in Google Global Mobility Report: 477322
Total number of rows deduplicated: 0


##### Google Mobility Data Assessment

Generally speaking, the Google data largely doesn't have any data quality issues. The primary concern is that there are many null values which can produce problem in data analysis. I will be removing a few rows with a null value upon insert into RedShift. Additionally I will handle other nulls to ensure that a good composite key will be enforced, which also helps analytics queries later. The "County" string at the end of the county column will also need handled to ensure a good composite key later and a match between the two tables. This table is a pretty straight forward load. I will be filtering by the United State as part of my scope which will be part the transformation process of the ETL SQL logic.

#### John Hopkins Exploration

In [4]:
# Reading in the Json pulled from AWS data lake (this will be used later as well)
Json1 = 'part-00000-00f0e506-3bdc-4621-bbc9-e9510a1e84de-c000.json'
Json2 = 'part-00001-00f0e506-3bdc-4621-bbc9-e9510a1e84de-c000.json'
Json3 = 'part-00002-00f0e506-3bdc-4621-bbc9-e9510a1e84de-c000.json'


jsonList = []
for line in open(Json1, 'r'):
    jsonList.append(json.loads(line))

# Converts list to DataFrame for easy exploration
df = pd.DataFrame(jsonList)  

# Reading top few lines of Tableau's data frame
df.head()

Unnamed: 0,Case_Type,Cases,Difference,Date,Country_Region,Province_State,Admin2,Combined_Key,Lat,Long,Prep_Flow_Runtime,FIPS
0,Confirmed,6,0,5/22/2020,Western Sahara,,,Western Sahara,24.2155,-12.8858,6/4/2020 12:18:58 AM,
1,Confirmed,0,0,2/3/2020,Switzerland,,,Switzerland,46.8182,8.2275,6/4/2020 12:18:58 AM,
2,Deaths,0,0,3/1/2020,Cyprus,,,Cyprus,35.1264,33.4299,6/4/2020 12:18:58 AM,
3,Confirmed,23,0,4/21/2020,Antigua and Barbuda,,,Antigua and Barbuda,17.0608,-61.7964,6/4/2020 12:18:58 AM,
4,Deaths,56,0,5/11/2020,Thailand,,,Thailand,15.870032,100.992541,6/4/2020 12:18:58 AM,


In [5]:
# First checking row count and then de duplicating to see if this will be a necessary part of clean up later
# Keep in mind that this is only one of the Json files as sample data
rowNum = len(df.index)
print('Number of rows present in Tableau John Hopkins Data:', rowNum)
df.drop_duplicates() 
rowNum2 = len(df.index)
print('Total number of rows deduplicated:', rowNum-rowNum2)

Number of rows present in Tableau John Hopkins Data: 320281
Total number of rows deduplicated: 0


##### Tableau and John Hopkins Data Assessment

After careful analysis I also feel confident that most of the Tableau and John Hopkins data doesn't have many data quality issues. Again the primary concern is null values in a few columns. What appears to be missing is when the state or county isn't assigned a situation occurs which subsequently creates null values for county FIPS, lat and long. This isn't terrible since most of the data will be used as aggregations down the line in normalized form. But this will also need to be handled to ensure a good composite key I will be limiting filtering by the United State as part of my scope which will be part the transformation process of the ETL logic.

#### Cleaning Steps
Documenting steps necessary to clean the data:

Since most of the data seems to be in good condition, here are the few steps I will be taking in the ETL process to clean it up.

1. First I will filter all files by the United States, which will reduce my scope, most due to the fact that I reside in the United States.

2. I will then remove a hand full (closer to 100) of from the Google Mobility Data that are located in the United State but without state information.

3. I will also remove the Combined_Key column from the John Hopkins data, as there will a better normalized composite key to be used later with just state and county combined.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Mapping out the conceptual data model and explaining why I am choosing that model

For ease of use and a little effort needed for transformation I have decided to use a data warehouse on an AWS Redshift cluster to store the data.

I am choosing a three table data schema in order to get the data in a factor NF3 (Normalized Form 3) if end users would hypothetically wanted to use this framework to perform analytic queries on the data. The tables end tables will be structured in the following manner:

1. One table will be the fact table which will be called demographics. This table will be contain the attributes Country, State, a compositekey of the two just mentioned, County information as well as FIPS and Latitude and Longitude. The County and State information will be a collection from the Json and CSVs in order to ensure all keys will be captured from both tables.
2. The second table will be called covidCaseData and will manipulated from the Tableau/John Hopkins collection of data. This table will include the Case_Type, Cases, Dates, and a compositeKey of State and County.
3. The last table will be comprised of the Google Mobility data and called googleMobility. The County and State will be combined in order to create a compositeKey. In addition all of the change in baseline columns will be used in order to provide an end user the capability of creating appropriate analytic queries. 


You can view the architecture on the read me.

#### 3.2 Mapping Out Data Pipelines
Listing the steps necessary to pipeline the data into the chosen data model

First I will create a cluster in redshift and fill out the configuration file as appropriate. While using the dl.cfg file to move data from the download to S3, the other configuration is entitled dwh.cfg for the ETL process itself. Generally speaking both have the same information and will need changed to run the code to your specifications. However they were done with two files as I iterated through this process.

1. After getting the correct environment and configuration in place, I will create all the necessary tables in Redshift and their corresponding schemas. This includes the three end of scope deliverables mentioned above: demographics; covidCaseData; googleMobility. It also includes two staging tables in Redshift in order to easily perform the transformation. 
2. From there I will perform all the transforms mentioned previously as well as performing the appropriate insert statements.
3. I will then execute data quality checks to ensure that all the data is loaded into the table accordingly.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [6]:
# Code can be executed here. I have created some other modules at the bottom in order to keep the code clean. 
# You can view them in this folder.
# You can execute each module to perform a function mentioned above.


################################################  Transform  ##############################################################
# Because Redshift has a $ MB bite limit for Json files, 
# I have decided to transpose the Json into CSV in order to make a seamless load process
print('Beginning initial transformation process....')

# Defines bucket location
# NOTE YOU NEED TO CHANGE THIS TO YOUR OWN S3 BUCKET TO GET THE CODE TO WORK APPROPRIATELY
S3 = r'capstone-nano-degree-bucket'

# ALSO NOTE YOU WILL NEED TO CHANGE THE CONFIGURATION FILES (dl.cfg and dwh.cfg) TO YOUR AWS CREDENTIALS
session = boto3.Session(
    aws_access_key_id= config['AWS']['AWS_ACCESS_KEY_ID'],
    aws_secret_access_key= config['AWS']['AWS_SECRET_ACCESS_KEY'],
)
s3 = session.resource('s3')


# Converts list to DataFrame for easy csv transformtion 
# Peletes combined key (commas are no good in csv)
# Places in temp folder
# This file in particular needs the columns rearragned to match the other two files
jsonList = []
for line in open(Json1, 'r'):
    jsonList.append(json.loads(line))

df = pd.DataFrame(jsonList)  
del df['Combined_Key']
df = df[['Case_Type','Cases','Difference','Date','Country_Region','Province_State','Admin2','FIPS','Lat','Long','Prep_Flow_Runtime']]
df.to_csv(currentDir + r"/temp/JohnHopkins1.csv", index=False)



# Converts list to DataFrame for easy csv transformtion 
# Peletes combined key (commas are no good in csv)
# Places in temp folder
jsonList2 = []
for line in open(Json2, 'r'):
    jsonList2.append(json.loads(line))


df2 = pd.DataFrame(jsonList2)  
del df2['Combined_Key']
df2.to_csv(currentDir + r"/temp/JohnHopkins2.csv", index=False)



# Converts list to DataFrame for easy csv transformtion 
# Peletes combined key (commas are no good in csv)
# Places in temp folder
jsonList3 = []
for line in open(Json3, 'r'):
    jsonList3.append(json.loads(line))

df3 = pd.DataFrame(jsonList3) 
del df3['Combined_Key']
df3.to_csv(currentDir + r"/temp/JohnHopkins3.csv", index=False)
print('Initial tranformation complete!')

##############################################Inserts tranform into S3 ###################################################
print('Loading local files into AWS S3 bucket....')

# Loads Global Mobility CSV
print("\n Moving Global Mobility CSV to S3 from local location....")
s3.meta.client.upload_file(Filename = currentDir + r"\Global_Mobility_Report.csv", Bucket = S3 , Key = r'Global_Mobility_Report.csv')
print("\n Global Mobility CSV to S3 Complete!")


# Loads John Hopkins 1
print("\n Moving John Hopkins 1 to S3 from local location....")
s3.meta.client.upload_file(Filename = currentDir + r"\temp\JohnHopkins1.csv", Bucket = S3 , Key = r'JohnHopkins1.CSV')
print("\n John Hopkins 1 to S3 Complete!")


# Loads John Hopkins 2
print("\n Moving John Hopkins 2 to S3 from local location....")
s3.meta.client.upload_file(Filename = currentDir + r"\temp/JohnHopkins2.csv", Bucket = S3 , Key = r'JohnHopkins2.CSV')
print("\n John Hopkins 2 to S3 Complete!")


# Loads John Hopkins 3
print("\n Moving John Hopkins 3 to S3 from local location....")
s3.meta.client.upload_file(Filename = currentDir + r"\temp/JohnHopkins3.csv", Bucket = S3 , Key = r'JohnHopkins3.CSV')
print("\n John Hopkins 3 to S3 Complete!")


# Executes intial drop and create tables on the RedShift cluster
print("Dropping and Creating Tables in Redshift.")
exec(open("create_tables.py").read())
print("Created Tables in Redshift without errors!")


# Executes the ETL process
print("Executing ETL, copying data into staging tables and inserting into final tables.")
exec(open("etl.py").read())
print("Executing ETL complete without errors!")


Beginning initial transformation process....
Initial tranformation complete!
Loading local files into AWS S3 bucket....

 Moving Global Mobility CSV to S3 from local location....

 Global Mobility CSV to S3 Complete!

 Moving John Hopkins 1 to S3 from local location....

 John Hopkins 1 to S3 Complete!

 Moving John Hopkins 2 to S3 from local location....

 John Hopkins 2 to S3 Complete!

 Moving John Hopkins 3 to S3 from local location....

 John Hopkins 3 to S3 Complete!
Dropping and Creating Tables in Redshift.
Created Tables in Redshift without errors!
Executing ETL, copying data into staging tables and inserting into final tables.
Executing ETL complete without errors!


#### 4.2 Data Quality Checks
The data quality checks I'll perform to ensure the pipeline ran as expected will be a simple counting on the tables.

In [7]:
# Perform quality checks here by creating a unit test count function to ensure that there were the approximate rows accounted for
config = configparser.ConfigParser()
config.read('dwh.cfg')
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()
for counting_query in counting_audit_queries:
        print(counting_query[26:])
        cur.execute(counting_query)
        analysis = cur.fetchone()

        for row in analysis:
            print("--", row)


staging_covidCase

-- 943628
staging_googleMobility

-- 477322
demographics

-- 3259
covidCaseData

-- 873412
googleMobility

-- 282051


#### 4.3 Data dictionary 
You can view the dictionary on the read me.

#### Step 5: Completing Project Write Up	
   
   The overall goal of this project was to gather and combine some interesting sub-sets of Covid-19 data in order to get a data warehouse solution for later analysis. As a recap, the technologies that I used for the project were PostgreSQL, AWS Redshift, AWS S3 buckets and Python to create a desirable solution. Because there was less transformation, I decided to use these technologies instead of Spark or Airflow, which support a more robust transformation process.
	
   The data itself should probably be updated on a weekly basis, which gives time for the collection to add enough data that would be pertinent additions to the data warehouse store. This is easily accomplished by walking through the Jupyter notebook file provided, with step by step instructions. As requested by Udacity, under the following scenarios I would tackle things differently:
    
###### The data was increased by 100x
If this were the case I would probably include Spark's distributed style framework because even though the transformations seem light, a hundred million rows would pose quite a different scenario for the data that I have collected. 


###### The data populates a dashboard that must be updated on a daily basis by 7am every day.
I would probably try to find a source API for the data so that the data could be pulled down easier without having to download a file. In addition, I would use a scheduler like Window Scheduler to execute my converted notebook to python packages. The best choice however would be to use AirFlow in order to schedule the jobs within the DAGs themselves.

###### The database needed to be accessed by 100+ people
Redshift is pretty expensive if there were going to be several users. I would probably explore AWS Athena which is a cost per query which would generally be the better value. 


I hope you have enjoyed my project and the write up, I certainly have enjoyed completing my Udacity Data Engineer Nano Degree!