# United States Immigration study

## Data Engineering Capstone Project

### Project Summary
This project aims to unveil information about immigration into the United States. Questions like how the flow of immigrants changes seasonally, if the destinations are dependent on temperature, and where the immigrants come from are some of the key questions this study is trying to answer.

Data are extracted from three different dataset sources, "I94 Immigration Data" from U.S. National Tourism and Trade Office, "Global Land Temperatures By City" from kaggle, and finally "Us Cities Demographics" from OpenSoft.

Apache Spark, Jupyter Notebook, and potentially Amazon Elastic MapReduce, EMR, are used to accomplish this ETL pipeline, which then writes parquet files either to an Amazon S3 bucket or locally to disk.

#### The project follows the following steps:
- Step 1: Scope the Project and Gather Data
- Step 2: Explore and Assess the Data
- Step 3: Define the Data Model
- Step 4: Run ETL to Model the Data
- Step 5: Complete Project Write Up


In [1]:
# Imports and installs 
import pandas as pd
import re
import glob

from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, udf, desc
from pyspark.sql.types import StringType
from IPython.display import display
from IPython.core.display import HTML

In [2]:
# makes pandas dataframe show every column
pd.set_option('display.max_columns', 35)

## Step 1: Scope the Project and Gather Data

#### Scope 

The scope for this project is to create a Data Lake using an ETL pipeline. This Jupyter Notebook presents the whole process, describing and assessing the datasets, and at the bottom, running the complete ETL pipeline.  Ideally, this notebook should run on an EMR cluster in AWS, especially if using the whole immigration dataset. For this study, the star schema is chosen, making it possible to execute the various queries outlined below.  


### The choice of tools and technologies.

Apache Spark and Jupyter Notebook are the primary processing tools for this ETL pipeline. The ability for Spark to handle vast amounts of data in parallel, and the fact that AWS EMR supports Spark out of the box, makes Spark a compelling choice. The Jupyter Notebook is not only a great choice as an interactive environment for writing and running code but also works well as a tool for documenting. 


#### Describing the datasets

- **I94 Immigration Data**: comes from the [U.S. National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html) and contains  
various statistics on international visitor arrival in the USA and comes from the US National  
Tourism and Trade Office. The dataset contains data from 2016.


- **Temperature Data**: comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) and is provided in a csv format, and includes  
information like average temperature, date, and city.


- **Demographic Data:** comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) and contains information about the demographics of all US cities such as race, male and female population, and median age. This dataset is from 2015.


#### This study tries to build an ETL and Data Model to answer the following questions:

- What cities in the U.S. are the most common for arriving immigrants?
- How does immigration change throughout a year?
- Where do the immigrants come from?
- What is the gender ratio for the immigrants?
- What is the age profile for the volume of immigrants?
- What is the legal status of immigrants in the U.S.

### Gather Data

The gathering of the data is done with pandas and spark dataFrames.  

In [3]:
# initializing SparkSession
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport()\
    .getOrCreate()

In [4]:
# set the write path to local or S3 bucket
local = './results/'
s3 = ''

### Reading in Immigration Data to pandas

In [5]:
# read in the sample csv to pandas for easy assessing
pd_immi_df = pd.read_csv('immigration_data_sample.csv')
print('Finished reading Immigration csv file to pandas')

Finished reading Immigration csv file to pandas


In [6]:
# first look at the Immigration Data
pd_immi_df.head(3)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT


### Reading in Immigration Data to Spark
This is to take advantage of the parallelization of spark, which pandas don't have. 

In [7]:
# reading in either the full immigration data or a sample to spark

# full imigration file, only use with EMR cluster
# big_file = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
#                                                         ^

# sample immigration file, only for April 2016
small_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# read in the data to Spark
spark_immi_df =spark.read.format('com.github.saurfang.sas.spark').load(small_file)

print('Finished reading Immigration sas.sample files to Spark.')

Finished reading Immigration sas.sample files to Spark.


In [8]:
# comparing with the spark Immigration Data
spark_immi_df.limit(3).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2


It looks similar the immigration_data_sample.csv 

Testing to see if everything works. Writing and reading the raw data, preferable to and from an S3 bucket.

In [9]:
# test writing and reading parquet files
spark_immi_df.write.parquet("sas_data", 'overwrite')
spark_immi_df = spark.read.parquet("sas_data")
print('Finished writing and reading')

Finished writing and reading


### Reading in Temperature Data to pandas
The original file is unaccessible skip this step for the temperature data and use this instead: Temperature_data.csv

In [10]:
# reading in the Temperature Data to pandas
pd_temp_df = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
print('Finished reading Temperature Data to pandas')

