### Data Engineering Capstone Project

#### Project Summary
This project was made using Spark in order to make a datawarehouse in parquet file format that reflects inmigration data in US airports. It uses a star schema with a facts table an dimensional tables.

In [1]:
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

from Scripts.datafromsource import Datafromsource
from Scripts.clean import Clean
from Scripts.transformer import Transformer
from Scripts.schema import Schema
from Scripts.validator import Validator

In [2]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

## Data Configurations

In [3]:
paths = {
    "demographics" : "./data/us-cities-demographics.csv",
    "airports" :  "./data/airport-codes_csv.csv",
    "sas_data" : "./data/sas_data",
    "us_states" : "./data/us_states.csv",
    "cities" : "./data/cities.csv",
    "countries" : "./data/countries.csv",
    "visa" : "./data/visa.csv",
    "inmigrant_airports" : "./data/airports.csv",
    "mode" : "./data/mode.csv",
    "airlines" : "./data/airlines.dat"
}

### Step 1: Scope the Project and Gather Data

### Scope
This project will pull data from all sources and create fact and dimension tables to show movement of immigration in US.

### Data Description:
*U.S. City Demographic Data (demog)*: comes from OpenSoft and includes data by city, state, age, population, veteran status and race.

*I94 Immigration Data (sas_data)*: comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.

*Airport Code Table (airport)*: comes from datahub.io and includes airport codes and corresponding cities.

*Countries (countries)*: comes from I94_SAS_Labels_Descriptions.SAS

*Visas (visa)*: comes from I94_SAS_Labels_Descriptions.SAS

*Inmigrant Entry Mode (mode)*: comes from I94_SAS_Labels_Descriptions.SAS

Airlines: comes from https://raw.githubusercontent.com/jpatokal/openflights/master/data/airlines.dat


In [4]:
# Reading  in the data here
source = Datafromsource(spark, paths)

demog = source.get_cities_demographics_raw()
airport=source.get_airports_raw()
sas_data = source.get_inmigration_raw()
countries = source.get_countries_raw()
visa = source.get_visa_raw()
mode = source.get_mode_raw()
airlines = source.get_airlines()

#### Checking data in original raw format

In [5]:
demog.show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

In [6]:
airport.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [7]:
sas_data.show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [8]:
countries.show()

+----+--------------------+
|code|        country_name|
+----+--------------------+
| 582|MEXICO Air Sea, a...|
| 236|         AFGHANISTAN|
| 101|             ALBANIA|
| 316|             ALGERIA|
| 102|             ANDORRA|
| 324|              ANGOLA|
| 529|            ANGUILLA|
| 518|     ANTIGUA-BARBUDA|
| 687|           ARGENTINA|
| 151|             ARMENIA|
| 532|               ARUBA|
| 438|           AUSTRALIA|
| 103|             AUSTRIA|
| 152|          AZERBAIJAN|
| 512|             BAHAMAS|
| 298|             BAHRAIN|
| 274|          BANGLADESH|
| 513|            BARBADOS|
| 104|             BELGIUM|
| 581|              BELIZE|
+----+--------------------+
only showing top 20 rows



In [9]:

visa.show()

+---------+--------+
|visa_code|    visa|
+---------+--------+
|        1|Business|
|        2|Pleasure|
|        3| Student|
+---------+--------+



In [10]:
mode.show()

+--------+----------------+
|cod_mode|       mode_name|
+--------+----------------+
|     1.0|             Air|
|     2.0|             Sea|
|     3.0|            Land|
|     9.0|Not reportedmode|
+--------+----------------+



In [11]:
airlines.show()

