# Project Capstone
### Data Engineering Capstone Project

#### Project Summary
This Project aims to provide useful information to analysts as well as for the immigration staff in order they can make decisions regarding immigration process because they will be able to access what is the current and historical situation about population who have entered the country, what is the relation with demographics indicators and also what has been the behavior regarding COVID19 in the country where visitors resides.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd, re

### Step 1: Scope the Project and Gather Data

#### Scope 
This Project aims to provide useful information to analysts as well as for the immigration staff in order they can make decisions regarding immigration process because they will be able to access what is the current and historical situation about population who have entered the country, what is the relation with demographics indicators and also what has been the behavior regarding COVID19 in the country where visitors resides. For that we have built a Datapipeline by using pandas, spark with python. The information is taken from different sources, organize them into a model and written into parquet files.  

#### Describe and Gather Data 

* Immigration data comes from the <a href="https://www.trade.gov/national-travel-and-tourism-office">US National Tourism and Trade Office</a>
* US cities demographics information is provided with the project. Likewise data of countries, us states and i94ports to ensure the data quality.
* COVID19 by country is from <a href="https://www.kaggle.com/datasets/jcsantiago/covid19-by-country-with-government-response">Kaggle</a>

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, levenshtein
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [3]:
#Parses the file I94_SAS_Labels_Descriptions.SAS dinamically by iterating each line of each group of data within the file, discard the data that shouldn't be used, and then move the data into a dictionary and then into a pandas Dataframe.
def build_sas_table(target,array_words,keytype):
    array_valids = {}
    with open('I94_SAS_Labels_Descriptions.SAS') as f:
        start = False
        for line in f:
            if not start:
                match_start = re.compile(r'%s$' %(target)).search(line)
            if match_start:
                start = True
                match_end = re.compile(r';$').search(line)
                if match_end:
                    break
                if keytype == 1:
                    match = re.compile(r"'([^=]*)'.*'(.*)'").search(line)
                else:
                    match = re.compile(r"([^=]*).*'(.*)'").search(line)
                if match:         
                    i94port_invalid = False
                    for i in array_words:             
                        if re.search(i,match[2]):                       
                            i94port_invalid = True
                            break
                    if not i94port_invalid:
                        array_valids[match[1]]=match[2].strip()
    return array_valids

In [4]:
# Extract the data from I94_SAS_Labels_Descriptions.SAS to clean it and generate the Dataframe for dim_i94ports
dfi94port = pd.DataFrame.from_dict(build_sas_table("i94prtl",['No PORT','Collapsed'],1), orient='index', columns = {'i94port_name'})

In [39]:
# Extract the data from I94_SAS_Labels_Descriptions.SAS to clean it and generate the Dataframe for dim_countries
country = pd.DataFrame.from_dict(build_sas_table("i94cntyl",['INVALID','Collapsed','No Country'],2), orient='index', columns = {'country_name'})

In [6]:
# Extract the data from I94_SAS_Labels_Descriptions.SAS to clean it and generate the Dataframe for dim_us_states
us_states = pd.DataFrame.from_dict(build_sas_table("i94addrl",[],1), orient='index', columns = {'us_states_codes'})

In [7]:
#Read the file covid19_by_country.csv
covid19_by_country = pd.read_csv('covid19_by_country_small.csv', sep=',')
covid19_by_country_table = spark.createDataFrame(covid19_by_country) 

In [8]:
#Read the file us-cities-demographics.csv
demographic = pd.read_csv('us-cities-demographics.csv', sep=';')
demographic_table = spark.createDataFrame(demographic) 

In [9]:
#Read the files from trade.gov
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

### Step 3: Data Model
#### 3.1 Conceptual Data Model

<img src="../assets/ER_diagram_updated.png" width="1250"/>

#### 3.2 Mapping Out Data Pipelines

