# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import configparser
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

output_data = config['S3']['S3_BUCKET']

In [3]:
spark = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .enableHiveSupport() \
    .getOrCreate()
spark._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will use the I94 Immigration Data and the Airport Code Table to create data that merges both pieces of information. This data can, for example, tell us the name of the airport the immigrant came from. I will use Pyspark to accomplish this process.

#### Describe and Gather Data 
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

- Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from here.

##### I94 Immigration Data

In [4]:
df_immigration = spark.read.load('./sas_data')
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [5]:
df_immigration.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [6]:
df_immigration.show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [7]:
df_immigration.count()

3096313

##### Airport Code Table

In [8]:
df_airport = spark.read.csv('airport-codes_csv.csv', header=True)
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [9]:
df_airport.limit(5).toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [10]:
df_airport.count()

55075

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### I94 Immigration Data

Check that there are no null values in the key items to be merged with the Airport Code Table, and check the contents of the code.

In [11]:
df_immigration.filter(df_immigration['i94port'].isNull()).count()

0

In [12]:
df_immigration.select(df_immigration['i94port']).drop_duplicates().toPandas()

Unnamed: 0,i94port
0,FMY
1,BGM
2,HEL
3,DNS
4,MOR
5,FOK
6,HVR
7,SNA
8,PTK
9,CLG


##### Airport Code Table

Check that there are no null values in the key items to be merged with the Airport Code Table, and check the contents of the code.
Delete the data with null values.

In [13]:
df_airport_cleaned = df_airport.filter(df_airport["iso_country"]=="US")
df_airport_cleaned.count()

22757

In [14]:
df_airport_cleaned.filter(df_airport_cleaned["iata_code"].isNull()).count()

20738

In [15]:
df_airport_cleaned = df_airport_cleaned.filter(df_airport_cleaned["iata_code"].isNotNull())
df_airport_cleaned.count()

2019

In [16]:
df_airport_cleaned.drop_duplicates(['iata_code']).count()

2014

In [17]:
df_airport_cleaned = df_airport_cleaned.drop_duplicates(['iata_code'])
df_airport_cleaned.count()

2014

In [18]:
df_airport_cleaned.toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,KBGM,medium_airport,Greater Binghamton/Edwin A Link field,1636,,US,US-NY,Binghamton,KBGM,BGM,BGM,"-75.97979736, 42.20869827"
1,2TE0,small_airport,Eagle Air Park,15,,US,US-TX,Brazoria,2TE0,BZT,2TE0,"-95.579696655273, 28.982200622559"
2,KCNU,medium_airport,Chanute Martin Johnson Airport,1002,,US,US-KS,Chanute,KCNU,CNU,CNU,"-95.4850997925, 37.668800354"
3,KCRS,small_airport,C David Campbell Field Corsicana Municipal Air...,449,,US,US-TX,Corsicana,KCRS,CRS,CRS,"-96.4005966187, 32.0280990601"
4,KFMY,medium_airport,Page Field,17,,US,US-FL,Fort Myers,KFMY,FMY,FMY,"-81.86329650879999, 26.58659935"
5,HYL,seaplane_base,Hollis Clark Bay Seaplane Base,0,,US,US-AK,Hollis,HYL,HYL,HYL,"-132.645996094, 55.4816017151"
6,KEB,small_airport,Nanwalek Airport,27,,US,US-AK,Nanwalek,KEB,KEB,KEB,"-151.925003052, 59.3521003723"
7,KLEB,medium_airport,Lebanon Municipal Airport,603,,US,US-NH,Lebanon,KLEB,LEB,LEB,"-72.30419921880001, 43.626098632799994"
8,KOXC,small_airport,Waterbury Oxford Airport,726,,US,US-CT,Oxford,KOXC,OXC,OXC,"-73.135200500488, 41.47859954834"
9,KRKP,small_airport,Aransas County Airport,24,,US,US-TX,Rockport,KRKP,RKP,RKP,"-97.0446014404, 28.0867996216"


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I94 Immigration Data