+----------+--------------------+-----+----+----+--------------+--------------+------+
|Airline_ID|                Name|Alias|IATA|ICAO|      Callsign|       Country|Active|
+----------+--------------------+-----+----+----+--------------+--------------+------+
|        -1|             Unknown|   \N|   -| N/A|            \N|            \N|     Y|
|         1|      Private flight|   \N|   -| N/A|          null|          null|     Y|
|         2|         135 Airways|   \N|null| GNL|       GENERAL| United States|     N|
|         3|       1Time Airline|   \N|  1T| RNX|       NEXTIME|  South Africa|     Y|
|         4|2 Sqn No 1 Elemen...|   \N|null| WYT|          null|United Kingdom|     N|
|         5|     213 Flight Unit|   \N|null| TFU|          null|        Russia|     N|
|         6|223 Flight Unit S...|   \N|null| CHD|CHKALOVSK-AVIA|        Russia|     N|
|         7|   224th Flight Unit|   \N|null| TTF|    CARGO UNIT|        Russia|     N|
|         8|         247 Jet Ltd|   \N|null

### Step 2: Explore and Assess the Data

#### Cleaning Steps
Main steps are:

* Clean demographics dataset, filling null values withn 0 and grouping by city and state and pivot Race in diferent columns
* Clean airports dataset filtering only US airports and discarting anything else that is not an airport. Extract iso regions and cast as float elevation feet.
* Clean the inmigrantion dataset. Rename columns with understandable names. Put correct formats in dates and select only important columns
* Clean airlines dataset and filter only airlines with IATA code.

In [12]:
demographic_clean = Clean.get_cities_demographics(demog)
demographic_clean.show()

+---------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|           City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino| White|
+---------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|         Skokie|      Illinois|      43.4|          31382|            33437|           64819|              1066|       27424|                  2.78|        IL|                                0|20272|                     4937|              6

In [13]:

airport_clean = Clean.get_airports(airport)
airport_clean.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| 00AA|small_airport|Aero B Ranch Airport|      3435.0|       NA|         US|        KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|       450.0|       NA|         US|        AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|       820.0|       NA|         US|        AL|     Harvest|    00AL|     null|      00AL|-86.7703018188476...|
| 00AS|small_airport|      Fulton Airport|      1100.0|       NA|         US|     

In [14]:
inmigrant_clean = Clean.get_inmigration(sas_data)
inmigrant_clean.show()

+-------+--------+---------+--------+-------+--------+------+-------+--------------+-----+--------+--------+--------+------------------+---------------+----+-----+---------+---+-------+------------+--------------+
| cic_id|cod_port|cod_state|visapost|matflag| dtaddto|gender|airline|        admnum|fltno|visatype|cod_visa|cod_mode|cod_country_origin|cod_country_cit|year|month|bird_year|age|counter|arrival_date|departure_date|
+-------+--------+---------+--------+-------+--------+------+-------+--------------+-----+--------+--------+--------+------------------+---------------+----+-----+---------+---+-------+------------+--------------+
|5748517|     LOS|       CA|     SYD|      M|10292016|     F|     QF|9.495387003E10|00011|      B1|       1|       1|               438|            245|2016|    4|     1976| 40|      1|  2016-04-30|    2016-05-08|
|5748518|     LOS|       NV|     SYD|      M|10292016|     F|     VA|9.495562283E10|00007|      B1|       1|       1|               438|        

In [15]:
countries_clean = Clean.get_countries(countries)
countries_clean.show()

+-----------+--------------------+
|cod_country|        country_name|
+-----------+--------------------+
|        582|MEXICO Air Sea, a...|
|        236|         AFGHANISTAN|
|        101|             ALBANIA|
|        316|             ALGERIA|
|        102|             ANDORRA|
|        324|              ANGOLA|
|        529|            ANGUILLA|
|        518|     ANTIGUA-BARBUDA|
|        687|           ARGENTINA|
|        151|             ARMENIA|
|        532|               ARUBA|
|        438|           AUSTRALIA|
|        103|             AUSTRIA|
|        152|          AZERBAIJAN|
|        512|             BAHAMAS|
|        298|             BAHRAIN|
|        274|          BANGLADESH|
|        513|            BARBADOS|
|        104|             BELGIUM|
|        581|              BELIZE|
+-----------+--------------------+
only showing top 20 rows



In [16]:
visa_clean = Clean.get_visa(visa)
visa_clean.show()

+--------+--------+
|cod_visa|    visa|
+--------+--------+
|       1|Business|
|       2|Pleasure|
|       3| Student|
+--------+--------+



