# Immigration & Airport Data

## Project Summary
**Aggregate the immigration data by airport in order to find out the relationship between the number of immigrants and the airports.**

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import os
import numpy as np
import pandas as pd
import configparser

# aquire key to access AWS S3
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEY']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEY']['AWS_SECRET_ACCESS_KEY']
os.environ['AWS_DEFAULT_REGION']=config['KEY']['AWS_DEFAULT_REGION']

# pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

## Step 1: Scope of project and Data Source

### Scope 
- Aggregate the immigration data by airport in order to find out the relationship between the number of immigrants and the airports.

### Data Source
1. (New) Airport codes data downloaded from https://datahub.io/core/airport-codes
    - 6 different types of airports
    - Location for each type of airport in the area
1. Immigration data from i94 https://travel.trade.gov/research/reports/i94/historical/2016.html
    - 12 files for each month, each with around 3 million rows
    - Contains each immigrant entry port and basic information

### Solution
- Solution will be aggregated data partitioned by year and month the total immigrants and number of airports in each city.

### Technical Tools
- Jupyter Notebook (Python): Test code to read/process airport/immigration files and read/write parquet files using PySpark
- Spark: PySpark was used to read and process the data (each month of immigration data has ~3 million rows)
  - Saving the files in S3 by partitioning the parquet files allows for faster processing speed
- AWS: Used S3 to store and save data, easily scalable

#### Discarded Data

- US Cities Demographics: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/
    - 2015 data - Doesn't have over a million data, year doesn't match with the immigration 2016 data
- Temperature by City: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
   - 1743~2013 data - doesn't match with the immigration 2016 data

In [3]:
# Old data
airport_df = pd.read_csv('airport-codes_csv.csv')
print(airport_df.shape)

(55075, 12)


In [17]:
# Download updated airport data on datahub.io
!wget http://ourairports.com/data/airports.csv

--2020-04-22 21:57:34--  http://ourairports.com/data/airports.csv
Resolving ourairports.com (ourairports.com)... 206.71.179.167
Connecting to ourairports.com (ourairports.com)|206.71.179.167|:80... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://ourairports.com/data/airports.csv [following]
--2020-04-22 21:57:34--  https://ourairports.com/data/airports.csv
Connecting to ourairports.com (ourairports.com)|206.71.179.167|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8470475 (8.1M) [text/csv]
Saving to: ‘airports.csv’


2020-04-22 21:57:35 (12.3 MB/s) - ‘airports.csv’ saved [8470475/8470475]



In [20]:
# 1079 more rows than old data
airports_csv = pd.read_csv('airports.csv')
print(airports_csv.shape)
airports_csv.isnull().sum()

(56154, 18)


id                       0
ident                    0
type                     0
name                     0
latitude_deg             0
longitude_deg            0
elevation_ft          7239
continent            28082
iso_country            246
iso_region               0
municipality          5801
scheduled_service        0
gps_code             15081
iata_code            46919
local_code           27024
home_link            53089
wikipedia_link       46143
keywords             46099
dtype: int64

#### Save new airport code data to S3

In [19]:
# Load updated raw data into S3
output = 's3a://immigration-airport-data/airport'
airport_spark = spark.read.format('csv').options(header='true', inferSchema='true').load('airports.csv')
airport_spark.write.parquet(output, mode='overwrite')

In [21]:
# April 2016 immigration data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
i94_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
# temp_dropna = temp_df.dropna()
print(i94_df.shape)
i94_df.head()

(3096313, 28)


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [33]:
# year, month, city, res, port - No NULL values
i94_df.isnull().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
i94port           0
arrdate           0
i94mode         239
i94addr      152372
depdate      142457
i94bir          802
i94visa           0
count             0
dtadfile          1
visapost    1881250
occup       3088187
entdepa         238
entdepd      138429
entdepu     3095921
matflag      138429
biryear         802
dtaddto         477
gender       414269
insnum      2982605
airline       83627
admnum            0
fltno         19549
visatype          0
dtype: int64

In [None]:
i94_df['i94port'].value_counts().to_dict()

### Immigration Data
- Save all 12 raw files into S3 as parquet using PySpark

In [3]:
import os
import glob
# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk('../../data/'):
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
print(file_path_list)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import isnan, when, count, col, udf, split, upper

spark = SparkSession.builder.\
        config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2")\
        .enableHiveSupport().getOrCreate()

In [None]:
# Upload raw immigration data to S3
output = 's3a://immigration-airport-data/immigration'
for file in file_path_list[:-1]:
    df = spark.read.format('com.github.saurfang.sas.spark').load(file)
    df = df.withColumn("year", col("i94yr").cast(IntegerType()))
    df = df.withColumn("month", col("i94mon").cast(IntegerType()))
    print(f'Loading {file} to {output}')
    df.write.partitionBy('year', 'month').parquet(output, mode='append')

