# Impact of the Covid19 Pandemic on Mobility in Germany

### Udacity Data Engeneering Nanodegree Program - The Capstone Project

#### Project Summary
The goal of this project is to bring together current data on the Covid19 pandemic, governmental measures against it, vaccination and mobility in Germany.

The data is stored in star schemata (two dimension and two facts tables) and is ready for analysis.

##### The project follows the follow steps:
1. Scope the Project and Gather Data
1. Explore and Assess the Data
1. Define the Data Model
1. Run ETL to Model the Data
1. Complete Project Write Up

##### Imports

In [1]:
import datetime as dt
from datetime import datetime
import numpy as np
import pandas as pd

import pyspark
from pyspark.sql import functions as F

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

The background task is to examine mobility changes from March 2020 to today compared to 2019. 

The project is about combining data from the Covid19 pandemic (cases etc.), vaccination progress, government measures (lockdowns imposed) with data describing mobility. 

I decided to limit the whole thing to Germany for now, as I found a promising data set for mobility here. In order to be able to interpret the numbers, I have included a further table with recent estimates of population.

##### Single State vs Whole Country Approach

Data on mobility and pandemic (cases, etc.) are available for the individual federal states. Unfortunately, data on vaccination progress and government interventions are currently only available for the country Germany as a whole.

Therefore, I decided to create two fact tables:
1. One that summarizes everything (mobility, pandemic, vaccination, goverment response) on a Germany-wide level.
1. And one that only covers mobility vs. cases only, but on the finer federal state level.

#### Describe and Gather Data 