* Given the file I94_SAS_Labels_Descriptions.SAS is in the project's root directory, likewise the different sources (covid19_by_country.csv, us-cities-demographics.csv)
* Parse the file I94_SAS_Labels_Descriptions.SAS dinamically by iterating each line of each group of data within the file, clean the data, and then move it into a dictionary and then into a pandas Dataframe.
* Generate the TempView for dim_i94ports table. 
* Generate the TempView for dim_countries table.
* Generate the TempView for dim_us_states table.
* Read the file covid19_by_country.csv and then generate the TempView for covid_by_country table, grouping the data by country, year, month and leaving the most recient number of active cases.
* Compare by applying Levenshtein and spark algorythm for set the threecountrycode in the countries_table. Add that new column to dim_countries table.
* Read the file us-cities-demographics.csv and then generate the TempView for dim_demographic table, grouping the data by us_state and summing the foreignborn, total population, population by gender and average population ages.
* Read the folder where the immigration data is, clean the data by joining on dim i94ports table and dim us_states table.
* Generate the TemView for immigrations table.
* Write the parquet files for the tables: dim i94ports, dim countries, dim us_states, dim demographic, covid_by_country and immigration.
* Run quality checks to ensure the data were loaded properly and schemas are correct. 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [40]:
#Generate TempView for dim_i94ports table
dfi94port.reset_index(drop=False,inplace=True)
i94ports_table = spark.createDataFrame(dfi94port) 
i94ports_table.createOrReplaceTempView("i94ports_table")
i94ports_table.show(3)
i94ports_table.describe().show()

+-------+-----+--------------------+
|level_0|index|        i94port_name|
+-------+-----+--------------------+
|      0|  ALC|           ALCAN, AK|
|      1|  ANC|       ANCHORAGE, AK|
|      2|  BAR|BAKER AAF - BAKER...|
+-------+-----+--------------------+
only showing top 3 rows

+-------+------------------+-----+------------+
|summary|           level_0|index|i94port_name|
+-------+------------------+-----+------------+
|  count|               588|  588|         588|
|   mean|             293.5|888.0|        null|
| stddev|169.88525539316237|  NaN|        null|
|    min|                 0|  48Y|ABERDEEN, WA|
|    max|               587|  ZZZ|    YUMA, AZ|
+-------+------------------+-----+------------+



In [41]:
#Generate TempView for dim_countries table
country.reset_index(drop=False,inplace=True)
countries_table = spark.createDataFrame(country)
countries_table = countries_table.selectExpr("index","country_name","LOWER(country_name) AS country_name_lower")
#countries_table.createOrReplaceTempView("countries_table")
countries_table.show(3)
countries_table.describe().show()

+-------+------------+------------------+
|  index|country_name|country_name_lower|
+-------+------------+------------------+
|   582 |      MEXICO|            mexico|
|   236 | AFGHANISTAN|       afghanistan|
|   101 |     ALBANIA|           albania|
+-------+------------+------------------+
only showing top 3 rows

+-------+------------------+------------+------------------+
|summary|             index|country_name|country_name_lower|
+-------+------------------+------------+------------------+
|  count|               236|         236|               236|
|   mean| 362.5550847457627|        null|              null|
| stddev|187.97711816271342|        null|              null|
|    min|              101 | AFGHANISTAN|       afghanistan|
|    max|              760 |    ZIMBABWE|          zimbabwe|
+-------+------------------+------------+------------------+



In [42]:
#Generate TempView for dim_us_states table
us_states.reset_index(drop=False,inplace=True)
us_states_table = spark.createDataFrame(us_states) 
us_states_table.createOrReplaceTempView("us_states_table")
us_states_table.show(3)
us_states_table.describe().show()

+-------+-----+---------------+
|level_0|index|us_states_codes|
+-------+-----+---------------+
|      0|   AL|        ALABAMA|
|      1|   AK|         ALASKA|
|      2|   AZ|        ARIZONA|
+-------+-----+---------------+
only showing top 3 rows

