
# Data Engineering Capstone Project: COVID-19 World Vaccination Progress
#### Daily and Total Vaccination for COVID-19 in the World






### Sources:
* Vaccinations : https://www.kaggle.com/gpreda/covid-world-vaccination-progress

* Demographics : https://www.oecd-ilibrary.org/social-issues-migration-health/health-spending/indicator/english_8643de7e-en

* Covid-19 cases: https://data.world/covid-19-data-resource-hub/covid-19-case-counts/workspace/file?filename=COVID-19+Activity.csv

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
#for getting data from kaggle
import requests
import io
import os
from pyspark.sql.functions import to_timestamp


In [2]:
#output partition data directory
output_data = 'data/outputs/vaccine_data'

## 1 Scope the Project and Data decription

#### Scope 
What is the goal of this mini-project?
* The goal of this project is to gather data regarding the covid-19 cases, vaccination processes and demographics. We would like to create a DataLake that extracts the data from online sources and loads them into a database that can be used for analytics or any other data app.

What data do we use?

We use data from:
* OECD library regarding the demographics of the countries
* Dataworld for the covid-19 cases in all countries
* Vaccination processes from github sources

What is the end solution look like?

* It will be a etl.py script that will extract the covid data from the online sources and will create partition data in the data/outputs file directory

What tools did we use?

* Due to the big amount of data and the need of computationaly expensive queries, we used spark and aws.



## 1.1 Data

In [3]:
covid_cases = "https://download.data.world/s/s65p7s2aqym4qams7ub2n4b72e5e4h"
covid_cases2 = "https://data.world/covid-19-data-resource-hub/covid-19-case-counts/workspace/file?filename=COVID-19+Activity.csv"

In [4]:
df_cases = pd.read_csv(covid_cases)

  interactivity=interactivity, compiler=compiler, result=result)


In [5]:
df_cases.head()

Unnamed: 0,people_positive_cases_count,county_name,province_state_name,report_date,continent_name,data_source_name,people_death_new_count,county_fips_number,country_alpha_3_code,country_short_name,country_alpha_2_code,people_positive_new_cases_count,people_death_count
0,18046,Kenosha,Wisconsin,2021-04-26,America,New York Times,0,55059.0,USA,United States,US,3,321
1,18096,Kenosha,Wisconsin,2021-04-27,America,New York Times,0,55059.0,USA,United States,US,50,321
2,18123,Kenosha,Wisconsin,2021-04-28,America,New York Times,1,55059.0,USA,United States,US,27,322
3,18148,Kenosha,Wisconsin,2021-04-29,America,New York Times,0,55059.0,USA,United States,US,25,322
4,18163,Kenosha,Wisconsin,2021-04-30,America,New York Times,0,55059.0,USA,United States,US,15,322


In [6]:
df_cases = df_cases[['report_date','continent_name','country_alpha_3_code','people_positive_new_cases_count','people_death_new_count']]

In [7]:
df_cases['country_alpha_3_code'] = df_cases['country_alpha_3_code'].fillna('unknown')
df_cases['continent_name'] = df_cases['continent_name'].fillna('unknown')

In [10]:
#pickle.dump(some_obj, f)

In [11]:
len(df_cases)

1683875

In [8]:
# Read in the data here
url_vac_by_manu = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/vaccinations/vaccinations-by-manufacturer.csv"

url_vac = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/vaccinations/vaccinations.csv"
#url_covid_full_json = "https://covid.ourworldindata.org/data/owid-covid-data.json"


In [13]:
#df_vac_by_manufacturer = pd.read_csv(url_vac_by_manu,sep=',')
#df_vac = pd.read_csv(url_vac,sep=',')

In [14]:
#df_vac_by_manufacturer.head()

In [None]:
#df_vac.head()

In [None]:
len(df_vac)

In [None]:
#df_GDPR = pd.read_csv(url_cases_and_deaths)
#df_GDPR.head()

### 1.2 Spark

We are creating a spark session

In [9]:

from pyspark.sql import SparkSession
from pyspark import SparkFiles

spark = SparkSession.builder.appName("Vaccination_database").getOrCreate()