## Step 2: Explore and Assess the Data
### Explore the Data 
- Aggregate airport types by city
- Aggregate immigrants by port

## Airport Data
### Clean data
- Remove Null values from column municipality
### Feature Engineer
- Split the region columns to show both country and state separately

In [65]:
airport_spark = spark.read.format('csv').options(header='true', inferSchema='true').load('airports.csv')

# Drop NaN values in municipality
airport_spark = airport_spark.filter(airport_spark.municipality.isNotNull())
# Upper case for merging later
airport_spark = airport_spark.withColumn('municipality', upper(col('municipality'))) 
# Split into country and state
region_split = split(col('iso_region'), '-')
airport_spark = airport_spark.withColumn('country', region_split.getItem(0))
airport_spark = airport_spark.withColumn('state', region_split.getItem(1))
print(airport_spark.count())
airport_spark.limit(3).toPandas()

50353


Unnamed: 0,id,ident,type,name,latitude_deg,longitude_deg,elevation_ft,continent,iso_country,iso_region,municipality,scheduled_service,gps_code,iata_code,local_code,home_link,wikipedia_link,keywords,country,state
0,6523,00A,heliport,Total Rf Heliport,40.070801,-74.933601,11,,US,US-PA,BENSALEM,no,00A,,00A,,,,US,PA
1,323361,00AA,small_airport,Aero B Ranch Airport,38.704022,-101.473911,3435,,US,US-KS,LEOTI,no,00AA,,00AA,,,,US,KS
2,6524,00AK,small_airport,Lowell Field,59.9492,-151.695999,450,,US,US-AK,ANCHOR POINT,no,00AK,,00AK,,,,US,AK


## Table 1: Airport Types Dimension Table
- Stores all different types of airports in each location

In [66]:
airport_types = airport_spark.select('continent','country','state','municipality','type',
                                     'name','latitude_deg','longitude_deg','elevation_ft')
airport_types.limit(3).toPandas()

Unnamed: 0,continent,country,state,municipality,type,name,latitude_deg,longitude_deg,elevation_ft
0,,US,PA,BENSALEM,heliport,Total Rf Heliport,40.070801,-74.933601,11
1,,US,KS,LEOTI,small_airport,Aero B Ranch Airport,38.704022,-101.473911,3435
2,,US,AK,ANCHOR POINT,small_airport,Lowell Field,59.9492,-151.695999,450


In [67]:
airport_types.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in airport_types.columns]).toPandas()

Unnamed: 0,continent,country,state,municipality,type,name,latitude_deg,longitude_deg,elevation_ft
0,0,0,0,0,0,0,0,0,4492


### Saves airport_types to S3
- confirm continent, country, state, munipality has data

In [37]:
output = 's3a://immigration-airport-data/airport_types'
airport_types.write.parquet(output, mode='overwrite')

### Table 2: aggregating airport types
- Using group by and pivot in PySpark to create a new dataframe aggregating the total number of airport types in each city
- Dropped all airports that are closed

In [68]:
# Create a new table to show number of different kind of airports in each city
airport_pivot = airport_spark.groupBy('country','state','municipality').pivot('type').count().drop('closed')
airport_pivot.limit(3).toPandas()

Unnamed: 0,country,state,municipality,balloonport,heliport,large_airport,medium_airport,seaplane_base,small_airport
0,FR,HDF,BOIS BERNARD,,1,,,,
1,US,DE,DOVER,,5,1.0,,,1.0
2,CZ,VY,JIHLAVA,,1,,,,1.0


##### Changing column types

In [69]:
for port_type in airport_pivot.columns[3:]:
    airport_pivot=airport_pivot.withColumn(port_type, airport_pivot[port_type].cast('int'))

In [70]:
airport_pivot.printSchema()

root
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- balloonport: integer (nullable = true)
 |-- heliport: integer (nullable = true)
 |-- large_airport: integer (nullable = true)
 |-- medium_airport: integer (nullable = true)
 |-- seaplane_base: integer (nullable = true)
 |-- small_airport: integer (nullable = true)



In [71]:
# Making sure country, state, municipality doesn't have NULL values
airport_pivot.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in airport_pivot.columns]).toPandas()

Unnamed: 0,country,state,municipality,balloonport,heliport,large_airport,medium_airport,seaplane_base,small_airport
0,0,0,0,31882,25356,31331,28022,31222,9251


### Saving aggregation of airports table to S3

In [39]:
output = 's3a://immigration-airport-data/airport_aggregate'
airport_pivot.write.parquet(output, mode='overwrite')