+-------+------------------+-----+---------------+
|summary|           level_0|index|us_states_codes|
+-------+------------------+-----+---------------+
|  count|                54|   54|             54|
|   mean|              26.5| null|           null|
| stddev|15.732132722552274| null|           null|
|    min|                 0|   AK|        ALABAMA|
|    max|                53|   WY|        WYOMING|
+-------+------------------+-----+---------------+



In [44]:
#Generate TempView for dim_covid_by_country table grouped by country
def generate_covid19_table_grouped(table):
    covid19_table_grouped = table.selectExpr("Country", "confirmed_inc", "year(Date) AS year", "month(Date) AS month", "countryalpha3code") \
                        .groupBy("countryalpha3code","year","month") \
                        .agg({"confirmed_inc":"last","Country":"last"}) \
                        .withColumnRenamed('last(year)','year') \
                        .withColumnRenamed('last(confirmed_inc)','confirmed_inc') \
                        .withColumnRenamed('last(Country)','country')
    covid19_table_grouped.createOrReplaceTempView("covid19_table_grouped")
    return covid19_table_grouped
covid19_table_grouped = generate_covid19_table_grouped(covid19_by_country_table)
covid19_table_grouped.show(3)
covid19_table_grouped.summary().show()

+-----------------+----+-----+---------+-------------+
|countryalpha3code|year|month|  country|confirmed_inc|
+-----------------+----+-----+---------+-------------+
|              AUS|2020|    6|Australia|         86.0|
|              AUT|2021|    7|  Austria|        538.0|
|              BHR|2021|    1|  Bahrain|        431.0|
+-----------------+----+-----+---------+-------------+
only showing top 3 rows

+-------+-----------------+------------------+-----------------+-----------+------------------+
|summary|countryalpha3code|              year|            month|    country|     confirmed_inc|
+-------+-----------------+------------------+-----------------+-----------+------------------+
|  count|              770|               770|              770|        770|               770|
|   mean|             null|2020.4233766233767|5.855844155844156|       null|1563.2805194805194|
| stddev|             null|0.4944151301378696|3.207395286996354|       null| 6619.320648358399|
|    min|     

In [14]:
#Generate temporal dataframe for dim_covid_by_country country names
def generate_countries_from_covid19_df(table):
    countries_from_covid19_df = table.selectExpr("countryalpha3code", "country", "LOWER(country) AS lower_country") \
                        .groupBy("countryalpha3code") \
                        .agg({"country":"last", "lower_country":"last"}) \
                        .withColumnRenamed('last(country)','country') \
                        .withColumnRenamed('last(lower_country)','country_lower')
    return countries_from_covid19_df
countries_from_covid19_df = generate_countries_from_covid19_df(covid19_table_grouped)
countries_from_covid19_df.show(3)

+-----------------+--------+-------------+
|countryalpha3code| country|country_lower|
+-----------------+--------+-------------+
|              BRB|Barbados|     barbados|
|              BRA|  Brazil|       brazil|
|              ARM| Armenia|      armenia|
+-----------------+--------+-------------+
only showing top 3 rows



37

In [15]:
#Merge country table and covid19_by_country, then apply levenshtein algorythm to link i94res and countryalpha3code fields
merged_data=countries_from_covid19_df.crossJoin(countries_table)
merged_data = merged_data.withColumn("levenshtein", levenshtein(col("country_name_lower"), col("country_lower")))
merged_data = merged_data.filter("levenshtein < 2")
countries_table = merged_data.selectExpr("countryalpha3code","index as i94res","country")
countries_table.show(3)

+-----------------+-------+--------+
|countryalpha3code| i94res| country|
+-----------------+-------+--------+
|              BRB|   513 |Barbados|
|              BRA|   689 |  Brazil|
|              ARM|   151 | Armenia|
+-----------------+-------+--------+
only showing top 3 rows



