# Immigration Analysis
### Data Engineering Capstone Project

#### Project Summary
This project intends to prepare an analytical database for the study of immigration into the United States based on the I94 data. This incorporates the design of an ETL process which includes data modeling (i.e. a database schema) and data cleansing. The main focus of the analytical database will lie on:

* Mode of entrance (vehicle and visa type)
* Destination and origin of travel
* Date of entrance and duration of stay in the country


The project consists of the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import datetime

# We will be using PySpark, as will be justified later:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, expr, pandas_udf, year, month, dayofmonth
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

### Step 1: Scope the Project and Gather Data

#### Scope 
*Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>*

We intend to make a database model, i.e. design a database relationship model, and an ETL process. 
Our main tools will be `Pandas` for initial data exploration, and -- for the heavy weight lifting -- Spark (in particular `PySpark`). For data storage, we will use `Parquet` since this data format allows for fast and efficient querying.

#### Describe and Gather Data 
*Describe the data sets you're using. Where did it come from? What type of information is included?*

We will use US immigration data from the year 2016 which has been provided by the US National Tourism and Trade office through its website. This dataset includes data such as the place of immigration (border), the visa type, the date of arrival, the duration of stay, and many more.

## US immigration data from the year 2016 - Sample

In [2]:
df_pandas = pd.read_csv('immigration_data_sample.csv')
df_pandas.head(10)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,...,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,...,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,...,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,...,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,...,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


## Airport Codes

In [3]:
df_pandas = pd.read_csv('airport-codes_csv.csv')
df_pandas.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


## Demographics of US-Cities

In [4]:
# Strictly speaking, semicolon-separated data:
df_pandas = pd.read_csv('us-cities-demographics.csv', sep=';')
df_pandas.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


## Temperature Data

In [5]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_pandas = pd.read_csv(fname)
df_pandas.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


# Utilizing Spark

Since this is only a sample, we will try out Pandas with the monthly data of the whole dataset. As per the instructions, the raw immigration data is stored in the directory `../../data/18-83510-I94-Data-2016/` and is partitioned into monthly datasets as indicated by three-letter month codes, e.g. `i94_jan16_sub.sas7bdat`.

In [6]:
fname = '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat'
df_pandas = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

As expected, the processing takes too long, and we will have to utilize a genuine big data toolbox: *Spark*.

In [6]:
spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()

df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat')

In order to work more efficiently with higher i/o-performance, we will convert the raw immigration data into **Parquet** format.

In [7]:
# Work with parquet
df.write.parquet("data_parquet/exploration")
df = spark.read.parquet("data_parquet/exploration")

In [8]:
df = spark.read.parquet("data_parquet/exploration")

For reference: Number of rows in the raw dataset.

In [9]:
df.count()

2847924

In [10]:
df.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+
|  7.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|      T|   null|   null|   null| 1996.0|     D/S|     M|  null|     LH|3.46608285E8|  424|      F1|
|  8.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|

### Step 2: Explore and Assess the Data
#### Explore the Data 
*Identify data quality issues, like missing values, duplicate data, etc.*

In [11]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]>

#### Cleaning Steps
*Document steps necessary to clean the data*

We are looking for the following data:

In [12]:
df = df.filter("i94cit == i94res and i94addr != '' and visatype != '' and arrdate is not null and depdate is not null")

In [13]:
df.count()

1786071

In [14]:
df.dropDuplicates()

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]

In [15]:
df.count()

1786071

I.e., it seems like there weren't any duplicates left.

In [16]:
df = df.withColumn('immigration_id', expr('cast(cicid as int)')).withColumn('origin_id', expr('cast(i94cit as int)')).withColumn('visa_id', expr('cast(i94visa as int)')).withColumn('mode_id', expr('cast(i94mode as int)'))

In [17]:
get_timestamp_from_sas = udf(lambda sas: datetime.datetime(1960, 1, 1) + datetime.timedelta(days=int(sas)), TimestampType())