#df_spark =spark.read.load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')



#### Read and save dataset covid cases

In [10]:
#spark.sparkContext.addFile(covid_cases)

df_spark_cases = spark.createDataFrame(df_cases)


In [11]:
df_spark_cases.show(5)

+-----------+--------------+--------------------+-------------------------------+----------------------+
|report_date|continent_name|country_alpha_3_code|people_positive_new_cases_count|people_death_new_count|
+-----------+--------------+--------------------+-------------------------------+----------------------+
| 2021-04-26|       America|                 USA|                              3|                     0|
| 2021-04-27|       America|                 USA|                             50|                     0|
| 2021-04-28|       America|                 USA|                             27|                     1|
| 2021-04-29|       America|                 USA|                             25|                     0|
| 2021-04-30|       America|                 USA|                             15|                     0|
+-----------+--------------+--------------------+-------------------------------+----------------------+
only showing top 5 rows



#### Read and save dataset vac_by_manu 

In [12]:

spark.sparkContext.addFile(url_vac_by_manu)
df_spark_vac_by_manu = spark.read.csv("file://"+SparkFiles.get("vaccinations-by-manufacturer.csv"), header=True, inferSchema= True)
df_spark_vac_by_manu.show(5)

+--------+-------------------+---------------+------------------+
|location|               date|        vaccine|total_vaccinations|
+--------+-------------------+---------------+------------------+
|   Chile|2020-12-24 00:00:00|Pfizer/BioNTech|               420|
|   Chile|2020-12-25 00:00:00|Pfizer/BioNTech|              5198|
|   Chile|2020-12-26 00:00:00|Pfizer/BioNTech|              8338|
|   Chile|2020-12-27 00:00:00|Pfizer/BioNTech|              8649|
|   Chile|2020-12-28 00:00:00|Pfizer/BioNTech|              8649|
+--------+-------------------+---------------+------------------+
only showing top 5 rows



In [None]:
#write to parquet


#df_spark.write.parquet("data/sas_data")




#df_spark=spark.read.parquet("sas_data")

#### Read and save dataset vac

In [13]:
spark.sparkContext.addFile(url_vac)

df_spark_vac = spark.read.csv("file://"+SparkFiles.get("vaccinations.csv"), header=True, inferSchema= True)
df_spark_vac.show(5)

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+
|   location|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+
|Afghanistan|     AFG|2021-02-22 00:00:00|                 0|                0|                   null|                  null|              null|                           0.0|  

#### Read and save GDPR

In [14]:
df_spark_GDPR =spark.read.csv(
    'Health spendings per country GDPR percentage.csv', 
    header=True, 
    inferSchema= True
)

In [15]:
df_spark_GDPR.show(5)

+--------+---------+-------+-------+---------+----+------+----------+
|LOCATION|INDICATOR|SUBJECT|MEASURE|FREQUENCY|TIME| Value|Flag Codes|
+--------+---------+-------+-------+---------+----+------+----------+
|     AUS|      POP|    TOT| AGRWTH|        A|1951|2.9711|      null|
|     AUS|      POP|    TOT| AGRWTH|        A|1952|2.5505|      null|
|     AUS|      POP|    TOT| AGRWTH|        A|1953|2.0702|      null|
|     AUS|      POP|    TOT| AGRWTH|        A|1954| 1.942|      null|
|     AUS|      POP|    TOT| AGRWTH|        A|1955|2.3724|      null|
+--------+---------+-------+-------+---------+----+------+----------+
only showing top 5 rows



In [None]:
#requires aggregation

### 2 Explore and Assess the Data
#### Exploring the Data 
We identify data quality issues, like missing values, duplicate data, etc.



In [None]:
#Covid Cases

In [23]:
df_spark_cases.describe().show()

+-------+-----------+--------------+--------------------+-------------------------------+----------------------+
|summary|report_date|continent_name|country_alpha_3_code|people_positive_new_cases_count|people_death_new_count|
+-------+-----------+--------------+--------------------+-------------------------------+----------------------+
|  count|    1683875|       1683875|             1683875|                        1683875|               1683875|
|   mean|       null|          null|                null|              94.04136886645387|    1.9552425209709747|
| stddev|       null|          null|                null|              1979.551147663271|     33.45775875787036|
|    min| 2020-01-21|        Africa|                 ABW|                         -74347|                 -1918|
|    max| 2021-05-09|       unknown|             unknown|                         823225|                  4249|
+-------+-----------+--------------+--------------------+-------------------------------+-------

