# US Immigration
### Data Engineering Capstone Project
#### Project Description

* In this project we are going to design a database schema for easier processing and analysis of the US immigration data. As data engineer, we built a etl pipeline that extracts data from different sources, cleans the data and loads the data into respective tables for easier querying and analysing the data that impact the behaviour of the immigrants in US


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# imports
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as f
import re

### Step 1: Scope of the Project and Gather Data

#### Scope 
* I94 Immigration, city data (both census and temperature), airport data are used to design a database schema that is optimized to easily query and analyze immigration events. An ETL pipeline is built with these data sources to create the star schema-based design. Finally, the database can be used to analyze immigration behavior to location data.


#### Describe and Gather Data 
##### The following datasets are used to create the database schema. 
* I94 Immigration Data: This data comes from the US National Tourism and Trade Office and it contains the SAS7BDAT file for each month of the year. Each file has 3M rows with 28 columns each.
* World Temperature Data: This dataset came from Kaggle and contains the temperature of the cities
* U.S. City Demographic Data: This data comes from OpenSoft and data is about city demographics
* Airport Code Table:  it consists of information about different airports around the world


In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [5]:
#gathering required data
#df_spark.write.parquet("sas_data")
df_I94immigration = spark.read.parquet("sas_data")
df_airport = spark.read.csv('airport-codes_csv.csv', header=True, inferSchema=True)
df_usdemo = spark.read.csv('us-cities-demographics.csv', header = True, sep= ';', inferSchema = True)
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = spark.read.csv(fname, header = True, inferSchema = True)

### Step 2: Explore and Assess the Data

* The datasets were filtered by using the city. Port codes and cities were extracted from the labels of the immigrant event data. As city is common for all the datasets. each datset is cleaned for missing values, null values, duplicate values etc., Required columns were extracted for each table which provides more information in the schema design

In [6]:
#dealing I94port codes from I94 SAS labels
ports=dict()
with open("airportscode.txt") as file:
    for line in file:
        (key, val) = line.split('=')
        key = re.sub("[ '\t]", "",key)
        val = (re.sub("['\t\n]", "",val)).lower()
        val= re.sub(r',[^,]*$', '', val)
        ports[key] = val
#defining user defined function to get cities for portcodes
@udf
def get_city(code):
    for key,value in ports.items():
        if code == key:
            return ports[code]
        else:
            pass

In [7]:
#performing cleaning and transformation on Data Immigration
df_I94immigration = spark.read.parquet("sas_data")
df_I94immigration = df_I94immigration.filter(df_I94immigration.i94port.isin(list(ports.keys())))
df_I94immigration = df_I94immigration.withColumn('arrdate', f.expr("date_add(to_date('1960-1-1'), arrdate)"))
df_I94immigration = df_I94immigration.withColumn('depdate', f.expr("date_add(to_date('1960-1-1'), depdate)"))
df_I94immigration = df_I94immigration.fillna({'i94mode':5})
df_I94immigration = df_I94immigration.withColumn('i94City', get_city(df_I94immigration.i94port)).drop('i94port')
df_I94immigration = df_I94immigration.select('cicid',
                                              'i94yr',
                                              'i94mon',
                                              'arrdate',
                                              'i94mode',
                                              'depdate',
                                              'i94bir',
                                              'i94visa',
                                              'biryear',
                                              'visatype',
                                             'i94City')
df_I94immigration = df_I94immigration.na.drop()

In [8]:
#cleanig and transforming airport data
df_airport = df_airport.select('type','name','elevation_ft','municipality').dropDuplicates(['municipality'])
df_airport = df_airport.withColumn('municipality',lower(col('municipality')))
df_airport = df_airport.filter(df_airport.municipality.isin(list(ports.values())))
df_airport = df_airport.na.drop()

In [9]:
#cleanig and transforming usdemographics data
df_usdemo = df_usdemo.select('City','State','Median Age','Total Population','Race').dropDuplicates(['City'])
df_usdemo = df_usdemo.withColumn('City',lower(col('City')))
df_usdemo = df_usdemo.filter(df_usdemo.City.isin(list(ports.values())))

In [10]:
#transformation and cleaning temperature data
df_temp = df_temp.select('AverageTemperature','City','Latitude','Longitude')
df_temp = df_temp.withColumn('City', lower(col('City'))).dropDuplicates(['City'])
df_temp = df_temp.filter(df_temp.City.isin(list(ports.values())))