### Created module to convert i94 codes
- i94_codes.py

In [44]:
from i94_codes import port_codes_dict, city_res_codes_dict

port_codes_dict.get('FMY'), city_res_codes_dict.get(101)

('FORT MYERS, FL', 'ALBANIA')

## Explore and Clean Immigration Data

In [6]:
df_i94 = spark.read.parquet("s3a://immigration-airport-data/immigration")
df_i94.count()
# 40 million total rows

40790529

In [45]:
df_i94 = df_i94.withColumn("year", col("i94yr").cast(IntegerType())) \
               .withColumn("month", col("i94mon").cast(IntegerType()))
df_i94.count()
df_i94.limit(3).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,year,month
0,7.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,20.0,3.0,1.0,,,,T,,,,1996.0,D/S,M,,LH,346608285.0,424,F1,2016,1
1,8.0,2016.0,1.0,101.0,101.0,BOS,20465.0,1.0,MA,,20.0,3.0,1.0,,,,T,,,,1996.0,D/S,M,,LH,346627585.0,424,F1,2016,1
2,9.0,2016.0,1.0,101.0,101.0,BOS,20469.0,1.0,CT,20480.0,17.0,2.0,1.0,,,,T,N,,M,1999.0,07152016,F,,AF,381092385.0,338,B2,2016,1


## Table 3: Convert City and Res code into new table
- Used the code coverter dictionary to feature engineer city and res names

In [46]:
convert_city_res = udf(lambda x: city_res_codes_dict.get(x), StringType())

city_res = df_i94.groupBy('year','month','i94res','i94cit','i94yr','i94mon').count()
city_res = city_res.withColumn('city_name', convert_city_res(col('i94cit'))) \
                   .withColumn('res_name', convert_city_res(col('i94res')))
city_res.limit(3).toPandas()

Unnamed: 0,year,month,i94res,i94cit,i94yr,i94mon,count,city_name,res_name
0,2016,1,112.0,113.0,2016.0,1.0,96,GREECE,GERMANY


In [58]:
# The code dictionary does not match all city codes
city_res.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in city_res.columns]).toPandas()

Unnamed: 0,year,month,i94res,i94cit,i94yr,i94mon,count,city_name,res_name
0,0,0,0,0,0,0,0,515,0


In [None]:
output = 's3a://immigration-airport-data/immigration_city_res'
city_res.write.partitionBy('year', 'month').parquet(output, mode='append')

## Table 4: Immigration Port code (dimension table)
- Using the code converter to feature engineer port names

In [81]:
convert_port = udf(lambda x: port_codes_dict.get(x), StringType())

port_i94 = df_i94.groupBy('year','month','i94port','i94yr','i94mon').count()
port_i94 = port_i94.withColumn('port_name', convert_port(col('i94port')))
port_i94.limit(3).toPandas()

Unnamed: 0,year,month,i94port,i94yr,i94mon,count,port_name
0,2016,1,LAN,2016.0,1.0,8,"LANCASTER, MN"
1,2016,1,BLA,2016.0,1.0,11433,"BLAINE, WA"
2,2016,1,MAS,2016.0,1.0,103,"MASSENA, NY"


In [75]:
port_i94.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in port_i94.columns]).toPandas()

Unnamed: 0,year,month,i94port,i94yr,i94mon,count,port_name
0,0,0,0,0,0,0,0


In [54]:
output = 's3a://immigration-airport-data/immigration_port'
port_i94.write.partitionBy('year', 'month').parquet(output, mode='append')

### Table 5: Fact table - aggregate immigrants and airports

In [48]:
# Separate port_name into city and state
city_state = split(col('port_name'), ', ')
port_city_state = port_i94.withColumn('city', city_state.getItem(0)) \
                          .withColumn('state', city_state.getItem(1))
port_city_state.limit(3).toPandas()

Unnamed: 0,year,month,i94port,i94yr,i94mon,count,port_name,city,state
0,2016,1,LAN,2016.0,1.0,8,"LANCASTER, MN",LANCASTER,MN
1,2016,1,BLA,2016.0,1.0,11433,"BLAINE, WA",BLAINE,WA
2,2016,1,MAS,2016.0,1.0,103,"MASSENA, NY",MASSENA,NY


In [73]:
airport_pivot.limit(1).toPandas()

Unnamed: 0,country,state,municipality,balloonport,heliport,large_airport,medium_airport,seaplane_base,small_airport
0,FR,HDF,BOIS BERNARD,,1,,,,


### Joining airport_aggregate (pivot) table and immigration_port table