Select only meaningful items from the original data.
Also, date items are converted from Julian to Gregorian calendar.

- i94yr
- i94mon
- i94port
- arrdate
- i94addr
- depdate
- i94bir
- i94visa
- biryear
- gender
- airline
- admnum
- fltno
- visatype

Airport Code Table

Split coordinates into latitude and longitude.

- cicid
- ident
- type
- name
- elevation_ft
- continent
- iso_country
- iso_region
- municipality
- gps_code
- iata_code
- local_code
- latitude
- longitude

#### 3.2 Mapping Out Data Pipelines

I94 Immigration Data

- Add items that converts arrdate and depdate to Gregorian calender.
- Extract selected items.

Airport Code Table

- Add the latitude and longitude where the coordinates are divided.
- Extract selected items.

##### I94 Immigration Data

In [19]:
from datetime import datetime
from datetime import timedelta
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

def convert_datetime(num_days):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(num_days))
    except:
        return None
    
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), DateType())
df_immigration_new = df_immigration \
    .withColumn("arrival_date", udf_datetime_from_sas("arrdate")) \
    .withColumn("departure_date", udf_datetime_from_sas("depdate"))

In [20]:
immigration_table = df_immigration_new.select(
    col("cicid").cast("int"),
    col("i94yr").cast("int"),
    col("i94mon").cast("int"),
    col("i94port"),
    col("arrival_date").alias("arrdate"),
    col("i94addr"),
    col("departure_date").alias("depdate"),
    col("i94bir").cast("int"),
    col("i94visa").cast("int"),
    col("biryear").cast("int"),
    col("gender"),
    col("airline"),
    col("admnum").cast("int"),
    col("fltno"),
    col("visatype")
)
immigration_table.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94port,arrdate,i94addr,depdate,i94bir,i94visa,biryear,gender,airline,admnum,fltno,visatype
0,5748517,2016,4,LOS,2016-04-30,CA,2016-05-08,40,1,1976,F,QF,2147483647,11,B1
1,5748518,2016,4,LOS,2016-04-30,NV,2016-05-17,32,1,1984,F,VA,2147483647,7,B1
2,5748519,2016,4,LOS,2016-04-30,WA,2016-05-08,29,1,1987,M,DL,2147483647,40,B1
3,5748520,2016,4,LOS,2016-04-30,WA,2016-05-14,29,1,1987,F,DL,2147483647,40,B1
4,5748521,2016,4,LOS,2016-04-30,WA,2016-05-14,28,1,1988,M,DL,2147483647,40,B1


In [21]:
immigration_table.count()

3096313

In [22]:
immigration_table.write.partitionBy("i94yr", "i94mon").mode('overwrite').parquet(os.path.join(output_data, 'immigration'))

##### Airport Code Table

In [23]:
df_airport_new = df_airport_cleaned \
    .withColumn('latitude', split(df_airport_cleaned['coordinates'], ',').getItem(0)) \
    .withColumn('longitude', split(df_airport_cleaned['coordinates'], ',').getItem(1))

airport_table = df_airport_new.select(
    col("ident"),
    col("type"),
    col("name"),
    col("elevation_ft").cast("int"),
    col("continent"),
    col("iso_country"),
    col("iso_region"),
    col("municipality"),
    col("gps_code"),
    col("iata_code"),
    col("local_code"),
    col("latitude").cast("float"),
    col("longitude").cast("float")
)
airport_table.limit(5).toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,latitude,longitude
0,KBGM,medium_airport,Greater Binghamton/Edwin A Link field,1636,,US,US-NY,Binghamton,KBGM,BGM,BGM,-75.979797,42.208698
1,2TE0,small_airport,Eagle Air Park,15,,US,US-TX,Brazoria,2TE0,BZT,2TE0,-95.579697,28.982201
2,KCNU,medium_airport,Chanute Martin Johnson Airport,1002,,US,US-KS,Chanute,KCNU,CNU,CNU,-95.4851,37.6688
3,KCRS,small_airport,C David Campbell Field Corsicana Municipal Air...,449,,US,US-TX,Corsicana,KCRS,CRS,CRS,-96.400597,32.028099
4,KFMY,medium_airport,Page Field,17,,US,US-FL,Fort Myers,KFMY,FMY,FMY,-81.863297,26.586599