In [24]:
for col in df_spark_cases.columns:
    print(col, "\t", "with null values: ", df_spark_cases.filter(df_spark_cases[col].isNull()).count())

report_date 	 with null values:  0
continent_name 	 with null values:  0
country_alpha_3_code 	 with null values:  0
people_positive_new_cases_count 	 with null values:  0
people_death_new_count 	 with null values:  0


In [25]:
for col in df_spark_cases.columns:
    print(col, "\t", "with unknown values: ", df_spark_cases.filter(df_spark_cases[col] == 'unknown').count())

report_date 	 with unknown values:  0
continent_name 	 with unknown values:  1900
country_alpha_3_code 	 with unknown values:  1900
people_positive_new_cases_count 	 with unknown values:  0
people_death_new_count 	 with unknown values:  0


In [26]:
col = 'continent_name'
df_filtered=df_spark_cases.filter(df_spark_cases[col] == 'unknown')
df_filtered.head(5)

[Row(report_date='2020-11-15', continent_name='unknown', country_alpha_3_code='unknown', people_positive_new_cases_count=0, people_death_new_count=0),
 Row(report_date='2021-02-10', continent_name='unknown', country_alpha_3_code='unknown', people_positive_new_cases_count=0, people_death_new_count=0),
 Row(report_date='2020-11-05', continent_name='unknown', country_alpha_3_code='unknown', people_positive_new_cases_count=0, people_death_new_count=0),
 Row(report_date='2020-07-25', continent_name='unknown', country_alpha_3_code='unknown', people_positive_new_cases_count=0, people_death_new_count=0),
 Row(report_date='2021-03-05', continent_name='unknown', country_alpha_3_code='unknown', people_positive_new_cases_count=0, people_death_new_count=0)]

#### Cleaning Steps
Document steps necessary to clean the data


In [None]:
#Covid Cases

In [16]:
# We observed that features with the uknown value offer no information (all features are also 'uknown') so we will skip those instances
col = 'continent_name'
df_spark_cases = df_spark_cases.filter(df_spark_cases[col] != 'unknown')
df_spark_cases.show(5)

+-----------+--------------+--------------------+-------------------------------+----------------------+
|report_date|continent_name|country_alpha_3_code|people_positive_new_cases_count|people_death_new_count|
+-----------+--------------+--------------------+-------------------------------+----------------------+
| 2021-04-26|       America|                 USA|                              3|                     0|
| 2021-04-27|       America|                 USA|                             50|                     0|
| 2021-04-28|       America|                 USA|                             27|                     1|
| 2021-04-29|       America|                 USA|                             25|                     0|
| 2021-04-30|       America|                 USA|                             15|                     0|
+-----------+--------------+--------------------+-------------------------------+----------------------+
only showing top 5 rows



### Schema print for all tables

In [28]:
df_spark_vac_by_manu.show(5)

+--------+-------------------+---------------+------------------+
|location|               date|        vaccine|total_vaccinations|
+--------+-------------------+---------------+------------------+
|   Chile|2020-12-24 00:00:00|Pfizer/BioNTech|               420|
|   Chile|2020-12-25 00:00:00|Pfizer/BioNTech|              5198|
|   Chile|2020-12-26 00:00:00|Pfizer/BioNTech|              8338|
|   Chile|2020-12-27 00:00:00|Pfizer/BioNTech|              8649|
|   Chile|2020-12-28 00:00:00|Pfizer/BioNTech|              8649|
+--------+-------------------+---------------+------------------+
only showing top 5 rows



In [29]:
df_spark_vac_by_manu.printSchema

<bound method DataFrame.printSchema of DataFrame[location: string, date: timestamp, vaccine: string, total_vaccinations: int]>

