# US Immigration 
### Data Engineering Capstone Project

#### Project Summary
This project processes several data sets on US Immigration, US City Demographics, Airports, and Global Temperature. The goal is to prepare the data and save it in a data warehouse as a source for a Data Analyst or Data Scientist to do further analysis. 

The goal would be to find any correlations between the i94 immigration data, e.g. *visa type*, *visa category*, or *age* and the destination city demographic data like *median age*, *average houshold income*, or *gender distribution*.

I decided to not use the data sets for temperature and airport at this point.

The project follows the follow steps:
* Step 1: Scope of the Project
* Step 2: Data Import and Exploration
* Step 3: Definition of the Data Model
* Step 4: Provide an ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, StringType
from pyspark.sql.functions import col, sum, udf, when, round
from pyspark.sql.functions import monotonically_increasing_id

pd.set_option('display.max_columns', None)

### Step 1: Scope of the Project

#### Scope 
In order to prepare the data for an anaylitcs process, the following steps where taken:
- Import of the data sets from various sources
    - csv import with pandas
    - sas import with pyspark
- Exploratory data analysis and data cleaning using pandas and pyspark
- Data Modeling and creation of the necessary tables
- Saving tables in parquet format

### Step 2: Data Import and Exploration

---
#### Data Set 1: I94 Immigration Data
This data comes from the [US National Tourism and Trade Office](https://www.trade.gov/national-travel-and-tourism-office). It contains records about U.S. immigration including flight data, visa status, and personal details. A data dictionary is included in the workspace.

#### inspect the small dataset

In [2]:
# Immigration Data Sample
df_imm_small = pd.read_csv('raw_data/immigration_data_sample.csv')
print(f'rows: {df_imm_small.shape[0]}, columns: {df_imm_small.shape[1]}')
df_imm_small.head(3)

rows: 1000, columns: 29


Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT


In [3]:
df_imm_small.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [4]:
df_imm_small['i94port'].unique()

array(['HHW', 'MCA', 'OGG', 'LOS', 'CHM', 'ATL', 'SFR', 'NYC', 'CHI',
       'PHI', 'FTL', 'BOS', 'SAI', 'NAS', 'SEA', 'ORL', 'PSP', 'HOU',
       'NEW', 'BAL', 'SNJ', 'DET', 'AGA', 'LVG', 'MIA', 'SDP', 'VCV',
       'DUB', 'PEM', 'TAM', 'BLA', 'WAS', 'KOA', 'DAL', 'SHA', 'SPM',
       'NIA', 'PHR', 'MIL', 'SLC', 'CLT', 'EPI', 'SNA', 'MON', 'DLR',
       'SFB', 'OPF', 'X96', 'CLM', 'LIH', 'DEN', 'PHO', 'POO', 'NOL',
       'WPB', 'PBB', 'TOR', 'MAA', 'RNO', 'FMY', 'HIG', 'OAK', 'OTM',
       'ONT', 'SRQ', 'LLB', 'NCA', 'SUM', 'STR', 'HAM'], dtype=object)

---
#### now import the large data set from sas format

In [5]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [6]:
df_i94 = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat')
df_i94.count()

3574989

In [7]:
df_i94_pandas = df_i94.limit(5).toPandas()
df_i94_pandas

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,validres,delete_days,delete_mexl,delete_dup,delete_visa,delete_recdup,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,59.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1957.0,10032016,,,,14938460000.0,,WT
1,5.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,50.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1966.0,10032016,,,,17460060000.0,,WT
2,6.0,2016.0,6.0,213.0,213.0,XXX,20609.0,,,,27.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1989.0,D/S,,,,1679298000.0,,F1
3,7.0,2016.0,6.0,213.0,213.0,XXX,20611.0,,,,23.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1993.0,D/S,,,,1140963000.0,,F1
4,16.0,2016.0,6.0,245.0,245.0,XXX,20632.0,,,,24.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1992.0,D/S,,,,1934535000.0,,F1


In [8]:
df_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- validres: double (nullable = true)
 |-- delete_days: double (nullable = true)
 |-- delete_mexl: double (nullable = true)
 |-- delete_dup: double (nullable = true)
 |-- delete_visa: double (nullable = true)
 |-- delete_recdup: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- mat

In [9]:
df_i94.select('i94yr').distinct().collect()

[Row(i94yr=2016.0)]

In [10]:
# count the missing values by summing the boolean output of the isNull() method, after converting it to type integer
df_i94.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_i94.columns)).show(truncate=False)

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|occup  |entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum |airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|0    |0    |0     |520   |0     |0      |0      |61187  |186064 |287071 |639   |0      |0    |0       |0          |0          |0         |0     