In [52]:
df_joined = (port_city_state.alias('a')
             .join(airport_pivot.alias('b'),
                  (col('a.city') == col('b.municipality'))&
                  (col('a.state') == col('b.state')),
                   how='left')
             .select('year','month','i94port',
                     col('count').alias('i94_count'),
                     'port_name','city',col('a.state'),
                     'small_airport','medium_airport','large_airport',
                     'seaplane_base','heliport','balloonport',
                     col('year').alias('i94yr'),
                     col('month').alias('i94mon'))
            )
df_joined.limit(3).toPandas()

Unnamed: 0,year,month,i94port,i94_count,port_name,city,state,small_airport,medium_airport,large_airport,seaplane_base,heliport,balloonport,i94yr,i94mon
0,2016,1,LAN,8,"LANCASTER, MN",LANCASTER,MN,,,,,,,2016,1
1,2016,1,BLA,11433,"BLAINE, WA",BLAINE,WA,,,,,2.0,,2016,1
2,2016,1,MAS,103,"MASSENA, NY",MASSENA,NY,,1.0,,,1.0,,2016,1


In [76]:
# Check which column has NaN values
df_joined.select([count(when(isnan(c), c)).alias(c) for c in df_joined.columns]).toPandas()

Unnamed: 0,year,month,i94port,i94_count,port_name,city,state,small_airport,medium_airport,large_airport,seaplane_base,heliport,balloonport,i94yr,i94mon
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [53]:
output = 's3a://immigration-airport-data/immigration_airport_aggregate'
df_joined.write.partitionBy('year', 'month').parquet(output, mode='append')

## Step 3: Data Model
### Fact Table
- `immigration_airport_aggregate`
  - stores the number of immigrants in each port for each month of the year
  - also shows the number of different types of airports for each location

### 4 Dimension Tables
- `airport_types`: shows the location and types of airports in each city
- `airport_aggregate`: stores the number of types of airports in each city
- `immigration_city_res`: stores the number of immigrants coming from a given city
- `immigration_port`: stores the number of immigrants to each port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

## Data dictionary
- Airport data from https://datahub.io/core/airport-codes
- Immigration data from https://travel.trade.gov/research/reports/i94/historical/2016.html
### airport_types
| Field         | Type    | Contraints |
|---------------|---------|------------|
| continent     | varchar | PK         |
| country       | varchar | PK         |
| state         | varchar | PK         |
| municipality  | varchar | PK         |
| type          | varchar | NOT NULL   |
| name          | varch   | NOT NULL   |
| latitude_deg  | float   |            |
| logitutde_deg | float   |            |
| elevation_ft  | float   |            |

### airport_aggregate
| Field          | Type    | Contraints |
|----------------|---------|------------|
| country        | varchar | PK         |
| state          | varchar | PK         |
| municipality   | varchar | PK         |
| ballonport     | int     |            |
| heliport       | int     |            |
| large_airport  | int     |            |
| medium_airport | int     |            |
| seaplane_base  | int     |            |
| small_airport  | int     |            |

### immigration_city_res
| Field     | Type    | Contraints |
|-----------|---------|------------|
| year      | int     | PK         |
| month     | int     | PK         |
| i94res    | int     | PK         |
| i94cit    | int     | PK         |
| count     | int     | NOT NULL   |
| city_name | varchar |            |
| res_name  | varchar |            |

### immigration_port
| Field     | Type    | Contraints |
|-----------|---------|------------|
| year      | int     | PK         |
| month     | int     | PK         |
| i94port   | int     | PK         |
| count     | int     | NOT NULL   |
| port_name | varchar |            |

### immigration_airport_aggregate
| Field          | Type    | Contraints |
|----------------|---------|------------|
| year           | int     | PK         |
| month          | int     | PK         |
| i94port        | int     | PK         |
| i94_count      | int     | NOT NULL   |
| port_name      | varchar |            |
| city           | varchar |            |
| state          | varchar |            |
| small_airport  | int     |            |
| medium_airport | int     |            |
| large_airport  | int     |            |
| seaplane_base  | int     |            |
| heliport       | int     |            |
| balloonport    | int     |            |


## Scenarios

### The data was increased by 100x
Depending on the required time limit, additional nodes may be required and EMR should be used to process the huge amount of data in the desired time. Additional S3 buckets might be needed if data are reaching the limit.

### The pipelines would be run on a daily basis by 7 am every day
Schedule Airflow DAG to trigger the pipeline everyday at 7am.

### The database needed to be accessed by 100+ people
Create a specific IAM for people that needs to access the S3, make sure the S3 bucket is created as a normal and not glacier S3 as that would slow the speed significantly.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

## Data Pipeline
How often the data should be updated?
- The data should be updated monthly given how the the data is aggregated monthly.