# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project is aimed to prepare an in-house datastore that will be mainly manipulated by analysts to discover insights about relations between U.S. immigration data and U.S. city demographic data. Other supplementary datasets may be added to the datastore when requested by analysts.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import datetime
import glob
import os

from pyspark.sql import SparkSession, types
from pyspark.sql.functions import col, udf, to_date

In [2]:
# Default limits when displaying Pandas DataFrame.info()
# Can be overrided by setting arguments verbose=True and show_counts=True
#print(pd.options.display.max_info_columns) # 100
#print(pd.options.display.max_info_rows) # 1690785

# Default limits when displaying Pandas DataFrame itself
#print(pd.options.display.max_columns) # 20
#print(pd.options.display.max_rows) # 60

# Set options to fully display data contents of Pandas DataFrames on screen
# WARNING - Only suitable for small Pandas DataFrames
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [3]:
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

In this project a local Apache Spark installation will serve as the main datastore of the project. All the tables will be created as the in-memory PySpark DataFrames, and data extracted from original datasets will be loaded into the dataframes. Transformations and other processing will be carried out after the data are successfully loaded into the dataframes.

By using Apache Spark and PySpark DataFrames, the final datastore can not only provide SQL quering capabilities, but also offer analysts the flexibility to wrangle with original raw data if needed.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

Following is the datasets that will be used in the project:

* [I94 Immigration Data](https://www.trade.gov/i-94-arrivals-historical-data) (This is the main dataset in the project. The [original URL](https://travel.trade.gov/research/reports/i94/historical/2016.html) of the dataset mentioned in the project is now broken, but we can still find an archived copy of the website by using [Wayback Machine](https://web.archive.org/web/20210328153411/https://travel.trade.gov/research/reports/i94/historical/2016.html).)
* [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) (This is the additional dataset in the project. It is maintained and provided by [Opendatasoft](https://www.opendatasoft.com/).)

Also, these optional datasets are listed here for informational purposes, and will not be utilized in the project.

* [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
* [Airport Code Table](https://datahub.io/core/airport-codes#data)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

- I94 Immigration Data
    * Wrong data type in various columns. Some of them should be either int or string, but they are being reported as floating numbers instead
    * Some of the date columns are [SAS date values](https://v8doc.sas.com/sashtml/lrcon/zenid-63.htm). It would be better if we can change these columns to regular date formats
    * NULL value discovered in some columns. These NULL values will be preserved, as they have valid meanings in immigration data


- U.S. City Demographic Data
    * Wrong data type in several population related columns. These columns should be int type, but most of them are floating numbers.
    * NULL value discovered in some columns. These NULL values will be filled with a default zero value, since these columns are numerical by nature, and it would be easier to manipulate the data if we can eliminate the nuisances caused by NULL values

In [4]:
# Read in the data here (we will be using a small sample to explore and assess I94 data)
df_i94_data_sample = pd.read_csv('immigration_data_sample.csv')
df_us_city_demo = pd.read_csv('us-cities-demographics.csv', sep=';')

In [5]:
# Print out first 5 lines of each Pandas DataFrame for a brief check
display(df_i94_data_sample.head())
display(df_us_city_demo.head())

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [6]:
# Check data types and NULL values of I94 immigration data sample
df_i94_data_sample.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

In [7]:
# Check data types and NULL values of US city demographic data
df_us_city_demo.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [8]:
# DROP Pandas DataFrames we created to free memory
# https://stackoverflow.com/questions/32247643/how-to-delete-multiple-pandas-python-dataframes-from-memory-to-save-ram
del df_i94_data_sample

# df_us_city_demo will not be dropped, since it will be used as a dataset to load into a Pyspark DataFrame
spark_df_us_city_demo = spark.createDataFrame(df_us_city_demo)

In [9]:
# Combine I94 SAS data files into parquet files
# (code snippet borrowed from Project 1 ETL script)
i94_filepath='/data/18-83510-I94-Data-2016/'
all_files = []
for root, dirs, files in os.walk(i94_filepath):
    files = glob.glob(os.path.join(root,'*.sas7bdat'))
    for f in files :
        all_files.append(os.path.abspath(f))

for file in all_files:
    # Load I94 SAS files into a temp Pyspark DataFrame and write it to parquet files in append mode, one by one
    spark_df_sas_temp = spark.read.format('com.github.saurfang.sas.spark').load(file)
    spark_df_sas_temp.write.mode('append').parquet('./sas_data')
    print("File {} appended.".format(file))

File /data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat appended.
File /data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat appended.


In [10]:
# Load combined I94 parquet data files into Pyspark DataFrame
spark_df_i94_data = spark.read.parquet('./sas_data')

In [11]:
spark_df_i94_data.show(1)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+-------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+-------------+-----+--------+
|5680949.0|2016.0|   7.0| 117.0| 117.0|    NYC|20659.0|    1.0|     NY|   null|  30.0|    3.0|  1.0|20160724|     NPL| null|      G|   null|   null|   null| 1986.0|    D/S|     F|  null|     IG|2.947450085E9| 3940|      F1|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----

In [12]:
spark_df_i94_data.take(1)

[Row(cicid=5680949.0, i94yr=2016.0, i94mon=7.0, i94cit=117.0, i94res=117.0, i94port='NYC', arrdate=20659.0, i94mode=1.0, i94addr='NY', depdate=None, i94bir=30.0, i94visa=3.0, count=1.0, dtadfile='20160724', visapost='NPL', occup=None, entdepa='G', entdepd=None, entdepu=None, matflag=None, biryear=1986.0, dtaddto='D/S', gender='F', insnum=None, airline='IG', admnum=2947450085.0, fltno='3940', visatype='F1')]

In [13]:
spark_df_i94_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [14]:
# https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/
from pyspark.sql.functions import col, isnan, when, count
spark_df_i94_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_df_i94_data.columns]).show()

# It turns out that the full I94 dataset does contain some NULL values in the column 'depdate'. Actually this is 
# a reasonable result, as visitors coming to US may still stay in the US territory when the dataset was 
# being recorded. However, these NULL values will cause some troubles when we uniformly convert SAS date values to 
# regular date format data. Because of that, some conditional processing mechanism may be needed when performing 
# the SAS date conversion.

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+--------+-------+-------+--------+-------+-------+-------+-------+--------+-------+------+------+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|   occup|entdepa|entdepd| entdepu|matflag|biryear|dtaddto| gender|  insnum|airline|admnum| fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+--------+-------+-------+--------+-------+-------+-------+-------+--------+-------+------+------+--------+
|    0|    0|     0| 28575|     0|      0|      0|  73949|2027926|3308012|  9517|      0|    0|  131050|24032175|40597574|   2404|3287909|40777323|3219581|   9517| 101551|4079983|35678095|1308066|     0|333922|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------

In [15]:
# Load raw data from I94_SAS_Labels_Descriptions.SAS file into Pyspark DataFrames in 3 steps
# Step 1 : Save raw data as Python dict objects (code snippets borrowed from Udacity Knowledge)
# https://knowledge.udacity.com/questions/801811
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic
i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa_cat = {
'1':'Business', 
'2': 'Pleasure', 
'3' : 'Student'}

# https://www.trade.gov/i-94-arrivals-program
i94visa_type = {
'B1':'Visa Holder: Non-Immigrant Temporary Visitor for Business', 
'WB':'Visa Waiver Program: Temporary Visitor for Business admitted without a Visa', 
'GB':'Visa Waiver Program: Guam Visa Waiver Business', 
'GMB':'Guam Marianas Business', 
'I':'Visa Holder: Foreign Professional Journalist, Information Media, including Spouse and Child', 
'I1':'Visa Holder: Foreign Professional Journalist, Information Media, including Spouse and Child', 
'E1':'Visa Holder: Treaty Trader based on the Trade Treaty between the U.S. and Home Country', 
'E2':'Visa Holder: Treaty Investor based on the Treaty between the U.S. and Home Country', 
'B2':'Visa Holder: Non-Immigrant Temporary Visitor for Pleasure', 
'WT':'Visa Waiver Program: Temporary Visitor for Pleasure admitted without a Visa', 
'GT':'Visa Waiver Program: Guam Visa Waiver Tourist', 
'GMT':'Guam Marianas Tourist', 
'CP':'Parolee (Public Interest – Headquarters) (urgent, medical, family needs) (country code not equal to 584 “Cuba”)', 
'CPL':'Silent Parolee (do not disclose)', 
'SBP':'Silent Parolee at POE – CBP', 
'F1':'Visa Holder: Non-Immigrant Student and Exchange Visitor - Academic Student', 
'F2':'Visa Holder: Spouse or Child of Academic Student', 
'M1':'Visa Holder: Student pursuing a full course of study at an established vocational or other recognized non-academic institution (other than in a language training program)', 
'M2':'Visa Holder: Spouse or Child of M-1 Vocational Student'}

In [16]:
# Step 2 : Convert Python dict objects to Pandas DataFrames
df_i94cit_res   = pd.DataFrame(i94cit_res, index=[0]).transpose().reset_index().rename(columns={'index':'country_code', 0:'country_name'})
df_i94port      = pd.DataFrame(i94port, index=[0]).transpose().reset_index().rename(columns={'index':'airport_code', 0:'airport_location'})
df_i94mode      = pd.DataFrame(i94mode, index=[0]).transpose().reset_index().rename(columns={'index':'entry_code', 0:'entry_mode'})
df_i94addr      = pd.DataFrame(i94addr, index=[0]).transpose().reset_index().rename(columns={'index':'state_code', 0:'state_name'})
df_i94visa_cat  = pd.DataFrame(i94visa_cat, index=[0]).transpose().reset_index().rename(columns={'index':'visa_cat_code', 0:'visa_cat_desc'})
df_i94visa_type = pd.DataFrame(i94visa_type, index=[0]).transpose().reset_index().rename(columns={'index':'visa_type_code', 0:'visa_type_desc'})

In [17]:
# Step 3 : Convert Pandas DataFrames to PySpark DataFrames
# https://www.geeksforgeeks.org/how-to-convert-pandas-to-pyspark-dataframe/
spark_df_i94cit_res   = spark.createDataFrame(df_i94cit_res)
spark_df_i94port      = spark.createDataFrame(df_i94port)
spark_df_i94mode      = spark.createDataFrame(df_i94mode)
spark_df_i94addr      = spark.createDataFrame(df_i94addr)
spark_df_i94visa_cat  = spark.createDataFrame(df_i94visa_cat)
spark_df_i94visa_type = spark.createDataFrame(df_i94visa_type)

#### Cleaning Steps
Document steps necessary to clean the data

* I94 Immigration Data
    - Correcting the data types in the columns
    - Changing SAS date to YYYY-MM-DD format


* U.S. City Demographic Data
    - Correcting the data types in the columns
    - Changing null values in numerical fields to 0

In [18]:
# Performing cleaning tasks here

In [19]:
# Perform type conversion on i94_data
# https://sparkbyexamples.com/pyspark/pyspark-cast-column-type/
# https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html
spark_df_i94_data = spark_df_i94_data.\
                  withColumn("cicid", spark_df_i94_data.cicid.cast('int')).\
                  withColumn("i94yr", spark_df_i94_data.i94yr.cast('int')).\
                  withColumn("i94mon", spark_df_i94_data.i94mon.cast('int')).\
                  withColumn("i94cit", spark_df_i94_data.i94cit.cast('int').cast('string')).\
                  withColumn("i94res", spark_df_i94_data.i94res.cast('int').cast('string')).\
                  withColumn("arrdate", spark_df_i94_data.arrdate.cast('int')).\
                  withColumn("i94mode", spark_df_i94_data.i94mode.cast('int').cast('string')).\
                  withColumn("depdate", spark_df_i94_data.depdate.cast('int')).\
                  withColumn("i94bir", spark_df_i94_data.i94bir.cast('int').cast('string')).\
                  withColumn("i94visa", spark_df_i94_data.i94visa.cast('int').cast('string')).\
                  withColumn("count", col("count").cast('int')).\
                  withColumn("dtadfile", to_date(col("dtadfile"), "yyyyMMdd")).\
                  withColumn("biryear", spark_df_i94_data.biryear.cast('int')).\
                  withColumn("dtaddto", to_date(col("dtaddto"), "MMddyyyy")).\
                  withColumn("admnum", spark_df_i94_data.admnum.cast('int').cast('string'))

In [20]:
# Perform SAS date format conversion on i94_data
# https://knowledge.udacity.com/questions/741863
# https://stackoverflow.com/questions/26923564/convert-sas-numeric-to-python-datetime
# https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html
sasdate_convert = udf(lambda x: datetime.datetime(1960, 1, 1) + datetime.timedelta(days=x) if x else None, returnType=types.DateType())

spark_df_i94_data = spark_df_i94_data.\
                  withColumn("arrdate", sasdate_convert(spark_df_i94_data.arrdate)).\
                  withColumn("depdate", sasdate_convert(spark_df_i94_data.depdate))

In [21]:
# Perform type conversion, NULL value filling, column renaming on us_city_demo
spark_df_us_city_demo = spark_df_us_city_demo.\
                      withColumn("Male Population", col("Male Population").cast('int')).\
                      withColumn("Female Population", col("Female Population").cast('int')).\
                      withColumn("Number of Veterans", col("Number of Veterans").cast('int')).\
                      withColumn("Foreign-born", col("Foreign-born").cast('int')).\
                      withColumn("Average Household Size", col("Average Household Size").cast('int'))

spark_df_us_city_demo = spark_df_us_city_demo.fillna(
    {'Male Population':0, \
     'Female Population':0, \
     'Number of Veterans':0, \
     'Foreign-born':0, \
     'Average Household Size':0}
)

spark_df_us_city_demo = spark_df_us_city_demo.\
                      withColumnRenamed("City", "city").\
                      withColumnRenamed("State", "state").\
                      withColumnRenamed("Median Age", "median_age").\
                      withColumnRenamed("Male Population", "male_population").\
                      withColumnRenamed("Female Population", "female_population").\
                      withColumnRenamed("Total Population", "total_population").\
                      withColumnRenamed("Number of Veterans", "number_of_veterans").\
                      withColumnRenamed("Foreign-born", "foreign_born").\
                      withColumnRenamed("Average Household Size", "average_household_size").\
                      withColumnRenamed("State Code", "state_code").\
                      withColumnRenamed("Race", "race").\
                      withColumnRenamed("Count", "count")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

We will create a star schema to optimize queries focused on analysis of US immigration data and US city demographic data. Eventually the following tables would be created.

Fact Table

**i94_data** - I94 Immigration Data
* *cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, i94mode, i94addr, depdate, i94bir, i94visa, count, dtadfile, visapost, occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, gender, insnum, airline, admnum, fltno, visatype*

Dimension Tables

**i94cit_res** - country codes of COC (Country of Citizenship) and COR (Country of Residence)
* *country_code, country_name*

**i94port** - simplified version of airport code list
* *airport_code, airport_location*

**i94mode** - the method how a traveller enters the United States
* *entry_code, entry_mode*

**i94addr** - simplified version of demographic data
* *state_code, state_name*

**i94visa_cat** - list of visa categories
* *visa_cat_code, visa_cat_desc*

**i94visa_type** - class of admission legally admitting the non-immigrant to temporarily stay in U.S.
* *visa_type_code, visa_type_desc*

**us_city_demo** - U.S. City Demographic Data
* *city, state, median_age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, state_code, race, count*

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Gather necessary datasets (all the raw data files that may be needed by the analysts)
2. Load (Extract) the raw data into some staging area. Depending on the choice of the software, it may be some temporary database tables, or some in-memory dataframes
3. Assess the loaded data and perform any necessary cleaning/conversion tasks (Transform) before making the data public
4. Move the prepared, ready-for-use data to some permanent, non-volatile storage and make the data available for normal use (aka "Load" phase in ETL process). Depending on the choice of the software, this step may save the data to another set of permanent database tables, or save the in-memory dataframes to some data files written in some portable formats (eg. Parquet)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [22]:
# Write code here

In [23]:
spark_df_i94_data.createOrReplaceTempView("i94_data")

table_i94_data = spark.sql("""
SELECT cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, i94mode, i94addr, depdate, i94bir, i94visa, count, dtadfile, visapost, occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, gender, insnum, airline, admnum, fltno, visatype
FROM i94_data
""")

table_i94_data.write.parquet('/home/workspace/parquet/table_i94_data', mode='overwrite')

In [24]:
spark_df_i94cit_res.createOrReplaceTempView("i94cit_res")

table_i94cit_res = spark.sql("""
SELECT country_code, country_name
FROM i94cit_res
""")

table_i94cit_res.write.parquet('/home/workspace/parquet/table_i94cit_res', mode='overwrite')

In [25]:
spark_df_i94port.createOrReplaceTempView("i94port")

table_i94port = spark.sql("""
SELECT airport_code, airport_location
FROM i94port
""")

table_i94cit_res.write.parquet('/home/workspace/parquet/table_i94port', mode='overwrite')

In [26]:
spark_df_i94mode.createOrReplaceTempView("i94mode")

table_i94mode = spark.sql("""
SELECT entry_code, entry_mode
FROM i94mode
""")

table_i94mode.write.parquet('/home/workspace/parquet/table_i94mode', mode='overwrite')

In [27]:
spark_df_i94addr.createOrReplaceTempView("i94addr")

table_i94addr = spark.sql("""
SELECT state_code, state_name
FROM i94addr
""")

table_i94addr.write.parquet('/home/workspace/parquet/table_i94addr', mode='overwrite')

In [28]:
spark_df_i94visa_cat.createOrReplaceTempView("i94visa_cat")

table_i94visa_cat = spark.sql("""
SELECT visa_cat_code, visa_cat_desc
FROM i94visa_cat
""")

table_i94visa_cat.write.parquet('/home/workspace/parquet/table_i94visa_cat', mode='overwrite')

In [29]:
spark_df_i94visa_type.createOrReplaceTempView("i94visa_type")

table_i94visa_type = spark.sql("""
SELECT visa_type_code, visa_type_desc
FROM i94visa_type
""")

table_i94visa_type.write.parquet('/home/workspace/parquet/table_i94visa_type', mode='overwrite')

In [30]:
spark_df_us_city_demo.createOrReplaceTempView("us_city_demo")

table_us_city_demo = spark.sql("""
SELECT city, state, median_age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, state_code, race, count
FROM us_city_demo
""")

table_us_city_demo.write.parquet('/home/workspace/parquet/table_us_city_demo', mode='overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

In [30]:
# Check the number of records loaded into each dataframe, should be greater than zero
# https://www.geeksforgeeks.org/get-number-of-rows-and-columns-of-pyspark-dataframe/
print('Total number of rows in table {} is: {}'.format('i94_data', spark_df_i94_data.count()))

print('Total number of rows in table {} is: {}'.format('i94cit_res', spark_df_i94cit_res.count()))
print('Total number of rows in table {} is: {}'.format('i94port', spark_df_i94port.count()))
print('Total number of rows in table {} is: {}'.format('i94mode', spark_df_i94mode.count()))
print('Total number of rows in table {} is: {}'.format('i94addr', spark_df_i94addr.count()))
print('Total number of rows in table {} is: {}'.format('i94visa_cat', spark_df_i94visa_cat.count()))
print('Total number of rows in table {} is: {}'.format('i94visa_type', spark_df_i94visa_type.count()))

print('Total number of rows in table {} is: {}'.format('us_city_demo', spark_df_us_city_demo.count()))

Total number of rows in table i94_data is: 40790529
Total number of rows in table i94cit_res is: 289
Total number of rows in table i94port is: 660
Total number of rows in table i94mode is: 4
Total number of rows in table i94addr is: 55
Total number of rows in table i94visa_cat is: 3
Total number of rows in table i94visa_type is: 19
Total number of rows in table us_city_demo is: 2891


In [31]:
# Check if there is any NULL value found in the columns which would be declared as primary keys in a SQL database
# https://sparkbyexamples.com/pyspark/pyspark-filter-rows-with-null-values/
spark_df_i94_data.filter("cicid IS NULL").show()

spark_df_i94cit_res.filter("country_code IS NULL").show()
spark_df_i94port.filter("airport_code IS NULL").show()
spark_df_i94mode.filter("entry_code IS NULL").show()
spark_df_i94addr.filter("state_code IS NULL").show()
spark_df_i94visa_cat.filter("visa_cat_code IS NULL").show()
spark_df_i94visa_type.filter("visa_type_code IS NULL").show()

spark_df_us_city_demo.filter("city IS NULL OR state IS NULL").show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+

+------------+------------+
|country_code|country_name|
+------------+------------+
+------------+------------+

+------------+----------------+
|airpo

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Table *i94_data*:

| Field      | Description |
| ---------- | ----------- |
| cicid      | CIC Record ID |
| i94yr      | 4 digit year |
| i94mon     | Numeric month |
| i94cit     | Country of citizenship (COC) |
| i94res     | Country of residence (COR) |
| i94port    | Airport code |
| arrdate    | Arrival Date |
| i94mode    | Mode of Transport |
| i94addr    | First Address |
| depdate    | Departure Date |
| i94bir     | Age of Respondent in Years |
| i94visa    | Visa codes collapsed into three categories |
| count      | Used for summary statistics |
| dtadfile   | Character Date Field - Date added to I-94 Files - CIC does not use |
| visapost   | Department of State where where Visa was issued - CIC does not use |
| occup      | Occupation that will be performed in U.S. - CIC does not use |
| entdepa    | Arrival Flag - admitted or paroled into the U.S. - CIC does not use |
| entdepd    | Departure Flag - Departed, lost I-94 or is deceased - CIC does not use |
| entdepu    | Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use |
| matflag    | Match flag - Match of arrival and departure records |
| biryear    | 4 digit year of birth |
| dtaddto    | Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use |
| gender     | Non-immigrant sex |
| insnum     | INS number |
| airline    | Airline used to arrive in U.S. |
| admnum     | Admission Number |
| fltno      | Flight number of Airline used to arrive in U.S. |
| visatype   | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

Table *us_city_demo*:

| Field      | Description |
| ---------- | ----------- |
| city       | US city name |
| state        | US state name |
| median_age       | Median Age |
| male_population       | Male population count |
| female_population       | Female population count |
| total_population       | Total population count |
| num_of_veterans       | Veteran count |
| foreign_born       | Foreign born count |
| avg_household_size       | Average household size |
| state_code       |US state abbreviation |
| race       | Race identifier |
| count       | Count of individuals of each race |

Table *i94cit_res*:

| Field      | Description |
| ---------- | ----------- |
| country_code | 3 digit code of country |
| country_name | Name of country |

Table *i94port*:

| Field      | Description |
| ---------- | ----------- |
| airport_code | 3 alphabet code of the airport |
| airport_location | Detailed location of the airport |

Table *i94mode*:

| Field      | Description |
| ---------- | ----------- |
| entry_code | 1 digit code of the mode of transport |
| entry_mode | Detailed description of mode of transport |

Table *i94addr*:

| Field      | Description |
| ---------- | ----------- |
| state_code | Abbr. of US state |
| state_name | Name of US state |

Table *i94visa_cat*:

| Field      | Description |
| ---------- | ----------- |
| visa_cat_code | 1 digit code of the visa category |
| visa_cat_desc | Description of the visa category |

Table *i94visa_type*:

| Field      | Description |
| ---------- | ----------- |
| visa_type_code | 1 ~ 3 alphanumeric code of the visa type |
| visa_type_desc | Description of the visa type |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

In this project a local Apache Spark installation is being used as the main datastore. Because of the features it offers (SQL-like querying, raw data always available, performance of in-memory dataframes), Spark is quite versatile for the prototyping of the ETL pipelines, and the ETL logics can be ported to other database solutions (SQL-based RDBMS or data warehouse) with moderate efforts when necessary.

* Propose how often the data should be updated and why.

I94 Immigration Data is recommended to be updated monthly, because the raw data files seem to be released on a monthly basis. Refer to [the official website of the dataset](https://www.trade.gov/i-94-arrivals-program) for more information on detailed release schedule.

It seems that the U.S. City Demographic Data is expected to be updated in [every 10 years](https://demographics.coopercenter.org/guide-to-publicly-available-demographic-data), which implies this dataset is relatively static, and may not be updated for years.

Other additional datasets being used in this project depends on the update release schedule of The International Trade Administration (ITA), U.S. Department of Commerce, because these data (embedded in I94_SAS_Labels_Descriptions.SAS file) were part of the paid dataset purchased from ITA.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

When the size of the data was **increased by 100x**, it means the data would be expected to grow exponentially. There are some technologies that can simultaneously allow nearly infinite storage expansion as well as provide adequate scalability, like [Apache Cassandra](https://cassandra.apache.org/_/case-studies.html) NoSQL database, and it can be deployed on-premises. If we prefer to use cloud-based services, then AWS EMR (cluster mode Spark) and AWS S3 (cloud-based storage) would be a good start.

When the data populates a dashboard that must be **updated on a daily basis by 7am every day**, it means the execution of the ETL pipelines should be scheduled and automated. The [Apache Airflow](https://airflow.apache.org/) introduced in the previous courses should serve this situation very well, but whether the Airflow is deployed on-premises or cloud-based depends on the budget we have. And remember to schedule the ETL tasks based on the recorded execution time of the current ETL pipelines - for example, if you ETL pipelines take more than 1 hour to finish, then you may need to schedule your tasks to start at least 2 hours before 7am every day.

When the database needed to be **accessed by more than 100 people**, it means the concurrency control becomes an issue thet needs to be looked after. It may be necessary to migrate part or all the data to some [ACID](https://en.wikipedia.org/wiki/ACID)-compliant databases to ensure all the online users can get the expected, desirable results at any time they access the database.