# US Immigration Data - warehouse
### Data Engineering Capstone Project

#### Project Summary
In this project we gather data for a sample data warehouse for analysing US immigration data in a simple star schema. The main aim is to provide analysts the possibility to answer business questions using simple tools. The final data is stored in parquet files in local store.. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Importing all required python packages
import pandas as pd
import os
import glob

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

A spark local session is initiated and utilised for data exploration, cleaning and implementation of the ETL process. Parquet files are exported locally after the data analytics and wrangling is done to create dim and fact tables.

In [2]:
#Spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of this project to integrate I94 immigration data with other data sources including world temperature data, US demographic data and airport codes to create a simple star schema data warehouse for analytical purposes. 

Should this task be utilised my a large number of people and loaded regularly, all the necessary steps outlined here should be implemented in a separate .py file.

In this first step data is read into Spark and also to a pandas dataframe for analysis

#### Datasets
1. I94 Immigration data [link](https://www.trade.gov/national-travel-and-tourism-office)
2. World Temperature Data [link](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data)
3. US City Demographic Data [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
4. Airport Codes table [link](https://datahub.io/core/airport-codes#data)
5. Reference tables for abbreviations of country, city and state and lookup tables for abbreviations

#### 1. I94 Immigration Data 
This data comes from the US National Tourism and Trade Office.

In [3]:
# Read in I94 Immigration Data
i94_path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# Pandas df for analytical purposes
df_i94 = pd.read_sas(i94_path, 'sas7bdat', encoding="ISO-8859-1")
# Spark df as staging
spark_i94 = spark.read.format('com.github.saurfang.sas.spark').load(i94_path)
spark_i94.createOrReplaceTempView("stage_immigration")

In [4]:
df_i94.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
spark_i94.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

#### 2. World Temperature Data
This dataset came from Kaggle. The data is by date, country and state as this matched the requirements the most

In [6]:
WT_path = './GlobalLandTemperaturesByState.csv'
# Pandas df for analytical purposes
df_WT = pd.read_csv(WT_path)
# Spark df as staging
spark_WT = spark.read.format("csv").option("header", "true").load(WT_path)
spark_WT.createOrReplaceTempView("stage_temperature")

In [7]:
df_WT.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
0,1855-05-01,25.544,1.171,Acre,Brazil
1,1855-06-01,24.228,1.103,Acre,Brazil
2,1855-07-01,24.371,1.044,Acre,Brazil
3,1855-08-01,25.427,1.073,Acre,Brazil
4,1855-09-01,25.675,1.014,Acre,Brazil


In [8]:
spark_WT.show(5)

+----------+------------------+-----------------------------+-----+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|
+----------+------------------+-----------------------------+-----+-------+
|1855-05-01|            25.544|                        1.171| Acre| Brazil|
|1855-06-01|            24.228|                        1.103| Acre| Brazil|
|1855-07-01|            24.371|                        1.044| Acre| Brazil|
|1855-08-01|            25.427|                        1.073| Acre| Brazil|
|1855-09-01|            25.675|                        1.014| Acre| Brazil|
+----------+------------------+-----------------------------+-----+-------+
only showing top 5 rows



#### 3. U.S. City Demographic Data
This data comes from OpenSoft.

In [9]:
DD_path = './us-cities-demographics.csv'
# Pandas df for analytical purposes
df_DD = pd.read_csv(DD_path, delimiter=";")
# Spark df as staging
spark_DD = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(DD_path)
spark_DD.createOrReplaceTempView("stage_demography")

In [10]:
df_DD.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [11]:
spark_DD.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

#### 4. Airport Code Table
This is a simple table of airport codes and corresponding cities. 

In [12]:
AC_path = './airport-codes_csv.csv'
# Pandas df for analytical purposes
df_AC = pd.read_csv(AC_path)
# Spark df as staging
spark_AC = spark.read.format("csv").option("header", "true").load(AC_path)
spark_AC.createOrReplaceTempView("stage_airport")

In [13]:
df_AC.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [14]:
spark_AC.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

#### 5. Reference Tables
Lookup tables for further use in datamodel. These are partly provided or derived from the data or created by me.

In [15]:
# read description file
RT_path = './I94_SAS_Labels_Descriptions.SAS'
with open(RT_path) as fl:
    SAS_file = fl.readlines()

In [16]:
# country reference file
country_reference = dict()
for countries in SAS_file[10:298]:
    values = countries.split('=')
    code, country = values[0].strip(), values[1].strip().strip("'")
    country_reference[code] = country

spark_RT_country = spark.createDataFrame(country_reference.items(), ['code', 'country'])

spark_RT_country.createOrReplaceTempView("ref_country")

In [17]:
spark_RT_country.show(5)

+----+-----------+
|code|    country|
+----+-----------+
| 236|AFGHANISTAN|
| 101|    ALBANIA|
| 316|    ALGERIA|
| 102|    ANDORRA|
| 324|     ANGOLA|
+----+-----------+
only showing top 5 rows



In [18]:
# city reference file
city_reference = dict()
for cities in SAS_file[303:962]:
    values = cities.split('=')
    code, city = values[0].strip("\t").strip().strip("'"), values[1].strip("\t").strip().strip("''")
    city_reference[code] = city

spark_RT_city = spark.createDataFrame(city_reference.items(), ['code', 'city'])

spark_RT_city.createOrReplaceTempView("ref_city")

In [19]:
spark_RT_city.show(5)

+----+--------------------+
|code|                city|
+----+--------------------+
| ANC|ANCHORAGE, AK    ...|
| BAR|BAKER AAF - BAKER...|
| DAC|DALTONS CACHE, AK...|
| PIZ|DEW STATION PT LA...|
| DTH|DUTCH HARBOR, AK ...|
+----+--------------------+
only showing top 5 rows



In [20]:
# state reference file
state_reference = dict()
for states in SAS_file[982:1036]:
    values = states.split('=')
    code, state = values[0].strip("\t").strip("'"), values[1].strip().strip("'")
    state_reference[code] = state

spark_RT_state = spark.createDataFrame(state_reference.items(), ['code', 'state'])

spark_RT_state.createOrReplaceTempView("ref_state")

In [21]:
spark_RT_state.show(5)

+----+----------+
|code|     state|
+----+----------+
|  AK|    ALASKA|
|  AZ|   ARIZONA|
|  AR|  ARKANSAS|
|  CA|CALIFORNIA|
|  CO|  COLORADO|
+----+----------+
only showing top 5 rows



In [22]:
iata_reference = spark.sql(
"""
    SELECT DISTINCT 
        a.iata_code
    FROM stage_airport AS a
    WHERE a.iata_code IS NOT NULL
        AND a.iso_country LIKE 'US'
        AND a.type IN ('large_airport', 'medium_airport', 'small_airport')
"""
)

In [23]:
iata_reference.show(5)

+---------+
|iata_code|
+---------+
|      BZT|
|      BGM|
|      CNU|
|      CRS|
|      KEB|
+---------+
only showing top 5 rows



In [24]:
iata_reference.createOrReplaceTempView("tmp_iata_airline")

In [25]:
iata_reference = spark.sql(
"""
    SELECT DISTINCT 
        a.i94port
    FROM stage_immigration AS a
    WHERE a.i94port IN (
        SELECT b.iata_code
        FROM tmp_iata_airline AS b)

"""
)

In [26]:
iata_reference.show(5)

+-------+
|i94port|
+-------+
|    FMY|
|    BGM|
|    DNS|
|    FOK|
|    HVR|
+-------+
only showing top 5 rows



In [27]:
iata_reference.createOrReplaceTempView("tmp_iata_immigration")

As the last step the initial data loads to stage tables are checked - the simple row counts are gathered

In [28]:
# stage & reference tables check
rows =spark.sql(
"""
    SELECT
        'stage_immigration' AS tbl
        , count(*) AS rowcount
    FROM stage_immigration AS a 
    
    UNION
    
    SELECT
        'stage_temperature' AS tbl
        , count(*) AS rowcount
    FROM stage_temperature AS b 
    
    UNION
    
    SELECT
        'stage_airport' AS tbl
        , count(*) AS rowcount
    FROM stage_airport AS b 
    
    UNION
    
    SELECT
        'stage_demography' AS tbl
        , count(*) AS rowcount
    FROM stage_demography AS b 
    
    UNION
    
    SELECT
        'ref_country' AS tbl
        , count(*) AS rowcount
    FROM ref_country AS b 
    
    UNION
    
    SELECT
        'ref_city' AS tbl
        , count(*) AS rowcount
    FROM ref_city AS b 
    
    UNION
    
    SELECT
        'ref_state' AS tbl
        , count(*) AS rowcount
    FROM ref_state AS b 
""")

In [29]:
rows.show()

+-----------------+--------+
|              tbl|rowcount|
+-----------------+--------+
|stage_immigration| 3096313|
|         ref_city|     659|
|stage_temperature|  645675|
|    stage_airport|   55075|
|      ref_country|     288|
| stage_demography|    2891|
|        ref_state|      54|
+-----------------+--------+



In [30]:
# write stage tables to parquet files
#fact_immigration.write.mode("overwrite").partitionBy('state_code')\
#                    .parquet(path=output_data + 'fact_immigration')
#spark_i94.write.mode("overwrite").parquet("stage_immigration")
#spark_WT.write.mode("overwrite").parquet("stage_temperature")
#spark_DD.write.mode("overwrite").parquet("stage_demography")
#spark_AC.write.mode("overwrite").parquet("stage_airport")

### Step 2: Explore and Assess the Data
#### 2.1 Explore the Data 
Data exploration is carried out here.

#### 2.1.1 Explore i94 Data
Explore i94 data

How many rows are there?

In [31]:
tmp =spark.sql(
"""
    SELECT count(*) AS rownum
    FROM stage_immigration AS a 
""")

In [32]:
tmp.show()

+-------+
| rownum|
+-------+
|3096313|
+-------+



What dates does this data run for?

In [33]:
tmp =spark.sql(
"""
    SELECT min(date_add('1960-01-01', a.arrdate)) AS min_arrdate, max(date_add('1960-01-01', a.arrdate)) AS max_arrdate
    FROM stage_immigration AS a 
""")

In [34]:
tmp.show()

+-----------+-----------+
|min_arrdate|max_arrdate|
+-----------+-----------+
| 2016-04-01| 2016-04-30|
+-----------+-----------+



How many rows have invalid state codes?

In [41]:
# number of rows with invalid state codes
tmp = spark.sql(
"""
    SELECT COUNT(*) FROM (
    SELECT *
    FROM stage_immigration AS a
    WHERE a.i94addr NOT IN (
        SELECT DISTINCT
            b.`State Code`
        FROM stage_demography AS b
        )
        ) s
"""       
)

In [42]:
tmp.show()

+--------+
|count(1)|
+--------+
|  123652|
+--------+



what are these? is it safe to delete?

In [43]:
# invalid rows
tmp = spark.sql(
"""

    SELECT a.i94addr
        , count(*)
    FROM stage_immigration AS a
    WHERE a.i94addr NOT IN (
        SELECT DISTINCT
            b.`State Code`
        FROM stage_demography AS b
        )
    GROUP BY a.i94addr
    ORDER BY 2 DESC
    

"""       
)

In [44]:
tmp.show(5)

+-------+--------+
|i94addr|count(1)|
+-------+--------+
|     GU|   94107|
|     US|    7421|
|     MP|    4505|
|     VQ|    4503|
|     UN|    1522|
+-------+--------+
only showing top 5 rows



are there rows with null departure date?

In [66]:
tmp =spark.sql(
"""
    SELECT count(*) AS rownum
    FROM stage_immigration AS a 
    WHERE a.arrdate IS NULL
""")

In [67]:
tmp.show(5)

+------+
|rownum|
+------+
|     0|
+------+



are there rows where dept date is later than arrival date?

In [68]:
tmp =spark.sql(
"""
    SELECT count(*) AS rownum
    FROM stage_immigration AS a 
    WHERE a.arrdate > a.depdate
""")

In [69]:
tmp.show(5)

+------+
|rownum|
+------+
|   375|
+------+



#### 2.1.2 Explore World Temperature Data
Explore World Temperature Data

In [53]:
spark_WT.show(5)

+----------+------------------+-----------------------------+-----+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|
+----------+------------------+-----------------------------+-----+-------+
|1855-05-01|            25.544|                        1.171| Acre| Brazil|
|1855-06-01|            24.228|                        1.103| Acre| Brazil|
|1855-07-01|            24.371|                        1.044| Acre| Brazil|
|1855-08-01|            25.427|                        1.073| Acre| Brazil|
|1855-09-01|            25.675|                        1.014| Acre| Brazil|
+----------+------------------+-----------------------------+-----+-------+
only showing top 5 rows



number of relevant data (size)

In [56]:
tmp =spark.sql(
"""
    SELECT count(*) AS rowcount
    FROM stage_temperature AS a 
    WHERE Country LIKE 'United States'
        AND dt > '2000-01-01'
""")

In [57]:
tmp.show()

+--------+
|rowcount|
+--------+
|    8364|
+--------+



Check the minimum and maximum dates that the dataset runs for

In [48]:
tmp =spark.sql(
"""
    SELECT min(a.dt), max(a.dt)
    FROM stage_temperature AS a 
""")

In [49]:
tmp.show()

+----------+----------+
|   min(dt)|   max(dt)|
+----------+----------+
|1743-11-01|2013-09-01|
+----------+----------+



The WT data is only up until Q4 2013. As this is a demo database, in order to match the i94 data, I have selected 2012 and shifted it to 2016. Later a new database source for WT data should be researched and utilised

Does this data seem feasible? Does it have outliers? 

In [50]:
tmp =spark.sql(
"""
    SELECT min(a.AverageTemperature), max(a.AverageTemperature)
    FROM stage_temperature AS a 
""")

In [51]:
tmp.show()

+-----------------------+-----------------------+
|min(AverageTemperature)|max(AverageTemperature)|
+-----------------------+-----------------------+
|   -0.00099999999999...|                  9.999|
+-----------------------+-----------------------+



Looks like no outliers. Also mark, that the data is for months, so the keys should be created for that. 

The data will be aggregated using GROUP BY so no need to check for duplicates - it will be handled automatically.

#### 2.1.4 Explore Airline Data
Explore Airline Data

Sample data

In [31]:
tmp = spark.sql(
    """
    SELECT *
    FROM stage_airport AS a
    WHERE iso_country LIKE 'US'
    """
)

In [32]:
tmp.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

Number of relevant rows

In [58]:
tmp = spark.sql(
    """
    SELECT count(*)
    FROM stage_airport AS a
    WHERE iso_country LIKE 'US'
    """
)

In [59]:
tmp.show()

+--------+
|count(1)|
+--------+
|   22757|
+--------+



Types of airports

In [33]:
tmp = spark.sql(
"""
    SELECT DISTINCT 
        a.type
    FROM stage_airport AS a
    WHERE a.iata_code IS NOT NULL
        AND a.iso_country LIKE 'US'

"""
)

In [34]:
tmp.show()

+--------------+
|          type|
+--------------+
| large_airport|
| seaplane_base|
|      heliport|
|        closed|
|medium_airport|
| small_airport|
+--------------+



Missing airline codes

In [None]:
# how many records have no valid airline code?
tmp = spark.sql(
"""
    SELECT count(*)
    FROM stage_immigration b
    WHERE b.i94port NOT IN (
        SELECT DISTINCT 
            c.i94port
        FROM tmp_iata_immigration AS c
        )

"""
)

In [37]:
tmp.show()

+--------+
|count(1)|
+--------+
| 2059350|
+--------+



In [55]:
# what are these?
tmp = spark.sql(
"""
    SELECT b.i94port, 
        count(*)
    FROM stage_immigration b
    WHERE b.i94port NOT IN (
        SELECT DISTINCT 
            c.i94port
        FROM tmp_iata_immigration AS c
        )
    GROUP BY b.i94port
    ORDER BY 2 DESC

"""
)

In [56]:
tmp.show(10)

+-------+--------+
|i94port|count(1)|
+-------+--------+
|    NYC|  485916|
|    LOS|  310163|
|    SFR|  152586|
|    ORL|  149195|
|    HHW|  142720|
|    CHI|  130564|
|    FTL|   95977|
|    LVG|   89280|
|    AGA|   80919|
|    WAS|   74835|
+-------+--------+
only showing top 10 rows



#### 2.1.4 Explore Demography Data
Explore Demography Data

number of relevant rows

In [60]:
tmp = spark.sql(
    """
    SELECT count(*)
    FROM stage_demography AS a
    """
)

In [61]:
tmp.show()

+--------+
|count(1)|
+--------+
|    2891|
+--------+



are there missing data in total population?

In [64]:
tmp = spark.sql(
    """
    SELECT count(*) AS rownum
    FROM stage_demography AS a
        WHERE a.`Total Population` IS NULL
    """
)

In [65]:
tmp.show()

+------+
|rownum|
+------+
|     0|
+------+



#### 2.2 Cleaning Steps
Document steps necessary to clean the data

#### 2.2.1 Cleaning i94 Immigration Data
Cleaning steps for i94 data

The i94 data arrival and departure dates should be converted to dates

In [35]:
# dates are created
spark_i94 = spark_i94.withColumn('arrdate', f.expr("date_add('1960-01-01', arrdate)"))
spark_i94 = spark_i94.withColumn('depdate', f.expr("date_add('1960-01-01', depdate)"))

In [36]:
spark_i94 = spark_i94.withColumn('reference_date', f.expr("date_add(arrdate, -day(arrdate)+1)"))

In [37]:
spark_i94.show(5)

+-----+------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|reference_date|
+-----+------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+--------------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|2016-04-29|   null|   null|      null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|    2016-04-01|
|  7.0|2016.0|   4.0| 254.0|

In [38]:
spark_i94.createOrReplaceTempView("stage_immigration")

In [39]:
# remove data with invalid state codes
spark_i94 = spark.sql(
"""
    SELECT *
    FROM stage_immigration AS a
    WHERE a.i94addr IN (
        SELECT DISTINCT
            b.`State Code`
        FROM stage_demography AS b
        )
"""       
)

In [40]:
spark_i94.createOrReplaceTempView("stage_immigration")

In [41]:
# remove data with invalid arrival and departure dates
spark_i94 = spark.sql(
"""
    SELECT *
    FROM stage_immigration AS a
    WHERE a.arrdate < a.depdate
"""       
)

In [42]:
spark_i94.createOrReplaceTempView("stage_immigration")

In [43]:
tmp = spark.sql(
"""
    SELECT count(*)
    FROM stage_immigration

"""
)

In [44]:
tmp.show()

+--------+
|count(1)|
+--------+
| 2703145|
+--------+



#### 2.2.2 Cleaning World Temperature Data
Cleaning steps for World Temperature Data

The World Temperature Data should be filtered for the reference date and for United States. Also as demonstration purposes the dates are shifted to match i94 data. This should not be done in a live database though!

Any duplicates should be handled in dim_table creation - aggregation is done there

In [45]:
# filter for US and for a given year
spark_WT =spark.sql(
"""
    SELECT
        add_months(b.dt, 48) as dt
        , b.AverageTemperature
        , b.AverageTemperatureUncertainty
        , b.State
        , b.Country
    FROM (
        SELECT a.dt
            , a.AverageTemperature
            , a.AverageTemperatureUncertainty
            , a.State
            , a.Country
        FROM stage_temperature AS a 
        WHERE a.Country LIKE 'United States'
            AND year(a.dt) = 2012
        ) b
""")

In [46]:
spark_WT.show(5)

+----------+------------------+-----------------------------+-------+-------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|  State|      Country|
+----------+------------------+-----------------------------+-------+-------------+
|2016-01-01|             10.29|                        0.303|Alabama|United States|
|2016-02-01|            11.405|                        0.201|Alabama|United States|
|2016-03-01|            18.401|                        0.124|Alabama|United States|
|2016-04-01|            18.675|                        0.265|Alabama|United States|
|2016-05-01|            23.512|                        0.225|Alabama|United States|
+----------+------------------+-----------------------------+-------+-------------+
only showing top 5 rows



In [47]:
spark_WT.createOrReplaceTempView("stage_temperature")

States need to be matched with that of i94 as the spelling is different. A lookup table is loaded to the workspace for this purpose.

In [48]:
# match states with i94
distinct_states =spark.sql(
"""
    SELECT DISTINCT
        a.State
    FROM stage_temperature AS a 
    WHERE a.Country LIKE 'United States'
    ORDER BY 1
""")

In [49]:
distinct_states.show(5)

+----------+
|     State|
+----------+
|   Alabama|
|    Alaska|
|   Arizona|
|  Arkansas|
|California|
+----------+
only showing top 5 rows



In [50]:
WT_lookup_path = './states_i94_weather.csv'
# Pandas df for analytical purposes
df_lookup_WT = pd.read_csv(WT_lookup_path)
# Spark df as staging
spark_lookup_WT = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(WT_lookup_path)
spark_lookup_WT.createOrReplaceTempView("ref_temperature_states")

In [51]:
spark_lookup_WT.show(5)

+----+----------+-------------+
|code| state_i94|state_weather|
+----+----------+-------------+
|  AL|   ALABAMA|      Alabama|
|  AK|    ALASKA|       Alaska|
|  AZ|   ARIZONA|      Arizona|
|  AR|  ARKANSAS|     Arkansas|
|  CA|CALIFORNIA|   California|
+----+----------+-------------+
only showing top 5 rows



In [52]:
spark_WT =spark.sql(
"""
    SELECT a.dt
        , b.state_i94 as state
        , b.code as state_code
        , a.Country
        , a.AverageTemperature
        , a.AverageTemperatureUncertainty
    FROM stage_temperature AS a
    LEFT JOIN ref_temperature_states AS b
        ON a.State = b.state_weather
""")

In [53]:
spark_WT.show(5)

+----------+-------+----------+-------------+------------------+-----------------------------+
|        dt|  state|state_code|      Country|AverageTemperature|AverageTemperatureUncertainty|
+----------+-------+----------+-------------+------------------+-----------------------------+
|2016-01-01|ALABAMA|        AL|United States|             10.29|                        0.303|
|2016-02-01|ALABAMA|        AL|United States|            11.405|                        0.201|
|2016-03-01|ALABAMA|        AL|United States|            18.401|                        0.124|
|2016-04-01|ALABAMA|        AL|United States|            18.675|                        0.265|
|2016-05-01|ALABAMA|        AL|United States|            23.512|                        0.225|
+----------+-------+----------+-------------+------------------+-----------------------------+
only showing top 5 rows



In [54]:
spark_WT.createOrReplaceTempView("stage_temperature")

#### 2.2.4 Cleaning Airport Data
Cleaning steps for Airport Data

Only need US Airports that are large or medium and that have a match in immigration data

In [55]:
spark_AC = spark.sql(
    """
    SELECT *
    FROM stage_airport AS a
    WHERE a.type IN ('large_airport', 'medium_airport', 'small_airport')
        AND a.iso_country LIKE 'US'
        AND a.iata_code IS NOT NULL
   
    """
    )

In [56]:
spark_AC.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|    Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|
| 0CO2|small_airport|Crested Butte Air...|        8980|       NA|         US|     US-CO|Crested Butte|    0CO2|      CSE|      0CO2|-106.928341, 38.8...|
| 0TE7|small_airport|   LBJ Ranch Airport|        1515|       NA|         US

In [57]:
spark_AC.createOrReplaceTempView("stage_airport")

#### 2.2.5 Cleaning Demography Data
Cleaning steps for Demography Dataset

No steps needed

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The selected data model for business analysis is the star schema. Star schema is the most suitable analytical data model for analysing a single business process - in this case the immigration to the US. This methodology is developed as per Kimball methodology.

What is the business process? What is the smallest granularity? What are the factors that impact upon the business process itself?

A single Fact table is created with the immigration event as granularity.

Multiple Dimension tables are created that allow business analysts and professionals to analyse the process of immigration. Dimension tables are for filtering and slicing and dicing of events.

```
├── FACT
│   └── immigration
└── DIMENSIONS
    ├── immigrant
    ├── airline
    ├── demography
    ├── temperature
    └── airport

```
The schema of the final data modell is displayed in the project markdown document.

#### 3.2 Mapping Out Data Pipelines
The steps for the data pipeline is mapped as follows:

* Load the data into Spark staging tables
* Clean data using Spark
* Create Dimension tables using Spark SQL and create unique IDs for Dimension tables
* Create Fact table with uniqe IDs and map Dimension table IDs as foreign keys.
* Write data into parquet files
* Perform data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Dimension tables and Fact table is created using Spark. Data is exported as parquet files for analytical purposes. 

#### 4.1.1 Dim Immigrant
The Dim Immigrant table is created and loaded as a spark temp view

In [58]:
# select required dim data for immigrant
dim_immigrant = spark.sql(
"""
    SELECT DISTINCT
        int(a.cicid) AS cic_id
        , b.country AS citizen_country
        , c.country AS residence_country
        , int(a.biryear) AS birth_year
        , a.gender AS gender
        , int(a.insnum) AS ins_num
    FROM stage_immigration AS a 
    LEFT JOIN ref_country b ON
        a.i94cit = b.code
    LEFT JOIN ref_country c ON
        a.i94res = c.code
    
""")

# add primary key as running integer
dim_immigrant = dim_immigrant.withColumn('immigrant_id', f.monotonically_increasing_id())

In [59]:
dim_immigrant.show(5)

+------+---------------+-----------------+----------+------+-------+------------+
|cic_id|citizen_country|residence_country|birth_year|gender|ins_num|immigrant_id|
+------+---------------+-----------------+----------+------+-------+------------+
|304334|       MONGOLIA|         MONGOLIA|      1967|     F|   null|           0|
|213969|        ECUADOR|          ECUADOR|      1944|     M|   null|           1|
|214420|        ECUADOR|          ECUADOR|      1973|     F|   null|           2|
|214688|        ECUADOR|          ECUADOR|      1991|     F|   null|           3|
|214811|        ECUADOR|          ECUADOR|      1997|     F|   null|           4|
+------+---------------+-----------------+----------+------+-------+------------+
only showing top 5 rows



In [60]:
dim_immigrant.createOrReplaceTempView("dim_immigrant")

In [61]:
dim_immigrant.printSchema()

root
 |-- cic_id: integer (nullable = true)
 |-- citizen_country: string (nullable = true)
 |-- residence_country: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- ins_num: integer (nullable = true)
 |-- immigrant_id: long (nullable = false)



In [63]:
# Saving the data in parquet format
dim_immigrant.write.mode("overwrite").parquet("dim_immigrant")

#### 4.1.2 Dim Airline
The Dim Airline table is created and loaded as a spark temp view

In [64]:
# select required dim data for airline
dim_airline = spark.sql(
"""
    SELECT DISTINCT
        int(a.cicid) AS cic_id
        , a.airline AS airline
        , int(a.admnum) AS admin_num
        , a.fltno AS flight_number
        , a.visatype AS visa_type
    FROM stage_immigration AS a 
""")

# add primary key as running integer
dim_airline = dim_airline.withColumn('airline_id', f.monotonically_increasing_id())

In [65]:
dim_airline.show(5)

+------+-------+----------+-------------+---------+----------+
|cic_id|airline| admin_num|flight_number|visa_type|airline_id|
+------+-------+----------+-------------+---------+----------+
|   118|     LH|2147483647|        00402|       WT|         0|
|   375|     AA|2147483647|        00039|       WT|         1|
|   693|     AA|2147483647|        00717|       WT|         2|
|  1175|     UA|2147483647|        00914|       WT|         3|
|  1447|     BA|2147483647|        00175|       WT|         4|
+------+-------+----------+-------------+---------+----------+
only showing top 5 rows



In [66]:
dim_airline.createOrReplaceTempView("dim_airline")

In [67]:
dim_airline.printSchema()

root
 |-- cic_id: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- admin_num: integer (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- airline_id: long (nullable = false)



In [68]:
# Saving the data in parquet format
dim_airline.write.mode("overwrite").parquet("dim_airline")

#### 4.1.3 Dim Temperature
The Dim Temperature table is created and loaded as a spark temp view

In [69]:
# select and aggregate required dim data for temperature
dim_temperature = spark.sql(
"""
    SELECT 
        a.dt AS reference_date
        , a.state AS state
        , a.state_code AS state_code
        , a.Country AS country
        , round(avg(a.AverageTemperature), 2) AS avg_temp
        , round(avg(a.AverageTemperatureUncertainty), 2) AS avg_temp_uncertainty
    FROM stage_temperature AS a 
    GROUP BY a.dt, year(a.dt), month(a.dt), a.state, a.state_code, a.country

""")

# add primary key as running integer
dim_temperature = dim_temperature.withColumn('temperature_id', f.monotonically_increasing_id())

In [70]:
dim_temperature.show(5)

+--------------+-----------------+----------+-------------+--------+--------------------+--------------+
|reference_date|            state|state_code|      country|avg_temp|avg_temp_uncertainty|temperature_id|
+--------------+-----------------+----------+-------------+--------+--------------------+--------------+
|    2016-11-01|          ALABAMA|        AL|United States|   11.22|                0.13|             0|
|    2016-10-01|DIST. OF COLUMBIA|        DC|United States|   13.61|                0.33|             1|
|    2016-10-01|            MAINE|        ME|United States|     9.1|                0.27|    8589934592|
|    2016-01-01|       NEW MEXICO|        NM|United States|    3.43|                0.27|    8589934593|
|    2016-06-01|        N. DAKOTA|        ND|United States|   19.07|                0.35|    8589934594|
+--------------+-----------------+----------+-------------+--------+--------------------+--------------+
only showing top 5 rows



In [71]:
dim_temperature.createOrReplaceTempView("dim_temperature")

In [72]:
dim_temperature.printSchema()

root
 |-- reference_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- avg_temp_uncertainty: double (nullable = true)
 |-- temperature_id: long (nullable = false)



In [73]:
# Saving the data in parquet format
dim_temperature.write.mode("overwrite").parquet("dim_temperature")

#### 4.1.4 Dim Airport
The Dim Airport table is created and loaded as a spark temp view

In [74]:
dim_airport = spark.sql(
    """
    SELECT DISTINCT 
        a.iata_code
        , a.name
        , a.type
        , a.elevation_ft
        , float(trim(substr(a.coordinates, locate(',', a.coordinates)+1))) AS lat
        , float(trim(left(a.coordinates, locate(',', a.coordinates)-1))) AS lng

    FROM stage_airport a

    """
    )

# add primary key as running integer
dim_airport = dim_airport.withColumn('airport_id', f.monotonically_increasing_id())

In [75]:
dim_airport.show(5)

+---------+--------------------+--------------+------------+-------+--------+----------+
|iata_code|                name|          type|elevation_ft|    lat|     lng|airport_id|
+---------+--------------------+--------------+------------+-------+--------+----------+
|      AAF|Apalachicola Regi...| small_airport|          20|29.7275|-85.0275|         0|
|      CAE|Columbia Metropol...| large_airport|         236|33.9388|-81.1195|         1|
|      EKX|     Addington Field| small_airport|         775| 37.686| -85.925|         2|
|      EUF|        Weedon Field| small_airport|         285|31.9513|-85.1289|         3|
|      FTK|Godman Army Air F...|medium_airport|         756|37.9071|-85.9721|         4|
+---------+--------------------+--------------+------------+-------+--------+----------+
only showing top 5 rows



In [76]:
dim_airport.createOrReplaceTempView("dim_airport")

In [77]:
dim_airport.printSchema()

root
 |-- iata_code: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lng: float (nullable = true)
 |-- airport_id: long (nullable = false)



In [78]:
# Saving the data in parquet format
dim_airport.write.mode("overwrite").parquet("dim_airport")

#### 4.1.5 Dim Demography
The Dim Demography table is created and loaded as a spark temp view

In [79]:
dim_demography = spark.sql(
"""
    SELECT a.`State Code` AS state_code
        , round(avg(a.`Median Age`), 2) AS median_age
        , sum(a.`Male Population`) AS  male_population
        , sum(a.`Female Population`) AS female_population
        , sum(a.`Total Population`) AS total_population
        , sum(a.`Number of Veterans`) AS veteran_population
        , sum(a.`Foreign-born`) AS foreign_population
        , round(avg(a.`Average Household Size`), 2) AS average_household_size
    FROM stage_demography AS a
    GROUP BY a.`State Code` 
    ORDER BY 1

"""
)

# add primary key as running integer
dim_demography = dim_demography.withColumn('demography_id', f.monotonically_increasing_id())

In [80]:
dim_demography.show(5)

+----------+----------+---------------+-----------------+----------------+------------------+------------------+----------------------+-------------+
|state_code|median_age|male_population|female_population|total_population|veteran_population|foreign_population|average_household_size|demography_id|
+----------+----------+---------------+-----------------+----------------+------------------+------------------+----------------------+-------------+
|        AK|      32.2|       764725.0|         728750.0|       1493475.0|          137460.0|          166290.0|                  2.77|            0|
|        AL|     36.16|      2448200.0|        2715106.0|       5163306.0|          352896.0|          252541.0|                  2.43|   8589934592|
|        AR|     32.74|      1400724.0|        1482165.0|       2882889.0|          154390.0|          307753.0|                  2.53|  17179869184|
|        AZ|     35.04|    1.1137275E7|      1.1360435E7|      2.249771E7|         1322525.0|       

In [81]:
dim_demography.createOrReplaceTempView("dim_demography")

In [82]:
dim_demography.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: double (nullable = true)
 |-- female_population: double (nullable = true)
 |-- total_population: double (nullable = true)
 |-- veteran_population: double (nullable = true)
 |-- foreign_population: double (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- demography_id: long (nullable = false)



In [83]:
# Saving the data in parquet format
dim_demography.write.mode("overwrite").parquet("dim_demography")

#### 4.1.7 Fact Immigration
The Fact Immigration table is created and loaded as a spark temp view. The necessary columns are selected and renamed, and Foreign Keys are added from Dim tables. A running ID is added as a Primary Key.

In [84]:
# data is selected, foreign keys are added and an unique runnig ID is created
fact_immigration = spark.sql(
"""
    SELECT DISTINCT
        int(a.cicid) AS cic_id
        , int(a.i94yr) AS year
        , int(a.i94mon) AS month
        , a.i94port AS city_code
        , a.arrdate AS arrive_date
        , int(a.i94mode) AS mode
        , a.i94addr AS state_code
        , a.depdate AS departure_date
        , int(a.i94visa) AS visa
        , 'United States' AS country
        , b.immigrant_id AS immigrant_id
        , c.airline_id AS airline_id
        , d.temperature_id
        , e.demography_id
        , f.airport_id
    FROM stage_immigration AS a 
        LEFT JOIN dim_immigrant AS b ON 
            int(a.cicid) = b.cic_id
        LEFT JOIN dim_airline AS c ON
            int(a.cicid) = c.cic_id
        LEFT JOIN dim_temperature AS d ON
            a.i94addr = d.state_code AND
            a.reference_date = d.reference_date
        LEFT JOIN dim_demography AS e ON
            a.i94addr = e.state_code
        LEFT JOIN dim_airport AS f ON
            a.i94port = f.iata_code
""")

# add primary key as running integer
fact_immigration = fact_immigration.withColumn('immigration_id', f.monotonically_increasing_id())

In [85]:
fact_immigration.show(5)

+-------+----+-----+---------+-----------+----+----------+--------------+----+-------------+-------------+-------------+--------------+-------------+-------------+--------------+
| cic_id|year|month|city_code|arrive_date|mode|state_code|departure_date|visa|      country| immigrant_id|   airline_id|temperature_id|demography_id|   airport_id|immigration_id|
+-------+----+-----+---------+-----------+----+----------+--------------+----+-------------+-------------+-------------+--------------+-------------+-------------+--------------+
|2327332|2016|    4|      DEN| 2016-04-13|   1|        CO|    2016-04-23|   2|United States| 420906804890| 214748369994| 1116691496960|  42949672960| 360777252865|             0|
|4293112|2016|    4|      SFB| 2016-04-23|   1|        CO|    2016-05-07|   2|United States|1425929152664| 197568505430| 1116691496960|  42949672960|1657857376261|             1|
| 762307|2016|    4|      DEN| 2016-04-04|   1|        CO|    2016-04-08|   2|United States| 146028892202

In [86]:
fact_immigration.createOrReplaceTempView("fact_immigration")

In [87]:
fact_immigration.printSchema()

root
 |-- cic_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- arrive_date: date (nullable = true)
 |-- mode: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- visa: integer (nullable = true)
 |-- country: string (nullable = false)
 |-- immigrant_id: long (nullable = true)
 |-- airline_id: long (nullable = true)
 |-- temperature_id: long (nullable = true)
 |-- demography_id: long (nullable = true)
 |-- airport_id: long (nullable = true)
 |-- immigration_id: long (nullable = false)



In [88]:
# Saving the data in parquet format
fact_immigration.write.mode("overwrite").parquet("fact_immigration")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

#### 4.2.1 Simple row count check

See if the Fact and Dim tables have data

In [89]:
# dim & fact table check
rows =spark.sql(
"""
    SELECT
        'fact_immigration' AS tbl
        , count(*) AS rowcount
    FROM fact_immigration AS a 
    
    UNION
    
    SELECT
        'dim_temperature' AS tbl
        , count(*) AS rowcount
    FROM dim_temperature AS b 
    
    UNION
    
    SELECT
        'dim_airport' AS tbl
        , count(*) AS rowcount
    FROM dim_airport AS b 
    
    UNION
    
    SELECT
        'dim_demography' AS tbl
        , count(*) AS rowcount
    FROM dim_demography AS b 
    
    UNION
    
    SELECT
        'dim_airline' AS tbl
        , count(*) AS rowcount
    FROM dim_airline AS b 
    
    UNION
    
    SELECT
        'dim_immigrant' AS tbl
        , count(*) AS rowcount
    FROM dim_immigrant AS b 

""")

In [90]:
rows.show()

+----------------+--------+
|             tbl|rowcount|
+----------------+--------+
|  dim_demography|      49|
|   dim_immigrant| 2703145|
|     dim_airline| 2703145|
| dim_temperature|     612|
|fact_immigration| 2703145|
|     dim_airport|    1865|
+----------------+--------+



#### Data table Primary Key - Foreign Key Check

See if Dim table keys match Fact table keys

airport FK check

In [91]:
tmp = spark.sql(
"""
    SELECT count(b.airport_id)
    FROM fact_immigration AS a 
    LEFT JOIN dim_airport AS b 
        ON a.airport_id = b.airport_id

"""
)

In [92]:
# expected close to row num of fact_immigration (~2.7 million)
tmp.show()

+-----------------+
|count(airport_id)|
+-----------------+
|          1149202|
+-----------------+



temperature FK check

In [93]:
tmp = spark.sql(
"""
    SELECT count(b.temperature_id)
    FROM fact_immigration AS a 
    LEFT JOIN dim_temperature AS b 
        ON a.temperature_id = b.temperature_id

"""
)

In [94]:
# expected close to row num of fact_immigration (~2.7 million)
tmp.show()

+---------------------+
|count(temperature_id)|
+---------------------+
|              2694191|
+---------------------+



immigrant FK check

In [95]:
tmp = spark.sql(
"""
    SELECT count(b.immigrant_id)
    FROM fact_immigration AS a 
    LEFT JOIN dim_immigrant AS b 
        ON a.immigrant_id = b.immigrant_id

"""
)

In [96]:
# expected close to row num of fact_immigration (~2.7 million)
tmp.show()

+-------------------+
|count(immigrant_id)|
+-------------------+
|            2703145|
+-------------------+



airline FK check

In [97]:
tmp = spark.sql(
"""
    SELECT count(b.airline_id)
    FROM fact_immigration AS a 
    LEFT JOIN dim_airline AS b 
        ON a.airline_id = b.airline_id

"""
)

In [98]:
# expected close to row num of fact_immigration (~2.7 million)
tmp.show()

+-----------------+
|count(airline_id)|
+-----------------+
|          2703145|
+-----------------+



demography FK check

In [99]:
tmp = spark.sql(
"""
    SELECT count(b.demography_id)
    FROM fact_immigration AS a 
    LEFT JOIN dim_demography AS b 
        ON a.demography_id = b.demography_id

"""
)

In [100]:
# expected close to row num of fact_immigration (~2.7 million)
tmp.show()

+--------------------+
|count(demography_id)|
+--------------------+
|             2703145|
+--------------------+



#### 4.3 Data dictionary 
The data dictionary contains the data definitions for the final data model. The following tables contain the data definitions and data types of the resulting data model.

#### 4.3.1 Fact table
Fact table focuses on the occurrence of an immigration event.

| Table   name     | Column name    | Description            | Data type           | Primari key |
|------------------|----------------|------------------------|---------------------|-------------|
| fact_immigration | immigration_id | PK                     | INT (autogenerated) | PRIMARY KEY |
| fact_immigration | immigrant_id   | key to immigrant id    | INT                 | FOREIGN KEY |
| fact_immigration | airline_id     | key to airline_id      | INT                 | FOREIGN KEY |
| fact_immigration | temperature_id | key to temperature_id  | INT                 | FOREIGN KEY |
| fact_immigration | demography_id  | key to demography_id   | INT                 | FOREIGN KEY |
| fact_immigration | airport_id     | key to demography_id   | INT                 | FOREIGN KEY |
| fact_immigration | cic_id         | cicid                  | BIGINT              |             |
| fact_immigration | year           | 4 digit year           | INT                 |             |
| fact_immigration | month          | numeric month          | INT                 |             |
| fact_immigration | city_code      | USA City abbreviation  | CHAR(3)             |             |
| fact_immigration | arrive_date    | Arrive date            | TIMESTAMP           |             |
| fact_immigration | mode           | Traffic method         | INT                 |             |
| fact_immigration | state_code     | USA State abbreviation | CHAR(2)             |             |
| fact_immigration | departure_date | Departure date         | TIMESTAMP           |             |
| fact_immigration | visa           | Visa category          | INT                 |             |

#### 4.3.2 Dim tables
Dim tables are filter tables and provide reasoning behind the immigration events 

| Table   name  | Column name       | Description            | Data type           | Primari key |
|---------------|-------------------|------------------------|---------------------|-------------|
| dim_immigrant | immigrant_id      | PK                     | INT (autogenerated) | PRIMARY KEY |
| dim_immigrant | cic_id            | cicid                  | BIGINT              |             |
| dim_immigrant | citizen_country   | country of citizenship | VARCHAR             |             |
| dim_immigrant | residence_country | country of residence   | VARCHAR             |             |
| dim_immigrant | birth_year        | birth year             | INT                 |             |
| dim_immigrant | gender            | gender                 | CHAR(1)             |             |
| dim_immigrant | ins_num           | INS number             | INT                 |             |

| Table name  | Column name   | Description              | Data type           | Primari key |
|-------------|---------------|--------------------------|---------------------|-------------|
| dim_airline | airline_id    | PK                       | INT (autogenerated) | PRIMARY KEY |
| dim_airline | cic_id        | cicid                    | BIGINT              |             |
| dim_airline | airline       | airline used on arrival  | VARCHAR             |             |
| dim_airline | admin_num     | admission number         | BIGINT              |             |
| dim_airline | flight_number | flight number on arrival | VARCHAR             |             |
| dim_airline | visa_type     | class of legal admission | CHAR(2)             |             |

| Table   name    | Column name          | Description                 | Data type          | Primari key |
|-----------------|----------------------|-----------------------------|--------------------|-------------|
| dim_temperature | temperature_id       | PK                          | INT(autogenerated) | PRIMARY KEY |
| dim_temperature | reference_date       | date of temperature data    | TIMESTAMP          |             |
| dim_temperature | state                | reference city code         | VARCHAR            |             |
| dim_temperature | state_code           | state code                  | VARCHAR            |             |
| dim_temperature | country              | reference country           | VARCHAR            |             |
| dim_temperature | avg_temp             | monthly average temperature | FLOAT              |             |
| dim_temperature | avg_temp_uncertainty | temperature uncertainty     | FLOAT              |             |

| Table   name   | Column name            | Description                  | Data type  | Primari key |
|----------------|------------------------|------------------------------|------------|-------------|
| dim_demography | demography_id          | INT(autogenerate)            | INT        | PRIMARY KEY |
| dim_demography | state_code             | State code                   | VARCHAR(2) |             |
| dim_demography | median_age             | Median age                   | FLOAT      |             |
| dim_demography | male_population        | Total males                  | INT        |             |
| dim_demography | female_population      | Total females                | INT        |             |
| dim_demography | total_population       | Total population             | INT        |             |
| dim_demography | veteran_population     | Sum of veterans              | INT        |             |
| dim_demography | foreign_population     | Sum of foreign born citizens | INT        |             |
| dim_demography | average_household_size | Average household size       | FLOAT      |             |

| Table name  | Column name  | Description                  | Data type | Primari key |
|-------------|--------------|------------------------------|-----------|-------------|
| dim_airport | airport_id   | INT(autogenerate)            | INT       | PRIMARY KEY |
| dim_airport | iata_code    | code abbreviaton for airport | VARCHAR   |             |
| dim_airport | name         | name of airport              | VARCHAR   |             |
| dim_airport | type         | type of airport              | VARCHAR   |             |
| dim_airport | elevation_ft | elevation of airport         | FLOAT     |             |
| dim_airport | lat          | latitude of location         | FLOAT     |             |
| dim_airport | lng          | longitude of location        | FLOAT     |             |

#### Step 5: Complete Project Write Up
#### 5.1 Rationale for choice of tools
Clearly state the rationale for the choice of tools and technologies for the project.
The project uses Apache Spark engine. Spark is an simple and fast and also scalable analytics engine for large scale data processing.  It has an ability to process and analyse massive ammounts of data using PySpark interface.


#### 5.2 Data updates
Propose how often the data should be updated and why.
The fact table is created on a monthly basis - as such the data set could be updated on a monthly basis. 

#### 5.3 Scenario analysis
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### 5.3.1 Data volume increase
Current implementation is not suitable for handling such a large volumen increase. Should the volume increase, more sophisticated data framework should be implemented. S3 data storage should be used. An Amazon EMR cluster could be utilized  behind Apache Spark. Amazon Redshift distributed datase should be utilized.

#### 5.3.2 Data update frequency increase
Current ETL process is implemented in Jupyter Notebook. This is not suitable for an automated data pipeline. In such a case the ETL pipeline should be implemented in Apache Airflow using predefined DAGs. Quality checks and timing could be implemented using Airflow.

#### 5.3.3 Data user increase
Currently no database is created behind the solution as this is designed for analytical purposes for a small scale of users. Should the number of users increase, a distributed database solution should be implemented like Amazon Redshift - this database could handle massive data for a large number of users even if located in different parts of the world.

#### 5.3 Further Improvements 
* The weather data only runs until 2013 - this needs to be improved as the fact for immigration is for 2016