# US Immigration Analytics ETL Pipeline
### Data Engineering Capstone Project

#### Project Summary
Goal of the project is to optimize analysis of immigration data. In order to achieve this, a data lake will be created using Spark and parquet file format. Data will be modelled as a star schema with a facts table an dimensional tables.

The project follows the follow steps:
- Step 1: Scope the Project and Gather Data
- Step 2: Explore and Assess the Data
- Step 3: Define the Data Model
- Step 4: Run ETL to Model the Data
- Step 5: Complete Project Write Up

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from data_cleaner import DataCleaner
from data_transformer import DataTransformer
from data_modeler import DataModeler
from data_quality import DataValidator

### Step 1: Scope the Project and Gather Data

#### Scope 
Data Lake would be created using star schema concept and stored in parquet files. Data model will consist of fact table containing information about immigration and dimension tables.
This model will enable the analysis of possible impact of many factors on immigration such as demographics or temperature.

Example questions this model can answer are:
* What is the country that most people migrated from in particular month
* How many people migrated with particular visa type
* Most immigrants by origin country and gender

#### Describe and Gather Data 
Project consists of the following datasets:
- I94 immigration data - this data comes from the US National Tourism and Trade Office website. It is provided in SAS7BDAT format which is a binary database storage format. 
    Source: https://www.trade.gov/national-travel-and-tourism-office
- U.S. City Demographic Data - demographics of all US cities and census-designated places with a population greater or equal to 65*,000. 
    Source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
- Airport Code Table - simple table of airport codes and corresponding cities.
    Source: https://datahub.io/core/airport-codes#data
- Airlines Code Table - dataset containing a list of airlines with their IATA and ICAO codes

In [2]:
data_source_paths = {
    "immigration" : "./sources/sas_data/*.parquet",
    "airlines" :  "./sources/airlines.csv",
    "airport_codes" :  "./sources/airport-codes.csv",
    "countries" : "./sources/countries.csv",
    "sas_labels_descriptions": "./sources/I94_SAS_Labels_Descriptions.SAS",
    "modes" : "./sources/modes.csv",
    "ports" : "./sources/ports.csv",
    "states" : "./sources/states.csv",
    "demographics" : "./sources/us-cities-demographics.csv",
    "visa" : "./sources/visa.csv"
}

In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.10"). \
enableHiveSupport().getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues*, like missing values*, duplicate data*, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### I94 Labels Descriptions

In [4]:
with open(data_source_paths["sas_labels_descriptions"]) as f:
    f_content = f.read()
    f_content = f_content.replace("\t", "")
    i94country = DataCleaner.i94_code_mapper(f_content, "i94cntyl", data_source_paths["countries"], ["country_code", "country"], ";")
    i94port = DataCleaner.i94_code_mapper(f_content, "i94prtl", data_source_paths["ports"], ["port_code", "port"], ";")
    i94mode = DataCleaner.i94_code_mapper(f_content, "i94mode", data_source_paths["modes"], ["mode_code", "mode"], ";")
    i94addr = DataCleaner.i94_code_mapper(f_content, "i94addrl", data_source_paths["states"], ["state_code", "state"], ";")
    i94visa = DataCleaner.i94_code_mapper(f_content, "I94VISA", data_source_paths["visa"], ["visa_code", "visa_type"], ";")

In [5]:
countries_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load(data_source_paths["countries"])

In [6]:
countries_df = DataCleaner.clean_country_codes(countries_df)

In [7]:
countries_df.limit(5).toPandas()

Unnamed: 0,country_code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [8]:
states_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load(data_source_paths["states"])

In [9]:
states_df.limit(5).toPandas()

Unnamed: 0,state_code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [10]:
ports_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load(data_source_paths["ports"])

In [11]:
ports_df = DataCleaner.clean_port_codes(ports_df)

In [12]:
ports_df.limit(5).toPandas()

Unnamed: 0,port_code,port_city,port_state
0,ALC,Alcan,AK
1,ANC,Anchorage,AK
2,BAR,Baker Aaf - Baker Island,AK
3,DAC,Daltons Cache,AK
4,PIZ,Dew Station Pt Lay Dew,AK


In [13]:
modes_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load(data_source_paths["modes"])

In [14]:
modes_df.limit(5).toPandas()

Unnamed: 0,mode_code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [15]:
visa_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load(data_source_paths["visa"])

In [16]:
visa_df.limit(5).toPandas()

Unnamed: 0,visa_code,visa_type
0,1,Business
1,2,Pleasure
2,3,Student