Finished reading Temperature Data to pandas


In [11]:
# first look at the Temperature Data
pd_temp_df.head(3)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


### Reading in Demographic Data to pandas

In [12]:
# reading in the Demographic Data to pandas
pd_demo_df = pd.read_csv('us-cities-demographics.csv', sep = ';')
print('Finished reading Demographic Data to pandas')

Finished reading Demographic Data to pandas


In [13]:
# first look at the Demographic Data
pd_demo_df.head(3)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759


### Step 2: Explore and Assess the Data

### Exploration of the Immigration Data

In [14]:
# overview of the Immigration Data
pd_immi_df.head(10)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,20635.0,48.0,2.0,1.0,20160412,,,T,O,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,33.0,2.0,1.0,20160402,,,G,O,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,20575.0,39.0,2.0,1.0,20160428,,,O,O,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,35.0,2.0,1.0,20160401,,,O,O,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


In [15]:
# counting non-NaNs for each column
pd_immi_df.count()

Unnamed: 0    1000
cicid         1000
i94yr         1000
i94mon        1000
i94cit        1000
i94res        1000
i94port       1000
arrdate       1000
i94mode       1000
i94addr        941
depdate        951
i94bir        1000
i94visa       1000
count         1000
dtadfile      1000
visapost       382
occup            4
entdepa       1000
entdepd        954
entdepu          0
matflag        954
biryear       1000
dtaddto       1000
gender         859
insnum          35
airline        967
admnum        1000
fltno          992
visatype      1000
dtype: int64

The column abbreviations are explained in the **I94_SAS_Labels_Descriptions.SAS** file and is as follows:

In [16]:
%%html
<style>
  table {margin-left: 0 !important;}
</style>

| Abbreviation | Description |
|------------|------------|  
|cicid     |    Immigrant ID number|
|i94yr     |    Year  |
|i94mon    |    Month  |
|i94cit    |    Which city they came from  
i94res     |   Which country they came from
i94port    |   Airport at arrival
 arrdate    |   Arrival Date in the USA  
 i94mode    |   Mode of transportation to the USA  
 i94addr    |   Address state  
 depdate    |   Departure Date from the USA  
 i94bir     |   Age of Respondent in Years  
| i94visa   |   Reasons for immigration: <br /> 1 = Business <br /> 2 = Pleasure <br /> 3 = Student |
 count      |   Used for summary statistics  
 dtadfile   |   Date added to I-94 Files  
 visapost   |   Department of State where where Visa was issued  
 entdepa    |   Admitted or paroled into the U.S.  
 entdepd    |   Departed  
 matflag    |   Match of arrival and departure records  
 biryear    |   Year of birth  
 dtaddto    |   Date to which admitted to U.S. (allowed to stay until)  
 gender     |   Gender  
 airline    |   Airline used to arrive in U.S.  
 admnum     |   Admission Number  
 fltno      |   Flight number of Airline used to arrive in U.S.  
 visatype   |   Class of admission legally admitting the <br /> non-immigrant to temporarily stay in U.S.|

Let's take a look at the **visatype** column. What types are there, and what do they mean?

In [17]:
# counting each visatype
spark_immi_df.groupBy('visatype').count().orderBy('count', ascending=False).show()

+--------+-------+
|visatype|  count|
+--------+-------+
|      WT|1309059|
|      B2|1117897|
|      WB| 282983|
|      B1| 212410|
|     GMT|  89133|
|      F1|  39016|
|      E2|  19383|
|      CP|  14758|
|      E1|   3743|
|       I|   3176|
|      F2|   2984|
|      M1|   1317|
|      I1|    234|
|     GMB|    150|
|      M2|     49|
|     SBP|     11|
|     CPL|     10|
+--------+-------+