In [16]:
#Generate TempView for dim_demographic table grouped by country
def generate_demographic_table_grouped(table):
    demographic_table_grouped = table.select("State Code","Foreign-born","Total Population","Median Age", "Male Population", "Female Population") \
                        .groupBy(col("State Code").alias("statecode")) \
                        .agg({"Foreign-born":"sum","Total Population":"sum","Median Age":"mean","Male Population":"sum","Female Population":"sum"}) \
                        .withColumnRenamed('sum(Foreign-born)','foreignborn') \
                        .withColumnRenamed('sum(Total Population)','totalpopulation') \
                        .withColumnRenamed('sum(Male Population)','malepopulation') \
                        .withColumnRenamed('sum(Female Population)','femalepopulation') \
                        .withColumnRenamed('avg(Median Age)','medianage') 
    demographic_table_grouped.createOrReplaceTempView("demographic_table_grouped")
    return demographic_table_grouped
demographic_table_grouped = generate_demographic_table_grouped(demographic_table)
demographic_table_grouped.show(3)

+---------+---------------+----------------+-----------------+-----------+--------------+
|statecode|totalpopulation|femalepopulation|        medianage|foreignborn|malepopulation|
+---------+---------------+----------------+-----------------+-----------+--------------+
|       AZ|       22497710|     1.1360435E7|35.03750000000001|  3411565.0|   1.1137275E7|
|       SC|        2586976|       1321685.0|33.82500000000001|   134019.0|     1265291.0|
|       LA|        6502975|       3367985.0|34.62500000000001|   417095.0|     3134990.0|
+---------+---------------+----------------+-----------------+-----------+--------------+
only showing top 3 rows



In [18]:
#Generate the TempView for dim_immigration table joining on dim_countries, dim_i94ports, dim_us_states and dim_demographic 
def generate_immigration_table_temp(table):    
    immigration_table_temp = spark.sql("""
        SELECT i94yr, i94mon, i94res, i94port, i94addr, arrdate, depdate, gender, biryear, fltno, i94visa
        FROM immigration_table_temp
        JOIN i94ports_table ON (immigration_table_temp.i94port = i94ports_table.index)
        JOIN us_states_table ON (immigration_table_temp.i94addr = us_states_table.index)
        """)
    immigration_table_temp.createOrReplaceTempView(table)
    return immigration_table_temp
df_immigration.createOrReplaceTempView("immigration_table_temp")
immigration_table_temp = generate_immigration_table_temp("immigration_table_temp")
immigration_table_temp.show(3)

+------+------+------+-------+-------+-------+-------+------+-------+-----+-------+
| i94yr|i94mon|i94res|i94port|i94addr|arrdate|depdate|gender|biryear|fltno|i94visa|
+------+------+------+-------+-------+-------+-------+------+-------+-----+-------+
|2016.0|   4.0| 509.0|    FMY|     AZ|20545.0|20595.0|     M| 1962.0|01773|    2.0|
|2016.0|   4.0| 135.0|    FMY|     AZ|20546.0|20555.0|     M| 2001.0|00137|    2.0|
|2016.0|   4.0| 135.0|    FMY|     AZ|20546.0|20579.0|     M| 1948.0|00127|    2.0|
+------+------+------+-------+-------+-------+-------+------+-------+-----+-------+
only showing top 3 rows



In [73]:
#Write dim_i94ports into parquet file
i94ports_table.write.mode('overwrite').parquet("output_data/dim_i94ports/dim_i94ports.parquet")

In [74]:
#Write dim_countries into parquet file
countries_table.write.mode('overwrite').parquet("output_data/dim_countries/dim_countries.parquet")

In [75]:
#Write dim_us_states into parquet file
us_states_table.write.mode('overwrite').parquet("output_data/dim_us_states/dim_us_states.parquet")

In [78]:
#Write dim_covid_by_country into parquet file
covid19_table_grouped.write.mode('overwrite').partitionBy("country").parquet("output_data/dim_covid_by_country/dim_covid_by_country.parquet")

