# COVID-19 cases and vaccination at São Paulo state - Brazil
### Data Engineering Capstone Project

#### Project Summary
This project has the main purpose of provide data about COVID-19 cases, vaccination and hospital overwhelmed at São Paulo state in Brazil.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 1 - Scope the Project and Gather Data

In this project we are going to create an ETL process for ingest data into a Data Warehouse for an analytical team gets insights about it.

Firstly, we get the data from three different sources. The data about COVID-19 cases are available at [Brasil.io API](https://brasil.io/home/), to use this data, to this demonstration, I got the data from this API and download it in json format. In this [link](https://github.com/turicas/covid19-br) you can check more details about how to use Brasil.io API.

The [data about hospital overwhelmed](https://opendatasus.saude.gov.br/tl/dataset/registro-de-ocupacao-hospitalar) are available in CSV format, at Ministry of Health (Brazil).

At the same way, the [data about vaccination](https://opendatasus.saude.gov.br/tl/dataset/covid-19-vacinacao) at São Paulo state are avilable at Ministry of Health (Brazil) too, in CSV format.

## Step 2: Explore and Assess the Data

In [1]:
# imports
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, from_unixtime
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType, DateType, StructType, StringType, IntegerType, DoubleType, StructField, BooleanType
from pyspark.sql.functions import date_format
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id,row_number

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

### Hospital overwhelmed
Firstly, we are going to analyze the data about hospital overwhelmed

In [3]:
hospital_path = './covid19_data/leitos_BR.csv'

In [4]:
df_hospital = spark.read.option('delimiter', ';').option('header', 'true').csv(hospital_path)

In [5]:
df_hospital.printSchema()

root
 |-- _id: string (nullable = true)
 |-- dataNotificacao: string (nullable = true)
 |-- cnes: string (nullable = true)
 |-- ocupacaoSuspeitoCli: string (nullable = true)
 |-- ocupacaoSuspeitoUti: string (nullable = true)
 |-- ocupacaoConfirmadoCli: string (nullable = true)
 |-- ocupacaoConfirmadoUti: string (nullable = true)
 |-- saidaSuspeitaObitos: string (nullable = true)
 |-- saidaSuspeitaAltas: string (nullable = true)
 |-- saidaConfirmadaObitos: string (nullable = true)
 |-- saidaConfirmadaAltas: string (nullable = true)
 |-- origem: string (nullable = true)
 |-- _p_usuario: string (nullable = true)
 |-- estadoNotificacao: string (nullable = true)
 |-- municipioNotificacao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- excluido: string (nullable = true)
 |-- validado: string (nullable = true)
 |-- _created_at: string (nullable = true)
 |-- _updated_at: string (nullable = true)



Let's check what states are in this dataset.

In [6]:
df_hospital.select(['estado']).groupby('estado').count().sort('estado').show(50, False)

+-------------------+------+
|estado             |count |
+-------------------+------+
|null               |2     |
|Acre               |420   |
|Alagoas            |3762  |
|Amapá              |301   |
|Amazonas           |8543  |
|Bahia              |14149 |
|Ceará              |14023 |
|Distrito Federal   |2719  |
|Espírito Santo     |5067  |
|GOIAS              |1431  |
|Goiás              |39860 |
|Maranhão           |8493  |
|Mato Grosso        |15979 |
|Mato Grosso do Sul |17835 |
|Minas Gerais       |74280 |
|Paraná             |23594 |
|Paraíba            |3152  |
|Pará               |7031  |
|Pernambuco         |26504 |
|Piauí              |7070  |
|Rio Grande do Norte|7383  |
|Rio Grande do Sul  |53968 |
|Rio de Janeiro     |27301 |
|Rondônia           |3684  |
|Roraima            |419   |
|Santa Catarina     |17809 |
|Sergipe            |3065  |
|São Paulo          |104851|
|Tocantins          |4832  |
+-------------------+------+



The Ministry of Health (Brazil) provides data about all brazilian states. For this project we going to study only São Paulo, so, we need to filter this dataset.

In [7]:
df_hospital = df_hospital.filter(df_hospital.estado == 'São Paulo')

Secondly, let's check the cities of data 

In [8]:
df_hospital.select(['municipio']).where((col('municipio').isNull()) | (col('municipio') == '')).show()

+---------+
|municipio|
+---------+
+---------+



There aren't data without cities name, so, we don't need work in it

So, let's check the notification data 

In [9]:
df_hospital.select([
    'dataNotificacao',
]).sort('dataNotificacao').show()

+--------------------+
|     dataNotificacao|
+--------------------+
|1983-08-15T03:00:...|
|1989-08-08T03:00:...|
|2010-10-03T03:00:...|
|2010-11-24T02:00:...|
|2011-08-14T03:00:...|
|2019-06-29T03:00:...|
|2020-02-01T03:00:...|
|2020-02-07T03:00:...|
|2020-02-08T03:00:...|
|2020-02-19T03:00:...|
|2020-03-02T03:00:...|
|2020-03-04T03:00:...|
|2020-03-12T03:00:...|
|2020-03-17T03:00:...|
|2020-03-18T03:00:...|
|2020-03-19T03:00:...|
|2020-03-20T03:00:...|
|2020-03-20T03:00:...|
|2020-03-21T03:00:...|
|2020-03-22T03:00:...|
+--------------------+
only showing top 20 rows



We can see data from 1983. It's doesn't make sense, because we are analyzing hospital overwhelmed as a consequence of COVID-19. Firstly, we are cast the field. Data Notificacao is a string type, for our purpose, we don't need the hours of the date, so, let's cast this field to date type and filter dates from 2020 and 2021.

In [10]:
get_date = udf(lambda x: datetime.strptime(x[0:10], '%Y-%m-%d'), DateType())

In [11]:
df_hospital = df_hospital.withColumn("dataNotificacao_date", get_date(col('dataNotificacao')))
df_hospital = df_hospital.filter(df_hospital.dataNotificacao_date > '2020-01-01')

In [12]:
df_hospital.select([
    'dataNotificacao_date',
]).sort('dataNotificacao_date').show()

+--------------------+
|dataNotificacao_date|
+--------------------+
|          2020-02-01|
|          2020-02-07|
|          2020-02-08|
|          2020-02-19|
|          2020-03-02|
|          2020-03-04|
|          2020-03-12|
|          2020-03-17|
|          2020-03-18|
|          2020-03-19|
|          2020-03-20|
|          2020-03-20|
|          2020-03-21|
|          2020-03-22|
|          2020-03-23|
|          2020-03-23|
|          2020-03-24|
|          2020-03-24|
|          2020-03-25|
|          2020-03-25|
+--------------------+
only showing top 20 rows



For our database, these fields are more sensibles. Let's go to the another database.

### Vaccination

In [13]:
vaccine_path = './covid19_data/vacinacao_SP.csv'

In [14]:
df_vaccine = spark.read.option('delimiter', ';').option('header', 'true').csv(vaccine_path)

In [15]:
df_vaccine.printSchema()

root
 |-- document_id: string (nullable = true)
 |-- paciente_id: string (nullable = true)
 |-- paciente_idade: string (nullable = true)
 |-- paciente_dataNascimento: string (nullable = true)
 |-- paciente_enumSexoBiologico: string (nullable = true)
 |-- paciente_racaCor_codigo: string (nullable = true)
 |-- paciente_racaCor_valor: string (nullable = true)
 |-- paciente_endereco_coIbgeMunicipio: string (nullable = true)
 |-- paciente_endereco_coPais: string (nullable = true)
 |-- paciente_endereco_nmMunicipio: string (nullable = true)
 |-- paciente_endereco_nmPais: string (nullable = true)
 |-- paciente_endereco_uf: string (nullable = true)
 |-- paciente_endereco_cep: string (nullable = true)
 |-- paciente_nacionalidade_enumNacionalidade: string (nullable = true)
 |-- estabelecimento_valor: string (nullable = true)
 |-- estabelecimento_razaoSocial: string (nullable = true)
 |-- estalecimento_noFantasia: string (nullable = true)
 |-- estabelecimento_municipio_codigo: string (nullable = 

Firstly, let's check the age of the vaccinated people

In [16]:
df_vaccine.selectExpr(
    "cast(paciente_idade as int) paciente_idade", 
).dropDuplicates([
    'paciente_idade'
]).sort('paciente_idade', ascending=False).show()

+--------------+
|paciente_idade|
+--------------+
|           221|
|           127|
|           124|
|           123|
|           121|
|           119|
|           117|
|           115|
|           114|
|           113|
|           112|
|           111|
|           110|
|           109|
|           108|
|           107|
|           106|
|           105|
|           104|
|           103|
+--------------+
only showing top 20 rows



As you can see, one people that is 220 years, is so old to receive the vaccine. It's probably an error in our database, we're going to set null for ages above 110 years, because we don't want to lost this information about vaccined people, but we don't know how old they are. For ages less than 18 years, we also set null data, because there aren't vaccine application in children or younger people. We also need convert the ages do Integer type.

In [17]:
cast_age = udf(lambda x: int(x) if int(x) <= 110 or int(x) <= 18 else None, IntegerType())

In [18]:
df_vaccine = df_vaccine.withColumn("paciente_idade_int", cast_age(col('paciente_idade')))

Let's check if there is a data without application date

In [19]:
df_vaccine.select(['vacina_dataAplicacao']).where(col('vacina_dataAplicacao').isNull()).show()

+--------------------+
|vacina_dataAplicacao|
+--------------------+
|                null|
+--------------------+



Our databases are based on dates. We need to be able related vaccination, hospital overwhelmed and cases and understand the pandemic situation across the days. Because of this, I decided exclude datas without vacination date.

In [20]:
df_vaccine = df_vaccine.filter(df_vaccine.vacina_dataAplicacao.isNotNull())

These field are the most relevant for this dataset, so, we're going to the last dataset.

### Cases

In [21]:
cases_path = './covid19_data/api_covid.json'

In [22]:
df_cases = spark.read.json(cases_path)

In [23]:
df_cases.printSchema()

root
 |-- city: string (nullable = true)
 |-- city_ibge_code: string (nullable = true)
 |-- confirmed: long (nullable = true)
 |-- confirmed_per_100k_inhabitants: double (nullable = true)
 |-- date: string (nullable = true)
 |-- death_rate: double (nullable = true)
 |-- deaths: long (nullable = true)
 |-- estimated_population: long (nullable = true)
 |-- estimated_population_2019: long (nullable = true)
 |-- is_last: boolean (nullable = true)
 |-- order_for_place: long (nullable = true)
 |-- place_type: string (nullable = true)
 |-- state: string (nullable = true)



When we get the data from the API, we filter the state São Paulo, so, it's not necessary check state field

Firstly, let's check the dates of our dataset

In [24]:
df_cases.select(['date']).sort('date', ascending=True).show()

+----------+
|      date|
+----------+
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
|2021-03-20|
+----------+
only showing top 20 rows



Our data looks ok, let's check if exists any city without name

In [25]:
df_cases.select(['city']).groupby('city').count().sort('city', ascending=True).show()

+-------------------+-----+
|               city|count|
+-------------------+-----+
|               null|    1|
|         Adamantina|    1|
|             Adolfo|    1|
|              Aguaí|    1|
|             Agudos|    1|
|           Alambari|    1|
|  Alfredo Marcondes|    1|
|             Altair|    1|
|        Altinópolis|    1|
|        Alto Alegre|    1|
|           Alumínio|    1|
|        Alvinlândia|    1|
|          Americana|    1|
|             Amparo|    1|
|Américo Brasiliense|    1|
|  Américo de Campos|    1|
|          Analândia|    1|
|          Andradina|    1|
|           Angatuba|    1|
|            Anhembi|    1|
+-------------------+-----+
only showing top 20 rows



We have one city without name. It's occur because the first information is about the whole state, so, we need to exclude this information.

In [26]:
df_cases = df_cases.filter(df_cases.city.isNotNull())

Now, let's see if any numeric information is invalid (negative number).

In [27]:
df_cases.select([
    'confirmed',
    'confirmed_per_100k_inhabitants',
    'death_rate',
    'deaths',
    'estimated_population',
    'estimated_population_2019'
]).where(
    (col('confirmed') < 0) |
    (col('confirmed_per_100k_inhabitants') < 0) |
    (col('death_rate') < 0) |
    (col('deaths') < 0) | 
    (col('estimated_population') < 0) |
    (col('estimated_population_2019') < 0)
).show()

+---------+------------------------------+----------+------+--------------------+-------------------------+
|confirmed|confirmed_per_100k_inhabitants|death_rate|deaths|estimated_population|estimated_population_2019|
+---------+------------------------------+----------+------+--------------------+-------------------------+
+---------+------------------------------+----------+------+--------------------+-------------------------+



All numerical information looks right. 

## Step 3 - Define the Data Model

### 3.1 Conceptual Data Model

![Database Schema](./schema.png)

In this project, we created 8 tables.

- **city**: This table contains only the city information;
- **vaccines**: This table stores the data about the vaccines;
- **vaccination_group**: This table contains the data about vaccination groups that has priority;
- **establishment**: This table contains de information about establishments thats applies the vaccine;
- **vaccination**: This table stores the data about a person who has vaccinated;
- **cases**: This table stores the data of accumulated cases and deaths for each day;
- **hospital**: The hospital table stores the data about hospital overwhelmed;
- **epidemic_day_fact**: This is a fact table and its responsible for concentrate all information for each day.

### 3.2 Mapping Out Data Pipelines

#### City
For this table, we're going to use the cases data as a reference, because in this dataset, there are all cities of São Paulo state. So, we need to extract the city name and city code and uses this as an input to fill this table. For the name of the city, we'll need to normalize the string, removing special characters and transforming the string to lower case.  

#### Vaccination Group
The vaccination group are present into the vaccination dataset. We only need get the groups ids (and cast this for int type) and names from this dataset, without duplication and removing the field without date vaccination, and fill the vaccination groups table.

#### Vaccines
For fill the vaccines table, we need to use the data from the vaccination dataset, selecting all differents vaccines for the code (and cast this for int type) and its respective names. It's necessary filter the data to select only lines with valid date vaccination.

#### Establishment
For fill this table, we need to use the data from the vaccination dataset, selecting all differents establishments for the code (and cast this for int type) and its respective names. It's necessary filter the data to select only lines with valid date vaccination.

#### Vaccination
This table is filled by the vaccination data. Before insert the data here, we must to cast the string for the datetime into date format, treat the age information, setting null for dates grather than 110 years and less than 18 years. We also must filter the data without vaccination date and generate an unique id.

#### Hospital
This table is originated by hospital overwhelmed data. To add data here, we must filter the data for São Paulo state and cast the datetime string for datetime type.

#### Cases
For this table, we only have to cast the date string to date type, exclude the line without city name and generate an unique id.

#### Epidemic Day
This table is responsible for aggregate all data from others tables for each day. We just define a primary key based on the date and include the foreign keys.

## Step 4: Run Pipelines to Model the Data

### 4.1 Create the data model

#### City Table

In [28]:
!pip3 install unidecode



In [29]:
from unidecode import unidecode

In [30]:
def process_city_data(spark, cases_path, output_path):
    """
    This function is responsable for proccess city_data files and save them into the output.
    INPUTS: 
    * spark: spark session
    * cases_path: cases dataset path
    * output_path: path where process cities data will be save
    """
    
    # get the dataset
    cases_df = spark.read.json(cases_path)
    
    # filter lines without city name
    cases_df = cases_df.filter(cases_df.city.isNotNull())
    
    # normalized strings
    normalize_string = udf(lambda x: unidecode(x).lower(), StringType())
    
    city_table = cases_df.withColumn(
        'city_normalized_name', normalize_string(col('city'))
    ).withColumnRenamed(
        'city_normalized_name', 'nome'
    ).withColumnRenamed(
        'city_ibge_code', 'id'
    ).select([
        'id', 
        'nome'
    ]).dropDuplicates(['id'])

    city_table.write.mode("overwrite").parquet(os.path.join(output_path, 'cities'))

In [31]:
process_city_data(spark, './covid19_data/api_covid.json', './covid_19_results')

#### Vaccination Group Table 

In [32]:
def process_vaccination_group_data(spark, vaccination_path, output_path):
    """
    This function is responsable for proccess vaccination_group_data files 
    and save them into the output.
    INPUTS: 
    * spark: spark session
    * vaccination_path: vaccination dataset path
    * output_path: path where process cities data will be save
    """
    
    # get the dataset
    vaccination_df = spark.read.option('delimiter', ';').option('header', 'true').csv(vaccination_path)
    
    # removing data without vaccination date
    vaccination_df = vaccination_df.filter(vaccination_df.vacina_dataAplicacao.isNotNull())
    
    # cast ids to int type
    cast_int = udf(lambda x: int(x), IntegerType())
  
    vaccination_table = vaccination_df.withColumn(
        'grupoAtendimento_codigo_int', 
        cast_int(col('vacina_grupoAtendimento_codigo'))
    ).withColumnRenamed(
        'grupoAtendimento_codigo_int', 'id'
    ).withColumnRenamed(
        'vacina_grupoAtendimento_nome', 'nome'
    ).select([
        'id', 
        'nome'
    ]).dropDuplicates(['id'])
    
    vaccination_table.write.mode("overwrite").parquet(os.path.join(output_path, 'vaccination_groups'))

In [33]:
process_vaccination_group_data(spark, './covid19_data/vacinacao_SP.csv', './covid_19_results')

#### Vaccines Table 

In [34]:
def process_vaccines_data(spark, vaccination_path, output_path):
    """
    This function is responsable for proccess vaccines_data files 
    and save them into the output.
    INPUTS: 
    * spark: spark session
    * vaccination_path: vaccination dataset path
    * output_path: path where process cities data will be save
    """
    
    # get the dataset
    vaccination_df = spark.read.option('delimiter', ';').option('header', 'true').csv(vaccination_path)
    
    # removing data without vaccination date
    vaccination_df = vaccination_df.filter(vaccination_df.vacina_dataAplicacao.isNotNull())
    
    # cast ids to int type
    cast_int = udf(lambda x: int(x) if x is not None else 0, IntegerType())
  
    vaccines_table = vaccination_df.withColumn(
        'vacina_categoria_codigo_int', 
        cast_int(col('vacina_categoria_codigo'))
    ).withColumnRenamed(
        'vacina_categoria_codigo_int', 'id'
    ).withColumnRenamed(
        'vacina_grupoAtendimento_nome', 'nome'
    ).withColumnRenamed(
        'vacina_lote', 'lote'
    ).withColumnRenamed(
        'vacina_fabricante_nome', 'fabricante_nome'
    ).withColumnRenamed(
        'vacina_fabricante_referencia', 'fabricante_referencia'
    ).select([
        'id',
        'nome',
        'lote',
        'fabricante_nome',
        'fabricante_referencia'
    ]).dropDuplicates(['id'])

    vaccines_table.write.mode("overwrite").parquet(os.path.join(output_path, 'vaccines'))

In [35]:
process_vaccines_data(spark, './covid19_data/vacinacao_SP.csv', './covid_19_results')

#### Establishment Table

In [36]:
def process_establishment_data(spark, vaccination_path, output_path):
    """
    This function is responsable for proccess establishment_data files 
    and save them into the output.
    INPUTS: 
    * spark: spark session
    * vaccination_path: vaccination dataset path
    * output_path: path where process cities data will be save
    """
    
    # get the dataset
    vaccination_df = spark.read.option('delimiter', ';').option('header', 'true').csv(vaccination_path)
    
    # removing data without vaccination date
    vaccination_df = vaccination_df.filter(vaccination_df.vacina_dataAplicacao.isNotNull())
    
    # cast ids to int type
    cast_int = udf(lambda x: int(x) if x is not None else 0, IntegerType())
  
    establishment_table = vaccination_df.withColumn(
        'estabelecimento_valor_int', 
        cast_int(col('estabelecimento_valor'))
    ).withColumn(
        'estabelecimento_municipio_codigo_int',
        cast_int(col('estabelecimento_municipio_codigo'))
    ).withColumnRenamed(
        'estabelecimento_valor_int', 'id'
    ).withColumnRenamed(
        'estabelecimento_razaoSocial', 'razaoSocial'
    ).withColumnRenamed(
        'estalecimento_noFantasia', 'nomeFantasia'
    ).withColumnRenamed(
        'estabelecimento_municipio_codigo_int', 'id_city'
    ).select([
        'id',
        'razaoSocial',
        'nomeFantasia',
        'id_city',
    ]).dropDuplicates(['id'])
    
    establishment_table.write.mode("overwrite").parquet(os.path.join(output_path, 'establishment'))

In [37]:
process_establishment_data(spark, './covid19_data/vacinacao_SP.csv', './covid_19_results')

#### Vaccination Table 

In [38]:
def process_vaccination_data(spark, vaccination_path, output_path):
    """
    This function is responsable for proccess vaccination_data files 
    and save them into the output.
    INPUTS: 
    * spark: spark session
    * vaccinations_path: vaccination dataset path
    * output_path: path where process cities data will be save
    """
    
    # get the dataset
    vaccination_df = spark.read.option('delimiter', ';').option('header', 'true').csv(vaccination_path)
    
    # removing data without vaccination date
    vaccination_df = vaccination_df.filter(vaccination_df.vacina_dataAplicacao.isNotNull())
    
    # cast ids to int type
    cast_int = udf(lambda x: int(x) if x is not None else 0, IntegerType())
    get_date = udf(lambda x: datetime.strptime(x[0:10], '%Y-%m-%d'), DateType())
    cast_age = udf(lambda x: int(x) if int(x) <= 110 or int(x) <= 18 else None, IntegerType())
  
    vaccines_table = vaccination_df.withColumn(
        'paciente_idade_int', 
        cast_age(col('paciente_idade'))
    ).drop(
        'paciente_idade'
    ).withColumn(
        'vacina_dataAplicacao_date',
        get_date(col('vacina_dataAplicacao'))
    ).withColumnRenamed(
        'paciente_idade_int', 'paciente_idade'
    ).withColumnRenamed(
        'vacina_dataAplicacao_date', 'vacina_data_aplicacao'
    ).withColumn(
        'vaccine_id',
        cast_int(col('vacina_categoria_codigo'))
    ).withColumn(
        'id_vaccination_group',
        cast_int(col('vacina_grupoAtendimento_codigo'))
    ).withColumn(
        'city_id_estabelecimento',
        cast_int(col('estabelecimento_municipio_codigo'))
    ).withColumn(
        'city_id_paciente',
        cast_int(col('paciente_endereco_coIbgeMunicipio'))
    ).withColumn(
        'id_establishment',
        cast_int(col('estabelecimento_valor'))
    ).withColumn(
        'id', row_number().over(Window.orderBy(monotonically_increasing_id()))
    ).select([
        'id',
        'paciente_id',
        'paciente_idade',
        'paciente_enumSexoBiologico',
        'paciente_racaCor_valor',
        'paciente_endereco_nmPais',
        'paciente_endereco_uf',
        'paciente_nacionalidade_enumNacionalidade',
        'vacina_data_aplicacao',
        'vacina_descricao_dose',
        'vaccine_id',
        'id_vaccination_group',
        'city_id_estabelecimento',
        'city_id_paciente',
        'id_establishment'
    ])
    
    vaccines_table.write.mode("overwrite").partitionBy('vacina_data_aplicacao').parquet(os.path.join(output_path, 'vaccination'))

In [39]:
process_vaccination_data(spark, './covid19_data/vacinacao_SP.csv', './covid_19_results')

#### Hospital Table 

In [40]:
def process_hospital_data(spark, hospital_path, cities_parquet_path, output_path):
    """
    This function is responsable for proccess hospital_data files 
    and save them into the output.
    INPUTS: 
    * spark: spark session
    * hospital_path: hospital overwhelmed dataset path
    * cities_parquet_path: path of parquet cities file
    * output_path: path where process cities data will be save
    """
       
    # get city table
    city_table = spark.read.parquet(cities_parquet_path)

    # get hospital dataset
    hospital_df = spark.read.option('delimiter', ';').option('header', 'true').csv(hospital_path)
    
    # get data from São Paulo state
    hospital_df = hospital_df.filter(hospital_df.estado == 'São Paulo')
    

    get_date = udf(lambda x: datetime.strptime(x[0:10], '%Y-%m-%d'), DateType())
    normalize_string = udf(lambda x: unidecode(x).lower(), StringType())
    
    hospital_df = hospital_df.withColumn(
        'dataNotificacao_date', 
        get_date(col('dataNotificacao'))
    )
    hospital_df = hospital_df.filter(hospital_df.dataNotificacao_date > '2020-01-01')
    
    # normalize cities names to join city table
    hospital_df = hospital_df.withColumn('city_name_join', normalize_string('municipio'))
    
    hospital_table = hospital_df.join(
        city_table, (city_table.nome == hospital_df.city_name_join)
    ).select(
        col('_id').alias('id'),
        col('dataNotificacao'),
        col('ocupacaoSuspeitoCli'), 
        col('ocupacaoSuspeitoUti'), 
        col('ocupacaoConfirmadoCli'), 
        col('ocupacaoConfirmadoUti'), 
        col('saidaSuspeitaObitos'), 
        col('saidaSuspeitaAltas'), 
        col('saidaConfirmadaObitos'), 
        col('saidaConfirmadaAltas'),
        city_table.id.alias('city_id')
    ).dropDuplicates(['id'])
    
    hospital_table.write.mode("overwrite").parquet(os.path.join(output_path, 'hospital'))

In [41]:
process_hospital_data(spark, './covid19_data/leitos_BR.csv', './covid_19_results/cities', './covid_19_results')

#### Cases Table 

In [42]:
def process_cases_group_data(spark, cases_path, output_path):
    """
    This function is responsable for proccess hospital_data files 
    and save them into the output.
    INPUTS: 
    * spark: spark session
    * cases_path: cases dataset path
    * output_path: path where process cities data will be save
    """
    
    # get cases dataset
    cases_df = spark.read.json(cases_path)
    
    # filter lines without city name
    cases_df = cases_df.filter(cases_df.city.isNotNull())
    
    # cast string to date
    cast_date = udf(lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType())
    
    cases_table = cases_df.withColumnRenamed(
        'city_ibge_code', 'city_id'
    ).withColumn(
        'id', row_number().over(Window.orderBy(monotonically_increasing_id()))
    ).withColumn(
        'date_parsed', cast_date(col('date'))
    ).drop(
        'date'
    ).withColumnRenamed(
        'date_parsed',
        'date'
    ).select([
        'id',
        'confirmed',
        'confirmed_per_100k_inhabitants',
        'death_rate',
        'deaths',
        'estimated_population',
        'estimated_population_2019',
        'is_last',
        'order_for_place',
        'place_type',
        'date',
        'city_id'
    ])
    
    cases_table.write.mode("overwrite").partitionBy('date').parquet(os.path.join(output_path, 'cases'))

In [43]:
process_cases_group_data(spark, './covid19_data/api_covid.json', './covid_19_results')

#### Epidemic Day Fact Table

In [44]:
def process_epidemic_day_fact_data(spark, hospital_parquet_path, cases_parquet_path, vaccination_parquet_path, output_path):
    """
    This function is responsable for proccess epidemic_day_fact_data files 
    and save them into the output.
    INPUTS: 
    * spark: spark session
    * hospital_parquet_path: hospital parquet path
    * cases_parquet_path: cases parquet path
    * vaccination_parquet_path: vaccination parquet path
    * output_path: path where process cities data will be save
    """

    cases_table = spark.read.parquet(
        cases_parquet_path
    ).withColumnRenamed(
        'id','cases_id'
    ).withColumnRenamed(
        'city_id','city_ibge_code'
    )
    
    hospital_table = spark.read.parquet(
        hospital_parquet_path
    ).withColumnRenamed(
        'id','hospital_notificacao_id'
    ).withColumnRenamed(
        'city_id','hospital_city_id'
    )
    
    vaccination_table = spark.read.parquet(
        vaccination_parquet_path
    ).withColumnRenamed(
        'id','vaccination_id'
    ).withColumnRenamed(
        'city_id_estabelecimento','city_id_establishment'
    )    
        
    epidemic_day_fact_table = hospital_table.join(
        cases_table, 
        hospital_table.dataNotificacao == cases_table.date,
        'full'
    ).join(
        vaccination_table,
        hospital_table.dataNotificacao == vaccination_table.vacina_data_aplicacao,
        'full'
    ).selectExpr(
        'IF( \
            date IS NOT NULL, \
            CONCAT(city_ibge_code, "-", date), \
            IF( \
                dataNotificacao IS NOT NULL, \
                CONCAT(hospital_city_id, "-", dataNotificacao), \
                IF( \
                    vacina_data_aplicacao IS NOT NULL, \
                    CONCAT(city_id_establishment, "-", vacina_data_aplicacao), \
                    "" \
                ) \
            ) \
        ) AS city_date',
        'IF( \
            date IS NOT NULL, \
            date, \
            IF( \
                dataNotificacao IS NOT NULL, \
                dataNotificacao, \
                IF( \
                    vacina_data_aplicacao IS NOT NULL, \
                    vacina_data_aplicacao, \
                    "" \
                ) \
            ) \
        ) AS date',
        'hospital_notificacao_id',
        'cases_id', 
        'vaccination_id',
    )
    
    epidemic_day_fact_table.write.mode("overwrite").parquet(os.path.join(output_path, 'epidemic_day_fact'))

In [45]:
process_epidemic_day_fact_data(spark, './covid_19_results/hospital', './covid_19_results/cases', './covid_19_results/vaccination', './covid_19_results')

### 4.2 Data Quality Checks

#### City Table
For this table, we'll verify the quantity of cities.

In [46]:
df_cities_parquet = spark.read.parquet("./covid_19_results/cities")

In [47]:
assert df_cities_parquet.count() == 646, "Should be 646"

#### Vaccination Group Table
For this table, we'll verify the quantity of groups.

In [48]:
df_vaccination_group_parquet = spark.read.parquet("./covid_19_results/vaccination_groups")

In [49]:
assert df_vaccination_group_parquet.count() == 36, "Should be 36"

#### Vaccines Table
For this table, we'll verify the quantity of vaccines.

In [50]:
df_vaccines_parquet = spark.read.parquet("./covid_19_results/vaccines")

In [51]:
assert df_vaccines_parquet.count() == 9, "Should be 9"

#### Establishment Table
For this table, we'll count the quantity of establishments 

In [52]:
df_establishment_parquet = spark.read.parquet("./covid_19_results/establishment")

In [53]:
assert df_establishment_parquet.count() == 4355, "Should be 4355"

#### Vaccination Table
For this table, we'll check 4 things
- Quantity of establishments
- Quantity of vaccination groups
- Quantity of different vaccines 
- Quantity of lines

In [54]:
df_vaccination_parquet = spark.read.parquet("./covid_19_results/vaccination")

In [55]:
assert df_vaccination_parquet.select(['id_establishment']).dropDuplicates(['id_establishment']).count() == 4355, "Should be 4355"
assert df_vaccination_parquet.select(['id_vaccination_group']).dropDuplicates(['id_vaccination_group']).count() == 36, "Should be 36"
assert df_vaccination_parquet.select(['vaccine_id']).dropDuplicates(['vaccine_id']).count() == 9, "Should be 9"
assert df_vaccination_parquet.count() == 958897, "Should be 958897"

#### Hospital Table
For this table, we'll check the quantity of lines

In [56]:
df_hospital_parquet = spark.read.parquet("./covid_19_results/hospital")

In [57]:
assert df_hospital_parquet.count() == 104845, "Should be 104845"

#### Cases Table
For this table we'll check the quantity of line and the quantity of cities

In [58]:
df_cases_parquet = spark.read.parquet("./covid_19_results/cases")

In [59]:
assert df_cases_parquet.dropDuplicates(['city_id']).count() == 646, "Should be 646"
assert df_cases_parquet.count() == 646, "Should be 646"

#### Epidemic Day Fact Table
For this table we'll check 3 things:
- Quantity of vaccination_pacients
- Quantity of cases
- Quantity of hospital notification

In [60]:
df_epidemic_day_fact_parquet = spark.read.parquet("./covid_19_results/epidemic_day_fact")

In [61]:
assert df_epidemic_day_fact_parquet.dropDuplicates(['cases_id']).where(col('cases_id').isNotNull()).count() == 646, "Should be 646"
assert df_epidemic_day_fact_parquet.dropDuplicates(['hospital_notificacao_id']).where(col('hospital_notificacao_id').isNotNull()).count() == 104845, "Should be 104845"
assert df_epidemic_day_fact_parquet.dropDuplicates(['vaccination_id']).where(col('vaccination_id').isNotNull()).count() == 958897, "Should be 958897"

### 4.3 Data dictionary

#### City Table


| Field | Type | Description |
| ----- |------|-------------|
| id | int (PK) | IBGE city code |
| nome | varchar(255) | Name of city |

#### Establishment Table


| Field | Type | Description |
| ----- |------|-------------|
| id | int (PK) | Establishment ID |
| razaoSocial | varchar(255) | Name of the company |
| nomeFantasia | varchar(255) | Name of establishment |
| id_city | int (FK) | IBGE code of city |

#### Vaccination Group Table


| Field | Type | Description |
| ----- |------|-------------|
| id | int (PK) | Vaccination Group ID |
| nome | varchar(255) | Name of group |

#### Vaccines Table


| Field | Type | Description |
| ----- |------|-------------|
| id | int (PK) | Vaccine ID |
| lote | varchar(255) | Name of the production batch |
| fabricante_nome | varchar(255) | name of fabricant |
| fabricante_referencia | varchar(255) | Fabricant references |

#### Vaccination Table


| Field | Type | Description |
| ----- |------|-------------|
| id | int (PK) | Vaccination ID |
| paciente_id | varchar(255) | Pacient ID |
| paciente_idade | int | Pacient age |
| paciente_enumSexoBiologico | varchar(255) | Pacient gender |
| paciente_racaCor_valor | varchar(255) | Pacient race |
| paciente_endereco_nmPais | varchar(255) | Pacient country |
| paciente_endereco_uf | varchar(255) | Pacient state |
| paciente_nacionalidade_enumNacionalidade | varchar(255) | Pacient nationality |
| vacina_data_aplicacao | date | Date of vaccine application |
| descricao_dose | varchar(255) | Vaccine dose description |
| vaccine_id | int (FK) | Vaccine ID |
| id_vaccination_group | int (FK) | Vaccination group ID |
| city_id_estabelecimento | int (FK) | IBGE code of establishment city |
| city_id_paciente | int (FK) | IBGE code of pacient city |
| id_establishment | int (FK) | Establishment ID |

#### Hospital Table


| Field | Type | Description |
| ----- |------|-------------|
| id | int (PK) | Notification ID |
| dataNotificacao | date | Notification date |
| ocupacaoSuspeitoCli | int | Quantity of suspicious occupation CLI |
| ocupacaoSuspeitoUti | int | Quantity of suspected ICU occupation |
| ocupacaoConfirmadoCli | int | Quantity of occupation confirmed CLI |
| ocupacaoConfirmadoUti | int | Quantity of confirmed ICU occupation |
| saidaSuspeitaObitos | int | Quantity of suspect deaths |
| saidaSuspeitaAltas | int | Quantity of recuperations suspect |
| saidaConfirmadaObitos | int | Quantity of confirmed deaths |
| saidaConfirmadaAltas | int | Quantity of recuperations confirmed |
| city_id | int (FK) | IBGE city code |

#### Cases Table


| Field | Type | Description |
| ----- |------|-------------|
| id | int (PK) | Notification ID |
| confirmed | int | Confirmed cases until the date |
| confirmed_per_100k_inhabitants | int | Confirmed cases per 100k inhabitants until the date |
| death_rate | float | Death rate until the date |
| deaths | int | Total numbers of deaths until the date |
| estimated_population | int | Estimated population |
| estimated_population_2019 | int | Estimated population at 2019 |
| is_last | boolean | This data is an update? |
| order_for_place | int |Sorting for this location |
| place_type | int | Data from state or city |
| date | int | Date of notification |
| cty_id | int (FK) | IBGE code of city |

#### Epidemic Day Fact Table


| Field | Type | Description |
| ----- |------|-------------|
| city_date | int (PK) | Concat of IBGE code city and date |
| date | date | date |
| hospital_notificacao_id | int (FK) | Notification hospital ID |
| cases_id | int (FK) | case ID |
| vaccination_paciente_id | int (FK) | Vaccination Paciente ID |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

To process the data in this project, we used PySpark. The Spark was can be faster than Hadoop for large scale data processing, and we can process the data in a parallel way, so, it's easier to scale our cluster if it's necessary.
All data are saved into parquet files. Parquet format was chosen because it's a format that can be stored into S3 and can be copied to the Redshift cluster with the same structure. So, it's easy to get data from parquet files, at a staging bucket and load this into a table at Redshift Cluster.

* Propose how often the data should be updated and why.


The data should be updated once a day. We are working with COVID 19 data, so, it's necessary at the end of the day for all the public and private organizations can send their notifications to the government.

* Write a description of how you would approach the problem differently under the following scenarios:


 * The data was increased by 100x.

If the data was increased by 100x we can expand our cluster nodes to process the data faster, it's an easy solution because we're using spark, so, updates at code aren't necessary.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

If the data must update a dashboard at 7 a.m., we can set the ETL process to be executed at night. The data are available only at night when the notifications of the day are consolidated, so, it's not a problem to run the ETL process during the night. We also can create a new table with fields that are used by the dashboard and make this more performative.

 * The database needed to be accessed by 100+ people.

The tables are designed by the analytical team to have insights about the data, if the number of people that consult the database increase, we can create new OLAP cubes to make available the data faster for analytical queries. If another team must have access to the raw data, it's possible to create a staging step, that saves raw data into a bucket, so, the team can be accessed to the raw data at the bucket, the parquet files, and the tables at Redshift, depending on their necessities.