# US Immigration
### Data Engineering Capstone Project

#### Project Summary
This data comes from the US National Tourism and Trade Office, will work on "Immigration" and "U.S. State Demographics" files

##### The project follows the follow steps:
###### * Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import os
import glob
import psycopg2
import pandas as pd
import numpy as np
from sqlalchemy import column, Integer
import pyspark.sql.functions as F
import pyspark.sql.types as T

### Step 1: Scope the Project and Gather Data

#### Scope 
We have four datasets which will work on and examin their rows to figure a clear vision

#### Describe and Gather Data 
<b>Datasets</b>
* <b> 1- I94 Immigration Data:</b>


    - This data comes from the US National Tourism and Trade Office.
    - A data dictionary is included in the workspace.
    - There's a sample file so you can take a look at the data in csv format before reading it all in.
    - You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
    - You can access the immigration data in a folder with the following path: ../../data/18-83510-I94-Data-2016/.
    - There's a file for each month of the year. An example file name is i94_apr16_sub.sas7bdat.
    - Each file has a three-letter abbreviation for the month name. So a full file path for June would look like this: ../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat
    
* <b> 2- World Temperature Data:</b>


    - This dataset came from Kaggle.
    - We will discover its contain later.
    
* <b> 3- U.S. City Demographic Data:</b>


    - This data comes from OpenSoft.
    
    
* <b> 4- Airport Code Table:</b>


    - This is a simple table of airport codes and corresponding cities.

In [2]:
# Read in the data here

# 1- Immigration Data Sample
df_data =  pd.read_csv('immigration_data_sample.csv')
df_data.head(2)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2


In [3]:
# 2- US Cities Demographics

demo_data = pd.read_csv('us-cities-demographics.csv', sep = ';')
demo_data.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


In [4]:
# 3- Airport Codes

port_data = pd.read_csv('airport-codes_csv.csv')
port_data.head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"


In [5]:
# 4- Temperature Data