In [24]:
airport_table.count()

2014

In [25]:
airport_table.coalesce(1).write.mode('overwrite').parquet(os.path.join(output_data, 'airport'))

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### I94 Immigration Data

In [26]:
df_immigration_step4 = spark.read.parquet(os.path.join(output_data, 'immigration'))

In [27]:
df_immigration_step4.limit(5).toPandas().head()

Unnamed: 0,cicid,i94port,arrdate,i94addr,depdate,i94bir,i94visa,biryear,gender,airline,admnum,fltno,visatype,i94yr,i94mon
0,5748517,LOS,2016-04-30,CA,2016-05-08,40,1,1976,F,QF,2147483647,11,B1,2016,4
1,3344386,NEW,2016-04-17,NJ,2016-04-25,67,2,1949,M,UA,2147483647,865,WT,2016,4
2,5748518,LOS,2016-04-30,NV,2016-05-17,32,1,1984,F,VA,2147483647,7,B1,2016,4
3,3344387,NEW,2016-04-17,NJ,2016-04-25,49,2,1967,F,UA,2147483647,865,WT,2016,4
4,5748519,LOS,2016-04-30,WA,2016-05-08,29,1,1987,M,DL,2147483647,40,B1,2016,4


In [28]:
df_immigration_step4.count()

3096313

##### Airport Code Table

In [29]:
df_airport_step4 = spark.read.parquet(os.path.join(output_data, 'airport'))

In [30]:
df_airport_step4.limit(5).toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,latitude,longitude
0,KBGM,medium_airport,Greater Binghamton/Edwin A Link field,1636,,US,US-NY,Binghamton,KBGM,BGM,BGM,-75.979797,42.208698
1,2TE0,small_airport,Eagle Air Park,15,,US,US-TX,Brazoria,2TE0,BZT,2TE0,-95.579697,28.982201
2,KCNU,medium_airport,Chanute Martin Johnson Airport,1002,,US,US-KS,Chanute,KCNU,CNU,CNU,-95.4851,37.6688
3,KCRS,small_airport,C David Campbell Field Corsicana Municipal Air...,449,,US,US-TX,Corsicana,KCRS,CRS,CRS,-96.400597,32.028099
4,KFMY,medium_airport,Page Field,17,,US,US-FL,Fort Myers,KFMY,FMY,FMY,-81.863297,26.586599


In [31]:
df_airport_step4.count()

2014

##### Join

In [32]:
df_immigration_and_airport = df_immigration_step4.join(df_airport_step4, df_immigration_step4.i94port == df_airport_step4.iata_code, 'left_outer')

In [33]:
df_immigration_and_airport.limit(5).toPandas().head()

Unnamed: 0,cicid,i94port,arrdate,i94addr,depdate,i94bir,i94visa,biryear,gender,airline,...,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,latitude,longitude
0,5748517,LOS,2016-04-30,CA,2016-05-08,40,1,1976,F,QF,...,,,,,,,,,,
1,3344386,NEW,2016-04-17,NJ,2016-04-25,67,2,1949,M,UA,...,8.0,,US,US-LA,New Orleans,KNEW,NEW,NEW,-90.028297,30.0424
2,5748518,LOS,2016-04-30,NV,2016-05-17,32,1,1984,F,VA,...,,,,,,,,,,
3,3344387,NEW,2016-04-17,NJ,2016-04-25,49,2,1967,F,UA,...,8.0,,US,US-LA,New Orleans,KNEW,NEW,NEW,-90.028297,30.0424
4,5748519,LOS,2016-04-30,WA,2016-05-08,29,1,1987,M,DL,...,,,,,,,,,,


