# Analysis of United States Immigration Data and Creating Spark-warehouse 
### Data Engineering Capstone Project

#### Project Summary
The goal of the project is to create a spark-warehouse with fact and dimension tables in star schema. This objective is achieved by integrating data from different data sources for the purpose of data analysis and future backened querying. Additionally, the U.S Customs & Border Protection Department could leverage this source-of-truth database to open the solution through a web API so backend web services could query the warehouse for information relating to international visitors.

The project follows the following steps:
* Step 1: Describes the scope of the Project and gathers data
* Step 2: Explore and Assess the Data
* Step 3: Defines the Data Model
* Step 4: Runs ETL to Model the Data
* Step 5: Completes Project Write Up

In [3]:
# Do all imports and installs here
import configparser
import os
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType, LongType, StringType
import datetime
from pyspark.sql.functions import date_add, when, lower, isnull, dayofmonth, hour, weekofyear, dayofweek, date_format, udf, col, lit, year, month, upper, to_date
from pyspark.sql.functions import lit, udf, col
from pyspark.sql.types import DateType
from datetime import timedelta, datetime
from pyspark.sql.functions import monotonically_increasing_id
import re

### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of the project is to integrate different types of data sources to create a spark-warehouse of fact and dimension tables in Star Schema. This star schema model will help the BI and Data Analysts to query the database schema with minimum number of SQL JOINs, therby increasing the prformance.