In [79]:
#Write dim_demographic into parquet file
demographic_table_grouped.write.mode('overwrite').parquet("output_data/dim_demographic/dim_demographic.parquet")

In [80]:
#Write dim_immigration into parquet file
immigration_table_temp.write.mode('overwrite').partitionBy("i94addr").parquet("output_data/dim_immigration/dim_immigration.parquet")

#### 4.2 Data Quality Checks

In [19]:
#Checks number of records is greater than 0
def verify_not_empty(df):
    num_rows = df.count()
    assert(num_rows > 0)
    
# Perform quality checks here
verify_not_empty(i94ports_table) 
verify_not_empty(countries_table) 
verify_not_empty(us_states_table) 
verify_not_empty(covid19_table_grouped) 
verify_not_empty(demographic_table_grouped) 
verify_not_empty(immigration_table_temp) 

In [38]:
#Checks number of columns of each Dataframe
def verify_num_cols(df,number):
    num_cols = len(df.columns)
    assert(num_cols == number)

# Perform quality checks here
verify_num_cols(i94ports_table,2) 
verify_num_cols(countries_table,3) 
verify_num_cols(us_states_table,2) 
verify_num_cols(covid19_table_grouped,5) 
verify_num_cols(demographic_table_grouped,6) 
verify_num_cols(immigration_table_temp,11) 

#### 4.3 Data dictionary 

dim_demographic
* statecode: US States code
* medianage: Population average age in that US State
* totalpopulation: Total population in that US State
* malepopulation: Total population of male gender in that US State
* malepopulation: Total population of female gender in that US State
* foreignborn: Foreign born population in that specific US State

dim_immigration
* i94yr: 4 digits year
* i94mon: Numeric month
* i94port: Destination city
* i94res: Country code where the immigrant resides
* i94addr: US State
* arrdate: Date of arrival to US
* depdate: Date to depart from US
* gender: Gender
* biryear: Year of birth
* fitno: Flight number
* i94visa: Category of Visa (1 = Business, 2 = Pleasure, 3 = Student)

dim_i94ports
* index: Destination city code
* i94port: Destination city name

dim_countries
* i94res: Country code where the immigrant resides
* country: Country name
* countryaplpha3code: International Alpha 3 code for countries

dim_us_states
* index: US State code
* country_name: US State name

dim_covid19_by_country
* country: Country name
* date: Date of statistics
* confirmed_inc: Confirmed cases

#### Step 5: Project Write Up
* Data should be updated monthly because trade.gov posts the information in that frecuency. COVID19 data is posted daily and should be updated in the same frecuency. Demographic information could by updated yearly.
* Mainly the selected tool has been Spark implemented with Python because it is robust, powerful and easy to integrate, also oriented to handling large volumes of data. In addition, it allows you to use a set of libraries such as Dataframes, SQL, algorithms such as Levenshtein's, etc. You can also scale up towards an EMR in AWS. The idea is to write the Parquet files in a AWS S3 bucket and then be extracted through the Data Pipeline to a AWS Redshift Database, which can be managed by end users through an API or Backend.
* A Star Schema has been used for modeling the data with a slight mix with a Snowflake Schema, also known as Starflake Schema. This is due to the characteristics of the information that comes in the COVID19 table. You can either modify the information during the execution of the Pipeline or to set a relation between the covid19 table and the country table to solve that. The Star Schema allows you to make queries easiers with simple joins, furthermore aggegrations functions are lighter when you carry them out.  
* Airflow would be a great and the suitable solution to implement the Pipeline calling the same funcions written here in a Operator which are able to be called from an Airflow schedule.
* Description of how the project would approach the problem differently under the following scenarios:
 * If the data was increased by 100x is recommendable to move the Pipeline to a AWS EMR.
 * If the data populates a dashboard that must be updated on a daily basis by 7am every day is recommendable to set up Airflow Schedule and implement the same code in Operators to do that in the correct fashion.
 * If the database needed to be accessed by 100+ people is recommendable to populate a AWS Redshift with the model.