# Capstone Project

This project will model the immigration data for the US in April of 2016, making it easily queriable for analysts. 


### 1 - Data 

- I94 Immigration: [Origin](https://travel.trade.gov/research/reports/i94/historical/2016.html)
- US City Demographics: [Origin](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
- Airport Codes: [Origin](https://datahub.io/core/airport-codes#data)

The immigration data for US, it includes details of immigrants such as age groups and mode of transformation.
The demographics table gives the details about the city for a given city code. 
The airports table shows information about a particular airport.

#### Import the necessary libraries

In [110]:
from datetime import datetime, date, timedelta
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *


# display all columns in dataframe
pd.set_option('display.max_columns', None)
output_path='data/'

### 2 - Load the data

In [2]:
spark = SparkSession.builder/
                    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")/
                    .enableHiveSupport()/
                    .getOrCreate()

immi_df = spark.read.csv('immigration_data_sample.csv', sep=',', header=True, inferSchema=True)
cit_df = spark.read.csv('us-cities-demographics.csv', sep=';', header=True, inferSchema=True)
air_df = spark.read.csv('airport-codes_csv.csv', sep=',', header=True, inferSchema=True)

##### Explore imigration data

In [61]:
immi_df.toPandas().describe(include='all')

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000,1000.0,1000.0,941,951.0,1000.0,1000.0,1000.0,1000.0,382,4,1000,954,0.0,954,1000.0,1000.0,859,35.0,967,1000.0,992,1000
unique,,,,,,,70,,,51,,,,,,97,3,9,10,0.0,1,,99.0,3,,101,,502,10
top,,,,,,,NYC,,,FL,,,,,,MEX,STU,G,O,,M,,7152016.0,M,,AA,,LAND,WT
freq,,,,,,,155,,,188,,,,,,28,2,757,800,,954,,27.0,471,,109,,19,443
mean,1542097.0,3040461.0,2016.0,4.0,302.928,298.262,,20559.68,1.078,,20575.037855,42.382,1.859,1.0,20160420.0,,,,,,,1973.618,,,3826.857143,,69372370000.0,,
std,915287.9,1799818.0,0.0,0.0,206.485285,202.12039,,8.995027,0.485955,,24.211234,17.903424,0.386353,0.0,49.51657,,,,,,,17.903424,,,221.742583,,23381340000.0,,
min,10925.0,13208.0,2016.0,4.0,103.0,103.0,,20545.0,1.0,,20547.0,1.0,1.0,1.0,20160400.0,,,,,,,1923.0,,,3468.0,,0.0,,
25%,721442.2,1412170.0,2016.0,4.0,135.0,131.0,,20552.0,1.0,,20561.0,30.75,2.0,1.0,20160410.0,,,,,,,1961.0,,,3668.0,,55993010000.0,,
50%,1494568.0,2941176.0,2016.0,4.0,213.0,213.0,,20560.0,1.0,,20570.0,42.0,2.0,1.0,20160420.0,,,,,,,1974.0,,,3887.0,,59314770000.0,,
75%,2360901.0,4694151.0,2016.0,4.0,438.0,438.0,,20567.25,1.0,,20580.0,55.0,2.0,1.0,20160420.0,,,,,,,1985.25,,,3943.0,,93436230000.0,,


##### Explore city data

In [60]:
cit_df.toPandas().describe(include='all')

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
count,2891,2891,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891,2891,2891.0
unique,567,49,,,,,,,,49,5,
top,Springfield,California,,,,,,,,CA,Hispanic or Latino,
freq,15,676,,,,,,,,676,596,
mean,,,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,,,48963.77
std,,,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,,,144385.6
min,,,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,,,98.0
25%,,,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,,,3435.0
50%,,,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,,,13780.0
75%,,,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,,,54447.0


##### Explore airport data

In [62]:
air_df.toPandas().describe(include='all')

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
count,55075,55075,55075,48069.0,55075.0,55075,55075,49399,41030,9189.0,28686,55075
unique,55075,7,52144,,7.0,244,2810,27133,40850,9042.0,27436,54874
top,SCDI,small_airport,Centre Hospitalier Heliport,,,US,US-TX,Seoul,MBAC,0.0,LAN,"0, 0"
freq,1,33965,85,,27719.0,22757,2277,404,3,80.0,5,53
mean,,,,1240.789677,,,,,,,,
std,,,,1602.363459,,,,,,,,
min,,,,-1266.0,,,,,,,,
25%,,,,205.0,,,,,,,,
50%,,,,718.0,,,,,,,,
75%,,,,1497.0,,,,,,,,


#### Cleaning 

##### Only accept data that has i94addr in city table and i94port in airport table

In [84]:
immigration_df = imi_df.join(cit_df, immi_df['i94addr']==cit_df['State Code'], how='leftsemi') \
                       .join(air_df, immi_df['i94port']==air_df['local_code'], how='leftsemi') 

##### Drop records from the city table with null State Code

In [85]:
city_df = cit_df.dropna(how='any', subset=['State Code'])

##### Drop records from airport table with null local_code

In [86]:
airport_df = air_df.dropna(how='any', subset=['local_code'])

### 3 - Data Model
##### Fact table:

- <b>Immigrations</b>:
    - <b>cicid</b>: identifier
    - <b>year</b>: year 
    - <b>month</b>: month
    - <b>city</b>: code of city of origin
    - <b>resid</b>: residential code
    - <b>port</b>: port code
    - <b>arrivdate</b>: arrival date
    - <b>mode</b>: travel mode code
    - <b>addr</b>: state code
    - <b>depdate</b>: departure date
    - <b>age</b>: age
    - <b>visa</b>: visa code
    - <b>visapost</b>: post that issued visa
    - <b>occup</b>: occupation
    - <b>entdepa</b>: arrival flag
    - <b>entdepd</b>: departure flag
    - <b>entdepu</b>: update flag
    - <b>matflag</b>: match flag
    - <b>biryear</b>: year of birth
    - <b>dtaddto</b>: date to which admitted to U.S.
    - <b>gender</b>: gender
    - <b>insnum</b>: insurance number
    - <b>airline</b>: airline used
    - <b>admnum</b>: admission number
    - <b>fltno</b>: flight number
    - <b>visatype</b>: type of visa

##### Dimension tables:

- <b> Airport </b>:
    - <b>ident</b>: identification
    - <b>type</b>: airport type
    - <b>name</b>: airport name
    - <b>elevation_ft</b>: elevation
    - <b>continent</b>: continent
    - <b>iso_country</b>: iso country
    - <b>iso_region</b>: iso region
    - <b>municipality</b>: municipality
    - <b>gps_code</b>: gps code
    - <b>iata_code</b>: iata code
    - <b>local_code</b>: local code
    - <b>coordinates</b>: coordinates

- <b> City </b>:
    - <b>city</b>: City
    - <b>state</b>: State
    - <b>median_age</b>: Median Age
    - <b>male_population</b>: Male Population
    - <b>female_population</b>: Female Population
    - <b>total_population</b>: Total Population
    - <b>number_of_veterans</b>: Number of Veterans
    - <b>foreign_born</b>: Foreign-born
    - <b>average_household_size</b>: Average Household Size
    - <b>state_code</b>: State Code
    - <b>race</b>: Race
    - <b>count</b>: Count

### 4 - Run Pipelines to Model the Data 

In [107]:
get_date = F.udf(lambda x: (datetime(1960, 1, 1).date() + timedelta(x)).isoformat() if x is not None else '1960-01-01')

immigration = immigration_df.select(
                                    F.col("cicid").cast(IntegerType()),
                                    F.col("i94yr").cast(IntegerType()).alias('year'),
                                    F.col("i94mon").cast(IntegerType()).alias('month'),
                                    F.col("i94cit").cast(IntegerType()).alias('city'),
                                    F.col("i94res").cast(IntegerType()).alias('resid'),
                                    F.col("i94port").cast(StringType()).alias('port'),
                                    get_date(F.col("arrdate")).alias('arrivdate'),
                                    F.col("i94mode").cast(IntegerType()).alias('mode'),
                                    F.col("i94addr").cast(StringType()).alias('addr'),
                                    get_date(F.col("depdate")).alias('depdate'),
                                    F.col("i94bir").cast(IntegerType()).alias('age'),
                                    F.col("i94visa").cast(IntegerType()).alias('visa'),
                                    F.col("visapost").cast(StringType()).alias('visapost'),
                                    F.col("occup").cast(StringType()),
                                    F.col("entdepa").cast(StringType()),
                                    F.col("entdepd").cast(StringType()),
                                    F.col("entdepu").cast(StringType()),
                                    F.col("matflag").cast(StringType()),
                                    F.col("biryear").cast(IntegerType()),
                                    F.col("dtaddto").cast(StringType()),
                                    F.col("gender").cast(StringType()),
                                    F.col("insnum").cast(IntegerType()),
                                    F.col("airline").cast(StringType()),
                                    F.col("admnum").cast(FloatType()),
                                    F.col("fltno").cast(IntegerType()),
                                    F.col("visatype").cast(StringType()) 
                                   )

airport = airport_df.select(
                            F.col('ident').cast(StringType()),
                            F.col('type').cast(StringType()),
                            F.col('name').cast(StringType()),
                            F.col('elevation_ft').cast(IntegerType()),
                            F.col('continent').cast(StringType()),
                            F.col('iso_country').cast(StringType()),
                            F.col('iso_region').cast(StringType()),
                            F.col('municipality').cast(StringType()),
                            F.col('gps_code').cast(StringType()),
                            F.col('iata_code').cast(StringType()),
                            F.col('local_code').cast(StringType()),
                            F.col('coordinates').cast(StringType())
                           )

city = city_df.select(
                      F.col('City').cast(StringType()).alias('city'),
                      F.col('State').cast(StringType()).alias('state'),
                      F.col('Median Age').cast(IntegerType()).alias('median_age'),
                      F.col('Male Population').cast(IntegerType()).alias('male_population'),
                      F.col('Female Population').cast(IntegerType()).alias('female_population'),
                      F.col('Total Population').cast(IntegerType()).alias('total_population'),
                      F.col('Number of Veterans').cast(IntegerType()).alias('number_of_veterans'),
                      F.col('Foreign-born').cast(IntegerType()).alias('foreign_born'),
                      F.col('Average Household Size').cast(FloatType()).alias('average_household_size'),
                      F.col('State Code').cast(StringType()).alias('state_code'),
                      F.col('Race').cast(StringType()).alias('race'),
                      F.col('Count').cast(IntegerType()).alias('count')
                     )

immigration.write.mode('overwrite').parquet(output_path+'immigration')
airport.write.mode('overwrite').parquet(output_path+'airport')
city.write.mode('overwrite').parquet(output_path+'city')

#### Data Quality 

We will check to see that the tables are correctly created, that they are correctly written to the output path and that there are no duplicates

In [114]:
print('immigration data ok:',immigration_df.count() == spark.read.parquet(output_path + 'immigration').count())
print('airport data ok:', airport_df.count() == spark.read.parquet(output_path + 'airport').count())
print('city data ok:', city_df.count() == spark.read.parquet(output_path + 'city').count())

immigration data ok: True
airport data ok: True
city data ok: True


In [117]:
print('No immigration duplicates:', (immigration.count()-immigration.distinct().count())==0)
print('No airport duplicates:', (airport.count()-airport.distinct().count())==0)
print('No city duplicates:', (city.count()-city.distinct().count())==0)

No immigration duplicates: True
No airport duplicates: True
No city duplicates: True


#### 5 - Write Up
* Pyspark was chosen for two main reasons, being open source it doesn't tie one to a vendor and it is very easy to develop pipelines with it given the easy DataFrame abstraction.
* The Data should be updated monthly it give time for analysts to develop reporst for the month and developers to manage possible errors in the new data.
* If the data were to be increased 100x more machines would be added to the cluster until the required processing speed was reached
* airflow would be used if scheduling jobs at specific times were needed, like for example at 7am every morning.
* if 100+ people needed to acces the data more machines would be added to the cluster, same as with the increase in data.