# US Immigration data pipeline 
### Data Engineering Capstone Project

#### Project Summary
This is a Udacity Data Engineering Capstone project to showcase all the learning & skills that been acquired during the course of the nano-degree program.<br> This is an open-ended project and for this udacity has provided four datasets that includes US immigration 2016 data, airport codes, temperature and US demographic data.<br> If required, we can add more datasets to enrich information to suit our usecase which we like to analyze or present.<br> 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [22]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,isnan,when,count
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType, StringType
import os, re
import configparser
import boto3
import glob 
import pandas as pd
from datetime import timedelta, datetime
import boto3
from botocore.exceptions import ClientError

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of this project is to build a data Pipeline that utilitize the new technologies and concepts, the project first dump data to s3 bucket, then from that s3 bucket airflow transform and load in AWS Redshift, In this project I am working with the ELTL Extract, Load, Transfer, Load were there are two destinations first destination will be usful for backingup and the end users will be data Scientists and machine learning engineers that data exploration is a part of thier work and they need a large size of data with variation. the second destination is for the data warehouse that will be used for reporting.
<br>
<br>
<img src="ArchitectureETL.png"> 

#### Describe and Gather Data 
The data used in this project is all given by Udacity I used the Immigration files and extract 2 tables from it, and the demographic dataset, and airport dataset and extract a table from each.

In [2]:
spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0").\
    enableHiveSupport().getOrCreate()
print(spark.version)

2.4.3


### Reading the Immigration data from many files to a single data frame 

In [3]:
# # Read in the data here
i94_file_paths = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")

df_immigration=spark.read.format('com.github.saurfang.sas.spark').\
load(i94_file_paths[0],inferLong=True, forceLowercaseNames=True)
            
i94_file_paths.pop(0)
x = 0
for file in i94_file_paths:
    tempDF = spark.read.format('com.github.saurfang.sas.spark').\
        load(file,inferLong=True, forceLowercaseNames=True)
    if(len(tempDF.columns)==34):
        tempDF = tempDF.drop('delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup','dtadfile')
    df_immigration.union(tempDF)
    x = x+1
        
print("number of files appedned is = "+ str(x))
df_immigration.printSchema()

number of files appedned is = 11
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)

### Reading the airport data 

In [4]:
df_airport = spark.read.csv('inputs/airport-codes_csv.csv')
df_airport.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



### Reading the demographics data 

In [8]:
df_demographics = spark.read.csv('inputs/us-cities-demographics.csv',sep=';')
df_demographics.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify:
1. data quality issues.
2. like missing values. 
3. duplicate data.

#### Cleaning Steps
1. Change columns name if needed.
2. Chnage data types if needed.
3. Drop missing data if needed.
4. Drop duplicates if it exists.

### starting this step with the Immigration 

In [12]:
print('The Immigration data frame comtains '+str(df_immigration.count())+' record')
df_immigration1 = df_immigration.dropDuplicates()
print('The Immigration data frame comtains '+str(df_immigration1.count())+' record')

The Immigration data frame comtains 3096313 record
The Immigration data frame comtains 3096313 record


#### As we can see here there aren't any duplicates in the Immigration data 

In [15]:
df_immigration1 = df_immigration.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_immigration.columns])
df_immigration1.show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

#### We can see that there are missing values some are fine but attributes like entdepa and occup insnum more than 50% of the a records are missing so I might drop them.<br> Aother columns are fine due to there are ZERO duplicates.

### Same steps for the airport data frame

In [19]:
# Rename the columns
df_airport = df_airport.withColumnRenamed("_c0","ID")\
                            .withColumnRenamed("_c1","type")\
                            .withColumnRenamed("_c2","name")\
                            .withColumnRenamed("_c3","elevation_ft")\
                            .withColumnRenamed("_c4","continent")\
                            .withColumnRenamed("_c5","iso_country")\
                            .withColumnRenamed("_c6","iso_region")\
                            .withColumnRenamed("_c7","municipality")\
                            .withColumnRenamed("_c8","gps_code")\
                            .withColumnRenamed("_c9","iata_code")\
                            .withColumnRenamed("_c10","local_code")\
                            .withColumnRenamed("_c11","coordinates")

