# US Immigration Data Analytics (Capstone Project)

## Overview

This project uses I94 Immigration Data from the US National Tourism and Trade Office (https://travel.trade.gov/research/reports/i94/historical/2016.html) and combines it with U.S. City Demographic Data (see here https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) using Airport Code Tables.
The resulting dataschema can be used to easily run analytics queries on the immigration data and discover demographic and geographic dependencies. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 1: Scope the Project and Gather Data

### Scope 

Generally the project cleans and simplifies the I94 data and enhances it with airport and city data from the other two data sources. From the I94 dataset, data about the immigrant, the target destination and the time and type of immigration event will be used. This will be combined with city demographic data from the us-cities-demographics dataset as well as airport name and location for the destination city.
In the end the raw data will be extracted from the input files via an ETL pipeline and converted to a table star schema where the immigration act fact will be separated from dimension tables like city, immigrant and airports.

In order to get a general overview of the data, the python pandas library will be used to explore it in the following.

### Describe and Gather Data 

#### I94 Immigration Data

In [1]:
import pandas as pd

In [2]:
dfImmigration = pd.read_csv('./sample_data/immigration_data_sample.csv')
dfImmigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


Using _I94_SAS_Labels_Descriptions.SAS_ we can see that each immigration record has 

- an **unique id** (cicid), 
- the **date of arrival** in various forms and formats (i94yr, i94mon, arrdate, dtadfile), 
- some information about the **immigrant** (biryear, i94res, gender)
- the **mode of travel** used (i94mode)
- the type of the visa, i. e. the **reason for the immigration** like pleasure or business
- the **destination** city (i94res)
- the airline/**flight number**

These are all interesting information which we can use in the final dataset. However we can also see that

- some columns contain redundant information (i94yr and dtadfile)
- some columns not usable (occup) because no description of codes available
- some columns have the wrong datatype (lots of float which should be integer)
- some columns using codes (corresponding values are in I94_SAS_Labels_Descriptions) which will have to be transformed to their final values
- some columns might contain null values 

In [3]:
dfImmigration.isnull().sum()

Unnamed: 0       0
cicid            0
i94yr            0
i94mon           0
i94cit           0
i94res           0
i94port          0
arrdate          0
i94mode          0
i94addr         59
depdate         49
i94bir           0
i94visa          0
count            0
dtadfile         0
visapost       618
occup          996
entdepa          0
entdepd         46
entdepu       1000
matflag         46
biryear          0
dtaddto          0
gender         141
insnum         965
airline         33
admnum           0
fltno            8
visatype         0
dtype: int64

#### US Cities Demographic Data

In [4]:
dfCities = pd.read_csv('./sample_data/us-cities-demographics.csv', sep=';')
dfCities.sort_values(by='City')

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
2727,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,Asian,2929
1403,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,Hispanic or Latino,33222
1533,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,White,95487
245,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,American Indian and Alaska Native,1813
2880,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,Black or African-American,14449
...,...,...,...,...,...,...,...,...,...,...,...,...
2859,Yuma,Arizona,33.4,48298.0,45847.0,94145,7182.0,19326.0,2.64,AZ,American Indian and Alaska Native,1228
1084,Yuma,Arizona,33.4,48298.0,45847.0,94145,7182.0,19326.0,2.64,AZ,Hispanic or Latino,57054
2105,Yuma,Arizona,33.4,48298.0,45847.0,94145,7182.0,19326.0,2.64,AZ,Black or African-American,3731
1619,Yuma,Arizona,33.4,48298.0,45847.0,94145,7182.0,19326.0,2.64,AZ,Asian,1180


We can see that this file unsurprisingly contains demographic and population data for US cities. However there is a catch since are multiple rows per city each containing the population count for an ethnic group.

This means we will later have to think of a way to consolidate the data into just one row per city.

In [5]:
dfCities.City.unique().size == dfCities.City.size

False

#### Airport Codes Data

Also quite mundane is the airport code dataset:

In [6]:
dfAirportCodes = pd.read_csv('./sample_data/airport-codes_csv.csv')

In [7]:
dfAirportCodes.head()


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


What we see is a list of global airports, identified by their code and name as well as including their respective location (elevation, coordinates, municipality). Obviously the list also contains lots of small airports which are probably not used for immigration and are therefore not very interesting. However the coordinates column could perhaps be interesting in order to visualize immigration events on a map.

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
# df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [9]:
#write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

## Step 2: Explore and Assess the Data
### Explore and clean immigration data
Identify data quality issues, like missing values, duplicate data, etc.

In [10]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from airflow.spark.udf import *

In [11]:
dfImmigrationRaw = spark.read.parquet('sample_data/sas_data/')

Let's start by selecting only the columns which will be interesting to us:

In [12]:
I94SelectedColumns = ['cicid', 'dtadfile', 'biryear', 'i94bir', 'gender', 'I94CIT', 'I94RES', 'i94port', 'i94mode', 'I94VISA', 'dtaddto', 'airline', 'fltno', 'i94addr']
I94IntegerColumns = ['cicid', 'biryear', 'i94bir', 'I94CIT', 'I94RES', 'i94mode', 'i94visa']

dfImmigration = dfImmigrationRaw.select(*I94SelectedColumns)

Then we will convert the float columns to integer

In [13]:
dfImmigration = correct_to_integer(dfImmigration, I94IntegerColumns)
dfImmigration.show(1)

+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+
|  cicid|dtadfile|biryear|i94bir|gender|I94CIT|I94RES|i94port|i94mode|i94visa| dtaddto|airline|fltno|i94addr|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+
|5748517|20160430|   1976|    40|     F|   245|   438|    LOS|      1|      1|10292016|     QF|00011|     CA|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+
only showing top 1 row



Now lets add an arrival date as a date column

In [14]:
dfImmigration = dfImmigration.withColumn('arrival_date', to_date(col('dtadfile'), 'yyyyMMdd'))
dfImmigration.show(1)

+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+
|  cicid|dtadfile|biryear|i94bir|gender|I94CIT|I94RES|i94port|i94mode|i94visa| dtaddto|airline|fltno|i94addr|arrival_date|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+
|5748517|20160430|   1976|    40|     F|   245|   438|    LOS|      1|      1|10292016|     QF|00011|     CA|  2016-04-30|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+
only showing top 1 row



Then we can add a meaningful replacement for the i94mode and i94visa columns

In [15]:
dfImmigration = dfImmigration \
                    .withColumn('mode', udfI94Mode(col('i94mode'))) \
                    .withColumn('reason', udfI94VISA(col('I94VISA')))
dfImmigration.show(1)

+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+
|  cicid|dtadfile|biryear|i94bir|gender|I94CIT|I94RES|i94port|i94mode|i94visa| dtaddto|airline|fltno|i94addr|arrival_date|mode|  reason|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+
|5748517|20160430|   1976|    40|     F|   245|   438|    LOS|      1|      1|10292016|     QF|00011|     CA|  2016-04-30| air|business|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+
only showing top 1 row



Use the i94res column to add residential information of the immigrant

In [16]:
dfImmigration = dfImmigration \
                    .withColumn('resident', udfCityOrResident(col('I94RES')))
dfImmigration.show(1)

+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+---------+
|  cicid|dtadfile|biryear|i94bir|gender|I94CIT|I94RES|i94port|i94mode|i94visa| dtaddto|airline|fltno|i94addr|arrival_date|mode|  reason| resident|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+---------+
|5748517|20160430|   1976|    40|     F|   245|   438|    LOS|      1|      1|10292016|     QF|00011|     CA|  2016-04-30| air|business|australia|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+---------+
only showing top 1 row



Map i94port to the corresponding city and state

In [17]:
dfImmigration = dfImmigration \
                    .withColumn('destination_city', udfDestCity(col('i94port'))) \
                    .withColumn('destination_state', udfDestState(col('i94port')))
dfImmigration.show(1)

+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+---------+----------------+-----------------+
|  cicid|dtadfile|biryear|i94bir|gender|I94CIT|I94RES|i94port|i94mode|i94visa| dtaddto|airline|fltno|i94addr|arrival_date|mode|  reason| resident|destination_city|destination_state|
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+---------+----------------+-----------------+
|5748517|20160430|   1976|    40|     F|   245|   438|    LOS|      1|      1|10292016|     QF|00011|     CA|  2016-04-30| air|business|australia|     los angeles|        ca       |
+-------+--------+-------+------+------+------+------+-------+-------+-------+--------+-------+-----+-------+------------+----+--------+---------+----------------+-----------------+
only showing top 1 row



Remove obsolete columns and rename columns for better understanding

In [18]:
dfImmigrationFinal = dfImmigration \
                    .select( \
                        col('cicid').alias('immigration_id'), 'arrival_date', 'mode', 'reason', col('i94bir').alias('age'), \
                        trim(col('destination_city')).alias('destination_city'), trim(col('destination_state')).alias('destination_state'), \
                       # lower(col('i94addr')), \
                        col('biryear').alias('person_birth'), lower(col('gender')).alias('person_gender'), \
                        col('resident').alias('person_country'),
                        lower(col('airline')).alias('airline'), col('fltno').alias('flightnumber')
                    )
dfImmigrationFinal.show(1)

+--------------+------------+----+--------+---+----------------+-----------------+------------+-------------+--------------+-------+------------+
|immigration_id|arrival_date|mode|  reason|age|destination_city|destination_state|person_birth|person_gender|person_country|airline|flightnumber|
+--------------+------------+----+--------+---+----------------+-----------------+------------+-------------+--------------+-------+------------+
|       5748517|  2016-04-30| air|business| 40|     los angeles|               ca|        1976|            f|     australia|     qf|       00011|
+--------------+------------+----+--------+---+----------------+-----------------+------------+-------------+--------------+-------+------------+
only showing top 1 row



Finally let's do a quick check for null values.

In [19]:
dfImmigrationFinal.select([count(when(isnull(c), c)).alias(c) for c in dfImmigrationFinal.columns]).show()

+--------------+------------+----+------+---+----------------+-----------------+------------+-------------+--------------+-------+------------+
|immigration_id|arrival_date|mode|reason|age|destination_city|destination_state|person_birth|person_gender|person_country|airline|flightnumber|
+--------------+------------+----+------+---+----------------+-----------------+------------+-------------+--------------+-------+------------+
|             0|           1|   0|     0|802|          100723|           100723|         802|       414269|             0|  83627|       19549|
+--------------+------------+----+------+---+----------------+-----------------+------------+-------------+--------------+-------+------------+



*arrival_date*, *mode* and *reason* should not be null, therefore let's drop rows with null values

In [20]:
dfImmigrationFinal = dfImmigrationFinal.dropna(subset=['arrival_date', 'mode', 'reason'])
dfImmigrationFinal.show(3)

+--------------+------------+----+--------+---+----------------+-----------------+------------+-------------+--------------+-------+------------+
|immigration_id|arrival_date|mode|  reason|age|destination_city|destination_state|person_birth|person_gender|person_country|airline|flightnumber|
+--------------+------------+----+--------+---+----------------+-----------------+------------+-------------+--------------+-------+------------+
|       5748517|  2016-04-30| air|business| 40|     los angeles|               ca|        1976|            f|     australia|     qf|       00011|
|       5748518|  2016-04-30| air|business| 32|     los angeles|               ca|        1984|            f|     australia|     va|       00007|
|       5748519|  2016-04-30| air|business| 29|     los angeles|               ca|        1987|            m|     australia|     dl|       00040|
+--------------+------------+----+--------+---+----------------+-----------------+------------+-------------+--------------+

As destination_state (extracted from i94port) has fewer null values than i94addr we go with destination_state. Apart from that airline and flightnumber have quite a large number of null values which makes sense, since travel can also happen by land and sea. The person_gender column also has a lot of null values, however for analytical purposes it is probably still better to keep these values.

### Explore and clean us city data

In [21]:
dfCities = spark.read.option('delimiter', ';').option('header', True).csv('./sample_data/us-cities-demographics.csv', sep=';')
dfCities.orderBy('City').show(3)

+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|Abilene|Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|American Indian a...| 1813|
|Abilene|Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|  Hispanic or Latino|33222|
|Abilene|Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|  

In [22]:
dfCities.select([count(when(isnull(c), c)).alias(c) for c in dfCities.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [23]:
dfCitiesFinal = dfCities.filter(col('Race') == 'White').select(
    lower(col('City')).alias('city'), col('Median Age').alias('median_age'), \
    lower(col('State Code')).alias('state'), col('Total Population').alias('total_population'), \
    (round(col('Male Population')/col('Total Population'), 2)).alias('male_pct'), col('Average Household Size').alias('household_size'),
    (round(col('Count')/col('Total Population'), 2)).alias('white_pct')
).drop_duplicates()

dfCitiesFinal.show(3)

+------------+----------+-----+----------------+--------+--------------+---------+
|        city|median_age|state|total_population|male_pct|household_size|white_pct|
+------------+----------+-----+----------------+--------+--------------+---------+
|  richardson|      35.5|   tx|          110827|    0.49|          2.83|     0.69|
|murfreesboro|      30.2|   tn|          126121|    0.48|           2.6|     0.77|
|    alhambra|      41.0|   ca|           85572|    0.49|          2.89|     0.24|
+------------+----------+-----+----------------+--------+--------------+---------+
only showing top 3 rows



### Explore and clean airport data

In [24]:
dfAirports = spark.read.option('header', True).csv('./sample_data/airport-codes_csv.csv')
dfAirports.show(3)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
+-----+-------------+--------------------+------------+---------+-----------+-----

Let's remove all the small airports and also keep only airports in the US

In [25]:
dfAirports = dfAirports.filter(col('type') == 'large_airport').filter(col('iso_country') == 'US')
dfAirports.show(3)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| KABQ|large_airport|Albuquerque Inter...|        5355|       NA|         US|     US-NM| Albuquerque|    KABQ|      ABQ|       ABQ|-106.609001, 35.0...|
| KADW|large_airport|  Joint Base Andrews|         280|       NA|         US|     US-MD|Camp Springs|    KADW|      ADW|       ADW|-76.866997, 38.81...|
| KAFW|large_airport|Fort Worth Allian...|         722|       NA|         US|     US-TX|  Fort Worth|    KAFW|      AFW|       AFW|-97.3188018799000...|
+-----+-------------+--------------------+------------+---------+-----------+-----

Split coordinates into latitude/longitude

In [26]:
dfAirports = dfAirports.withColumn('coord', udfExtractLatLongFromCoords(col('coordinates')))
dfAirports.show(3)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|              coord|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-------------------+
| KABQ|large_airport|Albuquerque Inter...|        5355|       NA|         US|     US-NM| Albuquerque|    KABQ|      ABQ|       ABQ|-106.609001, 35.0...|{-106.609, 35.0402}|
| KADW|large_airport|  Joint Base Andrews|         280|       NA|         US|     US-MD|Camp Springs|    KADW|      ADW|       ADW|-76.866997, 38.81...| {-76.867, 38.8108}|
| KAFW|large_airport|Fort Worth Allian...|         722|       NA|         US|     US-TX|  Fort Worth|    KAFW|      AFW|       AFW|-97.

Extract state from iso_region

In [27]:
dfAirports = dfAirports.withColumn('state', udfExtractStateFromRegion(col('iso_region')))
dfAirports.show(3)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-------------------+-----+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|              coord|state|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-------------------+-----+
| KABQ|large_airport|Albuquerque Inter...|        5355|       NA|         US|     US-NM| Albuquerque|    KABQ|      ABQ|       ABQ|-106.609001, 35.0...|{-106.609, 35.0402}|   nm|
| KADW|large_airport|  Joint Base Andrews|         280|       NA|         US|     US-MD|Camp Springs|    KADW|      ADW|       ADW|-76.866997, 38.81...| {-76.867, 38.8108}|   md|
| KAFW|large_airport|Fort Worth Allian...|         722|       NA|         US|     US-TX|  Fort Worth|    

Compute elevation in meters, since this is easier to use.

In [28]:
dfAirports = dfAirports.withColumn('elevation', (col('elevation_ft') * 0.3048).cast('integer'))
dfAirports.show(3)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-------------------+-----+---------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|              coord|state|elevation|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+-------------------+-----+---------+
| KABQ|large_airport|Albuquerque Inter...|        5355|       NA|         US|     US-NM| Albuquerque|    KABQ|      ABQ|       ABQ|-106.609001, 35.0...|{-106.609, 35.0402}|   nm|     1632|
| KADW|large_airport|  Joint Base Andrews|         280|       NA|         US|     US-MD|Camp Springs|    KADW|      ADW|       ADW|-76.866997, 38.81...| {-76.867, 38.8108}|   md|       85|
| KAFW|large_airport|Fort Worth Allian...|         722|

Select final columns

In [29]:
dfAirportsFinal = dfAirports.select(lower(col('name')).alias('name'), lower(col('municipality')).alias('city'), lower(col('state')).alias('state'), 'coord.latitude', 'coord.longitude', 'elevation')
dfAirportsFinal.show(3)

+--------------------+------------+-----+--------+---------+---------+
|                name|        city|state|latitude|longitude|elevation|
+--------------------+------------+-----+--------+---------+---------+
|albuquerque inter...| albuquerque|   nm|-106.609|  35.0402|     1632|
|  joint base andrews|camp springs|   md| -76.867|  38.8108|       85|
|fort worth allian...|  fort worth|   tx|-97.3188|  32.9876|      220|
+--------------------+------------+-----+--------+---------+---------+
only showing top 3 rows



As we can see now several cities have more than one airport which is a problem as we can only associate the airport with the immigration event by city. 

However assuming that these airports are all pretty close to the city, we can just select any of them when there is more than one. In doing so we can still make use of the elevation or latitude/longitude data.

In [30]:
dfAirportsFinal.groupBy('city').agg(count(col('name')).alias('count')).orderBy(desc(col('count'))).show(5)

+----------+-----+
|      city|count|
+----------+-----+
|  columbus|    3|
|  new york|    3|
|charleston|    2|
|   chicago|    2|
|   spokane|    2|
+----------+-----+
only showing top 5 rows



## Step 3: Define the Data Model
### 3.1 Conceptual Data Model

The following graphic shows the target data model:

![Immigration Data Model](data-model.png "Immigration Data Model")

With this data model we should be able to answer the following questions:

- from where are people primarily immigrating to the US
- what is the primary reason of immigration
- what is the average age of immigrants
- which cities are handling the most immigrants and by which way of travel
- by which way of travel do people come to the US
- does the way of travel have influence on immigrants age
- show number of immigrations on a map using the airports latitude and longitude
- which airlines transport most of the immigrants


### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

#### Build dimPerson

Extract person columns from the *dfImmigrationFinal* dataset and generate a person_id by hashing all the columns.

In [31]:
dfPerson = dfImmigrationFinal \
            .select(col('person_birth').alias('birth_year'), col('person_gender').alias('gender'), col('person_country').alias('country')) \
            .drop_duplicates()

dfPerson = dfPerson.withColumn('person_id', sha2(concat(*(col(c).cast("string") for c in dfPerson.columns)), 256)) \
            
dfPerson.show(3)

+----------+------+------------+--------------------+
|birth_year|gender|     country|           person_id|
+----------+------+------------+--------------------+
|      1945|     f|saudi arabia|d26ccba4dab8785ac...|
|      1948|     f|      israel|e34309c483fb02c4f...|
|      1963|     m|      jordan|27834ff344cfd8099...|
+----------+------+------------+--------------------+
only showing top 3 rows



#### Build dimDestination

Extract destination columns from the *dfImmigrationFinal* dataset and join with the *dfCitiesFinal* dataset

In [32]:
dfDestination = dfImmigrationFinal \
                .select(col('destination_city').alias('city'), col('destination_state').alias('state')) \
                .drop_duplicates()

dfDestination = dfDestination.withColumn('destination_id', sha2(concat(*(col(c).cast("string") for c in dfDestination.columns)), 256)) \
                .join(dfCitiesFinal, ['city', 'state'], 'left')

In [33]:
dfDestination.filter(~dfDestination.median_age.isNull()).show(5)
dfDestination.count()

+---------+-----+--------------------+----------+----------------+--------+--------------+---------+
|     city|state|      destination_id|median_age|total_population|male_pct|household_size|white_pct|
+---------+-----+--------------------+----------+----------------+--------+--------------+---------+
|  ontario|   ca|ea618d825f855e9a9...|      31.0|          171200|     0.5|          3.52|     0.44|
|nashville|   tn|c991adef1dbe9df15...|      34.1|          654596|    0.48|          2.39|     0.66|
|   mobile|   al|66c26273272bf325d...|      38.0|          194305|    0.47|           2.4|     0.48|
|    omaha|   ne|5954c975c184568aa...|      34.2|          443887|    0.49|          2.47|      0.8|
|   fresno|   ca|7aa097d84d817b240...|      30.0|          520072|    0.49|          3.12|     0.63|
+---------+-----+--------------------+----------+----------------+--------+--------------+---------+
only showing top 5 rows



282

#### Build dimFlight

Extract the airline and flightnumber from the *dfImmigrationFinal* dataset and remove rows where airline is null.

In [34]:
dfFlight = dfImmigrationFinal \
            .select('airline', 'flightnumber') \
            .drop_duplicates() \
            .filter(~col('airline').isNull()) 
            
dfFlight = dfFlight.withColumn('flight_id', sha2(concat(*(col(c).cast("string") for c in dfFlight.columns)), 256))
dfFlight.show(3)
dfFlight.count()

+-------+------------+--------------------+
|airline|flightnumber|           flight_id|
+-------+------------+--------------------+
|     ua|       01011|24f95d9f73a966f6c...|
|     af|       00066|3ffda692354ff1379...|
|     tn|       00007|eba44be24ebb81276...|
+-------+------------+--------------------+
only showing top 3 rows



10393

#### Build dimAirport

Extract destination_city and destination_state and join with *dfAirportsFinal*. Since some cities have more than one airport we remove duplicate city,state. Note that we do not care about getting the "correct" airport here.

In [35]:
dfAirport = dfImmigrationFinal \
            .select(col('destination_city').alias('city'), col('destination_state').alias('state')) \
            .drop_duplicates()

dfAirport = dfAirport.withColumn('airport_id', sha2(concat(*(col(c).cast("string") for c in dfAirport.columns)), 256)) \
            .join(dfAirportsFinal, ['city', 'state']) \
            .drop_duplicates(['city', 'state']) \
            .drop('city', 'state')
dfAirport.orderBy('city').show(5)
dfAirport.count()

+--------------------+--------------------+--------+---------+---------+
|          airport_id|                name|latitude|longitude|elevation|
+--------------------+--------------------+--------+---------+---------+
|ee512688a10599a29...|albuquerque inter...|-106.609|  35.0402|     1632|
|f69324430dbf0a266...|ted stevens ancho...|-149.996|  61.1744|       46|
|67d6630d1a72c1b1b...|hartsfield jackso...|-84.4281|  33.6367|      312|
|46c42052de045246e...|austin bergstrom ...|-97.6699|  30.1945|      165|
|b2bb8df6c0f8da84c...|baltimore/washing...|-76.6683|  39.1754|       44|
+--------------------+--------------------+--------+---------+---------+
only showing top 5 rows



67

#### Build factImmigration

Generate id columns for destination, airport, flight and person by hashing appropriate columns

In [36]:
dfFactImmigration = dfImmigrationFinal \
                    .withColumn('age', when(col('age') <= 0, None).otherwise(col('age'))) \
                    .withColumn('destination_id', sha2(concat(col('destination_city'), col('destination_state')), 256)) \
                    .withColumn('airport_id', sha2(concat(col('destination_city'), col('destination_state')), 256)) \
                    .withColumn('flight_id', sha2(concat(col('airline'), col('flightnumber')), 256)) \
                    .withColumn('person_id', sha2(concat(col('person_birth'), col('person_gender'), col('person_country')), 256)) \
                    .drop('destination_city', 'destination_state', 'person_birth', 'person_gender', 'person_country', 'airline', 'flightnumber')

dfFactImmigration.show(3)
dfFactImmigration.count()

+--------------+------------+----+--------+---+--------------------+--------------------+--------------------+--------------------+
|immigration_id|arrival_date|mode|  reason|age|      destination_id|          airport_id|           flight_id|           person_id|
+--------------+------------+----+--------+---+--------------------+--------------------+--------------------+--------------------+
|       5748517|  2016-04-30| air|business| 40|62cce3579bba26a81...|62cce3579bba26a81...|dff2727479de367c5...|76b293ad639b2757b...|
|       5748518|  2016-04-30| air|business| 32|62cce3579bba26a81...|62cce3579bba26a81...|9a3502d3d068dd909...|77032703ae77d5dd6...|
|       5748519|  2016-04-30| air|business| 29|62cce3579bba26a81...|62cce3579bba26a81...|293d30efcb1fc3ca7...|9c6c96e256cf1ee8a...|
+--------------+------------+----+--------+---+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



3096312

#### Create tables

In [37]:
dfPerson.createOrReplaceTempView('person')
dfFlight.createOrReplaceTempView('flight')
dfAirport.createOrReplaceTempView('airport')
dfDestination.createOrReplaceTempView('destination')
dfFactImmigration.createOrReplaceTempView('immigration')

#### Example queries

##### Top immigration origins

In [38]:
spark.sql("""
select country, mode, reason, avg(age) as avg_age, count(*) as total
from immigration i
join person p on (i.person_id = p.person_id)
where arrival_date = '2016-04-30'
group by country, mode, reason
order by total desc
""").show(5)

+--------------------+----+--------+------------------+-----+
|             country|mode|  reason|           avg_age|total|
+--------------------+----+--------+------------------+-----+
|      united kingdom| air|pleasure| 42.41161616161616| 9114|
|               japan| air|pleasure|           39.9512| 7502|
|mexico air sea, a...| air|pleasure| 42.52473369327302| 6854|
|             germany| air|pleasure|41.653202140514416| 5793|
|          china, prc| air|pleasure| 44.87930726655954| 5601|
+--------------------+----+--------+------------------+-----+
only showing top 5 rows



##### Top immigration cities and country of origins for sea travel

In [39]:
spark.sql("""
select city, state, country, count(*) as total
from immigration i
join destination d on (i.destination_id = d.destination_id)
join person p on (i.person_id = p.person_id)
where mode = 'sea' and arrival_date = '2016-04-30'
group by city, state, country
order by total desc
""").show(5)

+----------+------+--------------+-----+
|      city| state|       country|total|
+----------+------+--------------+-----+
|hakai pass|canada|     australia|  193|
|hakai pass|canada|united kingdom|   99|
|hakai pass|canada|    china, prc|   78|
|     miami|    fl|united kingdom|   53|
|hakai pass|canada|         japan|   47|
+----------+------+--------------+-----+
only showing top 5 rows



##### Top countries of origin

In [40]:
spark.sql("""
   select country, count(*) as total
   from immigration i
   join person p on (i.person_id = p.person_id)
   where arrival_date = '2016-04-15'
   group by country
   order by total desc
""").show(5)

+--------------------+-----+
|             country|total|
+--------------------+-----+
|      united kingdom| 9244|
|              france| 6945|
|mexico air sea, a...| 6430|
|          china, prc| 5736|
|               japan| 5736|
+--------------------+-----+
only showing top 5 rows



## Step 4: Run Pipelines to Model the Data 
### 4.1 Creating the data model with airflow

All pyspark code to build the target data model can be found in *airflow/spark*. The pyspark code will be copied to and executed on the spark cluster using an airflow SSH operator.

![Immigration Data Pipeline](data-pipeline.png "Immigration Data Pipeline")

### 4.2 Data Quality Checks


Data quality checks are implemented in *airflow/spark/data_quality_check.py*.

#### Airports must be unique

In [41]:
result = dfAirport.groupBy('name').count().orderBy(desc('count')).first()
assert result[1] == 1, "Airport name must be unique"

#### All age values must be > 0 or null

In [42]:
result = dfFactImmigration.where(col('age') <= 0).first()
assert result is None, "Age must not be <= 0"

#### Returns results for age > 30

In [43]:
result = dfFactImmigration.where(col('age') >  30).count()
assert result > 30, "Expected results for age > 30"

#### Airline and flightnumber must be unique

In [44]:
result = dfFlight.groupBy('airline', 'flightnumber').count().orderBy(desc('count')).first()
assert result[2] == 1, "Flight airline and flightnumber must be unique"

#### United Kingdom should be in top 10 or origin countries

Since the overwhelming part of immigrants is from the UK, 'united kingdom' should always be in the top 10 of immigrant origin countries.

In [45]:
result = spark.sql("""
   select country, count(*) as total
   from immigration i
   join person p on (i.person_id = p.person_id)
   where arrival_date = '2016-04-20'
   group by country
   order by total desc
""")

first10 = result.take(10)

assert "united kingdom" in map(lambda x: x[0], first10), "Expected united kingdom in top 10 origin countries"


### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [52]:
def create_data_dictionary():    
    schema = StructType([ \
        StructField("name", StringType(), True), \
        StructField("type", StringType(), True), \
        StructField("description", StringType(),True), \
    ])
    data = [
        ("immigration.immigration_id", "string", "unique id identifying an immigration event. derived from: i94 cicid converted to integer"),
        ("immigration.arrival_date", "date", "date of arrival to us. derived from: i94 dtadfile"),
        ("immigration.mode", "string", "travel mode used in the immigration event. derived from: i94 mode. values are air, sea, land, not reported"),
        ("immigration.reason", "string", "the reason for the immigration. derived from: i94 visa. values are: business, pleasure, student"),
        ("immigration.age", "integer", "age of the immigrant. derived from: i94 bir. rows with values <= 0 have been converted to null."),
        ("immigration.destination_id", "string", "id referencing an immigration destination"),
        ("immigration.airport_id", "string", "(if mode == 'air') id referencing the destination airport. important note: the reference to the airport is only established via the destination city and state. therefore the airport might NOT reflect the actual airport used by the immigrant if the city has more than one large airport."),
        ("immigration.flight_id", "string", "(if mode == 'air') id referencing the flight taken by the immigrant"),
        ("immigration.person_id", "string", "id referencing immigrant person details"),
        
        ("person.person_id", "string", "unique id. sha2 hash of (birth_year, gender, country)"),
        ("person.birth_year", "integer", "year of birth. derived from: i94 biryear"),
        ("person.gender", "string", "gender. derived from: i94 gender. values are f (female) or m (male)"),
        ("person.country", "string", "origin country name of immigrant person. derived from: i94 res. values are taken from i94cntyl."),
        
        ("airport.airport_id", "string", "sha2 hash of multiple columns to obtain a unique id"),
        ("airport.name", "string", "airport name. derived from: airport-codes.csv name"),
        ("airport.latitude", "float", "latitude of airport location. derived from: airport-codes.csv coordinates"),
        ("airport.longitude", "float", "longitude of airport location. derived from: airport-codes.csv coordinates"),
        ("airport.elevation", "float", "elevation in meters of the airport. derived from: airport-codes.csv elevation_ft"),
        
        ("destination.destination_id", "string", "sha2 hash of (city, state) to obtain a unique id"),
        ("destination.city", "string", "city name. derived from: us-cities-demographics 'City'"),
        ("destination.state", "string", "state the city is located in. derived from: us-cities-demographics 'State Code'"),
        ("destination.median_age", "float", "median age. derived from us-cities-demographics 'Median Age'"),
        ("destination.total_population", "integer", "city total population. derived from us-cities-demographics 'Total Population'"),
        ("destination.male_pct", "float", "percentage of males. derived from: us-cities-demographics 'Male Population' / 'Total Population'"),
        ("destination.white_pct", "float", "percentage of white. derived from: us-cities-demographics 'White Count' / 'Total Population'"),
        ("destination.household_size", "float", "average size of household. derived from: us-cities-demographics 'Average Household Size'"),
         
        ("flight.flight_id", "string", "sha2 hash of (airline, flightnumber) to obtain a unique id"),
        ("flight.airline", "string", "airline iata code. derived from: i94 airline"),
        ("flight.flightnumber", "string", "flight number. derived from i94 fltno"),
    ]

    df = spark.createDataFrame(data=data, schema=schema)
    return df
  

In [53]:
dfDD = create_data_dictionary()
pd.set_option("max_colwidth", 100)
dfDD.toPandas()

Unnamed: 0,name,type,description
0,immigration.immigration_id,string,unique id identifying an immigration event. derived from: i94 cicid converted to integer
1,immigration.arrival_date,date,date of arrival to us. derived from: i94 dtadfile
2,immigration.mode,string,"travel mode used in the immigration event. derived from: i94 mode. values are air, sea, land, no..."
3,immigration.reason,string,"the reason for the immigration. derived from: i94 visa. values are: business, pleasure, student"
4,immigration.age,integer,age of the immigrant. derived from: i94 bir. rows with values <= 0 have been converted to null.
5,immigration.destination_id,string,id referencing an immigration destination
6,immigration.airport_id,string,(if mode == 'air') id referencing the destination airport. important note: the reference to the ...
7,immigration.flight_id,string,(if mode == 'air') id referencing the flight taken by the immigrant
8,immigration.person_id,string,id referencing immigrant person details
9,person.person_id,string,"unique id. sha2 hash of (birth_year, gender, country)"


## Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### Choice of tools

This data pipeline project uses the following tools:
    
- spark and pyspark for ELT
- apache airflow to orchestrate the data pipeline
- AWS EMR as a cloud spark cluster
- AWS S3 as a data store for input and output files
    
I decided to use **pyspark** for data processing because of the disparate nature of the raw data sources (parquet files and csv) and the need for some more complex data transformations (e. g. parsing the i94 port codes). It seems to be a better and more flexible choice than using AWS redshift and sql scripts as the alternative.
Furthermore pyspark is also very scalable and well suited for a data lake approach which would be useful if new data sources (or just new data attributes) need to be incorporated into the pipeline or further requirements regarding the target analytics schema might arise. 

**Airflow** is probably useful in any data pipeline project for pipeline orchestration, handling retries and backfilling or just as a graphical user interface to troubleshoot and monitor pipeline runs.


### Update frequency of data

Generally the update frequency of the data depends on the analytical purposes, so for example if you are looking for a report on immigration to the US once a year then once a year would be enough. However if you want to know what's happening every day or want to go even more realtime like hourly, then of course the pipeline has to run much more frequently.

In [48]:
spark.sql("""
select arrival_date, count(*) as total from immigration
group by arrival_date
order by total desc
""").show(5)

+------------+------+
|arrival_date| total|
+------------+------+
|  2016-04-30|125570|
|  2016-04-29|120497|
|  2016-04-17|119296|
|  2016-04-28|116601|
|  2016-04-15|109746|
+------------+------+
only showing top 5 rows



Looking at the query above we can see that we have about 120k - 130k immigration events to process per day. So daily updates would be no problem at all.

In [49]:
dfPerson.count(), dfDestination.count(), dfAirport.count(), dfFlight.count()

(37685, 282, 67, 10393)

The numbers above show that the dimension tables are quite small. Biggest is dfPerson which will certainly grow with more data. However all in all still no problem to update daily or even more frequently.

### Different scenarios

#### Data increases 100

So data increasing 100x means instead of a 130k we would have 13m daily events. In this case we should definitely switch to daily processing to keep the amount of data relatively small and limited. 
Input immigration data would have to be partitioned by day and put into corresponding source parquet files.
We can then leverage airflow to process the daily input files and append new data to our target analytics database.

Given AWS EMR scalability (more and/or faster cluster nodes) updating the target database daily should pose no problem even with an 100x increase in data. However processing time might increase from a couple of minutes to hours depending on the compute resources in the cluster.

#### Populate a dashboard daily

In order to populate a dashboard daily with immigration data we should switch to daily processing as described above. We can then configure our airflow dag to run nightly and should have the up to date data ready in our dashboard in the morning by 7am.

#### Database accessed by 100+ people

In the extreme case our database would have to handle 100 analytical queries concurrently. Again given the scalability of AWS EMR every user could just start his own EMR cluster, load the target data and start analyzing (assuming all users use jupyter notebooks). This would definitely scale however it would also not be a very cost efficient solution. To improve the situation multiple users (say 10) could share an EMR cluster for their analytics purposes. This seems more like a cost effective solution.

A different scenario would be a web application serving 100+ users and accessing the target database or 100+ users with SQL clients wanting to access the target database. In such cases spark is not the best tool and we could additionally start up a AWS redshift instance and import the target database using the output parquet files on Amazon S3. Our users would then access redshift and perform their queries. With redshift being also very scalable again this should be no problem at all.