Where 
- WT - Visitors entering for pleasure
- B2 - Visitors ("tourists"): Temporary Visitor for Pleasure or Medical Treatment
- WB - Visitors entering for business purposes
- B1 - Temporary Visa for Business Travelers and domestic servant  
...  
[See more here](https://travel.state.gov/content/travel/en/us-visas/visa-information-resources/all-visa-categories.html)

The **visatype** looks good, so I'll just leave the visa type as it is. 

In [18]:
# printing schema
spark_immi_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

The data types look acceptable. 

Checking the **i94mode** column. 

In [19]:
# creating a temporary spark sql table view
spark_immi_df.createOrReplaceTempView('immi_table')

In [20]:
spark.sql("""
SELECT i94mode, count(*)
FROM immi_table
GROUP BY 1
ORDER BY 2 DESC
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|    1.0| 2994505|
|    3.0|   66660|
|    2.0|   26349|
|    9.0|    8560|
|   null|     239|
+-------+--------+



where:

1 = Air <br />
2 = Sea <br />
3 = Land <br />
9 = Not reported  <br />

To keep the data consistent with airports, only the travels by air are kept.

Checking to see if **cicid** can be used as a primary key.

In [21]:
# taking the total amount of rows minus the amount of distinct cicid rows
# if 0, then cicid can be used as a primary key. 
spark.sql("""
SELECT count(*) AS total_amount_rows
FROM immi_table

MINUS

SELECT COUNT(DISTINCT(cicid)) AS distinct_rows_cicid
FROM immi_table
""").show()

+-----------------+
|total_amount_rows|
+-----------------+
+-----------------+



Since every row in cicid is distinct, cicid can be used as a primary key.

The **i94visa** column, or the reason for immigration, is divided into three categories: <br /> 1 = Business <br /> 2 = Pleasure <br /> 3 = Student  

In [22]:
# counting numbers of records for each reason for immigration
spark.sql("""
SELECT i94visa, count(*)
FROM immi_table
GROUP BY 1
ORDER BY 2 DESC
""").show()

+-------+--------+
|i94visa|count(1)|
+-------+--------+
|    2.0| 2530868|
|    1.0|  522079|
|    3.0|   43366|
+-------+--------+



In [23]:
# Let's check the gender column
spark_immi_df.groupby('gender').count().orderBy('count', ascending=False).show()

+------+-------+
|gender|  count|
+------+-------+
|     M|1377224|
|     F|1302743|
|  null| 414269|
|     X|   1610|
|     U|    467|
+------+-------+



Only 'M' and 'F' **genders** will be kept. 

In [24]:
# counting the number of each age
spark_immi_df.groupby('i94bir').count().orderBy('i94bir', ascending=False).show(20)

+------+-----+
|i94bir|count|
+------+-----+
| 114.0|    1|
| 111.0|    1|
| 110.0|    1|
| 109.0|    2|
| 108.0|    2|
| 107.0|    1|
| 105.0|    2|
| 103.0|    1|
| 102.0|    4|
| 101.0|    2|
| 100.0|   24|
|  99.0|   19|
|  98.0|   26|
|  97.0|   52|
|  96.0|   46|
|  95.0|   88|
|  94.0|  104|
|  93.0|  185|
|  92.0|  241|
|  91.0|  319|
+------+-----+
only showing top 20 rows



That a 114 year old has immigrated seems a bit strange, so I'll cap the age at 105. 

The codes in **i94cit** and **i94res** columns needs to be replaced with the corresponding city and country.

The **i94port** column shows the airports the immigrants arrived at, but to keep it compatible with dimension tables I will map the airport to two new columns, **'arrival_city'** and **'arrival_state'**.

### Explore the Temperature Data

In [25]:
# first look at the temperatur data
pd_temp_df.head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [26]:
# selecting the latest date
pd_temp_df['dt'].max()

'2013-09-01'

It looks like there is a measurement every month, but the dataset stops at '2013-09-01'. Since it, therefore, won't match with the immigrant data I'll go ahead and do the stupid thing and add 4 years to the 2012 records so that it matches up to the immigration records. It's not perfect, but I think it is a viable solution. I'll also filter for only the year 2012. 

In [27]:
# Which countries are in the data
pd_temp_df['Country'].nunique()

159

This study is also only looking at US data, so the other 158 countrys should be dropped.

In [28]:
# filtering records for the US
pd_temp_df = pd_temp_df[pd_temp_df['Country'] == 'United States']

In [29]:
# checking NaNs
pd_temp_df.count()

dt                               687289
AverageTemperature               661524
AverageTemperatureUncertainty    661524
City                             687289
Country                          687289
Latitude                         687289
Longitude                        687289
dtype: int64

NaNs in 'AverageTemperature' need to be dropped. 

City names letter case should match the letter case for city in the immigration table

In [30]:
# writing to csv for reproducibility
pd_temp_df.to_csv('Temperature_data.csv', index = False)
print('Finished writing csv file')

Finished writing csv file


### Explore the Demographics Data

In [31]:
# overview of the Demographic Data
pd_demo_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [32]:
# counting the nonNaNs for each column
pd_demo_df.count()

City                      2891
State                     2891
Median Age                2891
Male Population           2888
Female Population         2888
Total Population          2891
Number of Veterans        2878
Foreign-born              2878
Average Household Size    2875
State Code                2891
Race                      2891
Count                     2891
dtype: int64

City names letter case should once again match the letter case for city in the immigration table

In [33]:
# dropping the NaNs in every column
pd_demo_df = pd_demo_df.dropna()

In [34]:
# counting again to see how many NaNs got dropped
pd_demo_df.count()

City                      2875
State                     2875
Median Age                2875
Male Population           2875
Female Population         2875
Total Population          2875
Number of Veterans        2875
Foreign-born              2875
Average Household Size    2875
State Code                2875
Race                      2875
Count                     2875
dtype: int64

Should drop NaNs to keep the data clean.  
Checking to see if City and State can be used as a composite key.

In [35]:
# dropping duplicates to see how many distinct values there are
pd_demo_df[['City', 'State']].drop_duplicates().count()

City     588
State    588
dtype: int64

In [36]:
# maybe three columns would be more suitable
pd_demo_df[['City', 'State', 'Race']].drop_duplicates().count()

City     2875
State    2875
Race     2875
dtype: int64

Yes, that'll do.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

As explained in the scope of the project, the Data Model is a Data Lake leading into a star schema. As opposed to a Data Warehouse, where each schema is normalized and tailored to a specific query through the 'schema-on-write' approach, the Data Lake uses a 'schema-on-read' approach, which gives more flexibility to what queries could be made. 

To meet the requirements for the scope of the project a star schema with two dimensions is being made. The fact table consists of facts about the immigrants, and the two dimension tables will store information about temperatures per month per city and demographics for each city. This setup will hopefully allow analysts to make queries that answer the questions presented in the scope of the project. 

A diagram of the star schema is presented below.

![picss](./Table_diagram.png)

#### Immigration fact table
- id
- arrival_city
- arrival_state
- arrival_date
- departure_date
- year
- month
- country_citizenship
- country_residence
- age
- gender
- reason_for_immigration
- visatype

#### Temperature dimension table
- city
- date
- year_plus_4_years
- month
- average_temperature
- average_temperature_uncertainty

#### Demographics dimension table
- city
- state_code
- race
- median_age
- male_population
- total_population
- number_of_veterans
- foreign_born
- average_household
- count

### 3.2 Mapping Out Data Pipelines

#### Transformation steps for the Immigration Data
- Date values needs to be transformed to YYYY-MM-DD.
- Port values needs validating. 
- States needs to correspond to the state codes in the Demographic Data.
- Filter i94visa column by mode of transportation to only let travel by air.
- Set the reasons for immigration.
- Filter genders.
- Capping age at 105
- Convert i94res codes to new country_residence column
- Convert i94cit codes to new country_citizenship column

#### Temperature Data transforming steps
- Filter for only US using pandas
- Drop rows with NaN in AverageTemperature column
- Convert dt to datetime
- Only keep records from 2013
- For each city keep only one records per day 
- Make city names upper case to match fact table

#### Demographic Data transforming steps
- Make city names upper case
- Drop NaNs

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

The ETL pipeline will consist of the following sections:
- #### Extraction
    - Extraction of csv and sas files
    - Extraction for dictionaries from the I94_SAS_Labels_Descriptions
- #### Transformation
    - Transforming of the Immigration Data
    - Transforming of the Temperature Data
    - Transforming of the Demographic Data
- #### Loading
    - Loading of the data to S3 or locally

#### Extracting data

In [37]:
# extracting a fresh set of data

pd_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
pd_demographic = pd.read_csv('us-cities-demographics.csv', sep = ';')
spark_immigration =spark.read.format('com.github.saurfang.sas.spark')\
                             .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
print('Finished extracting data')

Finished extracting data


##### Extracting data from I94_SAS_Labels_Descriptions

In [38]:
# reading in the data from the Labels_Descriptions file
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"

# reading in all the lines
with open(i94_sas_label_descriptions_fname) as f:
    Labels_Descriptions = f.readlines()

In [39]:
# creating dictionaries


# making a dictionary with {ports: [city, state]} from the Labels_Descriptions file
re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in Labels_Descriptions[302:962]:
    results = re_compiled.search(line)
    key = results[1]
    value = results[2].split(",")
    if len(value) == 2:
        value[1] = value[1].strip(' ')
        valid_ports[key] = value
    else:
        valid_ports[key] = value

# valid_ports:

# {'ALC': ['ALCAN', 'AK'],
#  'ANC': ['ANCHORAGE', 'AK'],
#  'BAR': ['BAKER AAF - BAKER ISLAND', 'AK'],
#  'DAC': ['DALTONS CACHE', 'AK'], 
#   ...

        
# making a dictionary with country codes from the Labels_Descriptions file
re_compiled = re.compile(r"\   (.*)\ =.*\'(.*)\'")
country_codes = {}
for line in Labels_Descriptions[9:298]:
    results = re_compiled.search(line)
    country_codes[int(results[1])] = results[2]
    
# country_codes:    

# {582: 'MEXICO',
#  236: 'AFGHANISTAN',
#  101: 'ALBANIA',
#  316: 'ALGERIA',
#  ...

## Transforming the Immigration Data

#### Transformation steps for the Immigration Data
- Date values needs to be transformed to YYYY-MM-DD.
- Port values needs validating. 
- States needs to correspond to the state codes in the Demographic Data.
- Filter i94visa column by mode of transportation to only let travel by air.
- Set the reasons for immigration.
- Filter genders.
- Capping age at 105
- Convert i94res codes to new country_residence column
- Convert i94cit codes to new country_citizenship column

In [40]:
# number of rows in the original data
# spark_immigration.limit(4).toPandas()

The SAS System represents dates as the [number of days since a reference date](http://v8doc.sas.com/sashtml/ets/chap2/sect5.htm#:~:text=SAS%20date%20values%20are%20written,value%20for%2017%20October%201991.). The reference date, or date zero, used for SAS date values is 1 January 1960. Thus, for example, 3 February 1960 is represented by the SAS System as 33.

In [41]:
# transforming the arrdate column

# create udf to convert SAS date to YYYY-MM-DD
@udf(StringType())
def date_to_isoformat(x):
    """
    argument: SAS system date
    return: datetime YYYY-MM-DD
    
    This function takes the 'date zero' and adds the number of days
    delivered in the argument.
    """

    if x:
        
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

# convert sas date to spark dates in column arrdate and depdate
spark_immigration = spark_immigration.withColumn('arrdate', date_to_isoformat(spark_immigration.arrdate))
spark_immigration = spark_immigration.withColumn('depdate', date_to_isoformat(spark_immigration.depdate))

[Link to regex page](https://regex101.com/)

In [42]:
# generating the arrival_city column

# valid_ports:
# {'ALC': ['ALCAN', 'AK'],
# ...


@udf(StringType())
def port_to_city(port):
    """
    argument: airport
    return: city
    
    this function takes a port abbreviation as argument and 
    return a corresponding city from the valid_ports dict
    
    """
    if port in valid_ports:
        return valid_ports[port][0]
    return None
    

# generating an arrival_city column with the city names from valid_ports dictionary
spark_immigration = spark_immigration.withColumn('arrival_city', port_to_city(spark_immigration.i94port))

In [43]:
# removing null values from arrival_city column
spark_immigration = spark_immigration.filter('arrival_city is not null')

In [44]:
# generating the arrival_state column

# valid_ports:
# {'ALC': ['ALCAN', 'AK'],
#  'WAS': [WASHINGTON DC],
#  ...



@udf(StringType())
def port_to_state(port):
    """
    argument: (string) airport
    return: (string) state
    
    This function maps the airport to state
    """
    
    if port in valid_ports:
        # exception where:
        #          city == state
        # WASHINGTON DC == WASHINGTON DC
        # so when:
        # if len(valid_ports['WAS']) = 1: 
        #     then state == city
        
        if len(valid_ports[port]) == 2:
            return valid_ports[port][1]
        else:
            return valid_ports[port][0]


# generating a arrival_state column with state names
spark_immigration = spark_immigration.withColumn('arrival_state', port_to_state(spark_immigration.i94port))

In [45]:
# creating a temporary spark sql table view
spark_immigration.createOrReplaceTempView('immigration_table')

In [46]:
# keep only those that travelled by air
spark.sql("""
SELECT *
FROM immigration_table
WHERE i94mode == 1.0
""").createOrReplaceTempView('immigration_table')

In [47]:
# set the reason for immigrating
spark.sql("""
SELECT *, CASE  
               WHEN i94visa = 1 THEN 'Business'
               WHEN i94visa = 2 THEN 'Pleasure'
               WHEN i94visa = 3 THEN 'Student'
               ELSE NULL END as reason_for_immigration
               
FROM immigration_table""").createOrReplaceTempView('immigration_table')

In [48]:
# filter gender for only male and female
spark.sql("""
SELECT *
FROM immigration_table
WHERE gender IN ('M', 'F')
""").createOrReplaceTempView('immigration_table')

In [49]:
# capping the age at 105
spark.sql("""
SELECT *
FROM immigration_table
WHERE i94bir <= 105.0
""").createOrReplaceTempView('immigration_table')

In [50]:
# creating a temporary table view of the country_codes  

# making a pandas df from country codes, I think this is the best way to get in a spark dataframe
pd_df_country_codes = pd.DataFrame(list(country_codes.items()),columns = ['country_codes', 'country']) 

# creating a spark dataframe from the pandas dataframe
spark_df_country_codes = spark.createDataFrame(pd_df_country_codes)

# creating a temporary spark sql table view
spark_df_country_codes.createOrReplaceTempView("country_codes_table")

In [51]:
# adding a country_residence column with country names from the i94res codes
spark.sql("""
SELECT im.*, cc.country AS country_residence
FROM immigration_table im
JOIN country_codes_table cc
ON im.i94res = cc.country_codes
""").createOrReplaceTempView('immigration_table')

In [52]:
# adding a country_citizenship column with country names from the 94cit codes
spark.sql("""
SELECT im.*, cc.country AS country_citizenship
FROM immigration_table im
JOIN country_codes_table cc
ON im.i94cit = cc.country_codes
""").createOrReplaceTempView('immigration_table')

In [53]:
# creating immigration fact table
fact_immigration = spark.sql("""
                                        SELECT 
                                                cicid AS id,
                                                arrival_city,
                                                arrival_state,
                                                arrdate AS arrival_date,
                                                depdate AS departure_date,
                                                i94yr AS year,
                                                i94mon AS month,
                                                country_citizenship,
                                                country_residence,
                                                i94bir AS age,
                                                gender,
                                                reason_for_immigration,
                                                visatype

                                        FROM immigration_table
                                   """)

In [54]:
# counting the number of rows
# fact_immigration.count()

In [55]:
# test to see how the fact_immigration_table turned out
fact_immigration.limit(5).toPandas()

## Transforming the Temperature Data

#### Temperature Data transforming steps
- Filter for only US using pandas
- Drop rows with NaN in AverageTemperature column
- Convert dt to datetime
- Only keep temperatures between 2012 and 2013
- Add four years to the reacords to make them match immigration records
- For each city keep only one records per day 
- Make city names upper case to match fact table

In [56]:
# pd_temperature.head(3)

In [57]:
# only keep the temperatures for US
pd_temperature = pd_temperature[pd_temperature['Country'] == 'United States']

In [58]:
# drop rows with NaN in AverageTemperature
pd_temperature = pd_temperature.dropna(how = 'any', subset = ['AverageTemperature'])

In [59]:
# convert dt to datetime
pd_temperature['date'] = pd.to_datetime(pd_temperature.dt)

In [60]:
# only keep temperatures between 2012 and 2013
dates = (pd_temperature['date'] >= '2012-01-01') & (pd_temperature['date'] < '2013-01-01')
pd_temperature = pd_temperature.loc[dates]

In [61]:
# adding 4 years to the temperature measurements to get them to 2016
pd_temperature['date'] = pd_temperature['date'] + pd.DateOffset(years=4)

In [62]:
# adding a year column
pd_temperature['year'] = pd.DatetimeIndex(pd_temperature['date']).year

In [63]:
# adding a month column
pd_temperature['month'] = pd.DatetimeIndex(pd_temperature['date']).month

In [64]:
# only keeping one temperature per day for each city
pd_temperature = pd_temperature.drop_duplicates(['date', 'City'])

In [65]:
# making the city names upper case
pd_temperature.City = pd_temperature['City'].str.upper()

In [66]:
# creating a temporary spark sql table view of the pandas dataframe
spark_temperature = spark.createDataFrame(pd_temperature)
spark_temperature.createOrReplaceTempView('temperature_table')

In [67]:
# creating the dimensional temperature table
dim_temperature = spark.sql("""
                                SELECT 
                                        DISTINCT
                                        date, 
                                        year AS year_plus_4_years,
                                        month,
                                        City AS city,
                                        AVG(AverageTemperature) OVER (PARTITION BY City, date) AS average_temperature,
                                        AVG(AverageTemperatureUncertainty) OVER (PARTITION BY City, date) AS average_temperature_uncertainty
                                        
                                FROM temperature_table
                            """)

In [68]:
# counting number of rows
# dim_temperature.count()

In [69]:
# Checking out the temperatur table
dim_temperature.orderBy('city', 'date').limit(13).toPandas()

## Transforming the Demographic Data

#### Demographic Data transforming steps
- Make city names upper case
- Drop NaN

In [70]:
# pd_demographics.head()

In [71]:
# making the city names upper case
pd_demographic.City = pd_demographic['City'].str.upper()

In [72]:
# dropping NaNs in all columns
pd_demographic = pd_demographic.dropna()

In [73]:
# creating a temporary spark sql table view from pandas
spark_demographic = spark.createDataFrame(pd_demographic)
spark_demographic.createOrReplaceTempView('demographic_table')

In [74]:
# creating the dimensional demographic table
dim_demographic = spark.sql("""
                                SELECT 
                                        City AS city,
                                       `State Code` AS state_code,
                                        Race AS race,
                                       `Median Age` AS median_age,
                                       `Male Population` AS male_population,
                                       `Total Population` AS total_population,
                                       `Number of Veterans` AS number_of_veterans,
                                       `Foreign-born` AS foreign_born,
                                       `Average Household Size` AS average_household,
                                        Count AS count

                                FROM demographic_table
                            """)

In [75]:
# counting number of rows
# dim_demographic.count()

In [76]:
# Checking out the demographic table
dim_demographic.limit(3).toPandas()

## Loading the Data
The data should preferably be loaded into an S3 bucket for further analysis, but for this demonstration with only one month of data, writing it locally to disk will suffice. 

In [78]:
# writing the tables as parquet files to either locally or s3

fact_immigration.write.mode('overwrite').parquet(local + 'fact_immigration.parquet')
dim_temperature.write.mode('overwrite').parquet(local + 'dim_temperature.parquet')
dim_demographic.write.mode('overwrite').parquet(local + 'dim_demographic.parquet')
        
print('Finished writing tables to disk')

Finished writing tables to disk


## 4.2 Data Quality Checks
Two unit tests and two Data Quality tests are deployed below. The unit tests are testing the functions 'date_to_isoformat' and 'port_to_state'. 

The first Data Quality test is to validate if the tables exist and have any records, and the second is to detect NaNs in the primary keys.

In [79]:
# loading a sample of the Immigration Data into spark to use in the data quality checks
# using inferSchema = "true" to set the column data type the same as the values own data type
TEST_Im = spark.read.format('csv').options(header = 'true', inferSchema = "true")\
                             .load('immigration_data_sample.csv')

In [80]:
# overview of the original immigration data
TEST_Im.limit(4).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2


In [81]:
# TESTING the function date_to_isoformat
print('TESTING the function date_to_isoformat:')

# replacing columns 'arrdate' and 'depdate' 
# with date_to_isoformat function
TEST_Im = TEST_Im.withColumn('arrdate', date_to_isoformat(TEST_Im.arrdate))
TEST_Im = TEST_Im.withColumn('depdate', date_to_isoformat(TEST_Im.depdate))

if TEST_Im.first()[7] == '2016-04-22' and TEST_Im.first()[10] == '2016-04-29':
    print('UNIT TEST PASSED!')
else:
    raise ValueError(f'''UNIT TEST FAILED! Giving {TEST_Im.first()[7]} instead of "2016-04-22"
                                              and {TEST_Im.first()[10]} instead of "2016-04-29"''')
    
# TEST_Im.select('arrdate', 'depdate').limit().toPandas()

TESTING the function date_to_isoformat:
UNIT TEST PASSED!


In [82]:
# TESTING the function port_to_state
print('TESTING the function port_to_state:')

# creating columns 'arrival_city' and 'arrival_state' 
# with port_to_city function
TEST_Im = TEST_Im.withColumn('arrival_city', port_to_city(TEST_Im.i94port))
TEST_Im = TEST_Im.withColumn('arrival_state', port_to_state(TEST_Im.i94port))

if TEST_Im.first()[29] == 'HONOLULU' and TEST_Im.first()[30] == 'HI':
    print('UNIT TEST PASSED!')
else:
    raise ValueError(f'''UNIT TEST FAILED! Giving {TEST_Im.first()[29]} instead of "HONOLULU"
                         and {TEST_Im.first()[30]} instead of "HI"''')

# TEST_Im.select('arrival_city', 'arrival_state').limit(4).toPandas()

TESTING the function port_to_state:
UNIT TEST PASSED!


In [84]:
# showing the result of both funcions that got unit tested
TEST_Im.select('arrdate', 'depdate', 'arrival_city', 'arrival_state').limit(4).toPandas()

Unnamed: 0,arrdate,depdate,arrival_city,arrival_state
0,2016-04-22,2016-04-29,HONOLULU,HI
1,2016-04-23,2016-04-24,MCALLEN,TX
2,2016-04-07,2016-04-27,KAHULUI - MAUI,HI
3,2016-04-28,2016-05-07,LOS ANGELES,CA


In [83]:
# TESTING to check if the DataFrames have any rows

def exists(df, table_name):
    """
    Arguments: DataFrame, name_of_DataFrame 
    
    If the number of rows are more than 0 the test passed
    If the number of rows are 0 the test failed and a ValueError is raised
    """
    res = df.count()
    if res > 0:
        return f'TEST PASSED! The {table_name} table exists with {res} number of records'
    else:
        raise ValueError(f'TEST FAILED! The {table_name} table does not exists with {res} number of records')


print('Testing for tables existence.')
res1 = exists(dim_temperature, 'dim_temperature')
print(res1)
res2 = exists(dim_demographic, 'dim_demographic')
print(res2)
res3 = exists(fact_immigration, 'fact_immigration')
print(res3)

Testing for tables existence.
TEST PASSED! The dim_temperature table exists with 2976 number of records
TEST PASSED! The dim_demographic table exists with 2875 number of records
TEST PASSED! The fact_immigration table exists with 2263563 number of records


In [84]:
dim_temperature.createOrReplaceTempView('dim_temperature')
dim_demographic.createOrReplaceTempView('dim_demographic')
fact_immigration.createOrReplaceTempView('fact_immigration')

In [86]:
# TESTING to see if there are any NaNs in the primary keys

dict_of_tables = {'dim_temperature': ['city', 'date'],
                  'dim_demographic': ['city', 'state_code', 'race'],
                  'fact_immigration': ['id']}


def nan_in_column(spark, df_dict):
    """
    Arguments: spark_context, dictionary_of_str_tables_and_columns
    
    This function counts the number of NaNs in the tables 
    """
    for table in df_dict:
        for column in df_dict[table]:
            
            result = spark.sql(f'''SELECT count(*) AS number_of_nans 
                                   FROM {table} 
                                   WHERE {column} IS NULL
                                   ''')
            
            if result.head()[0] != 0:
                raise ValueError(f"TEST FAILED! The {column} column in {table} contains {result.head()[0]} NaNs")
            else:
                print(f"TEST PASSED! The {column} column in {table} passed with {result.head()[0]} NaNs")
    

print("Testing for NaNs in primary key columns")
nan_in_column(spark, dict_of_tables)

Testing for NaNs in primary key columns
TEST PASSED! The city column in dim_temperature passed with 0 NaNs
TEST PASSED! The date column in dim_temperature passed with 0 NaNs
TEST PASSED! The city column in dim_demographic passed with 0 NaNs
TEST PASSED! The state_code column in dim_demographic passed with 0 NaNs
TEST PASSED! The race column in dim_demographic passed with 0 NaNs
TEST PASSED! The id column in fact_immigration passed with 0 NaNs


#### 4.3 Data dictionary 
For this data model the columns come from the same dataset as the table name suggest. No joins are made to make these tables.



#### Immigration fact table
Primary key = id
- id = unique id
- arrival_city = (string) name of city of arrival by air
- arrival_state = (string) name of state of arrival by air
- arrival_date = (date) date of arrival
- departure_date = (date) date of departure
- year = (double) year
- month = (double) month
- country_citizenship = (string) country citizenship
- country_residence = (string) country residence
- age = (double) median age
- gender = (string) char
- reason_for_immigration = (string) category for reason of immigration
- visatype = (string) detailed visa type


#### Temperature dimension table
Primary composite key = city, date
- city = (string) name of city
- date = (string) date of measurement
- year_plus_4_years = (int) year of measurement but with four years added
- month = (int) month of measurement
- average_temperature = (double) average temperature
- average_temperature_uncertainty = (double) average temperature uncertainty


#### Demographics dimension table
Primary composite key = city, state_code, race
- city = (string) name of city
- state_code = (string) name of state
- race = (string) race
- median_age = (double) median age for the city
- male_population = (double) male population
- total_population = (int) total population
- number_of_veterans = (double) number of veterans
- foreign_born = (double) foreign born
- average_household = (double) average household
- count = (int) total respondens of population

### Step 5: Complete Project Write Up
* The rationale for the choice of tools and technologies is that Spark and EMR fit perfectly for this project. In my opinion, a notebook like this is pretty necessary to at least set up the ETL. The ETL itself could, of course, be a python script file, which then gets submitted to the EMR cluster, but that is outside of the scope of this project. The built-in parallelization in Spark makes it an ideal tool for working with large amounts of data. 


#### Scheduling
* Since the data is aggregated in months, the ETL should be scheduled to run the first or the second day of every month. 


#### What would change if the data was increased by 100x?
* If the data was increased by 100x, it would still make sense to run a script of the ETL in an AWS EMR. The EMR cluster would need to be larger, and the data would need to be stored in an S3 instead of in the EMR. 
     

#### What would change if the data populates a dashboard that must be updated daily by 7 am every day?
* If the ETL needed to run every morning, it would make sense to set up a job in Apache Airflow and schedule it every morning. This would be an easy way to automate the scheduling and the data quality validating. 


#### What would change if the database needed to be accessed by 100+ people?
* The more people accessing the data, the more CPU resources would need to be allocated to give a fast user experience. Instead of increasing the size of the EMR cluster running Spark, it would be more reasonable to write partitioned parquet files to a distributed database, like the Hadoop Distributed File System, HDFS, to secure faster query results for each user. 