In [17]:
mode_clean = Clean.get_mode(mode)
mode_clean.show()


+--------+----------------+
|cod_mode|       mode_name|
+--------+----------------+
|       1|             Air|
|       2|             Sea|
|       3|            Land|
|       9|Not reportedmode|
+--------+----------------+



In [18]:

airlines_clean = Clean.get_airlines(airlines)
airlines_clean.show()

+----------+--------------------+----+----+------------+-----------------+------+
|Airline_ID|                Name|IATA|ICAO|    Callsign|          Country|Active|
+----------+--------------------+----+----+------------+-----------------+------+
|         3|       1Time Airline|  1T| RNX|     NEXTIME|     South Africa|     Y|
|        10|         40-Mile Air|  Q5| MLA|    MILE-AIR|    United States|     Y|
|        13|    Ansett Australia|  AN| AAA|      ANSETT|        Australia|     Y|
|        14|Abacus International|  1B|null|        null|        Singapore|     Y|
|        15|     Abelag Aviation|  W9| AAB|         ABG|          Belgium|     N|
|        21|          Aigle Azur|  ZI| AAF|  AIGLE AZUR|           France|     Y|
|        22|      Aloha Airlines|  AQ| AAH|       ALOHA|    United States|     Y|
|        24|   American Airlines|  AA| AAL|    AMERICAN|    United States|     Y|
|        28|     Asiana Airlines|  OZ| AAR|      ASIANA|Republic of Korea|     Y|
|        29|    

### Step 3: Defination the Data Model
The following data model is designed in the following way 
#### Star Schema
#### Dimension Tables:

* dim_demographics
    * State, state_code, Total_Population, Male_Population, Female_Population, American_Indian_and_Alaska_Native, Asian, Black_or_African-American,   Hispanic_or_Latino, White, Male_Population_Ratio, Female_Population_Ratio, American_Indian_and_Alaska_Native_Ratio, Asian_Ratio, Black_or_African-American_Ratio, Hispanic_or_Latino_Ratio, White_Ratio.
* dim_airports
    * ident, type, name, elevation_ft, continent, iso_country, iso_region, municipality, gps_code, iata_code, local_code, coordinates.
* dim_airlines
    * Airline_ID, Name, IATA, ICAO, Callsign, Country, Active.
* dim_countries:
    * cod_country, country_name
* dim_get_visa:
    * cod_visa, visa.
* dim_get_mode:
    * cod_mode, mode_name.

#### Fact Table:

* immigration_fact_table
    * cic_id, cod_port, cod_state, visapost, matflag, dtaddto, gender, airline, admnum, fltno, visatype, cod_visa, cod_mode, cod_country_origin, cod_country_cit, year, month, bird_year, age, counter, arrival_date, departure_date, arrival_year, arrival_month, arrival_day.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

* Tranform data:
     * Transform demographics dataset grouping by state an calculate all the totals and ratios for every race in every state.
     * Transform inmigration dataset on order to get arrival date in different columns (year, month, day) for partitioning the dataset.   
* Generate Model (Star Schema):
    * Create all dimensions in parquet.
    * Create fact table in parquet particioned by year, month, day of th arrival date.
    * Insert in fact table only items with dimension keys right. For integrity and consistency.

In [20]:
demog_transformer = Transformer.transform_demographics(demographic_clean)
demog_transformer.show()

+----------+--------------------+----------------+---------------+-----------------+---------------------------------+-------+-------------------------+------------------+--------+---------------------+-----------------------+---------------------------------------+-----------+-------------------------------+------------------------+-----------+
|State_code|               State|Total_Population|Male_Population|Female_Population|American_Indian_and_Alaska_Native|  Asian|Black_or_African-American|Hispanic_or_Latino|   White|Male_Population_Ratio|Female_Population_Ratio|American_Indian_and_Alaska_Native_Ratio|Asian_Ratio|Black_or_African-American_Ratio|Hispanic_or_Latino_Ratio|White_Ratio|
+----------+--------------------+----------------+---------------+-----------------+---------------------------------+-------+-------------------------+------------------+--------+---------------------+-----------------------+---------------------------------------+-----------+--------------------------