---
#### convert date from sas numeric format to datetime format

In [11]:
# function to convert date column into a date format
@udf(DateType())
def convert_to_date(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

In [12]:
df_i94 = df_i94.withColumn('arrdate', convert_to_date(col('arrdate')))
df_i94 = df_i94.withColumn('depdate', convert_to_date(col('depdate')))

In [13]:
df_i94_pandas = df_i94.limit(5).toPandas()
df_i94_pandas

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,validres,delete_days,delete_mexl,delete_dup,delete_visa,delete_recdup,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4.0,2016.0,6.0,135.0,135.0,XXX,2016-06-07,,,,59.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1957.0,10032016,,,,14938460000.0,,WT
1,5.0,2016.0,6.0,135.0,135.0,XXX,2016-06-07,,,,50.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1966.0,10032016,,,,17460060000.0,,WT
2,6.0,2016.0,6.0,213.0,213.0,XXX,2016-06-04,,,,27.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1989.0,D/S,,,,1679298000.0,,F1
3,7.0,2016.0,6.0,213.0,213.0,XXX,2016-06-06,,,,23.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1993.0,D/S,,,,1140963000.0,,F1
4,16.0,2016.0,6.0,245.0,245.0,XXX,2016-06-27,,,,24.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1992.0,D/S,,,,1934535000.0,,F1


In [14]:
type(df_i94_pandas['arrdate'][0])

datetime.date

---
#### convert double fields to integer

In [15]:
df_i94 = df_i94.withColumn("i94bir", round(df_i94["i94bir"]).cast('integer'))
df_i94 = df_i94.withColumn("i94yr", round(df_i94["i94yr"]).cast('integer'))
df_i94 = df_i94.withColumn("i94mon", round(df_i94["i94mon"]).cast('integer'))
df_i94 = df_i94.withColumn("i94visa", round(df_i94["i94visa"]).cast('integer'))
df_i94 = df_i94.withColumn("i94mode", round(df_i94["i94mode"]).cast('integer'))
df_i94 = df_i94.withColumn("i94cit", round(df_i94["i94cit"]).cast('integer'))
df_i94 = df_i94.withColumn("i94res", round(df_i94["i94res"]).cast('integer'))
df_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: double (nullable = true)
 |-- validres: double (nullable = true)
 |-- delete_days: double (nullable = true)
 |-- delete_mexl: double (nullable = true)
 |-- delete_dup: double (nullable = true)
 |-- delete_visa: double (nullable = true)
 |-- delete_recdup: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- 

In [16]:
df_i94_pandas = df_i94.limit(5).toPandas()
df_i94_pandas

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,validres,delete_days,delete_mexl,delete_dup,delete_visa,delete_recdup,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4.0,2016,6,135,135,XXX,2016-06-07,,,,59,2,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1957.0,10032016,,,,14938460000.0,,WT
1,5.0,2016,6,135,135,XXX,2016-06-07,,,,50,2,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1966.0,10032016,,,,17460060000.0,,WT
2,6.0,2016,6,213,213,XXX,2016-06-04,,,,27,3,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1989.0,D/S,,,,1679298000.0,,F1
3,7.0,2016,6,213,213,XXX,2016-06-06,,,,23,3,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1993.0,D/S,,,,1140963000.0,,F1
4,16.0,2016,6,245,245,XXX,2016-06-27,,,,24,3,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1992.0,D/S,,,,1934535000.0,,F1


---
#### import i94 description data from sas format
and save it in a dictionary

In [17]:
with open('raw_data/I94_SAS_Labels_Descriptions.SAS') as file:
    I94_descriptions = file.readlines()

In [18]:
# extract country info: used in i94CIT and i94RES
# '236': 'AFGHANISTAN'
country_codes = {}
for line in I94_descriptions[10:298]:
    code_country = line.split('=')
    code = code_country[0].strip()
    country = code_country[1].strip().strip("'")
    country_codes[code] = country

In [19]:
# extract port info: used in i94PORT (airport or harbour info)
# 'ANC': 'ANCHORAGE, AK' and delete state info. result: 'ANC': 'ANCHORAGE'
port_codes = {}
for line in I94_descriptions[303:962]:
    code_port = line.split('=')
    code = code_port[0].strip().strip("'")
    port = code_port[1].strip().strip("'").strip()[:-4]
    port_codes[code] = port

In [20]:
# extract state info: used in i94ADDR
# 'AK': 'ALASKA'
state_codes = {}
for line in I94_descriptions[982:1036]:
    code_state = line.split('=')
    code = code_state[0].strip().strip("'")
    state = code_state[1].strip().strip("'")
    state_codes[code] = state

In [21]:
# extract travelmode info: used in i94mode
# '1': 'AIR'
travelmode_codes = {}
for line in I94_descriptions[972:976]:
    code_travelmode = line.split('=')
    code = int(code_travelmode[0].strip())
    travelmode = code_travelmode[1].strip().strip("'")
    travelmode_codes[code] = travelmode

In [22]:
# extract visacategory info: used in i94visa
# '1': 'Business'
visacat_codes = {}
for line in I94_descriptions[1046:1049]:
    code_visacat = line.split('=')
    code = int(code_visacat[0].strip())
    visacat = code_visacat[1].strip()#.strip("'")
    visacat_codes[code] = visacat

---
#### Data Set 2: U.S. City Demographic Data
This data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.

Findings: Each city has five entries, one for each 'Race' with its corresponding 'Count' value. The rest of the columns are the same for that city. 

In [23]:
# US Cities Demographics
df_cities = pd.read_csv('raw_data/us-cities-demographics.csv', delimiter=';')
df_cities.head(3)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759


In [24]:
print(f'rows: {df_cities.shape[0]}, columns: {df_cities.shape[1]}')

rows: 2891, columns: 12


In [25]:
df_cities.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [26]:
df_cities.loc[df_cities['City'] == 'Silver Spring']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
592,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,White,37756
1678,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Black or African-American,21330
2123,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,American Indian and Alaska Native,1084
2162,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Asian,8841


In [27]:
df_cities['City'].loc[df_cities['State'] == 'Maryland'].unique()

array(['Silver Spring', 'Frederick', 'Columbia', 'Waldorf',
       'Ellicott City', 'German', 'Rockville', 'Baltimore', 'Gaithersburg',
       'Glen Burnie'], dtype=object)

In [28]:
df_cities['Race'].unique()

array(['Hispanic or Latino', 'White', 'Asian', 'Black or African-American',
       'American Indian and Alaska Native'], dtype=object)

In [29]:
print(f'rows: {df_cities.shape[0]}, columns: {df_cities.shape[1]}')

rows: 2891, columns: 12


In [30]:
df_cities.drop_duplicates
print(f'rows: {df_cities.shape[0]}, columns: {df_cities.shape[1]}')

rows: 2891, columns: 12


---
#### Data Set 3: World Temperature Data
This dataset came from [Kaggle](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data). Kaggle has repackaged the data from a compilation put together by the [Berkeley Earth](http://berkeleyearth.org/about/), which is affiliated with Lawrence Berkeley National Laboratory. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives. The data set used here is the Global Land Temperatur By City.

Findings: This data set only provides records until 2013, since the immigration data is from 2016 I am not going to use it here.

In [31]:
# Temperature Data
df_temperature = pd.read_csv('raw_data/GlobalLandTemperaturesByCity.csv')
df_temperature.head(3)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [32]:
print(f'rows: {df_temperature.shape[0]}, columns: {df_temperature.shape[1]}')

rows: 86085, columns: 7


In [33]:
df_temperature['year'] = df_temperature['dt'].str.extract(r'(\d{4})').astype('int32')
df_temperature.head(3)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E,1743
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E,1743
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E,1744


In [34]:
df_temperature['year'].max()

2013

In [35]:
df_temperature.isnull().sum()

dt                                  0
AverageTemperature               3782
AverageTemperatureUncertainty    3783
City                                1
Country                             1
Latitude                            1
Longitude                           1
year                                0
dtype: int64

---
#### Data Set 4: Airport Codes
This is a simple table of airport codes and corresponding cities provided by [Data Hub](https://datahub.io/core/airport-codes#data). The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia).

Findings: Reduce the data set to only US airports. Over 91% of US airports have a missing IATA code. Almost 2.000 are closed. 

There seems to be no use for this data set for the purpose of the project, so I am not going to include it.

In [36]:
# US Airport Codes
df_airports = pd.read_csv('raw_data/airport-codes_csv.csv')
df_airports.head(3)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"


In [37]:
print(f'rows: {df_airports.shape[0]}, columns: {df_airports.shape[1]}')

rows: 55075, columns: 12


In [38]:
df_airports.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [39]:
df_airports.drop_duplicates
print(f'rows: {df_airports.shape[0]}, columns: {df_airports.shape[1]}')

rows: 55075, columns: 12


In [40]:
df_airports['iso_country'].unique()

array(['US', 'PR', 'MH', 'MP', 'GU', 'SO', 'AQ', 'GB', 'PG', 'AD', 'SD',
       'SA', 'AE', 'SS', 'ES', 'CN', 'AF', 'LK', 'SB', 'CO', 'AU', 'MG',
       'TD', 'AL', 'AM', 'MX', 'MZ', 'PW', 'NR', 'AO', 'AR', 'AS', 'AT',
       'ZZ', 'GA', 'AZ', 'BA', 'BB', 'BE', 'DE', 'BF', 'BG', 'GL', 'BH',
       'BI', 'IS', 'BJ', 'OM', 'XK', 'BM', 'KE', 'PH', 'BO', 'BR', 'BS',
       'CV', 'BW', 'FJ', 'BY', 'UA', 'LR', 'BZ', 'CA', 'CD', 'CF', 'CG',
       'MR', 'CH', 'CL', 'CM', 'MA', 'CR', 'CU', 'CY', 'CZ', 'SK', 'PA',
       'DZ', 'ID', 'GH', 'RU', 'CI', 'DK', 'NG', 'DO', 'NE', 'HR', 'TN',
       'TG', 'EC', 'EE', 'FI', 'EG', 'GG', 'JE', 'IM', 'FK', 'EH', 'NL',
       'IE', 'FO', 'LU', 'NO', 'PL', 'ER', 'MN', 'PT', 'SE', 'ET', 'LV',
       'LT', 'ZA', 'SZ', 'GQ', 'SH', 'MU', 'IO', 'ZM', 'FM', 'KM', 'YT',
       'RE', 'TF', 'ST', 'FR', 'SC', 'ZW', 'MW', 'LS', nan, 'ML', 'GM',
       'GE', 'GF', 'SL', 'GW', 'GN', 'SN', 'GR', 'GT', 'TZ', 'GY', 'SR',
       'DJ', 'HK', 'LY', 'HN', 'VN', 'KZ', 'RW', 'HT

In [41]:
df_airports_us = df_airports.loc[df_airports['iso_country'] == 'US']
print(f'rows: {df_airports_us.shape[0]}, columns: {df_airports_us.shape[1]}')

rows: 22757, columns: 12


In [42]:
df_airports_us.isnull().sum()

ident               0
type                0
name                0
elevation_ft      239
continent       22756
iso_country         0
iso_region          0
municipality      102
gps_code         1773
iata_code       20738
local_code       1521
coordinates         0
dtype: int64

In [43]:
df_airports_us.loc[df_airports_us['iata_code'].isnull()]['iata_code'].isnull().sum() * 100 / df_airports_us.shape[0]

91.128004570022412

In [44]:
df_airports_us.loc[df_airports_us['iata_code'].isnull()]['ident'].isnull().sum()

0

In [45]:
df_airports_us.loc[(df_airports_us['type'] == 'closed')].shape[0]

1326

In [46]:
df_airports_us['type'].unique()

array(['heliport', 'small_airport', 'closed', 'seaplane_base',
       'balloonport', 'medium_airport', 'large_airport'], dtype=object)

In [47]:
df_airports_us.loc[df_airports_us['type'] != 'closed'].isnull().sum()

ident               0
type                0
name                0
elevation_ft      200
continent       21430
iso_country         0
iso_region          0
municipality       67
gps_code          550
iata_code       19475
local_code        306
coordinates         0
dtype: int64

---
### Step 3: Definition of the Data Model
#### 3.1 Conceptual Data Model
In order to devide the main i94 immigration data from any descriptive information, the project uses the star schema. There will be one fact table: 
1. **fact_i94** containing all necessary immigration information. 

And there will be 6 dimension tables. The first 5 are directly correlated to the fact table, i.e. contain information behind codes used in the fact table. The last one is holding the city demographic data: 
1. **dim_country** to match the i94cit and i94res country code  
2. **dim_city** to match the i94port city code 
3. **dim_state** to match the i94addr state code 
4. **dim_travelmode** to match the i94mode travel method code 
5. **dim_visacat** to match the i94visa visa category
6. **dim_city_demographics** with all necessary city demographic information

#### 3.2 Mapping Out Data Pipelines
The following steps are necessary to pipeline the data into the chosen data model.
- import raw data
- clean data: delete duplicates and convert types
- extract information for fact and dimension tables
- upload table to the Data Lake

### Step 4: (Run Pipelines to) Model the Data
This step includes the creation and the upload of the tables. The full pipeline which uncludes actions from step 2, can be found in `etl.py`.

#### 4.1 Create the data model

#### 4.1.1 create fact tables

In [48]:
# create fact table 
fact_i94 = df_i94.withColumn('id', monotonically_increasing_id())\
    .select('id', 'cicid', 'arrdate', 'i94yr', 'i94mon', 'depdate', 'i94port', 'i94addr', 'i94mode'\
            , 'i94cit', 'i94res', 'i94visa', 'visatype', 'i94bir', 'gender', 'insnum', 'admnum').drop_duplicates()   
fact_i94.printSchema()

root
 |-- id: long (nullable = false)
 |-- cicid: double (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- visatype: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- admnum: double (nullable = true)



#### 4.1.2 convert dictionaries with sas Description code into dimension tables

In [49]:
# country info: used in i94CIT and i94RES
dim_country = pd.DataFrame.from_dict(country_codes, orient="index").reset_index()
dim_country.columns=['country_code', 'country_name']
dim_country.head(3)

Unnamed: 0,country_code,country_name
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA


In [50]:
# port info: used in i94PORT (airport or harbour info)
# 'ANC': 'ANCHORAGE, AK'
dim_city = pd.DataFrame.from_dict(port_codes, orient="index").reset_index()
dim_city.columns=['city_code', 'city']
dim_city.head(3)

Unnamed: 0,city_code,city
0,ANC,ANCHORAGE
1,BAR,BAKER AAF - BAKER ISLAND
2,DAC,DALTONS CACHE


In [51]:
# state info: used in i94ADDR
# 'AK': 'ALASKA'
dim_state = pd.DataFrame.from_dict(state_codes, orient="index").reset_index()
dim_state.columns=['state', 'state_name']
dim_state.head(3)

Unnamed: 0,state,state_name
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS


In [52]:
# travelmode info: used in i94mode
# '1': 'AIR'
dim_travelmode = pd.DataFrame.from_dict(travelmode_codes, orient="index").reset_index()
dim_travelmode.columns=['i94mode', 'travelmode']
dim_travelmode

Unnamed: 0,i94mode,travelmode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [53]:
# visacategory info: used in i94visa
# '1': 'Business'
dim_visacat = pd.DataFrame.from_dict(visacat_codes, orient="index").reset_index()
dim_visacat.columns=['i94visa', 'visacat']
dim_visacat

Unnamed: 0,i94visa,visacat
0,1,Business
1,2,Pleasure
2,3,Student


---
#### 4.1.3 create city demographic dimension table

In [54]:
df_cities.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [55]:
# select needed columns and drop duplicate rows
dim_demographics = df_cities[['City', 'State', 'Median Age', 'Average Household Size'\
                              , 'Total Population', 'Female Population', 'Male Population', 'Foreign-born']]
dim_demographics = dim_demographics.drop_duplicates()
dim_demographics.head()

Unnamed: 0,City,State,Median Age,Average Household Size,Total Population,Female Population,Male Population,Foreign-born
0,Silver Spring,Maryland,33.8,2.6,82463,41862.0,40601.0,30908.0
1,Quincy,Massachusetts,41.0,2.39,93629,49500.0,44129.0,32935.0
2,Hoover,Alabama,38.5,2.58,84839,46799.0,38040.0,8229.0
3,Rancho Cucamonga,California,34.5,3.18,175232,87105.0,88127.0,33878.0
4,Newark,New Jersey,34.6,2.73,281913,143873.0,138040.0,86253.0


In [56]:
# rename columns
new_columns = ['City', 'State', 'Median_Age', 'Average_Household_Size'\
                              , 'Total_Population', 'Female_Population', 'Male_Population', 'Foreign_born']
dim_demographics.columns = new_columns
dim_demographics.head()

Unnamed: 0,City,State,Median_Age,Average_Household_Size,Total_Population,Female_Population,Male_Population,Foreign_born
0,Silver Spring,Maryland,33.8,2.6,82463,41862.0,40601.0,30908.0
1,Quincy,Massachusetts,41.0,2.39,93629,49500.0,44129.0,32935.0
2,Hoover,Alabama,38.5,2.58,84839,46799.0,38040.0,8229.0
3,Rancho Cucamonga,California,34.5,3.18,175232,87105.0,88127.0,33878.0
4,Newark,New Jersey,34.6,2.73,281913,143873.0,138040.0,86253.0


In [57]:
print(f'size of original data set: {df_cities.shape[0]}')

size of original data set: 2891


In [58]:
print(f'size of reduced dataset: {dim_demographics.shape[0]}')

size of reduced dataset: 596


---
#### 4.2 save tables in parquet format

In [59]:
output_data = 'data_warehouse/'

In [60]:
# writing fact_i94 table
fact_i94.write \
    .partitionBy('i94yr', 'i94mon') \
    .mode('overwrite') \
    .parquet('{}fact_i94/'.format(output_data))

In [61]:
# writing dim_country table
# convert pandas df to spark df in order to save it as parquet
dim_country=spark.createDataFrame(dim_country) 
dim_country.write \
    .mode('overwrite') \
    .parquet('{}dim_country/'.format(output_data))

In [62]:
# writing dim_city table
dim_city=spark.createDataFrame(dim_city) 
dim_city.write \
    .mode('overwrite') \
    .parquet('{}dim_city/'.format(output_data))

In [63]:
# writing dim_state table
dim_state=spark.createDataFrame(dim_state) 
dim_state.write \
    .mode('overwrite') \
    .parquet('{}dim_state/'.format(output_data))

In [64]:
# writing dim_travelmode table
dim_travelmode=spark.createDataFrame(dim_travelmode) 
dim_travelmode.write \
    .mode('overwrite') \
    .parquet('{}dim_travelmode/'.format(output_data))

In [65]:
# writing dim_visacat table
dim_visacat=spark.createDataFrame(dim_visacat) 
dim_visacat.write \
    .mode('overwrite') \
    .parquet('{}dim_visacat/'.format(output_data))

In [66]:
# writing dim_demographics table
dim_demographics=spark.createDataFrame(dim_demographics) 
dim_demographics.write \
    .mode('overwrite') \
    .parquet('{}dim_demographics/'.format(output_data))

#### 4.3 Data Quality Checks 
This step includes data quality checks to ensure that the etl process has been executed successfully.

1. Import tables from parquet storage and compare the size with the original dataframes.

In [67]:
def check_file_size(file_name, df_original):
    '''import the parquet file and compare its size with the size of the original dataframe'''
    test_file = spark.read.parquet('{}{}'.format(output_data, file_name))
    if test_file.count() == df_original.count():
        print(f'Data check for {file_name} size successful.')
    else:
        print(f'Data check for {file_name} size not successful. Please check the pipeline.')

In [68]:
check_file_size('dim_demographics', dim_demographics)

Data check for dim_demographics size successful.


In [69]:
check_file_size('dim_country', dim_country)
check_file_size('dim_city', dim_city)
check_file_size('dim_state', dim_state)
check_file_size('dim_travelmode', dim_travelmode)
check_file_size('dim_visacat', dim_visacat)

Data check for dim_country size successful.
Data check for dim_city size successful.
Data check for dim_state size successful.
Data check for dim_travelmode size successful.
Data check for dim_visacat size successful.


2. Import fact table and compare the size with the original dataframe. (I do not use the function here to use the imported df in the next data check.)

In [70]:
# check size for i94 fact table
test_fact_i94 = spark.read.parquet('{}fact_i94'.format(output_data))
if test_fact_i94.count() == fact_i94.count():
    print(f'Data check for fact_i94 size successful.')
else:
    print(f'Data check for fact_i94 size not successful. Please check the pipeline.')

Data check for fact_i94 size successful.


3. Compare some datatypes of the newly imported fact_i94 with the original dataframe.

In [71]:
# save data types in a list of tuples
origin_dt = fact_i94.dtypes
new_dt = test_fact_i94.dtypes

In [72]:
def test_datatype_match(column_name, origin_dt, new_dt):
    '''check for a data type match between the original and the newly imported data frames'''
    try:
        origin = [item for item in origin_dt if item[0] == column_name][0][1]
        new = [item for item in new_dt if item[0] == column_name][0][1]
        print(f'Data type match for column {column_name}: {origin == new}')
    except:
        print(f'{column_name} does not seem to exist in the dataframe')

In [73]:
test_datatype_match('id', origin_dt, new_dt)
test_datatype_match('arrdate', origin_dt, new_dt)
test_datatype_match('depdate', origin_dt, new_dt)
test_datatype_match('i94yr', origin_dt, new_dt)
test_datatype_match('i94visa', origin_dt, new_dt)

Data type match for column id: True
Data type match for column arrdate: True
Data type match for column depdate: True
Data type match for column i94yr: True
Data type match for column i94visa: True


#### 4.4 Data dictionary 

FACT_i94
- **id**: Primary key
- **cicid**: CIC identification number
- **arrdate**: arrival date
- **depdate**: departure date
- **i94port**: port of entry (city and state)
- **i94addr**: state during stay
- **i94mode**: travel method code
- **i94cit**: citizenship country code
- **i94res**: residency country code
- **i94visa**: visa category code
- **visatype**: visa type
- **i94bir**: age at entry
- **gender**: gender
- **insnum**: INS number
- **admnum**: admission number

DIM_CITY
- **city_code**: 3-character country code for port of entry found in i94port
- **city**: corresponding city name

DIM_STATE
- **state**: 2-character state abbreviation found in i94addr
- **state_name**: corresponding state name

DIM_COUNTRY
- **country_code**: 3-digit country code, found in i94cit and i94res
- **country_name**: corresponding country name

DIM_TRAVELMODE
- **i94mode**: 1-digit code referring to the mean of travel, found in i94cit and i94res
- **travelmode**: corresponding name (e.g. "Air")

DIM_VISACAT
- **i94visa**: 1-digit code referring to the visa category, found in i94cit and i94res
- **visacat**: corresponding visa category (e.g. "Business")

DIM_DEMOGRAPHICS
- **city_id**: primary key
- **city**: name of the city
- **state**: name of the state
- **median_age**: the median age of the cities population
- **average_houshold_income**: the cities average household income
- **total_population**: the cities total population
- **female_population**: the cities female population
- **male_population**: the cities male population
- **foreign_born**: number of people born in a foreign country 

#### Step 5: Complete Project Write Up

#### 5.1 Choice of tools and technologies
The i94 data set data set was available in the *sas7bdat* data format, a database storage format created by Statistical Analysis System (SAS) software. I choose Spark to open and process the data as it is great to handle large data sets, scales easily and is very fast for analytical proccessing.
The small data sets were imported with pandas to introduce a second format into the project. The saving of the tables was again realized with Spark writing to parquet files, I choose this for all tables to have a consistent data format for all of them. 

I choose the star schema to seperate the numerical data in the fact table from the descriptive information in the dimension tables.

#### 5.2 Different Scenarios
1. **If the data was increased by 100x**: I would move the parquet storage to an S3 bucket in the cloud with scalable data storage.
2. **If the data populates a dashboard that must be updated on a daily basis by 7am every day**: I would move the pipeline to Airflow, and set a schedule to have it run every morning to update the dashboard.
3. **If the database needed to be accessed by 100+ people**: Like for point 1, moving the storage to an S3 bucket would be sufficient to enable high scale and frequent access.