In [30]:
df_spark_vac.show(5)

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+
|   location|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+
|Afghanistan|     AFG|2021-02-22 00:00:00|                 0|                0|                   null|                  null|              null|                           0.0|  

In [31]:
df_spark_vac.printSchema

<bound method DataFrame.printSchema of DataFrame[location: string, iso_code: string, date: timestamp, total_vaccinations: int, people_vaccinated: int, people_fully_vaccinated: int, daily_vaccinations_raw: int, daily_vaccinations: int, total_vaccinations_per_hundred: double, people_vaccinated_per_hundred: double, people_fully_vaccinated_per_hundred: double, daily_vaccinations_per_million: int]>

In [32]:
df_spark_GDPR.show(5)

+--------+---------+-------+-------+---------+----+------+----------+
|LOCATION|INDICATOR|SUBJECT|MEASURE|FREQUENCY|TIME| Value|Flag Codes|
+--------+---------+-------+-------+---------+----+------+----------+
|     AUS|      POP|    TOT| AGRWTH|        A|1951|2.9711|      null|
|     AUS|      POP|    TOT| AGRWTH|        A|1952|2.5505|      null|
|     AUS|      POP|    TOT| AGRWTH|        A|1953|2.0702|      null|
|     AUS|      POP|    TOT| AGRWTH|        A|1954| 1.942|      null|
|     AUS|      POP|    TOT| AGRWTH|        A|1955|2.3724|      null|
+--------+---------+-------+-------+---------+----+------+----------+
only showing top 5 rows



In [33]:
df_spark_GDPR.printSchema

<bound method DataFrame.printSchema of DataFrame[LOCATION: string, INDICATOR: string, SUBJECT: string, MEASURE: string, FREQUENCY: string, TIME: int, Value: double, Flag Codes: string]>

In [34]:
df_spark_cases.show(5)

+-----------+--------------+--------------------+-------------------------------+----------------------+
|report_date|continent_name|country_alpha_3_code|people_positive_new_cases_count|people_death_new_count|
+-----------+--------------+--------------------+-------------------------------+----------------------+
| 2020-09-16|       America|                 USA|                              2|                     0|
| 2020-09-17|       America|                 USA|                              3|                     0|
| 2020-09-18|       America|                 USA|                              0|                     0|
| 2020-09-19|       America|                 USA|                              0|                     0|
| 2020-09-20|       America|                 USA|                              0|                     0|
+-----------+--------------+--------------------+-------------------------------+----------------------+
only showing top 5 rows



In [35]:
df_spark_cases.printSchema

<bound method DataFrame.printSchema of DataFrame[report_date: string, continent_name: string, country_alpha_3_code: string, people_positive_new_cases_count: bigint, people_death_new_count: bigint]>

### Actions after schema

In [36]:
#We have to transform the report date from df_spark_cases to daytime 
#(we are probably not going to use this infromation since we are going to aggregate it is nice to have)
df_spark_cases.select(to_timestamp(df_spark_cases.report_date, 'yyyy-MM-dd').alias('report_date')).show()

+-------------------+
|        report_date|
+-------------------+
|2020-09-16 00:00:00|
|2020-09-17 00:00:00|
|2020-09-18 00:00:00|
|2020-09-19 00:00:00|
|2020-09-20 00:00:00|
|2020-09-21 00:00:00|
|2020-09-22 00:00:00|
|2020-09-23 00:00:00|
|2020-09-24 00:00:00|
|2020-09-25 00:00:00|
|2020-09-26 00:00:00|
|2020-09-27 00:00:00|
|2020-09-28 00:00:00|
|2020-09-29 00:00:00|
|2020-09-30 00:00:00|
|2020-10-01 00:00:00|
|2020-10-02 00:00:00|
|2020-10-03 00:00:00|
|2020-10-04 00:00:00|
|2020-10-05 00:00:00|
+-------------------+
only showing top 20 rows



In [17]:
df_spark_cases = df_spark_cases.withColumn('report_date', to_timestamp(df_spark_cases.report_date, 'yyyy-MM-dd'))

In [18]:
df_spark_cases.printSchema

<bound method DataFrame.printSchema of DataFrame[report_date: timestamp, continent_name: string, country_alpha_3_code: string, people_positive_new_cases_count: bigint, people_death_new_count: bigint]>