In [21]:
inmigrant_transformer = Transformer.transform_inmigrants(inmigrant_clean)
inmigrant_transformer.show()

+-------+--------+---------+--------+-------+--------+------+-------+--------------+-----+--------+--------+--------+------------------+---------------+----+-----+---------+---+-------+------------+--------------+------------+-------------+-----------+
| cic_id|cod_port|cod_state|visapost|matflag| dtaddto|gender|airline|        admnum|fltno|visatype|cod_visa|cod_mode|cod_country_origin|cod_country_cit|year|month|bird_year|age|counter|arrival_date|departure_date|arrival_year|arrival_month|arrival_day|
+-------+--------+---------+--------+-------+--------+------+-------+--------------+-----+--------+--------+--------+------------------+---------------+----+-----+---------+---+-------+------------+--------------+------------+-------------+-----------+
|5748517|     LOS|       CA|     SYD|      M|10292016|     F|     QF|9.495387003E10|00011|      B1|       1|       1|               438|            245|2016|    4|     1976| 40|      1|  2016-04-30|    2016-05-08|        2016|           04| 

### Step 4: Run Pipelines to Model the Data 
Build the data pipelines to create the data model.

#### Configuration to write the Model

In [22]:
paths_write = {
    "demographics" : "./model/demographics.parquet",
    "airports" :  "./model/airports.parquet",
    "airlines" : "./model/airlines.parquet",
    "countries" : "./model/countries.parquet",
    "visa" : "./model/visa.parquet",
    "mode" : "./model/mode.parquet",
    "facts" : "./model/facts_inmigration.parquet"
}

In [23]:
model = Schema(spark, paths_write)

In [26]:
model.modelize(inmigrant_transformer, demog_transformer, airport_clean, airlines_clean, countries_clean, visa_clean, mode_clean)

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Checks:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Running Quality Checks

In [27]:
validator = Validator(spark, paths_write)

In [28]:
facts = validator.get_facts()

In [29]:
dim_demographics, dim_airports, dim_airlines, dim_countries, dim_get_visa, dim_get_mode = validator.get_dimensions()

#### Validate all dimensions have data

In [32]:
validator.exists_rows(dim_demographics)

True

In [33]:
validator.exists_rows(dim_airlines)

True

In [34]:
validator.exists_rows(dim_countries)

True

In [35]:
validator.exists_rows(dim_get_visa)

True

In [36]:
validator.exists_rows(dim_get_mode)

True

In [37]:
validator.exists_rows(facts)

True

#### Check consistency of the model

In [38]:
validator.check_integrity(facts, dim_demographics, dim_airports, dim_airlines, dim_countries, dim_get_visa, dim_get_mode)

True

#### 4.3 Data dictionary 

A brief description of the data model is mentioned in a separate file - *TableDescription*


### Step 5:Complete Project Write Up



* For this project I have used Apache Spark to do all the processing data and create the model. The reason for this is because Spark can scale a lot of data and the library spark.sql has many tools to transform data which are easy to implement.It can easily handle multiple file formats (SAS, csv, etc) that contain large amounts of data. Spark SQL was used to process the input files into dataframes and manipulated via standard SQL join operations to create the tables.The data persisted in parquet files in *model* folder can scale to losts of terabytes without any problems.

* The data should be updated every day. We can use Apache Airflow to ingest every day (arrival date) because fact table are partitioned by arrival date.


#### Scenarios
    Description of how to approach the problem differently under the following scenarios
* the data was increased by 100x
    - Though spark can handle the data if it's increased by 100x but for more analytical optimisation we do we have an option of loading data into amazon redhsift which is optimised for aggregating and read-heavy workloads.

* To update on a daily basis*
    - Using Airflow, create DAG retries or send emails on failures.
    - Have daily quality checks; if fail, send emails to operators and freeze dashboards

* If the data needs to be accessed by 100+ people*
    - we can use Hive, Spark sql template views or use Redshift since it has auto-scaling capabilities and good read performance.