### Step 3: Define the Data Model
#### Database Schema Design
* Star schema is used to design the database as it is very simple and easy to understand relations between tables. A star schema contains one or more fact tables and several dimension tables connected to the fact table. The primary keys in dimension tables act as foreign keys in fact tables. In this project, the star schema contains one fact table and four dimension tables that are connected to the fact table. All four data sets are used in this schema.

##### fact table
* factTable
##### dimension tables
* df_I94immigration
* df_temp
* df_airport
* df_usdemo


### Step 4: Run Pipelines to Model the Data 

The ETL pipeline is designed to load the datasets into the respective dimensional and fact tables with required columns 

* Loaded data into the respective spark dataframes
* Filtered and Extracted required columns for the database design
* Cleaned the data (like missing values, null values, etc)
* Loaded the dimensional tables  into spark SQL tables with cleaned data from the above step 
* Created the fact table from the dimensional tables


In [11]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)

In [12]:
#creating dimension tables to extract fact table using sparksql
df_I94immigration.createOrReplaceTempView('df_I94immigration')
df_temp.createOrReplaceTempView('df_temp')
df_airport.createOrReplaceTempView('df_airport')
df_usdemo.createOrReplaceTempView('df_usdemo')

In [13]:
#creating fact table 
factTable = sqlContext.sql(
    """SELECT a.cicid,a.i94yr,a.i94mon,a.arrdate,
    a.i94mode,a.depdate,a.i94bir,a.i94visa,a.biryear,a.visatype,
    a.i94City,b.AverageTemperature,b.Latitude,b.Longitude,c.State,'c.Median Age','c.Total Population',c.Race,
    d.type,
    d.name,
    d.elevation_ft
    FROM df_I94immigration as a
    inner join df_temp as b on a.i94City == b.City
    inner join df_usdemo as c on a.i94City == c.City
    inner join df_airport as d on a.i94City == d.municipality"""
)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

In [14]:
# quality checks 
def quality_check(df,tablename):
    '''
    Input: Spark dataframe, table name
    Output: Prints output of data quality check
    '''
    
    rows = df.count()
    if rows == 0:
        print("Data quality check is failed for {} with zero records".format(tablename))
    else:
        print("Data quality check is success for {} with {} records".format(tablename, rows))
    return

In [None]:
quality_check(df_I94immigration, "immigration table")
quality_check(factTable, "fact table")

Data quality check is success for immigration table with 2953808 records


# 4.3 Data dictionary 
## FactTable

* cicid: unique identification of the immigrant
* i94yr: 4 digit year
* i94mon: month
* arrdate: arrival date
* i94mode: travel code
* depdate: departure date
* i94bir: age of immigrant
* i94visa: visa code
* biryear: date of birth
* visatype: type of visa
* i94City: city 
* AverageTemperature: city average temperature
* Latitude: city latitude
* Longitude, city longitude
* State: city-state
* Median Age': medium age in the state
* Total Population': population in the city
* Race: race 
* type: airport type
* name: airport name in the city
* elevation_ft: airport location elevation in feet

## Dimension Tables

### I94 immigration:
* cicid: unique identification of the immigrant
* i94yr: 4 digit year
* i94mon: month
* arrdate: arrival date
* i94mode: travel code
* depdate: departure date
* i94bir: age of immigrant
* i94visa: visa code
* biryear: date of birth
* visatype: type of visa
* i94City: city 

### Temperature:
* AverageTemperature = average temperature
* City = city name
* Latitude= latitude
* Longitude = longitude

### City Demographics 
* City: city
* State: state
* Median Age:  medium age in the state
* Total Population: population in the city

### Airports table
* type: airport type
* name: airport name in the city
* elevation_ft: airport location elevation in feet
* municipality: airport location


# Step 5: Complete Project Write Up
#### Clearly state the rationale for the choice of tools and technologies for the project:
* For this project, I used spark since it can handle process large amounts of data. Spark SQL is used to create fact and dimensional tables
#### Propose how often the data should be updated and why:
* The format of files is monthly we should load the data monthly

## Write a description of how you would approach the problem differently under the following scenarios:
#### the data was increased by 100x.
* It is better to use the database with the ability to handle, process and read-heavy analytical  workloads on big data for example amazon redshift
#### The data populates a dashboard that must be updated on a daily basis by 7 am every day.
* Use a scheduling framework to schedule the tasks on daily basis 
* For example, use Airflow, create DAG retries, or send emails on failures.
#### The database needed to be accessed by 100+ people.
* Using more nodes and distributed data storage to decrease the load on the database so that database can access by more people.