In [18]:
df = df.withColumn('arrival_date', get_timestamp_from_sas(df.arrdate)).withColumn('departure_date', get_timestamp_from_sas(df.depdate))

In [19]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string, immigration_id: int, origin_id: int, visa_id: int, mode_id: int, arrival_date: timestamp, departure_date: timestamp]>

In [20]:
df = df.selectExpr('immigration_id', 'origin_id', 'i94addr as destination', 
                                                 'arrival_date', 'departure_date', 'dayofmonth(arrival_date) as day', 'month(arrival_date) as month', 'year(arrival_date) as year',
                                                 'mode_id', 'visa_id', 'visatype as visa_type_id')

In [21]:
df.head()

Row(immigration_id=9, origin_id=101, destination='CT', arrival_date=datetime.datetime(2016, 1, 16, 0, 0), departure_date=datetime.datetime(2016, 1, 27, 0, 0), day=16, month=1, year=2016, mode_id=1, visa_id=2, visa_type_id='B2')

Better use show() when using PySpark:

In [23]:
df.show()

+--------------+---------+-----------+-------------------+-------------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|       arrival_date|     departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+-------------------+-------------------+---+-----+----+-------+-------+------------+
|             9|      101|         CT|2016-01-16 00:00:00|2016-01-27 00:00:00| 16|    1|2016|      1|      2|          B2|
|            10|      101|         CT|2016-01-16 00:00:00|2016-02-15 00:00:00| 16|    1|2016|      1|      2|          B2|
|            11|      101|         CT|2016-01-16 00:00:00|2016-02-15 00:00:00| 16|    1|2016|      1|      2|          B2|
|            15|      101|         MA|2016-01-24 00:00:00|2016-03-11 00:00:00| 24|    1|2016|      1|      3|          F1|
|            20|      101|         IL|2016-01-20 00:00:00|2016-01-29 00:00:00| 20|    1|2016|      1|      2|          B2|
|            21|

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
*Map out the conceptual data model and explain why you chose that model*

#### 3.2 Mapping Out Data Pipelines
*List the steps necessary to pipeline the data into the chosen data model*

We select a star schema with a fact table in the middle, and four supplemental dimensional tables. The fact table will provide us with sufficiently fast OLAP performance while the much smaller dimensional tables contain additional information.

The ETL-pipeline will not be too complicated, as the immigration dataset already contains all the information required for the fact table. As a result, no JOIN-operations are required, and the pipeline mostly consists of ignoring certain columns.

<img src='images/ER_diagram.png'>

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
*Build the data pipelines to create the data model.*

## 4.1.1 Fact Table

As already mentioned, the ETL pipeline for the fact table is rather simple to realize, i.e. JOINS are not needed.

In [37]:
def load_data(file):
    """
    Description: Load the data
    Arguments:   file: Filename of sas7bdat-file
    Returns:     None
    """
    file = '../../data/18-83510-I94-Data-2016/{}'.format(file)
    df = spark.read.format('com.github.saurfang.sas.spark').load(file)
    return df

In [38]:
def clean_data(df):
    """
    Description: Clean the data
    Arguments:   df: (Spark-)dataframe
    Returns:     Processed (Spark-)dataframe
    """
    return df.filter("i94cit == i94res and i94addr != '' and visatype != '' and arrdate is not null and depdate is not null").dropDuplicates()

In [39]:
def get_ids_columns(df):
    """
    Description: ds
    Arguments: (Spark-)dataframe
    Returns:   Processed (Spark-)dataframe
    """
    return df.withColumn('immigration_id', expr('cast(cicid as int)')).withColumn('origin_id', expr('cast(i94cit as int)')).withColumn('visa_id', expr('cast(i94visa as int)')).withColumn('mode_id', expr('cast(i94mode as int)'))


In [40]:
get_timestamp_from_sas = udf(lambda sas: datetime.datetime(1960, 1, 1) + datetime.timedelta(days=int(sas)), TimestampType())