temp_data = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
temp_data.head(2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
# Reading SAS data (.sas7bdat) with Spark

from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [7]:
df_spark.limit(2).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1


#### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data

<b> * Step 2: Explore and Assess the Data </b>
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up


#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### Cleaning datasets

<b> fact_immigration:</b>

- Convert Dates to DateTime
- Add Visa Categories (1=Business - 2=Pleasure - 3=Student)
- Add travel modes (1=Air - 2=Sea - 3=Land - 9=Not Reported)

<b> dim_city_demographics:</b>

- Check the number of cities and states.
- Drop Race and Count columns.

<b> dim_airports:</b>

- Filter on 'US' in iso_country column.
- Merge with city_demographics table.
- Save cleaned data.


In [8]:
df_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

In [9]:
#Convert Dates to DateTime

df_data['arrdate'] = pd.to_datetime(df_data['arrdate'], unit='D', origin='1960-01-01')
df_data['depdate'] = pd.to_datetime(df_data['depdate'], unit='D', origin='1960-01-01')


In [10]:
df_data['dtadfile'] = df_data['dtadfile'].astype(object)

In [11]:
df_data['dtadfile'] = pd.to_datetime(df_data['dtadfile'])

In [12]:
# Check the result 
df_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null datetime64[ns]
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null datetime64[ns]
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null datetime64[ns]
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float6

In [13]:

with pd.option_context('display.max_columns', None):
    display(df_data.head())

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,2016-04-29,61.0,2.0,1.0,1970-01-01 00:00:00.020160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,2016-04-24,26.0,2.0,1.0,1970-01-01 00:00:00.020160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,2016-04-07,1.0,FL,2016-04-27,76.0,2.0,1.0,1970-01-01 00:00:00.020160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,2016-04-28,1.0,CA,2016-05-07,25.0,2.0,1.0,1970-01-01 00:00:00.020160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,2016-04-06,3.0,NY,2016-04-09,19.0,2.0,1.0,1970-01-01 00:00:00.020160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [14]:
#Check duplication in addmision numbers

df_data.duplicated().sum()

0

In [15]:
# Check the visa post values

df_data.visapost.value_counts().head()

MEX    28
BNS    21
BGT    14
SPL    14
SHG    13
Name: visapost, dtype: int64

In [16]:
# Number of states

df_data.i94addr.nunique()

51

In [17]:
# Check the travel modes in "i94mode" column

df_data.i94mode.value_counts()

1.0    962
3.0     26
2.0     10
9.0      2
Name: i94mode, dtype: int64

In [18]:
#Replace the values in "i94mode" by its true names

df_data['i94mode'] = df_data['i94mode'].replace(to_replace=(1, 2, 3, 9), value=('Air', 'Sea', 'Land', 'Not reported'))

In [19]:
#Check the new values in "i94mode"

df_data.i94mode.value_counts()

Air             962
Land             26
Sea              10
Not reported      2
Name: i94mode, dtype: int64

In [20]:
#Check Visa Categories in "i94visa" column

df_data.i94visa.value_counts()

2.0    831
1.0    155
3.0     14
Name: i94visa, dtype: int64

In [21]:
#Replace the values in "i94visa" by its true names by Spark

from pyspark.sql.functions import col, isnan, when, count, desc

df_spark.createOrReplaceTempView('immigration')

In [22]:
# Add Visa Categories (Business - Pleasure - Student)

sql_expr = """
CASE  WHEN i94visa = 1.0 THEN 'Business' 
      WHEN i94visa = 2.0 THEN 'Pleasure'
      WHEN i94visa = 3.0 THEN 'Student'
      ELSE 'N/A'  
END              
"""

spark.sql("SELECT *," + sql_expr + "AS visa_category FROM immigration").createOrReplaceTempView("immigration2")

In [23]:
with pd.option_context('display.max_columns', None):
    display(spark.sql("""
                SELECT *
                FROM immigration2
                LIMIT 5
    """).toPandas())

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,visa_category
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2,Pleasure
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1,Student
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2,Pleasure
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2,Pleasure
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2,Pleasure


<b> dim_city_demographics:</b>

In [24]:
demo_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [25]:
demo_data.duplicated().sum()

0

In [26]:
#Number of citeis

demo_data.City.nunique()

567

In [27]:
#Number of states

demo_data.State.nunique()

49

In [28]:
demo_data.Race.value_counts()

Hispanic or Latino                   596
White                                589
Black or African-American            584
Asian                                583
American Indian and Alaska Native    539
Name: Race, dtype: int64

In [29]:
# As Count is ditributed by Race in same city and state, 
#  we can drop both columns 

demo_data.drop(columns=['Race', 'Count'], inplace=True)

In [30]:
demo_data.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ


<b> dim_airports:</b>


In [31]:
port_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [32]:
port_data.duplicated().sum()

0

In [33]:
port_data.iso_country.value_counts().head()

US    22757
BR     4334
CA     2784
AU     1963
KR     1376
Name: iso_country, dtype: int64

In [34]:
# Filter on US country and create a new dataset 

USport_data = port_data[port_data.iso_country == 'US']

In [35]:
USport_data.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [36]:
USport_data.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 22757 entries, 0 to 54896
Data columns (total 12 columns):
ident           22757 non-null object
type            22757 non-null object
name            22757 non-null object
elevation_ft    22518 non-null float64
continent       1 non-null object
iso_country     22757 non-null object
iso_region      22757 non-null object
municipality    22655 non-null object
gps_code        20984 non-null object
iata_code       2019 non-null object
local_code      21236 non-null object
coordinates     22757 non-null object
dtypes: float64(1), object(11)
memory usage: 2.3+ MB


In [37]:
USport_data.type.value_counts().sort_values(ascending=False) #almost small airport has the largest rate

small_airport     13720
heliport           6265
closed             1326
medium_airport      692
seaplane_base       566
large_airport       170
balloonport          18
Name: type, dtype: int64

In [38]:
USport_data.continent.value_counts()

AS    1
Name: continent, dtype: int64

In [39]:
# merge with "city_demographics" to get local airport code

USport_demo = pd.merge(demo_data, USport_data[['municipality', 'name', 'local_code']], how='left', 
                     left_on='City', right_on='municipality')

USport_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,municipality,name,local_code
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Silver Spring,Dow Jones & Co. Inc. Heliport,MD67
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Quincy,Blessing Hospital At 11th St Heliport,0IS8
2,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Quincy,S and S Field,1OH1
3,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Quincy,Gadsden Memorial Hospital Heliport,29FD
4,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Quincy,Quincy Municipal Airport,2J9


In [40]:
USport_demo.to_csv("us-states-demographics.csv", index=False)

#### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data

<b> * Step 3: Define the Data Model </b>
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up


#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines

###### - First Extraction by loading the datasets csv and sas files.

###### - Then appling some cleaning steps as following:

<b> fact_immigration:</b>

- Convert Dates to DateTime
- Add Visa Categories (1=Business - 2=Pleasure - 3=Student)
- Add travel modes (1=Air - 2=Sea - 3=Land - 9=Not Reported)

<b> dim_city_demographics:</b>

- Check the number of cities and states.
- Drop Race and Count columns.

<b> dim_airports:</b>

- Filter on 'US' in iso_country column.
- Merge with city_demographics table.
- Save cleaned data.


### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
    
<b> * Step 4: Run ETL to Model the Data </b>
* Step 5: Complete Project Write Up


#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [41]:
# - %run -i '.py'
# - !python .py

!python spark_etl.py

https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark-2.4.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
saurfang#spark-sas7bdat added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e1b7db36-7647-4020-81cf-23aadfa5e4d4;1.0
	confs: [default]
	found saurfang#spark-sas7bdat;2.0.0-s_2.11 in repo-1
	found com.epam#parso;2.0.8 in central
	found org.slf4j#slf4j-api;1.7.5 in central
	found org.apache.logging.log4j#log4j-api-scala_2.11;2.7 in central
	found org.scala-lang#scala-reflect;2.11.8 in central
:: resolution report :: resolve 386ms :: artifacts dl 7ms
	:: modules in use:
	com.epam#parso;2.0.8 from central in [default]
	org.apache.logging.log4j#log4j-api-scala_2.11;2.7 from central in [default]
	org.scala-lang#scala-reflect;2.11.8 from central 

In [42]:
# get filepath to immigration data file
path = "sas_data/immigration"
    
# read immigration data file
df_immigration = spark.read.parquet(path)

df_immigration.limit(5).toPandas()

AnalysisException: 'Path does not exist: file:/workspace/home/sas_data/immigration;'

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data

<b> * Step 5: Complete Project Write Up </b>


* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.