# removing the first row because it's the header
    
df_airport = df_airport.where(df_airport.ID != 'ident')
print('The airport data frame comtains '+str(df_airport.count())+' record')
df_airport1 = df_airport.dropDuplicates()
print('The airport data frame comtains '+str(df_airport1.count())+' record')

df_airport1 = df_airport.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_airport.columns])
df_airport1.show()

The airport data frame comtains 55075 record
The airport data frame comtains 55075 record
+---+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
| ID|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+---+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|  0|   0|   0|        7006|        0|          0|         0|        5676|   14045|    45886|     26389|          0|
+---+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



### There are no duplicates in the airport data frame.<br> But there are many missing value in elevation_ft, municipality, gps_code,iata_code local_code.

### Do the same steps for the demographics data frame

In [23]:
# Rename the columns
df_demographics = df_demographics.withColumnRenamed("_c0","city")\
                            .withColumnRenamed("_c1","state")\
                            .withColumnRenamed("_c2","Median_age")\
                            .withColumnRenamed("_c3","male_population")\
                            .withColumnRenamed("_c4","female_population")\
                            .withColumnRenamed("_c5","total_population")\
                            .withColumnRenamed("_c6","Number_of_ventreans")\
                            .withColumnRenamed("_c7","foreign_born")\
                            .withColumnRenamed("_c8","Average_Household_Size")\
                            .withColumnRenamed("_c9","state_code")\
                            .withColumnRenamed("_c10","race")\
                            .withColumnRenamed("_c11","Count")
    
# changing the data types to the appropriate data types.
    
df_demographics = df_demographics.withColumn("city",col("city").cast(StringType()))\
                            .withColumn("state",col("state").cast(StringType()))\
                            .withColumn("Median_age",col("Median_age").cast(DoubleType()))\
                            .withColumn("male_population",col("male_population").cast(IntegerType()))\
                            .withColumn("female_population",col("female_population").cast(IntegerType()))\
                            .withColumn("total_population",col("total_population").cast(IntegerType()))\
                            .withColumn("Number_of_ventreans",col("Number_of_ventreans").cast(IntegerType()))\
                            .withColumn("foreign_born",col("foreign_born").cast(IntegerType()))\
                            .withColumn("Average_Household_Size",col("Average_Household_Size").cast(DoubleType()))\
                            .withColumn("state_code",col("state_code").cast(StringType()))\
                            .withColumn("race",col("race").cast(StringType()))\
                            .withColumn("Count",col("Count").cast(IntegerType()))
# removing the first row because it's the header
df_demographics = df_demographics.where(df_demographics.city != 'City')

print('The demographics data frame comtains '+str(df_demographics.count())+' record')
df_demographics1 = df_demographics.dropDuplicates()
print('The demographics data frame comtains '+str(df_demographics1.count())+' record')

df_demographics1 = df_demographics.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_demographics.columns])
df_demographics1.show()

The demographics data frame comtains 2891 record
The demographics data frame comtains 2891 record
+----+-----+----------+---------------+-----------------+----------------+-------------------+------------+----------------------+----------+----+-----+
|city|state|Median_age|male_population|female_population|total_population|Number_of_ventreans|foreign_born|Average_Household_Size|state_code|race|Count|
+----+-----+----------+---------------+-----------------+----------------+-------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                 13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+-------------------+------------+----------------------+----------+----+-----+



### As we can see there are not duplicates to drop and for the missing values no actions are needed. 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
This project is built with a star schema A single Fact table with 3 Dimensions tables
* Fact table:
    * immigrant_fact
    
* Dimensions tables:
    * airport_dim
    * person_dim 
    * demographic 
<br>
<br>
<br>