- Data regarding Covid-cases, vaccinations, and demographic information in the 16 states of Germany is taken from https://www.kaggle.com/headsortails/covid19-tracking-germany
    - The Covid Cases are taken from the [Robert Koch Institut](https://www.rki.de/EN/Home/homepage_node.html)
    - The vaccinations are derived from https://impfdashboard.de/
    - Demographic information on Germany is taken from [Statistisches Bundesamt](https://www.destatis.de/EN/Home/_node.html) through their Open Data platform [GENESIS](https://www-genesis.destatis.de/genesis/online/data?operation=sprachwechsel&language=en).
- Data regarding the goverments counter measures is taken from the [Oxford University's Covid-19 Government Response Tracker ](https://github.com/OxCGRT/covid-policy-tracker/tree/master/data)
- Data regarding mobility is taken from [Statistisches Bundesamt](https://www.destatis.de/DE/Service/EXDAT/Datensaetze/mobilitaetsindikatoren-mobilfunkdaten.html#Mobilit%C3%A4t%20im%20Tagesverlauf) as well

#### Read and Display the first 5 Rows of the Raw Data Sets

##### Cases

In [2]:
df_covid = pd.read_csv("data/20210414_covid_de.csv", low_memory=False)

In [3]:
df_covid.head()

Unnamed: 0,state,county,age_group,gender,date,cases,deaths,recovered
0,Baden-Wuerttemberg,LK Alb-Donau-Kreis,00-04,F,2020-03-27,1,0,1
1,Baden-Wuerttemberg,LK Alb-Donau-Kreis,00-04,F,2020-03-28,1,0,1
2,Baden-Wuerttemberg,LK Alb-Donau-Kreis,00-04,F,2020-04-03,1,0,1
3,Baden-Wuerttemberg,LK Alb-Donau-Kreis,00-04,F,2020-10-18,1,0,1
4,Baden-Wuerttemberg,LK Alb-Donau-Kreis,00-04,F,2020-10-22,1,0,1


##### Vaccination

In [4]:
df_vaccination = pd.read_csv("data/20210414_covid_de_vaccines.csv", low_memory=False)

In [5]:
df_vaccination.head()

Unnamed: 0,date,doses,doses_first,doses_second,pfizer_cumul,moderna_cumul,astrazeneca_cumul,persons_first_cumul,persons_full_cumul
0,2020-12-27,24296,24159,137,24296,0,0,24159,137
1,2020-12-28,18383,18383,0,42679,0,0,42542,137
2,2020-12-29,48894,48305,589,91573,0,0,90847,726
3,2020-12-30,61546,61542,4,153119,0,0,152389,730
4,2020-12-31,49672,49555,117,202791,0,0,201944,847


##### Oxford University's Goverment Response Tracker

In [6]:
df_gov = pd.read_csv("data/20210414_OxCGRT.csv", low_memory=False)

In [7]:
df_gov.head()

Unnamed: 0,CountryName,CountryCode,RegionName,RegionCode,Jurisdiction,Date,C1_School closing,C1_Flag,C2_Workplace closing,C2_Flag,...,StringencyIndex,StringencyIndexForDisplay,StringencyLegacyIndex,StringencyLegacyIndexForDisplay,GovernmentResponseIndex,GovernmentResponseIndexForDisplay,ContainmentHealthIndex,ContainmentHealthIndexForDisplay,EconomicSupportIndex,EconomicSupportIndexForDisplay
0,Aruba,ABW,,,NAT_TOTAL,20200101,0.0,,0.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,Aruba,ABW,,,NAT_TOTAL,20200102,0.0,,0.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,Aruba,ABW,,,NAT_TOTAL,20200103,0.0,,0.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,Aruba,ABW,,,NAT_TOTAL,20200104,0.0,,0.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,Aruba,ABW,,,NAT_TOTAL,20200105,0.0,,0.0,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


##### Mobility

Change in mobility compared to the respective month in 2019
- from 2020-01-01 until 2021-04-14
- for every federal state in Germany given by its ID. BW stands for Baden-Wuerttemberg for example.
- and wholde Germany (DE) es well.

In [8]:
df_mobility = pd.read_csv("data/20210414_taegliches_mobilitaetsgeschehen.csv", decimal=",", sep=";")

In [9]:
df_mobility.head()

Unnamed: 0,Tag,BW,BY,BE,BB,HB,HH,HE,MV,NI,NW,RP,SL,SN,ST,SH,TH,DE
0,2020/01/01,15.64,11.76,5.57,-4.31,1.3,2.54,3.79,-2.16,-2.73,11.82,5.56,3.97,12.08,15.12,0.72,11.49,8.15
1,2020/01/02,-15.92,-13.44,-14.55,-23.7,-19.72,-19.15,-23.1,-9.19,-20.79,-15.57,-17.69,-14.47,-16.14,-13.25,-14.05,-18.06,-16.61
2,2020/01/03,-14.25,-11.32,-14.02,-22.1,-17.66,-16.45,-19.49,-14.79,-20.17,-14.41,-16.56,-13.42,-15.6,-12.23,-15.55,-17.16,-15.34
3,2020/01/04,0.01,-0.44,-5.87,-14.32,-7.67,-7.76,-13.5,-6.86,-10.74,-2.72,-5.3,0.13,-6.22,-1.3,-5.77,-6.57,-4.74
4,2020/01/05,8.75,7.34,-1.06,-8.52,-3.39,2.37,-12.51,-1.81,-7.91,1.83,0.26,4.72,-1.99,7.22,-1.61,-1.76,0.8


##### Demographics

In [10]:
df_demographics = pd.read_csv("data/20181231_demographics_de.csv", low_memory=False)

In [11]:
df_demographics.head()

Unnamed: 0,state,gender,age_group,population
0,Baden-Wuerttemberg,female,00-04,261674
1,Baden-Wuerttemberg,female,05-14,490822
2,Baden-Wuerttemberg,female,15-34,1293488
3,Baden-Wuerttemberg,female,35-59,1919649
4,Baden-Wuerttemberg,female,60-79,1182736


### Step 2: Explore and Assess the Data

The quality of the Covid, Vaccination, Demographics and Goverment Response Tracker is excellent. There are just
- columns I will remove, as I am not interested in:
    - age, gender in Covid and Demographics
    - pfizer_cumul, moderna_cumu, and astrazeneca_cumul in Vaccination
    - All columns but Date, CountryName, and StringencyIndexForDisplay in Goverment Response Tracker
- columns I will add:
    - country in Covid, Vaccination and Democraphics
    - state_id in Covid and Demographics
    
The Mobility Dataset has a impractical structure. The columns itself refer to a federal state while there is a row for every single date. I decided to make two separete dataframes of it:
1. Mobility (Country) with columns: date, country, change in mobility. It can be "joined" later via date and country.
1. Mobility (Single States) with columns: date, country, state_id, change in mobility. It can be "joined" later via date and country and state_id.

#### Covid19 - Cases, Deaths, & Recovered by Date in the 16 German States

In [12]:
df_covid = df_covid.groupby(by=["state", "date"]).agg(
    {
        "cases": np.sum,
        "deaths": np.sum,
        "recovered": np.sum
    }
).reset_index()

df_covid['state_id'] = df_covid['state'].replace(
    {
        'Baden-Wuerttemberg': "BW",
        'Bayern': "BY",
        'Berlin': "BE",
        'Brandenburg': "BB",
        'Bremen': "HB",
        'Hamburg': "HH",
        'Hessen': "HE",
        'Mecklenburg-Vorpommern': "MV",
        'Niedersachsen': "NI",
        'Nordrhein-Westfalen': "NW",
        'Rheinland-Pfalz': "RP",
        'Saarland': "SL",
        'Sachsen': "SN",
        'Sachsen-Anhalt': "ST",
        'Schleswig-Holstein': "SH",
        'Thueringen': "TH",
    }
)

df_covid["country"] = "Germany"

df_covid.date = pd.to_datetime(df_covid.date)

df_covid.head()

Unnamed: 0,state,date,cases,deaths,recovered,state_id,country
0,Baden-Wuerttemberg,2020-02-24,1,0,1,BW,Germany
1,Baden-Wuerttemberg,2020-02-25,2,0,2,BW,Germany
2,Baden-Wuerttemberg,2020-02-26,4,0,4,BW,Germany
3,Baden-Wuerttemberg,2020-02-27,3,0,3,BW,Germany
4,Baden-Wuerttemberg,2020-02-28,7,0,7,BW,Germany


#### Change in Mobility Compared to the Respective Month in 2019 accross whole Germany 

In [13]:
df_mobility_country = df_mobility[["Tag", "DE"]].rename(
    columns = {"Tag": "date", "DE": "change_in_mobility"}
)

df_mobility_country.date = pd.to_datetime(df_mobility_country.date)

df_mobility_country["country"] = "Germany"

df_mobility_country.head()

Unnamed: 0,date,change_in_mobility,country
0,2020-01-01,8.15,Germany
1,2020-01-02,-16.61,Germany
2,2020-01-03,-15.34,Germany
3,2020-01-04,-4.74,Germany
4,2020-01-05,0.8,Germany


#### Change in Mobility Compared to the Respective Month in 2019 in the Single States of Germany

In [14]:
state_ids = ['BW', 'BY', 'BE', 'BB', 'HB', 'HH', 'HE', 'MV', 'NI', 'NW', 'RP', 'SL', 'SN', 'ST', 'SH', 'TH']

df_mobility_states = df_mobility.drop(columns="DE")

df = pd.DataFrame(data=[], columns=['date', 'change_in_mobility', 'state_id'])
for state_id in state_ids:
    other = df_mobility_states[["Tag", state_id]].rename(
        columns = {"Tag": "date", state_id: "change_in_mobility"}
    )
    other.date = pd.to_datetime(other.date)
    other["state_id"] = state_id
    df = df.append(
        other=other,
        ignore_index=True
    )

df_mobility_states = df

df_mobility_states["country"] = "Germany"

df_mobility_states.head()

Unnamed: 0,date,change_in_mobility,state_id,country
0,2020-01-01,15.64,BW,Germany
1,2020-01-02,-15.92,BW,Germany
2,2020-01-03,-14.25,BW,Germany
3,2020-01-04,0.01,BW,Germany
4,2020-01-05,8.75,BW,Germany


#### Vaccinication

In [15]:
df_vaccination = df_vaccination.drop(columns=["pfizer_cumul", "moderna_cumul", "astrazeneca_cumul"])

df_vaccination.date = pd.to_datetime(df_vaccination.date)

df_vaccination["country"] = "Germany"

df_vaccination.head()

Unnamed: 0,date,doses,doses_first,doses_second,persons_first_cumul,persons_full_cumul,country
0,2020-12-27,24296,24159,137,24159,137,Germany
1,2020-12-28,18383,18383,0,42542,137,Germany
2,2020-12-29,48894,48305,589,90847,726,Germany
3,2020-12-30,61546,61542,4,152389,730,Germany
4,2020-12-31,49672,49555,117,201944,847,Germany


In [16]:
df_demographics = df_demographics.groupby("state").agg({"population": np.sum}).reset_index()
df_demographics['state_id'] = df_demographics['state'].replace(
    {
        'Baden-Wuerttemberg': "BW",
        'Bayern': "BY",
        'Berlin': "BE",
        'Brandenburg': "BB",
        'Bremen': "HB",
        'Hamburg': "HH",
        'Hessen': "HE",
        'Mecklenburg-Vorpommern': "MV",
        'Niedersachsen': "NI",
        'Nordrhein-Westfalen': "NW",
        'Rheinland-Pfalz': "RP",
        'Saarland': "SL",
        'Sachsen': "SN",
        'Sachsen-Anhalt': "ST",
        'Schleswig-Holstein': "SH",
        'Thueringen': "TH",
    }
)

df_demographics["country"] = "Germany"

df_demographics

Unnamed: 0,state,population,state_id,country
0,Baden-Wuerttemberg,11069533,BW,Germany
1,Bayern,13076721,BY,Germany
2,Berlin,3644826,BE,Germany
3,Brandenburg,2511917,BB,Germany
4,Bremen,682986,HB,Germany
5,Hamburg,1841179,HH,Germany
6,Hessen,6265809,HE,Germany
7,Mecklenburg-Vorpommern,1609675,MV,Germany
8,Niedersachsen,7982448,NI,Germany
9,Nordrhein-Westfalen,17932651,NW,Germany


#### Government Response

In [17]:
df_gov = df_gov.loc[df_gov.CountryName == "Germany"][["Date", "CountryName", "StringencyIndexForDisplay"]].rename(
    columns = {
        "Date": "date",
        "CountryName": "country",
        "StringencyIndexForDisplay": "stringency_index"
    }
)

df_gov.date = df_gov.date.astype(str)

df_gov.date = pd.to_datetime(df_gov.date, format="%Y%m%d")

df_gov.head()

Unnamed: 0,date,country,stringency_index
39480,2020-01-01,Germany,0.0
39481,2020-01-02,Germany,0.0
39482,2020-01-03,Germany,0.0
39483,2020-01-04,Germany,0.0
39484,2020-01-05,Germany,0.0


#### Save Clean Data as CSVs

In [18]:
df_gov.to_csv("clean_data/goverment_stringency_index.csv", index=False)
df_mobility_country.to_csv("clean_data/mobility_whole_country.csv", index=False)
df_mobility_states.to_csv("clean_data/mobility_single_tates.csv", index=False)
df_covid.to_csv("clean_data/covid.csv", index=False)
df_vaccination.to_csv("clean_data/vaccination.csv", index=False)
df_demographics.to_csv("clean_data/demographics.csv", index=False)

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

I'ce chosen that model to make (time series) analysis based on mobility, cases, vaccination, & goverment response 
regarding date (time) as easy as possible.

So all this information is stored in the right fact table.

As mentioned in Step 1 above, it is also possible to take a look at the federal state level. In that case Vaccination and Goverment Response Data is not present, so i decided to create another fact table, the left one.

There are just two dimension tables that come into question: dates and states.

#### 3.2 Mapping Out Data Pipelines

The steps necessary to pipeline the data into the chosen data model are visualised in the following images:

##### Fact Tables

<img src="images/fact_tables.png">

##### Dimension Tables

<img src="images/dim_tables.png">

Only for the fact table "Covid vs Vacc vs Mobility vs Gov in Germany" some helper DataFrames must be used to sum up ("cases","deaths","recovered") after group by ("date", "country") in the Covid Dataframe and sum up "population" after group by "country" in the Demographics DataFrame.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [19]:
def sparkShape(df):
    """
    Get the shape of a pyspark DataFrame
    
    Taken from: https://sparkbyexamples.com/pyspark/pyspark-dataframe-shape/
    """
    return (df.count(), len(df.columns))
pyspark.sql.dataframe.DataFrame.shape = sparkShape

In [20]:
spark = pyspark.sql.SparkSession \
    .builder \
    .appName("Capstone") \
    .getOrCreate()

In [21]:
# import clean_data with spark
df_gov = spark.read.option("header",True).csv(
    "clean_data/goverment_stringency_index.csv",
    inferSchema = True,
)
df_mobility_country = spark.read.option("header",True).csv(
    "clean_data/mobility_whole_country.csv",
    inferSchema = True,
)
df_mobility_states = spark.read.option("header",True).csv(
    "clean_data/mobility_single_tates.csv",
    inferSchema = True,
)
df_covid = spark.read.option("header",True).csv(
    "clean_data/covid.csv",
    inferSchema = True,
)
df_vaccination = spark.read.option("header",True).csv(
    "clean_data/vaccination.csv",
    inferSchema = True,
)
df_demographics = spark.read.option("header",True).csv(
    "clean_data/demographics.csv",
    inferSchema = True,
)

In [22]:
df_demographics

DataFrame[state: string, population: int, state_id: string, country: string]

In [23]:
df_gov.limit(5).toPandas()

Unnamed: 0,date,country,stringency_index
0,2020-01-01,Germany,0.0
1,2020-01-02,Germany,0.0
2,2020-01-03,Germany,0.0
3,2020-01-04,Germany,0.0
4,2020-01-05,Germany,0.0


#### Create Dimension Tables

In [24]:
df_dim_dates = df_gov.select("date").union(
    df_mobility_country.select("date")
).union(
    df_mobility_states.select("date")
).union(
    df_covid.select("date")
).union(
    df_vaccination.select("date")
).dropDuplicates().sort("date")

df_dim_dates = df_dim_dates.select(
    F.to_date(df_dim_dates.date).alias("date"),  # not sure why exactly to_date is needed, otherwise it is cast to string
    F.year(df_dim_dates.date).alias("year"),
    F.month(df_dim_dates.date).alias("month"),
    F.dayofmonth(df_dim_dates.date).alias("day"),
    F.dayofweek(df_dim_dates.date).alias("weekday"),
    F.weekofyear(df_dim_dates.date).alias("week_of_year"),
)

In [25]:
df_dim_dates.limit(5).toPandas()

Unnamed: 0,date,year,month,day,weekday,week_of_year
0,2020-01-01,2020,1,1,4,1
1,2020-01-02,2020,1,2,5,1
2,2020-01-03,2020,1,3,6,1
3,2020-01-04,2020,1,4,7,1
4,2020-01-05,2020,1,5,1,1


In [26]:
df_dim_states = df_demographics.select(
    "state_id",
    "state",
    "country",
    "population"
).dropDuplicates()

In [27]:
df_dim_states.limit(5).toPandas()

Unnamed: 0,state_id,state,country,population
0,SH,Schleswig-Holstein,Germany,2896712
1,NW,Nordrhein-Westfalen,Germany,17932651
2,BY,Bayern,Germany,13076721
3,TH,Thueringen,Germany,2143145
4,SN,Sachsen,Germany,4077937


#### Fact Tables

cols:
- date
- country
- change in mobility (ger)
- cases (groupby "date", "country" -> sum)
- deaths (groupby "date", "country" -> sum)
- recovered (groupby "date", "country" -> sum)
- persons first vacc
- persons full vacc
- stringency index
- population (groupby country -> sum)


In [28]:
# Helper DataFrames
df_covid_country = df_covid.groupBy("date", "country").sum("cases","deaths","recovered").sort("date")
df_population_country = df_dim_states.groupBy("country").sum("population")

# Fact Table
df_fact_covid_vacc_mobility_gov = df_covid_country.join(
    df_vaccination,
    on=["date", "country"],
    how="outer",
).join(
    df_gov,
    on=["date", "country"],
    how="outer",
).join(
    df_mobility_country,
    on=["date", "country"],
    how="outer",
).join(
    df_population_country,
    on=["country"], # pop by date is not given 
    how="outer",
).withColumn("date",F.to_date("date")).dropDuplicates(["date", "country"]).sort(["date", "country"])

In [29]:
df_fact_covid_vacc_mobility_gov.limit(5).toPandas()

# TODO: Rename Columns cases -> covid-cases, doses -> vacc_doses, etc

Unnamed: 0,country,date,sum(cases),sum(deaths),sum(recovered),doses,doses_first,doses_second,persons_first_cumul,persons_full_cumul,stringency_index,change_in_mobility,sum(population)
0,Germany,2020-01-01,,,,,,,,,0.0,8.15,83019213
1,Germany,2020-01-02,,,,,,,,,0.0,-16.61,83019213
2,Germany,2020-01-03,,,,,,,,,0.0,-15.34,83019213
3,Germany,2020-01-04,,,,,,,,,0.0,-4.74,83019213
4,Germany,2020-01-05,,,,,,,,,0.0,0.8,83019213


In [30]:
df_fact_covid_mobility_per_state = df_covid.join(
    df_mobility_states,
    on=["date", "country", "state_id"],
    how="outer"
).join(
    df_demographics,
    on=["country", "state_id"],
    how="outer"
).withColumn("date",F.to_date("date")).drop("state").dropDuplicates(["date", "country", "state_id"]).sort(["date", "country", "state_id"])

In [31]:
df_fact_covid_mobility_per_state.limit(5).toPandas()

Unnamed: 0,country,state_id,date,cases,deaths,recovered,change_in_mobility,population
0,Germany,BB,2020-01-01,,,,-4.31,2511917
1,Germany,BE,2020-01-01,,,,5.57,3644826
2,Germany,BW,2020-01-01,,,,15.64,11069533
3,Germany,BY,2020-01-01,,,,11.76,13076721
4,Germany,HB,2020-01-01,,,,1.3,682986


#### 4.2 Data Quality Checks 
 
There are four data quality checks you'll perform to ensure the pipeline ran as expected:
- A test that checks if the dataframe has the expected ammount of columns.
- A test that checks if the dataframe is not empty.
- A test that checks if the dataframe has the expected dtypes.
- A test that checks if the given keys are a unique set in the dataframe.


TODOs/Further Suggestions:
* Unit tests for the scripts to ensure they are doing the right thing
* Source/Count checks to ensure completeness

##### Functions for Quality Checks

In [32]:
def check_num_columns(df, expected):
    if df.shape()[1] == expected:
        print("Number of Columns Test Passed!")
    else:
        raise ValueError("Number of Columns does not Match with Expected Value!")
        
def check_emptiness(df):
    if df.shape()[0] > 0:
        print("Emptiness Test Passed!")
    else:
        raise ValueError("DataFrame is empty!")

def check_dtypes(df, expected):
    if df.dtypes == expected:
        print("Dytpe Test Passed!")
    else:
        raise ValueError("Dtypes do not Match with Expected Values!")
        
def check_unique_keys(df, keys):
    if df.select(F.countDistinct(*keys)).first()[0] == df.shape()[0]:
        print("Unique Keys Test Passed!")
    else:
        raise ValueError("The Given Keys are not Unique!")
        
def check_num_rows(df, expected_min, expected_max):
    if df.shape()[0] >= expected_min and df.shape()[0] <= expected_max:
        print("Number of Rows Test Passed!")
    else:
        raise ValueError("Number of Row is not in Expected Range of Values!")
    

##### Run Quality Checks

In [33]:
check_num_columns(df=df_dim_dates, expected=6)
check_emptiness(df=df_dim_dates)
check_dtypes(
    df=df_dim_dates,
    expected=[
        ('date', 'date'),
        ('year', 'int'),
        ('month', 'int'),
        ('day', 'int'),
        ('weekday', 'int'),
        ('week_of_year', 'int')
    ]
)
check_unique_keys(df=df_dim_dates, keys=["date"])

Number of Columns Test Passed!
Emptiness Test Passed!
Dytpe Test Passed!
Unique Keys Test Passed!


In [34]:
check_num_columns(df=df_dim_states, expected=4)
check_emptiness(df=df_dim_states)
check_dtypes(
    df=df_dim_states,
    expected=[
        ('state_id', 'string'),
        ('state', 'string'),
        ('country', 'string'),
        ('population', 'int')
    ]
)
check_unique_keys(df=df_dim_states, keys=["state_id"])

Number of Columns Test Passed!
Emptiness Test Passed!
Dytpe Test Passed!
Unique Keys Test Passed!


In [35]:
check_num_columns(df=df_fact_covid_mobility_per_state, expected=8)
check_emptiness(df=df_fact_covid_mobility_per_state)
check_dtypes(
    df=df_fact_covid_mobility_per_state,
    expected=[
        ('country', 'string'),
        ('state_id', 'string'),
        ('date', 'date'),
        ('cases', 'int'),
        ('deaths', 'int'),
        ('recovered', 'int'),
        ('change_in_mobility', 'double'),
        ('population', 'int')
    ]
)
check_unique_keys(df=df_fact_covid_mobility_per_state, keys=["country","state_id", "date"])

Number of Columns Test Passed!
Emptiness Test Passed!
Dytpe Test Passed!
Unique Keys Test Passed!


In [36]:
check_num_columns(df=df_fact_covid_vacc_mobility_gov, expected=13)
check_emptiness(df=df_fact_covid_vacc_mobility_gov)
check_dtypes(
    df=df_fact_covid_vacc_mobility_gov,
    expected=[
        ('country', 'string'),
        ('date', 'date'),
        ('sum(cases)', 'bigint'),
        ('sum(deaths)', 'bigint'),
        ('sum(recovered)', 'bigint'),
        ('doses', 'int'),
        ('doses_first', 'int'),
        ('doses_second', 'int'),
        ('persons_first_cumul', 'int'),
        ('persons_full_cumul', 'int'),
        ('stringency_index', 'double'),
        ('change_in_mobility', 'double'),
        ('sum(population)', 'bigint')
    ]
)
check_unique_keys(df=df_fact_covid_vacc_mobility_gov, keys=["country", "date"])

Number of Columns Test Passed!
Emptiness Test Passed!
Dytpe Test Passed!
Unique Keys Test Passed!


#### 4.3 Data Dictionary 

The data dictionary is in a separate file.

### Step 5: Complete Project Write Up

 
#### Tools and Technologies for the Project

The raw data is available as .csv files. Python's pandas is a good choice here for data crunching and cleaning. The amount of data is small and can be processed very efficiently in dataframes.
I decided to store the cleansed data as CSVs again. Finally I set up the ETL pipeline and quality checks with pyspark. This has no practical reason, but practice purpose and corresponds more to the content of the nanodegree program.

#### Schedule for Data Updates

The data should be updated on a daily basis, as the raw data is updated on a daily basis as well

#### Some Different Scenarios

##### If the data was increased by 100x

Let us pretend we are using a AWS EMR Spark Cluster of Instance Type *.xlarge and our data requires its total storage size, which are  64 GiB. If then the amount of data is increased by 100x, we can temporally save our data to S3 and switch to a Spark of Instance Type *.xlarg *.10xlarge. This one has 640 GiB an therefore should be able to handle it.

##### The data populates a dashboard that must be updated on a daily basis by 7am every day.

In that case, the ETL-Process should be automated, i.e. via Apache Airflow and the output data must be stored in an accessible database or accessible in S3. Let's assume the pipeline takes approx. 2h to run. Then a schedule should be set for running the pipeline every night at 4am, so we have a buffer of 1h. Also the run time should be monitored, to avoid running past the 7am mark.

##### The database needed to be accessed by 100+ people.

In the case The database needed to be accessed by 100+ people, the data must be stored in an accessible database or data warehouse that has the capacity to deal with the requests of the authorized 100+ people. If AWS is the choice: With Amazon RDS, you can deploy scalable PostgreSQL DBs. Also Amazon Redshift Clusters are scaleable with elastic resize. So Whenever the database or data warehouse runs the risk of not answering the requests anymore, its performance could be increased.