In [46]:
def get_column_dates_from_sas_to_timestamp(df):
    """
    Description: asd
    Arguments:   df: (Spark-)dataframe
    Returns:     df: Processed (Spark-)dataframe
    """
    return df.withColumn('arrival_date', get_timestamp_from_sas(df.arrdate)).withColumn('departure_date', get_timestamp_from_sas(df.depdate))

In [42]:
def select_columns(df):
    """
    Description: asd
    Arguments:   df: (Spark-)dataframe
    Returns:     df: Processed (Spark-)dataframe
    """
    return df.selectExpr('immigration_id', 'origin_id', 'i94addr as destination', 'arrival_date', 'departure_date', 'mode_id', 'visa_id', 'visatype as visa_type_id')

In [52]:
def etl_pipeline(batch):
    """
    Description: The ETL-Pipeline. Processes a batch of sas-files.
    Arguments:   batch: List of parquet-filenames
    Returns:     None
    """
    for num, name in enumerate(batch, start=1):
        df = load_data(name)

        df = clean_data(df)
        df = get_ids_columns(df)
        df = get_column_dates_from_sas_to_timestamp(df)
        df = select_columns(df)

        df.write.partitionBy('visa_type_id').parquet('data_parquet/{}/'.format(num), mode='overwrite')

        print("Processed file {}: {}".format(num, name))

In [53]:
# We compile the file batch manually, as there are only twelve months to consider:
batch = [
    'i94_jan16_sub.sas7bdat',
    'i94_feb16_sub.sas7bdat',
    'i94_mar16_sub.sas7bdat',
    'i94_apr16_sub.sas7bdat',
    'i94_may16_sub.sas7bdat',
    'i94_jun16_sub.sas7bdat',
    'i94_jul16_sub.sas7bdat',
    'i94_aug16_sub.sas7bdat',
    'i94_sep16_sub.sas7bdat',
    'i94_oct16_sub.sas7bdat',
    'i94_nov16_sub.sas7bdat',
    'i94_dec16_sub.sas7bdat'
]

Having prepared the pipeline above, we are set and ready to trigger the ETL-pipeline:

In [54]:
etl_pipeline(batch)

Processed file 1: i94_jan16_sub.sas7bdat
Processed file 2: i94_feb16_sub.sas7bdat
Processed file 3: i94_mar16_sub.sas7bdat
Processed file 4: i94_apr16_sub.sas7bdat
Processed file 5: i94_may16_sub.sas7bdat
Processed file 6: i94_jun16_sub.sas7bdat
Processed file 7: i94_jul16_sub.sas7bdat
Processed file 8: i94_aug16_sub.sas7bdat
Processed file 9: i94_sep16_sub.sas7bdat
Processed file 10: i94_oct16_sub.sas7bdat
Processed file 11: i94_nov16_sub.sas7bdat
Processed file 12: i94_dec16_sub.sas7bdat


#### 4.2 Data Quality Checks
*Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:*
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
*Run Quality Checks*

In [55]:
# Perform quality checks here

january_df = spark.read.parquet('data_parquet/1/')
january_df.show()

+--------------+---------+-----------+-------------------+-------------------+-------+-------+------------+
|immigration_id|origin_id|destination|       arrival_date|     departure_date|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+-------------------+-------------------+-------+-------+------------+
|          1153|      107|         IL|2016-01-16 00:00:00|2016-01-30 00:00:00|      1|      2|          B2|
|         16124|      131|         NY|2016-01-08 00:00:00|2016-01-12 00:00:00|      1|      2|          B2|
|         26940|      165|         CA|2016-01-01 00:00:00|2016-03-21 00:00:00|      1|      2|          B2|
|         28627|      209|         GU|2016-01-06 00:00:00|2016-01-09 00:00:00|      1|      2|          B2|
|         46913|      245|         IL|2016-01-08 00:00:00|2016-01-31 00:00:00|      1|      2|          B2|
|         47174|      245|         OH|2016-01-14 00:00:00|2016-04-14 00:00:00|      1|      2|          B2|
|         47442|      245|  