##### Airport codes data

In [17]:
airports_df = spark.read.format("csv").option("header", "true").load(data_source_paths["airport_codes"])

In [18]:
airports_df = DataCleaner.clean_airports(airports_df)

In [19]:
airports_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
1,00AK,small_airport,Lowell Field,450.0,,US,AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
2,00AL,small_airport,Epps Airpark,820.0,,US,AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
3,00AS,small_airport,Fulton Airport,1100.0,,US,OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
4,00AZ,small_airport,Cordes Airport,3810.0,,US,AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"


##### Airline codes data

In [20]:
airlines_df = spark.read.format("csv").option("header", "true").load(data_source_paths["airlines"], sep=';')

In [21]:
airlines_df = DataCleaner.clean_airlines(airlines_df)

In [22]:
airlines_df.limit(5).toPandas()

Unnamed: 0,iata_code,icao_code,airline,country
0,0A,GNT,Amber Air (Lithuania),Lithuania
1,0D,DWT,Darwin Airline (Switzerland),Switzerland
2,0J,JCS,Jetclub (Switzerland) Jetclub,Switzerland
3,0K,KRT,Kokshetau (Kazakhstan) Kokta,Kazakhstan
4,0W,WCR,West Caribbean Costa Rica (Costa Rica) West Ca...,Costa Rica


##### U.S. cities demographic data

In [23]:
demographics_df = spark.read.format("csv").option("header", "true").load(data_source_paths["demographics"], sep=';')

In [24]:
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [25]:
demographics_df.select(col("Race")).distinct().toPandas()

Unnamed: 0,Race
0,Black or African-American
1,Hispanic or Latino
2,White
3,Asian
4,American Indian and Alaska Native


In [26]:
demographics_df = DataCleaner.clean_demographics(demographics_df)

In [27]:
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Skokie,Illinois,43.4,31382,33437,64819,1066,27424,2.78,IL,0,20272,4937,6590,40642
1,Charlotte,North Carolina,34.3,396646,430475,827121,36046,128897,2.52,NC,8746,55399,301568,113731,446795
2,Manchester,New Hampshire,37.3,54845,55378,110223,5473,14506,2.4,NH,558,4304,6896,11962,100108
3,Chico,California,29.9,46168,44168,90336,4519,8425,2.5,CA,2766,6101,3164,15578,80467
4,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,1084,8841,21330,25924,37756


##### Immigration data

In [28]:
immigration_df = spark.read.parquet(data_source_paths["immigration"])

In [29]:
immigration_df = DataCleaner.clean_immigration(immigration_df)

In [30]:
immigration_df.limit(10).toPandas()

Unnamed: 0,arrival_year,arrival_month,port_code,mode_code,state_code,i94bir,visa_code,dtadfile,visapost,occup,...,airline,admnum,fltno,visatype,cic_id,origin_city_code,origin_country_code,arrival_date,departure_date,arrival_day
0,2016,4,LOS,1,CA,40,1,2016-04-30,SYD,,...,QF,94953870000.0,11,B1,5748517,245,438,2016-04-30,2016-05-08,30
1,2016,4,LOS,1,NV,32,1,2016-04-30,SYD,,...,VA,94955620000.0,7,B1,5748518,245,438,2016-04-30,2016-05-17,30
2,2016,4,LOS,1,WA,29,1,2016-04-30,SYD,,...,DL,94956410000.0,40,B1,5748519,245,438,2016-04-30,2016-05-08,30
3,2016,4,LOS,1,WA,29,1,2016-04-30,SYD,,...,DL,94956450000.0,40,B1,5748520,245,438,2016-04-30,2016-05-14,30
4,2016,4,LOS,1,WA,28,1,2016-04-30,SYD,,...,DL,94956390000.0,40,B1,5748521,245,438,2016-04-30,2016-05-14,30
5,2016,4,HHW,1,HI,57,2,2016-04-30,ACK,,...,NZ,94981800000.0,10,B2,5748522,245,464,2016-04-30,2016-05-05,30
6,2016,4,HHW,1,HI,66,2,2016-04-30,ACK,,...,NZ,94979690000.0,10,B2,5748523,245,464,2016-04-30,2016-05-12,30
7,2016,4,HHW,1,HI,41,2,2016-04-30,ACK,,...,NZ,94979750000.0,10,B2,5748524,245,464,2016-04-30,2016-05-12,30
8,2016,4,HOU,1,FL,27,2,2016-04-30,ACK,,...,NZ,94973250000.0,28,B2,5748525,245,464,2016-04-30,2016-05-07,30
9,2016,4,LOS,1,CA,26,2,2016-04-30,ACK,,...,NZ,95013550000.0,2,B2,5748526,245,464,2016-04-30,2016-05-07,30


### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

Star schema was chosen for the model since due to the fact that it's straightforward and easy to understand by the business and data doesn't require different levels of granularity to use the snowflake schema. It's also easy to maintain and reduces complexity of various queries performed on the dataset

--- 

**Dimension Tables**

**dim_demographics**
   - *city*, 
   - *state*, 
   - *male_population*, 
   - *female_population*, 
   - *total_population*, 
   - *foreign_born*, 
   - ***state_code***, 
   - *white*,
   - *asian*, 
   - *hispanic_or_latino*, 
   - *black_or_african_american*, 
   - *american_indian_or_alaskan*
   
**dim_countries**
   - ***country_code***, 
   - *country*
   
**dim_airports**
   - *ident*, 
   - *type*, 
   - *name*, 
   - *elevation_ft*, 
   - *continent*, 
   - *iso_country*, 
   - *iso_region*, 
   - *municipality*, 
   - *gps_code*, 
   - *iata_code*, 
   - ***local_code***, 
   - *coordinates*
   
**dim_modes**
   - ***mode_code***, 
   - *mode*
   
**dim_visa**
   - ***visa_code***, 
   - *visa_type*
   
**dim_airlines**
   - ***iata_code***, 
   - *icao_code*, 
   - *airline*, 
   - *country*
     
---      
    
**Fact Table**

**fact_immigration**
   - *cic_id*, 
   - ***port_code***, 
   - ***state_code***, 
   - ***visa_code***, 
   - ***mode_code***, 
   - ***origin_country_code***, 
   - *origin_city_code*, 
   - *arrival_date*, 
   - *arrival_year*, 
   - ***arrival_month***, 
   - *arrival_day*, 
   - *departure_date*, 
   - *matflag*, 
   - *visapost*, 
   - *dtaddto*, 
   - *biryear*, 
   - *gender*, 
   - ***airline***, 
   - *admnum*, 
   - *occup*, 
   - *fltno*, 
   - *visatype*



#### 3.2 Mapping Out Data Pipelines
The following steps need to be taken to pipeline the data:
- Extract I94 data labels and store them in the separate csv files
- Clean the data: 
   - *filter meaningful data*
   - *handle missing values*, 
   - *split strings into separate columns*, 
   - *extract dates*, 
   - *assign proper data types*
- Transform datasets
- Generate Model (Star Schema):
   - *Create all dimension tables and save them in parquet format.*
   - *Create fact table in parquet and partition data by year, month, day of arrival date.*
   - *Populate data in fact table in the way that provides integrity and consistency.*

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [31]:
demographics_df = DataTransformer.transform_demographics(demographics_df)

In [32]:
model_paths = {
    "demographics" : "./model/dim_demographics.parquet",
    "countries" : "./model/dim_countries.parquet",
    "visa" : "./model/dim_visa.parquet",
    "modes" : "./model/dim_mode.parquet",
    "airports" :  "./model/dim_airports.parquet",
    "airlines" : "./model/dim_airlines.parquet",
    "facts" : "./model/fact_immigration.parquet"
}

In [33]:
model = DataModeler(spark, model_paths)

In [34]:
model.model_dwh(immigration_df, demographics_df, countries_df, visa_df, modes_df, airports_df, airlines_df)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 - Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 - Unit tests for the scripts to ensure they are doing the right thing
 - Source/Count checks to ensure completeness
 
Run Quality Checks

In [38]:
validator = DataValidator(spark, model_paths)

In [39]:
facts = validator.get_facts()

In [40]:
dim_demographics, dim_countries, dim_visa, dim_mode, dim_airports, dim_airlines = validator.get_dimensions()

##### Validate all dimensions have data

In [41]:
validator.check_rows_exist(dim_demographics)

True

In [42]:
validator.check_rows_exist(dim_countries)

True

In [43]:
validator.check_rows_exist(dim_visa)

True

In [44]:
validator.check_rows_exist(dim_mode)

True

In [45]:
validator.check_rows_exist(dim_airports)

True

In [46]:
validator.check_rows_exist(dim_airlines)

True

##### Validate unique records

In [47]:
validator.check_unique_records(dim_demographics)

True

In [48]:
validator.check_unique_records(dim_countries)

True

In [49]:
validator.check_unique_records(dim_visa)

True

In [50]:
validator.check_unique_records(dim_mode)