### 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model


<img src="Schema_representation_of_data_lake.png">


This model was chosen in order to separate key factors for the covid vaccinations, case progress and ecomonical factors. All those factors have their own table entity.
The schema above is a equivalent representation if this project was a data werehouse. 
Since we have a datalake we partition those tables in order to be used by any user or app.

#### 3.2 Mapping Out Data Pipelines
Steps necessary to pipeline the data into the chosen data model:

* Extract data from the online and local sources
* Clean data
* Transform data and load them in to tables based on the star schema model
* Partition data
* Run Quality Checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Creating the data model

In [None]:
# 1)Group by covid cases data by country and create covid cases table

In [19]:
covid_cases = df_spark_cases.groupBy("country_alpha_3_code")\
.sum("people_positive_new_cases_count","people_death_new_count")


In [20]:
covid_cases = covid_cases.withColumnRenamed("country_alpha_3_code","country_code")
covid_cases = covid_cases.withColumnRenamed("sum(people_positive_new_cases_count)","total_cases")
covid_cases = covid_cases.withColumnRenamed("sum(people_death_new_count)","total_deaths")

In [41]:
covid_cases.show(2)

+------------+-----------+------------+
|country_code|total_cases|total_deaths|
+------------+-----------+------------+
|         HTI|      13164|         263|
|         PSE|     301751|        3358|
+------------+-----------+------------+
only showing top 2 rows



In [21]:
# Partition covid cases table
covid_cases_par = covid_cases.write.partitionBy('country_code','total_cases').parquet(os.path.join(output_data, 'covid_cases.parquet'), 'overwrite')
print("covid_cases partitioned!")

covid_cases partitioned!


In [43]:
# 2)Create vaccinations table

In [22]:
Vaccinations = df_spark_vac.select(
    df_spark_vac.iso_code.alias('country_code'),   
    df_spark_vac.date.alias('date_entry'),
    df_spark_vac.people_vaccinated,
    df_spark_vac.total_vaccinations,
    df_spark_vac.people_fully_vaccinated,
    df_spark_vac.daily_vaccinations,
    df_spark_vac.daily_vaccinations_per_million)
    


In [45]:
Vaccinations.show(3)

+------------+-------------------+-----------------+------------------+-----------------------+------------------+------------------------------+
|country_code|         date_entry|people_vaccinated|total_vaccinations|people_fully_vaccinated|daily_vaccinations|daily_vaccinations_per_million|
+------------+-------------------+-----------------+------------------+-----------------------+------------------+------------------------------+
|         AFG|2021-02-22 00:00:00|                0|                 0|                   null|              null|                          null|
|         AFG|2021-02-23 00:00:00|             null|              null|                   null|              1367|                            35|
|         AFG|2021-02-24 00:00:00|             null|              null|                   null|              1367|                            35|
+------------+-------------------+-----------------+------------------+-----------------------+------------------+----------

In [23]:
# Partition Vaccinations table
Vaccinations_par = Vaccinations.write.partitionBy('country_code').parquet(os.path.join(output_data, 'Vaccinations.parquet'), 'overwrite')
print("Vaccinations partitioned!")

Vaccinations partitioned!


In [None]:
# 3)Create country info table

In [24]:
#df_spark_cases.sql("SELECT DISTINCT continent_name, country_alpha_3_code").show(5)
countries = df_spark_cases.select('continent_name', "country_alpha_3_code").distinct()


In [25]:
countries = countries.join(df_spark_vac,df_spark_vac.iso_code == countries.country_alpha_3_code)\
    .select(countries.country_alpha_3_code.alias('country_code'),
        countries.continent_name,
        df_spark_vac.location.alias('country_name'))

In [49]:
countries.show(3)

+------------+--------------+------------+
|country_code|continent_name|country_name|
+------------+--------------+------------+
|         SAU|          Asia|Saudi Arabia|
|         SAU|          Asia|Saudi Arabia|
|         SAU|          Asia|Saudi Arabia|
+------------+--------------+------------+
only showing top 3 rows