In [56]:
checks = [
    'SELECT * FROM immigrations WHERE (immigration_id IS NULL OR immigration_id = "")',
    'SELECT * FROM immigrations WHERE (origin_id IS NULL OR origin_id = "")',
    'SELECT * FROM immigrations WHERE (mode_id IS NULL OR mode_id = "")',
    'SELECT * FROM immigrations WHERE (visa_id IS NULL OR visa_id = "")',
    'SELECT * FROM immigrations WHERE (visa_type_id IS NULL OR visa_type_id = "")'
]

In [57]:
def data_quality_checks(df, tests):
    """
    Description: Runs the quality checks defined as a list of SQL expressions on a (Spark-)dataframe.
    Arguments:   df:    (Spark-)dataframe to be tested
                 tests: List of tests defined as SQL-expressions
    Returns:     None
    """
    df.createOrReplaceTempView('immigrations')
    for num, test in enumerate(tests):
        print(f"Test {num}:")
        spark.sql(test).show()

In [59]:
data_quality_checks(df, checks)

Test 0:
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|arrival_date|departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+

Test 1:
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
|immigration_id|origin_id|destination|arrival_date|departure_date|day|month|year|mode_id|visa_id|visa_type_id|
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+
+--------------+---------+-----------+------------+--------------+---+-----+----+-------+-------+------------+

Test 2:
+--------------+---------+-----------+------------+--------------+---+-----+----+-----

#### 4.3 Data dictionary 
*Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.*

The data dictionary of the database is as follows:

#### Fact Table: `immigrations`

| Attribute | Description |
|-------|-------|
|immigration_id | unique identifier of immigration |
|origin_id | this value represents the location where the person came and is the same value as i94cit and i94res columns  |
|destination | state where the immigrant arrive (i94addr column) |
|arrival_date| data that the person arrived in the us (datetime based in column arrdate) |
|departure_time| data that the person left in the us (datetime based in column dapdate) |
|mode_id| vehicle used to enter the country and this value is from i94mode |
|visa_id| type of immigrant is the same value as i94visa column |
|visa_type_id| visa type is the same value as visatype column |


#### Dim Table: `country_of_origin`

| Attribute | Description |
|-------|-------|
|origin_id| valid and invalid codes which indicates the location where the person came from |
|description| location name |


#### Dim Table: `visa`

| Attribute | Description |
|-------|-------|
|visa_id| value which indicate the type of immigrant |
|sescription| type name |

#### Dim Table: `immigration_modes`

| Attribute | Description |
|-------|-------|
|mode_id| value which represent the vehicle used to enter the country |
|type| type of vehicle |

#### Dim Table: `visa_types`

| Attribute | Description |
|-------|-------|
|visa_type_id| code which indicate the visa type |
|description| description about who hava access to this visa |
|class| visa subclass |
|subclass| visa class |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Although we have used Parquet for the analytical data in the project, the main target should be a scalable warehouse solution like _AWS Redshift_.

In a realistic scenario, the provided data will be updated on a regular basis in a data lake. In order to make the most recent data available for fast analytical querying (e.g. for administrative or policy needs), the ETL process needs to update the database on a regular basis, for example on a monthly, weekly, or daily basis. One tool to realise this is _Apache Airflow_.

As for the update intervals, we propose a nightly update. _Airflow_ (or alternatively _AWS Step Functions_) can be configured such that the import data would be available early in the morning. Assuming that daily data will be available in the data lake around midnight, the ETL process can be automatically started in the middle of the night.

It has to be determined how long an incremental update takes on the given system (e.g. 2 or 3 hours), such that the start of this batch job can be set and the data can be available before office hours in the morning. In order to increase the data import throughput, an idea is to chunk the data to parallelize the input (and the number of nodes increased accordingly).

Access to a higher userbase (e.g. more than 100 analysts) would require to scale the database appropriately to handle all incoming OLAP-requests (all read-only operations).