##### Data Sources:
   * **I94 Immigration Dataset:** This data comes from the US National Tourism and Trade Office. Source: [I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office)
   * **World Temperature Data:** This dataset came from Kaggle. Source: [World Temperature Data](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data)
   * **U.S. City Demographic Data:** This data comes from OpenSoft. Source: [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

The descriptions contained in I94_SAS_Labels_Descriptions.SAS file will also be utilized.

##### Tools and Pre-requisites:
* For performing Exploratory data analysis on small dataset, Pandas library is used.It is helpful to efficiently load and manipulate data
* For performing Exploratory data analysis on large dataset, Pyspark library is used. In future, we can also think of using Spark available within Amazon EMR for distributed processing
* Jupyter Notebook is used to show the data structure and the need for data cleaning. Language used is Python since I am most comfortable with this language. For assessing the data, I have also used SQL language.
* configparser python 3 is needed to run the python scripts


#### Describe and Gather Data 

* **I94 Immigration Data Set:** U.S. immigration officers issued the I-94 Form (Arrival/Departure Record) to foreign visitors (e.g., business visitors, tourists and foreign students) who lawfully entered the United States. The I-94 is a white paper form that a foreign visitor received from cabin crews on arrival flights and from U.S. Customs and Border Protection at the time of entry into the United States. It listed the traveler's immigration category, port of entry, data of entry into the United States, status expiration date and had a unique 11-digit identifying number assigned to it. Its purpose is to record the traveler's lawful admission to the United States. This is the main dataset and there is a file for each month of the year of 2016 available in the directory ../../data/18-83510-I94-Data-2016/ in the SAS binary database storage format sas7bdat. When combined, the 12 datasets have than 40 million rows (40,790,529) and 28 columns. For most of the work, only the month of April of 2016 data is used which has more than three million records (3,096,313).

* **World Temperature Data:** The dataset provides a long period of the world's montly average temperature (from year 1743 to 2013) at different country in the world wide. The Berkeley Earth, which is affiliated with Lawrence Berkeley National Laboratory, has repackaged the data from a newer compilation. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives. It is nicely packaged and allows for slicing into interesting subsets (for example by country). They publish the source data and the code for the transformations they applied. They also use methods that allow weather observations from shorter time series to be included, meaning fewer observations need to be thrown away. In the original dataset from Kaggle, several files are available but in this capstone project we will be using only the GlobalLandTemperaturesByCity.csv

* **U.S. City Demographic Data:** This data comes from the US Census Bureau's 2015 American Community Survey. This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. We will be using us-cities-demographics.csv file.

##### Reading immigration dataset here: 
The immigration dataset is large, containing almost 3 million records, hence have will use a subset of approx 1000 rows in a csv to explore.

In [4]:
df_immigration_subset = pd.read_csv("immigration_data_sample.csv")

In [5]:
pd.set_option('display.max_columns', 50)
df_immigration_subset.head(10)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,20635.0,48.0,2.0,1.0,20160412,,,T,O,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,33.0,2.0,1.0,20160402,,,G,O,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,20575.0,39.0,2.0,1.0,20160428,,,O,O,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,35.0,2.0,1.0,20160401,,,O,O,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


In [6]:
df_immigration_subset.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

The definition of these fields is included in the file I94_SAS_Labels_Descriptions.SAS.

##### Reading global temperature dataset here

In [7]:
df_global_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')

In [8]:
df_global_temperature.shape

(8599212, 7)

In [9]:
df_global_temperature.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### Reading demographics dataset here

In [10]:
df_demographics = pd.read_csv('us-cities-demographics.csv', delimiter = ';')

In [11]:
df_demographics.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [12]:
df_demographics.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
* To explore the dataset which is in SAS binary database storage format - sas7bdat, Pyspark library's SparkSession will be utilized
* Datasets have been explored using pandas dataframe and sql
* These datasets will be used to create fact and dimension tables. Hence data set splitting will be performed and various cleaning steps will be taken into account based on the Exploratory data analysis

##### Exploring Immigration dataset using Spark: i94_apr16_sub.sas7bdat

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [14]:
#write to parquet
# df_immigration.write.parquet("sas_data")
df_immigration = spark.read.parquet("sas_data")
df_immigration.head(2)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1')]

In [15]:
df_immigration.createOrReplaceTempView("immigration_table")

In [16]:
df_immigration.count()

3096313

In [17]:
# Confirming if 'cicid' column has unique values
spark.sql("""
SELECT COUNT(DISTINCT cicid)
FROM immigration_table
"""
).show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



In [18]:
# In the provided "I94_SAS_Labels_Descriptions.SAS" file, i94port is a code of three character long.
# Confirming if the same applies to the "immigration_table"
spark.sql("""
SELECT DISTINCT LENGTH(i94port) AS i94Port_code_length
FROM immigration_table
""").show()

+-------------------+
|i94Port_code_length|
+-------------------+
|                  3|
+-------------------+



In [19]:
# Confirming the distinct values present within "i94visa" column
spark.sql("""
SELECT DISTINCT i94visa AS distinct_i94visa
FROM immigration_table
ORDER BY distinct_i94visa
""").show()

+----------------+
|distinct_i94visa|
+----------------+
|             1.0|
|             2.0|
|             3.0|
+----------------+



As per the "I94_SAS_Labels_Descriptions.SAS" file info, The three categories are:

* 1 = Business
* 2 = Pleasure
* 3 = Student

In [20]:
# Confirming the types of values in "i94mode" column
spark.sql("""
SELECT i94mode, COUNT(i94mode)
FROM immigration_table
GROUP BY i94mode
""").show()

+-------+--------------+
|i94mode|count(i94mode)|
+-------+--------------+
|   null|             0|
|    1.0|       2994505|
|    3.0|         66660|
|    2.0|         26349|
|    9.0|          8560|
+-------+--------------+



The i94mode definition as per the dictonary is as mentioned below.We see that almost majority of the data is related to "Air" travel, ie. appr 2994505 records. We do not have to ignore the other modes, since we might need to keep that also in our datawarehouse for future reference and statistics.

* 1 = 'Air'
* 2 = 'Sea'
* 3 = 'Land'
* 9 = 'Not reported'

In [21]:
#Checking for null values in "i94bir" column
spark.sql("""
SELECT COUNT(*)
FROM immigration_table
WHERE i94bir IS NULL
""").show()

+--------+
|count(1)|
+--------+
|     802|
+--------+



In [22]:
#Since we have null values in "i94bir" column checking for null values in "biryear" column
spark.sql("""
SELECT COUNT(biryear) FROM immigration_table WHERE biryear IS NULL
""").show()

+--------------+
|count(biryear)|
+--------------+
|             0|
+--------------+



In [23]:
# Confirming if the "gender" column is useable
spark.sql("""
SELECT gender, COUNT(gender) AS gender_count
FROM immigration_table
GROUP BY gender
""").show()

+------+------------+
|gender|gender_count|
+------+------------+
|     F|     1302743|
|  null|           0|
|     M|     1377224|
|     U|         467|
|     X|        1610|
+------+------------+



We analyse that most of the records fall into Female and Male categories, and some of the immigrants do not wish to reveal the gender. There are no NULL values in this column.

In [24]:
# Confirming citizenship-"i94cit" and residence-"i94res" columns to see if any values are missing
spark.sql("""
SELECT COUNT(*) AS i94cit_null_count
FROM immigration_table
WHERE i94cit IS NULL
""").show()

spark.sql("""
SELECT COUNT(*) AS i94res_null_count
FROM immigration_table
WHERE i94res IS NULL
""").show()

+-----------------+
|i94cit_null_count|
+-----------------+
|                0|
+-----------------+

+-----------------+
|i94res_null_count|
+-----------------+
|                0|
+-----------------+



In [25]:
# Confirming the length of i94cit and i94res columns
spark.sql("""
SELECT DISTINCT LENGTH(CAST(i94cit AS BIGINT)) AS i94cit_len
FROM immigration_table
""").show()

spark.sql("""
SELECT DISTINCT LENGTH(CAST(i94res AS BIGINT)) AS i94res_len
FROM immigration_table
""").show()

+----------+
|i94cit_len|
+----------+
|         3|
+----------+

+----------+
|i94res_len|
+----------+
|         3|
+----------+



In [26]:
# Checking the relevence of "visatype" column
spark.sql("""
SELECT COUNT(*) AS visatype_null_count
FROM immigration_table
WHERE visatype IS NULL
""").show()

+-------------------+
|visatype_null_count|
+-------------------+
|                  0|
+-------------------+



In [27]:
# Studying the distint values in "visatype" column
spark.sql(
"""
SELECT visatype, COUNT(visatype)
FROM immigration_table
GROUP BY visatype
ORDER BY visatype
"""
).show()

+--------+---------------+
|visatype|count(visatype)|
+--------+---------------+
|      B1|         212410|
|      B2|        1117897|
|      CP|          14758|
|     CPL|             10|
|      E1|           3743|
|      E2|          19383|
|      F1|          39016|
|      F2|           2984|
|     GMB|            150|
|     GMT|          89133|
|       I|           3176|
|      I1|            234|
|      M1|           1317|
|      M2|             49|
|     SBP|             11|
|      WB|         282983|
|      WT|        1309059|
+--------+---------------+



Since the I94_SAS_Labels_Descriptions.SAS file does not provide more information about different visa types, reference here https://travel.state.gov/content/travel/en/us-visas/visa-information-resources/all-visa-categories.html for detailed information. We will surely take this column into account since it might be useful for future statistics

In [28]:
# Analysing the relevance of "occup" column
spark.sql("""
SELECT occup, COUNT(*) occup_count
FROM immigration_table
GROUP BY occup
ORDER BY occup_count DESC, occup
""").show()

+-----+-----------+
|occup|occup_count|
+-----+-----------+
| null|    3088187|
|  STU|       4719|
|  OTH|        661|
|  NRR|        345|
|  MKT|        280|
|  EXA|        196|
|  GLS|        189|
|  ULS|        175|
|  ADM|        125|
|  TIE|        124|
|  MVC|        110|
|  ENO|         60|
|  CEO|         56|
|  TIP|         52|
|  RET|         50|
|  CMP|         47|
|  LLJ|         46|
|  PHS|         45|
|  UNP|         45|
|  HMK|         40|
+-----+-----------+
only showing top 20 rows



We find that majority of records have null in "occup" column. And for the others, the abbreviations are not useful, hence we will not consider this column in our data model

In [29]:
# Analysing the relevance of insnum column
spark.sql("""
SELECT insnum, COUNT(*) as insnum_count
FROM immigration_table
GROUP BY insnum
ORDER BY insnum_count DESC
""").show()

+------+------------+
|insnum|insnum_count|
+------+------------+
|  null|     2982605|
|  3692|        2155|
|  3697|        2033|
|  3703|        1986|
|  3893|        1866|
|  3661|        1820|
|  3693|        1690|
|  3939|        1680|
|  3672|        1678|
|  3882|        1673|
|  3943|        1662|
|  3679|        1635|
|  3949|        1584|
|  3945|        1568|
|  3691|        1535|
|  3690|        1488|
|  3954|        1470|
|  3694|        1450|
|  3890|        1394|
|  3941|        1331|
+------+------------+
only showing top 20 rows



Analysed, majority of records have null in "insnum" column. And for the others, the numbers are not useful, hence we will not consider this column in our data model

All the remaining columns have been ignored since they have lot of missing values and also we do not have relevant documentation details about it.

##### Exploring Global Temperature dataset: GlobalLandTemperaturesByCity.csv

In [30]:
df_global_temperature['Country'].nunique()

159

This dataset has recorded 159 unique countries temperature information, since the year 1743, but we are interested only in the "United States" data hence we will be filtering it

In [31]:
# Checking the most recent date in the dataset
df_global_temperature['dt'].max()

'2013-09-01'

Analysed that no temperature that's available here can be joined with our immigration dataset. But we will include this in our data model since we might get new data in the future and hence we will be able to join it with the immigration dataset 

In [32]:
# Checking the relevance of "latitude" and "longitude" columns
# Checking for a particular city and date
df_global_temperature[(df_global_temperature.City == 'Arlington') & (df_global_temperature.dt == '1950-02-01')]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
402597,1950-02-01,11.144,0.199,Arlington,United States,32.95N,96.70W
405836,1950-02-01,1.655,0.057,Arlington,United States,39.38N,76.99W


From the above analysis for Arlington city, we see that temperature is measured in different locations for each city. Hence we will also include "latitude" and "longitude" columns in our data model

##### Exploring U.S. City Demographic dataset: us-cities-demographics.csv

In [33]:
# Looking for missing values in this dataset
df_demographics.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

This dataset looks pretty clean with much few missing values hence no much cleaning is required apart from changing city and state columns to upper case

In [34]:
# Checking if City and Race column would help us to find unique records
df_demographics[df_demographics[['City', 'Race']].duplicated()].head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
177,Wilmington,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193
210,Lakewood,California,39.9,41523.0,40069.0,81592,4094.0,18274.0,3.13,CA,Hispanic or Latino,24987
238,Glendale,California,42.1,98181.0,102844.0,201025,4448.0,111510.0,2.69,CA,White,146718
300,Springfield,Massachusetts,31.8,74744.0,79592.0,154336,5723.0,16226.0,2.81,MA,Asian,5606
549,Bloomington,Indiana,23.5,40588.0,43227.0,83815,2368.0,10033.0,2.33,IN,Asian,9801


In [37]:
# Analysing a specific combination of City='LAKEWOOD' AND Race ='Hispanic or Latino'
df_demographics[(df_demographics.City == 'LAKEWOOD') & (df_demographics.Race == 'Hispanic or Latino')]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
27,LAKEWOOD,Colorado,37.7,76013.0,76576.0,152589,9988.0,14169.0,2.29,CO,Hispanic or Latino,33630
210,LAKEWOOD,California,39.9,41523.0,40069.0,81592,4094.0,18274.0,3.13,CA,Hispanic or Latino,24987


Studied that, the difference between these two records is in the State column, hence we add State also to identify unique records

In [38]:
# After adding State also, we check if any data is duplicated
df_demographics[df_demographics[['City', 'State', 'Race']].duplicated()].head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


Hence we will use City, State and Race for getting non duplicated record

#### Cleaning Steps

##### Cleaning and Creating fact and dimension tables from Immigration dataset

##### **Fact table**
##### ***fact_immigration_i94:***
* Create fact_immigration_i94 table by extracting the below mentioned columns:
    * 'cicid'
    * 'i94yr'
    * 'i94mon'
    * 'i94port'
    * 'i94addr'
    * 'arrdate'
    * 'depdate'
    * 'i94mode'
    * 'i94visa'
* Change the column names to an understandable format like mentioned below:
    * 'cic_id'
    * 'i94_year'
    * 'i94_month'
    * 'city_code'
    * 'state_code'
    * 'arrival_date'
    * 'departure_date'
    * 'i94_mode'
    * 'i94_visa'
* Cast these columns to its preferred datatype
* Drop all the duplicate values
* Add a country column with value 'United States'
* Transform arrival_date, departure_date from SAS time format to "YYYY-mm-dd" format

##### **Dimension tables**
##### ***dim_immigration_personal:***
* Create dim_immigration_personal table by extracting the below mentioned columns:
    * 'cicid'
    * 'i94cit'
    * 'i94res'
    * 'biryear'
    * 'gender'
* Change the column names to an understandable form like mentioned below:
    * 'cic_id'
    * 'citizen_of_country'
    * 'country_of_residence'
    * 'birth_year'
    * 'gender'
* Cast these columns to its preferred datatype
* Drop all the duplicate values

##### ***dim_immigration_airline:***
* Create dim_immigration_airline table by extracting the below mentioned columns:
    * 'cicid'
    * 'airline'
    * 'admnum'
    * 'fltno'
    * 'visatype'
* Change the column names to an understandable form like mentioned below:
    * 'cic_id'
    * 'airline'
    * 'admin_num'
    * 'flight_num'
    * 'visa_type'
* Cast these columns to its preferred datatype
* Drop all the duplicate values

##### Cleaning and Creating dimension table from World Temperature Dataset

##### ***dim_temperature:***
* Create dim_global_temperature table by extracting the below mentioned columns:
    * 'dt'
    * 'AverageTemperature'
    * 'AverageTemperatureUncertainty'
    * 'City'
    * 'Country'
    * 'Latitude'
    * 'Longitude'
* Filter just the "United States" data
* Change the column names to an understandable form like mentioned below:
    * 'date'
    * 'avg_temp'
    * 'avg_temp_uncertainty'
    * 'city'
    * 'country'
    * 'latitude'
    * 'longitude'
* Convert the date to date format and all other columns to preferred datatype
* Add year column which will be useful for querying
* Drop all the duplicate values

##### Cleaning and Creating dimension table from U.S. City Demographic Dataset

##### ***dim_demographics:***
* Create dim_demographic table by extracting the below mentioned columns:
    * 'City'
    * 'State'
    * 'Male Population'
    * 'Female Population'
    * 'Total Population'
    * 'Number of Veterans'
    * 'Foreign-born'
    * 'Race'
    * 'Median Age'
    * 'Average Household Size'
* Change the column names to an understandable form like mentioned below:
    * 'city'
    * 'state'
    * 'male_population'
    * 'female_population'
    * 'total_population'
    * 'num_of_vetarans'
    * 'foreign_born'
    * 'race'
    * 'median_age'
    * 'avg_household_size'
* Change city and state columns to upper case
* Change the columns to preferred datatype
* Drop all the duplicates

##### Parse I94_SAS_Labels_Descriptions.SAS Dataset to create the below mentioned look up tables:
* ***country_code:***
    * 'code'
    * 'country'


* ***city_code:***
    * 'code'
    * 'city'


* ***state_code:***
    * 'code'
    * 'state'

These cleaning steps have been taken care of in the **etl_spark_process.py** file. Please refer to this file for the detailed code relating to cleaning and transforming the raw data sources.This notebook is for the documentation purpose whereas the actual ETL data pipeline and data model creation is within the etl_spark_process.py file for the purpose of code reuseability and modularization.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
As discussed in the Project Summary, the scope of the project is to create spark-warehouse for making it easy for OLAP and Business Intelligence app to query with high performance and with much ease, our data model follows a star schema data modeling. 

For example, the Business Intelligence could join the fact_immigration_i94 and dim_immigration_personal, dim_immigration_airline tables for aggregating the data. 

Another join can be done with respect to aggregating the temperature and immigration data based on date, arrival_date and departure_date columns

fact_immigration_i94 can also be joined with dim_demographics table based on city and state. Also state_code and city_code can be used as look up tables

We have also added a year column to dim_temperature dataset which enables aggregating data based on year

The Star Schema will have the following fact and dimension tables:
* Fact table:
    * fact_immigration_i94
* Dimension tables:
    * dim_immigration_personal
    * dim_immigration_airline
    * dim_temperature
    * dim_demographics
    * country_code
    * city_code
    * state_code
    ![alt_text](StarSchema.png)
This Star schema model enables to search the database schema with the minimum number of SQL JOINs and hence increase performance while querying.

#### 3.2 Mapping Out Data Pipelines
1. Stage all the datasets as stated below:
    * `../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat` (Udacity have already loaded this into an attached disk)
    * `I94_SAS_Labels_Descriptions.SAS`
    * `../../data2/GlobalLandTemperaturesByCity.csv`(Udacity have already loaded this into an attached disk)
    * `us-cities-demographics.csv` 
    
2. Perform data cleaning as mentioned in **Step 4: Run Pipelines to Model the Data ->Create the data model-> Staging and Data Cleaning** section
3. Transform i94_apr16_sub.sas7bdat dataset to:
    * **fact_immigration_i94**: fact table, partitioned by state
    * **dimension tables:**
        * **dim_immigration_personal:** partitioned by country_of_residence
        * **dim_immigration_airline**
4. Transform I94_SAS_Labels_Descriptions.SAS dataset to get lookup tables:
    * **country_code**
    * **city_code**
    * **state_code** 
5. Transform GlobalLandTemperaturesByCity.csv dataset to get **dim_temperature** dimension table
6. Transform us-cities-demographics.csv dataset to **dim_demographics** dimension table
7. Store all the fact and dimension tables in parquet format thereby creating a spark warehouse

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Creating the data model

For creating the data model, I have used the etl_spark_process.py file. All the following steps have been taken care of while creating the data model:
* Staging the raw data sources using Spark
* Cleaning the datasets and splitting it to fact and dimenion tables mentioned in **Conceptual Data Model** section
* Storing it as parquet files, thereby creating a spark warehouse

The source of the code for processing I94_SAS_Labels_Descriptions.SAS is from https://knowledge.udacity.com/questions/125439.

In [39]:
# Creating the Star Schema data model
!python3 etl_spark_process.py

INFO:root:Creating Spark Session
https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark-2.4.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
saurfang#spark-sas7bdat added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1083dee8-5c34-4159-b4f2-0463a8c9116a;1.0
	confs: [default]
	found saurfang#spark-sas7bdat;2.1.0-s_2.11 in repo-1
	found com.epam#parso;2.0.10 in central
	found org.slf4j#slf4j-api;1.7.5 in central
	found org.apache.logging.log4j#log4j-api-scala_2.11;2.7 in central
	found org.scala-lang#scala-reflect;2.11.8 in central
	found org.apache.hadoop#hadoop-aws;2.7.0 in central
	found org.apache.hadoop#hadoop-common;2.7.0 in central
	found org.apache.hadoop#hadoop-annotations;2.7.0 in centra

#### 4.2 Data Quality Checks
* **Primary Data Quality Checks:**
* Verifying that data schema of all fact and dimension tables after the ETL data pipeline, matches the conceptual data model
* Verifying there are no empty tables after the ETL data pipeline process

* **Secondary Data Quality Checks**
* Verify the number of partitions in dim_immigration_personal folder, there should be 229 partitions
* Verifying the number of partitions in fact_immigration_i94 folder, there should be 457 partitions

In [40]:
# Performing data quality check to verify data schema of all fact and dimension tables matches the conceptual data model
path = ['fact_immigration_i94', 'dim_immigration_personal', 'dim_immigration_airline', 'dim_temperature', 'country_code', 'state_code', 'city_code', 'dim_demographics']
for file in path:
    df = spark.read.parquet(file)
    print("Table: " + file)
    df.printSchema()

Table: fact_immigration_i94
root
 |-- cic_id: integer (nullable = true)
 |-- i94_year: integer (nullable = true)
 |-- i94_month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- departure_date: string (nullable = true)
 |-- i94_mode: integer (nullable = true)
 |-- i94_visa: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- state_code: string (nullable = true)

Table: dim_immigration_personal
root
 |-- cic_id: integer (nullable = true)
 |-- citizen_of_country: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- country_of_residence: integer (nullable = true)

Table: dim_immigration_airline
root
 |-- cic_id: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- admin_num: integer (nullable = true)
 |-- flight_num: string (nullable = true)
 |-- visa_type: string (nullable = true)

Table: dim_temperature
root
 |-- date: date (nul

In [41]:
# Performing data quality check to verify there are no empty tables after the ETL data pipeline process
path = ['fact_immigration_i94', 'dim_immigration_personal', 'dim_immigration_airline', 'dim_temperature', 'country_code', 'state_code', 'city_code','dim_demographics']
for file in path:
    df = spark.read.parquet(str(file))
    num_records = df.count()
    if num_records <= 0:
        raise ValueError("This table is empty!")
    else:
        print(file + " table" + f" has {num_records} records")

fact_immigration_i94 table has 3096313 records
dim_immigration_personal table has 3096313 records
dim_immigration_airline table has 3096313 records
dim_temperature table has 687289 records
country_code table has 289 records
state_code table has 55 records
city_code table has 660 records
dim_demographics table has 2891 records


In [43]:
df_immigration.createOrReplaceTempView("raw_immigration_table")

In [44]:
# Verifying number of unique i94addr which is mapped to "state_code" in our resultant "fact_immigration_i94" table
spark.sql("""
SELECT COUNT(DISTINCT i94addr)
FROM raw_immigration_table
""").show()

+-----------------------+
|count(DISTINCT i94addr)|
+-----------------------+
|                    457|
+-----------------------+



In [45]:
# Checking the number of partitions in "fact_immigration_i94" folder
# There should be 457 partitions
file_path = "fact_immigration_i94"
totalDir = 0
for base, dirs, files in os.walk(file_path):
    for directories in dirs:
        if directories != 'state_code=__HIVE_DEFAULT_PARTITION__':
            totalDir += 1
print(f"Number of partitions based on state_code is {totalDir}")

Number of partitions based on state_code is 457


In [46]:
# Verifying number of unique i94res which is mapped to "country_of_residence" in our resultant "dim_immigration_personal" table
spark.sql("""
SELECT COUNT(DISTINCT i94res)
FROM raw_immigration_table
""").show()

+----------------------+
|count(DISTINCT i94res)|
+----------------------+
|                   229|
+----------------------+



In [47]:
# Verifying the number of partitions in dim_immigration_personal
# There should be 229 partitions
file_path = "dim_immigration_personal"
totalDir = 0
for base, dirs, files in os.walk(file_path):
    for directories in dirs:
        totalDir += 1
print(f"Number of partitions based on country_of_residence is {totalDir}")

Number of partitions based on country_of_residence is 229


#### 4.3 Data dictionary 
Please refer below for the data dictionary:
![fact_immigration_i94.png](fact_immigration_i94.png)
![dim_immigration_personal.png](dim_immigration_personal.png)
![dim_immigration_airline.png](dim_immigration_airline.png)
![country_state_city_code.png](country_state_city_code.png)
![dim_temperature.png](dim_temperature.png)
![dim_demographics.png](dim_demographics.png)

#### Step 5: Project Write Up
* **Rationale for the choice of tools and technologies:**
    * For performing Exploratory data analysis on small dataset, Pandas library is used. It is helpful to efficiently load and manipulate data
    * For performing Exploratory data analysis on large dataset, Pyspark library is used. We can also think of using Spark available within Amazon EMR for distributed processing
    * Jupyter Notebook is used to show the data structure and the need for data cleaning. Language used is Python since I am most comfortable with this language. For assessing the data, I have also used SQL language.
    * configparser python 3 is needed to run the python scripts
* **Frequency of Data Source:**
    * Since the raw dataset for immigration and temperature is built monthly, tables relating to these should be updated monthly
    * Since demography data collection takes time and high frequent demography might take high cost, tables relating to demography dataset could be updated annually
* **Future Design Considerations:**
     * The data was increased by 100x:
        * As the size of data source increases, store the raw datasourses and the parquet files(obtained after ETL data pipeline) in Amazon S3. 
        * Deploy this Spark solution on a cluster using AWS EMR cluster. AWS will easily scale when data increases by 100x.
     * The data populates a dashboard that must be updated on a daily basis by 7am every day.
        * Use Apache Airflow to run this on a schedule (use schedul_interval='daily'). Apache Airflow will take care of building ETL data pipeline to regularly update the data and populate a report.
        * Airflow can also integrate with Python and AWS very well
     * The database needed to be accessed by 100+ people.
        * AWS Redshift can be used since it has the capability of massive parallel processing and limitless concurrency to handle thousands of concurrent queries executed by users.

**Sample analysis using the created Data Model:**

Following is one of the sample analysis which could be performed with the created data model. This analyzes if the states with the most immigrants are also the states with more foreign-born people.

In [32]:
#Staging the fact_immigration_i94 table 
fact_immigration_analysis = spark.read.parquet('fact_immigration_i94')

In [33]:
fact_immigration_analysis.createOrReplaceTempView('fact_immigration_i94_tbl')

In [34]:
# Staging the dim_demographics table
dim_demographics_analysis = spark.read.parquet('dim_demographics')

In [35]:
dim_demographics_analysis.createOrReplaceTempView('dim_demographics_tbl')

In [36]:
# Staging the state_code table
state_code_analysis = spark.read.parquet("state_code")

In [37]:
state_code_analysis.createOrReplaceTempView("state_code_tbl")

In [38]:
# Joining dim_demographics_tbl with state_code_tbl to fetch the state_code column
spark.sql("""
SELECT dim_demographics_tbl.*, 
state_code_tbl.code AS state_code
FROM dim_demographics_tbl
JOIN state_code_tbl ON dim_demographics_tbl.state = state_code_tbl.state 
""").createOrReplaceTempView("dim_demographics_tbl")

In [39]:
# Creating a view with info of immigrants count per state
spark.sql("""
SELECT state_code, COUNT(cic_id) AS immigrants_count
FROM fact_immigration_i94_tbl
GROUP BY state_code
ORDER BY immigrants_count DESC
""").createOrReplaceTempView("immigrants_count_per_state")

In [40]:
# Creating a view with info of total foreign born per state
spark.sql("""
SELECT state_code, SUM(foreign_born) AS total_foreign_born
FROM dim_demographics_tbl
GROUP BY state_code
ORDER BY total_foreign_born DESC
""").createOrReplaceTempView("total_foreign_born_per_state")

In [45]:
# Joining the above two views for the purpose of analysing 
# number of immigrants per state versus number of foreign born per state and its related percentage
spark.sql("""
SELECT imm.state_code, immigrants_count, total_foreign_born,
ROUND((immigrants_count/total_foreign_born)*100,2) AS immigrant_to_foreign_born_ratio
FROM immigrants_count_per_state imm
JOIN total_foreign_born_per_state fb ON imm.state_code = fb.state_code
ORDER BY immigrants_count DESC
""").show()

+----------+----------------+------------------+-------------------------------+
|state_code|immigrants_count|total_foreign_born|immigrant_to_foreign_born_ratio|
+----------+----------------+------------------+-------------------------------+
|        FL|          621701|           7845566|                           7.92|
|        NY|          553677|          17186873|                           3.22|
|        CA|          470386|          37059662|                           1.27|
|        HI|          168764|            506560|                          33.32|
|        TX|          134321|          14498054|                           0.93|
|        NV|          114609|           2406685|                           4.76|
|        IL|           82126|           4632600|                           1.77|
|        NJ|           76531|           2327750|                           3.29|
|        MA|           70486|           2573815|                           2.74|
|        WA|           55792

Even though immigration dataset had approx 3m records, I could join this with demographics dataset to analyze the immigrants count versus foreign born count with much less query run time.

Based on the above data, we could assume the following:
* Though California, New York and Texas have the maximum foreign-born people due to the cosmopolitan culture, we could understand from the dataset that most of the immigrants are settled in Florida.
* States like Florida, Georgia, Nevada, etc. are having better immigrant to foreign born ratio compared to the other populous states. (Hawaii is an exception because of the cultural difference due to its late annexation to the US confederation)
* This could mean that immigrant settlement in a specific State might not only depend on the existing foreign-born population there.
* There could be other factors that may drive the immigration; better job opportunities, cost of living, healthcare benefits etc. But this may need additional data sources to conclude.

However more meaningful conclusion about the immigrant behavior could only be derived after analyzing better granular data.