In [34]:
df_immigration_and_airport.show()

+-------+-------+----------+-------+----------+------+-------+-------+------+-------+----------+-----+--------+-----+------+-----+--------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------+---------+
|  cicid|i94port|   arrdate|i94addr|   depdate|i94bir|i94visa|biryear|gender|airline|    admnum|fltno|visatype|i94yr|i94mon|ident|          type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|latitude|longitude|
+-------+-------+----------+-------+----------+------+-------+-------+------+-------+----------+-----+--------+-----+------+-----+--------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------+---------+
|5748517|    LOS|2016-04-30|     CA|2016-05-08|    40|      1|   1976|     F|     QF|2147483647|00011|      B1| 2016|     4| null|          null|                null|        n

In [35]:
df_immigration_and_airport.count()

3096313

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

There are no duplicate keys.

In [36]:
df_immigration_and_airport.filter(df_immigration_and_airport['cicid'].isNull()).count()

0

I94 Immigration Data and the number of final data matches.

In [37]:
df_immigration_and_airport.count()

3096313

In [38]:
df_immigration_step4.count()

3096313

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

- cicid: This is the unique Identifier(I94 Immigration Data)
- i94port: 3 character code of destination city(I94 Immigration Data)
- arrdate: Arrival date of the departure(I94 Immigration Data)
- i94addr: State(I94 Immigration Data)
- depdate: Departure date(I94 Immigration Data)
- i94bir: Ages(I94 Immigration Data)
- i94visa: Reason for immigration(I94 Immigration Data)
- biryear: Birth year(I94 Immigration Data)
- gender: Gender(I94 Immigration Data)
- airline: Airline(I94 Immigration Data)
- admnum: Admission Number(I94 Immigration Data)
- fltno： Flight Number(I94 Immigration Data)
- visatype: Visa type(Student, Job, etc.)(I94 Immigration Data)
- i94yr: 4 digit year of the arrival(I94 Immigration Data)
- i94mon: Numeric month of the arrival(I94 Immigration Data)
- ident: The text identifier used in the OurAirports URL(Airport Code Table)
- type: The type of the airport(Airport Code Table)
- name: The official airport name, including "Airport", "Airstrip", etc.(Airport Code Table)
- elevation_ft: The airport elevation MSL in feet (not metres).(Airport Code Table)
- continent: The code for the continent where the airport is (primarily) located(Airport Code Table)
- iso_country: The two-character ISO 3166:1-alpha2 code for the country where the airport is (primarily) located(Airport Code Table)
- iso_region: An alphanumeric code for the high-level administrative subdivision of a country where the airport is primarily located(Airport Code Table)
- municipality: The primary municipality that the airport serves (when available)(Airport Code Table)
- gps_code: The code that an aviation GPS database (such as Jeppesen's or Garmin's) would normally use for the airport.(Airport Code Table)
- iata_code: The three-letter IATA code for the airport (if it has one).(Airport Code Table)
- local_code: The local country code for the airport, if different from the gps_code and iata_code fields (used mainly for US airports)(Airport Code Table)
- latitude: Latitude(Airport Code Table)
- longitude: Longitude(Airport Code Table)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
　* Spark is used to process large amounts of data quickly.
* Propose how often the data should be updated and why.
 * I94 Immigration Data can be updated when new data is published.
 * The Airport Code Table needs to be updated when new airports are added.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
   -> Increase the number of nodes in SPARK to handle this.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   -> Use a tool that allows for scheduled execution. For example, Apache Airflow.
 * The database needed to be accessed by 100+ people.
   -> Increase the number of nodes in SPARK to handle this.