True

In [51]:
validator.check_unique_records(dim_airports)

True

In [52]:
validator.check_unique_records(dim_airlines)

True

##### Validate integrity and consistency of the model

In [53]:
validator.check_integrity(facts, dim_demographics, dim_countries, dim_visa, dim_mode, dim_airports, dim_airlines)

True

#### 4.3 Data dictionary 
Data dictionary for the model is included in data_dictionary.md file

#### 4.4 Performing queries on the dataset

In [54]:
facts.createOrReplaceTempView('facts')
dim_countries.createOrReplaceTempView('dim_countries')
dim_visa.createOrReplaceTempView('dim_visa')

##### Getting countries that most people migrated from in April

In [66]:
query = "SELECT C.country, COUNT(F.cic_id) AS num_immigrants " \
        "FROM facts F " \
        "INNER JOIN dim_countries C ON F.origin_country_code = C.country_code " \
        "WHERE F.arrival_month = 4 " \
        "GROUP BY C.country " \
        "ORDER BY num_immigrants DESC"

most_immigrants_countries = spark.sql(query)
most_immigrants_countries.limit(10).toPandas()

Unnamed: 0,country,num_immigrants
0,JAPAN,115020
1,UNITED KINGDOM,109168
2,"MEXICO Air Sea, and Not Reported (I-94, no lan...",88271
3,BRAZIL,85676
4,GERMANY,74079
5,FRANCE,61656
6,"CHINA, PRC",49683
7,ARGENTINA,49423
8,INDIA,37931
9,AUSTRALIA,35945


##### Immigration by visa type

In [61]:
query = "SELECT V.visa_type, COUNT(F.cic_id) AS num_visas " \
        "FROM facts F " \
        "INNER JOIN dim_visa V ON F.visa_code = V.visa_code " \
        "GROUP BY V.visa_type " \
        "ORDER BY num_visas DESC"

immigration_by_visa_type = spark.sql(query)
immigration_by_visa_type.limit(10).toPandas()

Unnamed: 0,visa_type,num_visas
0,Pleasure,995768
1,Business,222611
2,Student,15944


##### Student immigration by origin country and gender

In [64]:
query = "SELECT C.country, F.gender, COUNT(F.cic_id) AS num_immigrants " \
        "FROM facts F " \
        "INNER JOIN dim_visa V ON F.visa_code = V.visa_code " \
        "INNER JOIN dim_countries C ON F.origin_country_code = C.country_code " \
        "WHERE V.visa_type = 'Student' " \
        "GROUP BY C.country, F.gender " \
        "ORDER BY num_immigrants DESC"

student_immigration_by_country_and_gender = spark.sql(query)
student_immigration_by_country_and_gender.limit(10).toPandas()

Unnamed: 0,country,gender,num_immigrants
0,"CHINA, PRC",M,1846
1,"CHINA, PRC",F,1818
2,INDIA,M,714
3,JAPAN,F,565
4,BRAZIL,M,538
5,"MEXICO Air Sea, and Not Reported (I-94, no lan...",M,531
6,"MEXICO Air Sea, and Not Reported (I-94, no lan...",F,475
7,INDIA,F,472
8,SOUTH KOREA,F,446
9,BRAZIL,F,423


### Step 5: Complete Project Write Up

#### 5.1 Choice of tools and technologies

Apache Spark was used for this project to do all the data processing and create the model. 
The reason for this is because 
- Spark provides very good scalability and can handle a lot of data and the 
- spark.sql library consists of various data wrangling tools
- PySpark is very flexible when it comes to the syntax - Python or SQL approach can be used.
- The data stored in parquet format is very well optimized for Spark usage

#### 5.2 How often the data should be updated and why

Airflow can be used for data ingestion on a particular schedule. Depending on the data source:
- Since the format of the raw files is monthly, schedule can be monthly
- If there is a source that is more frequently updated - we can ingest new data daily, based on arrival date

#### 5.3 Scenarios

##### 5.3.1 The data was increased by 100x

- Use Amazon Redshift since it is optimized for aggregation and read-heavy workloads and is easily scalable
- Keep using Spark since it's very efficient in dealing with large volumes of data

##### 5.3.2 The data populates a dashboard that must be updated on a daily basis by 7am every day.

- Airflow can be used for data orchestration and scheduling
- Have daily quality checks, on failure - send email alerts. Use SLA functionality of Airlow

##### 5.3.3 The database needed to be accessed by 100+ people

- Use Redshift since it has very good scalability and performance
- Use Hive, Spark sql template views