<img src="Screen Shot 1444-03-20 at 3.31.24 PM (2).png">
<br>
<br>
<br>

#### 3.2 Mapping Out Data Pipelines
After cleaning the data (changing types, removing duplicates, dealing with the missing values, changing the columns names)<br>
I will drop the columns that will not be used, I will do that by selecting the columns into 4 tables.<br>
The below cade is the code used to convert data to the data model.

In [None]:
# The Fact table Immigration table
immigration_table = df_immigration.select('cicid','admnum','i94cit','i94port','arrdate', 'depdate').dropDuplicates(subset=['admnum'])
# The person Dimension table will be extracted from df_immigration also
person_table = df_immigration.select('admnum','i94res','i94port','i94mode',\
                                        'i94cit','i94mon','arrdate','i94addr',\
                                        'depdate','i94bir','i94visa','occup',\
                                        'biryear','gender','airline',\
                                        'fltno').dropDuplicates(subset=['admnum'])

# The airport Dimension will be extracted from df_airport

airport_table = df_airport.selectExpr('ID','type','name','iso_country','municipality','iata_code','local_code')\
    .dropDuplicates(subset=['ID'])

# The demographic Dimension will be extracted from df_demographics
demographics_table = df_demographics.dropDuplicates()

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.<br>
The whole pipeline can be divided into two stages. The first, where we used spark to load, extracted, transform and store the provided datasets into the AWS S3 staging area. The second stage we take advantage of Apache Airflow to build a DAG to extract data from S3 and load them into tables of the same name in Amazon Redshift. As a final step we check the data counting checking to ensure completeness.

#### the code for the data pipeline is in the "airflow/dags/udac_example_dag.py" file.
<br>
<br>
<img src="Screen Shot 1444-03-20 at 11.06.27 AM.png">

#### 4.2 Data Quality Checks
First, we load the Immigrante_table fact table through the step Load_Immigrante_fact_table , which is in parallel run with the steps to load the dimension tables PERSON, AIRPORT ,DEMOGRAPHIC.<br>
respectively 
load_person_dimension_table , load_airport_dimension_table , load_demographic_dimension_table steps. <br>
All the tables have a PK constraint that uniquely identify the records and in the fact table there are FK that guarantee that values in the fact are present in the dimension tables.<br>
After completing the loading process, we perform a data quality check through the step Data_Quality_Checks to make sure everything was OK. In this check we verify if every table was actually loaded with count check in all the tables of the model.
<br>
<br>
<img src="Screen Shot 1444-03-20 at 11.06.35 AM.png">

### Step 5: Complete Project Write Up
* #### Tools & Technologies
    * Python
    * Spark
    * AWS S3
    * AWS Redshift
    * Airflow
<br><br>

*  Propose how often the data should be updated and why.
    * ETL script should be run monthly basis (assuming that new I94 data is available once per month).
<br><br>

* ### Write a description of how you would approach the problem differently under the following scenarios:

 *  The data was increased by 100x?.
    * Use Spark to process the data efficiently in a distributed way e.g. with EMR. In case we recognize that we need a write-heavy operation.<br>
    * There is no issues with storge when increasing data because of AWS S3, even with pricing it's not expensive.
    <br><br>

 *  The data populates a dashboard that must be updated on a daily basis by 7am every day.
    * Dags can be scheduled to run daily by setting the start_date config as a datetime value containing both the date and time when it should start running, then setting schedule_interval to @daily which will ensure the dag runs everyday at the time provided in start_date.
 <br><br>

 *  The database needed to be accessed by 100+ people.
    * Redshift can support upto 500 connections, so 100+ people can easily connect to Redshift.

### Summary
* Project-Capstone provides tools to automatically process, clean, analyze US I94 Immigration data in a flexible way and help answering questions like <br> <br>
   * Which airline people used most to immigrate US ?.<br>
   * Which US airports people used most to immigrate US?.<br>
   * What immigrante gender is more ?.