In [26]:
# Partition country table
countries_par = countries.write.partitionBy('country_code','continent_name').parquet(os.path.join(output_data, 'countries.parquet'), 'overwrite')
print("countries partitioned!")



countries partitioned!


In [None]:
# 4)Create GDPR info table

In [None]:
#<bound method DataFrame.printSchema of DataFrame[LOCATION: string, INDICATOR: string, SUBJECT: string, MEASURE: string, FREQUENCY: string, TIME: int, Value: double, Flag Codes: string]>

In [27]:
GDPR_info = df_spark_GDPR.select(
    df_spark_GDPR.LOCATION.alias('country_code'),   
    df_spark_GDPR.INDICATOR,
    df_spark_GDPR.SUBJECT,
    df_spark_GDPR.FREQUENCY,
    df_spark_GDPR.TIME,
    df_spark_GDPR.Value,
    )
    

In [52]:
GDPR_info.show(5)

+------------+---------+-------+---------+----+------+
|country_code|INDICATOR|SUBJECT|FREQUENCY|TIME| Value|
+------------+---------+-------+---------+----+------+
|         AUS|      POP|    TOT|        A|1951|2.9711|
|         AUS|      POP|    TOT|        A|1952|2.5505|
|         AUS|      POP|    TOT|        A|1953|2.0702|
|         AUS|      POP|    TOT|        A|1954| 1.942|
|         AUS|      POP|    TOT|        A|1955|2.3724|
+------------+---------+-------+---------+----+------+
only showing top 5 rows



In [28]:
# Partition GDPR_info table

GDPR_info_par = GDPR_info.write.partitionBy('country_code','TIME').parquet(os.path.join(output_data, 'GDPR.parquet'), 'overwrite')
print("GDPR_info partitioned!")



GDPR_info partitioned!


#### 4.2 Data Quality Checks

An important quality check query is to choose a country and check its covid vacciations and fully vaccination progress

Here we check for Greece

In [None]:
#load partitioned data 

In [29]:
covid_cases = spark.read.parquet(os.path.join(output_data, 'covid_cases.parquet'))

In [54]:
Vaccinations.join(countries,countries.country_code == Vaccinations.country_code)\
    .select(countries.country_name,Vaccinations.people_vaccinated,Vaccinations.people_fully_vaccinated).groupBy('country_name')\
.sum('people_vaccinated','people_fully_vaccinated').where(countries.country_name == 'Greece').show()


+------------+----------------------+----------------------------+
|country_name|sum(people_vaccinated)|sum(people_fully_vaccinated)|
+------------+----------------------+----------------------------+
|      Greece|           13103729886|                  5987932442|
+------------+----------------------+----------------------------+



We can also check Greece's num covid cases and deaths

In [56]:
covid_cases.join(countries,countries.country_code == covid_cases.country_code)\
    .select(countries.country_name,covid_cases.total_cases,covid_cases.total_deaths).where(countries.country_name == 'Greece').show(1)

+------------+-----------+------------+
|country_name|total_cases|total_deaths|
+------------+-----------+------------+
|      Greece|     362004|       11029|
+------------+-----------+------------+
only showing top 1 row



#### 4.3 Data dictionary 

**/data** -> contains partitioned data outputs (and possibly in the future inputs)

**/Covid Vaccines Capstone Project.ipynb** -> contains the main code but in a notebook form

**/etl.py** -> contains the main code in .py form. It extracts data from different sources and creates datalake

**/Health spendings per country GDPR percentage.csv** -> contains data regarding the GDPR of each country


#### 5 Project Write Up


This sums our work. Here some things to keep in mind:
* Due to the high volume of the data (covid cases >1 milion instances) we chose to utilize sparks distributed power. The datalake will be able to provide data for queries and other BI tools.

* The data is updated automatically because it comes directly from the sources. Regarding the GDPR data, it can be updated every year. 

* If the data was increased by 100x it would be wize to utilize aws EMR capabilities and run in a powerfull computer.
* The pipeline should be running everyday at ~ 06:00 am since the data is updated before that by the internet sources
* Airflow can be used as well to perform this feat.
* There is no restrction for the amount of user who can use this data.