# Data Engineering for Georgia's Employee Visa Immigrations Analytics
### Data Engineering Capstone Project

#### Project Summary
This project is the Capstone project at the end of Udacity's Data Engineering Nanodegree. The project scope is mostly open-ended, with certain requirements to follow. It uses data from sources provided by Udacity as well as additional data. The project scope includes creating a Data Engineering pipeline for producing and updating a data warehouse, which will help the Govt of Georgia state with analytics and business intelligence related to employment visa based immigrations to the state. This project utilizes concepts and tools from the Nanodegree, including data modeling, data warehousing, ETL, data pipelines, PySpark, Pandas, PostgreSQL and Apache Airflow.   

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType
from sql_queries import *
from create_tables import *
from os.path import exists
import time

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### Scope and Datasets:

This project is based on the Udacity Provided Project and data for Data Engineering Nanodegree Capstone project, which has semi open-ended requirements.

Following scope was defined for the project:

The Govt of State of Georgia is interested in performing analytics on the people immigrating to Georgia, USA for employment, on employee visa i.e. E1, E2 and E3. 

They want to aggregate analytics about employee immigrations on monthly and yearly basis for Georgia cities and airports, based on visa-type and demographics information like age group and gender. They are also interested in airline and flight information.  

Following three datasets are used and combined in the project:

1. Airport Code Table, a CSV file containing information about airports, their codes and corresponding cities. It comes from datahub.io.

2. I94 Immigration Data, as SAS (sas7bdat) files containing information about immigrations to USA for months. It comes from US National Tourism and Trade Office. 

3. Airlines Data, as a JSON file. It comes from https://github.com/npow/airline-codes.

The end solution will be a Data Wareshouse with tables that will be used for OLAP according to requirement and updated monthly using ETL on the raw data.

The project will utilize Nanodegree lessons and topics related to Data Modeling, Data Lakes with Spark and Data Pipelines with Airflow.

The tools and libraries used in this project include Python, PostgreSQL, Psycopg, Pandas, PySpark and Apache AirFlow. 

It is assumed that new SAS data for the month will be available for ingestion every month as SAS files and it has to be converted to CSV files for analysis and easy use with PySpark. The data engineering pipeline will do the ETL and load the data into database every month.

This Jupyter notebook performs the ETL for the first month and also creates database and tables. After running that, the etl.py script can be run every month for doing data ingestion and ETL for next months. That is similar to this notebook but has small changes to utilize already processed data and keep track of time periods. dags/airflow_pipeline.py allows automating the process as a Airflow DAG and scheduling the code used in etl.py to run monthly and enable monitoring through Airflow. 

Dimensional Modeling is being done and a Star Schema was created. The tables are stored in a PostgreSQL relational database. 

test.ipynb Jupyter notebook can be used for testing and seeing the final results. It includes sample usage of the loaded data as data visualizations and aggregated statistics.

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Data Exploration

In [115]:
timerx = time.time()

In [3]:
cur_year = 2016
cur_month = 1

In [4]:
# variables used for quality checks
allnull_row_check = True
empty_check = True
date_check = True
init_rows_fact_table = 0

In [5]:
# Read in the data here
airports_df = pd.read_csv('airport-codes_csv.csv')
immig_sample_df = pd.read_csv('immigration_data_sample.csv')
airlines_df = pd.read_json('airlines.json')
print('immigration sample df shape : ',immig_sample_df.shape)
print('airports df shape : ' ,airports_df.shape)
print('airlines df shape : ' ,airlines_df.shape)

immigration sample df shape :  (1000, 29)
airports df shape :  (55075, 12)
airlines df shape :  (6065, 8)


In [6]:
print(list(immig_sample_df.columns))

['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype']


In [7]:
immig_sample_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [8]:
print(list(airports_df.columns))

['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country', 'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code', 'coordinates']


In [9]:
airports_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [10]:
print(list(airlines_df.columns))

['active', 'alias', 'callsign', 'country', 'iata', 'icao', 'id', 'name']


In [11]:
airlines_df.head()

Unnamed: 0,active,alias,callsign,country,iata,icao,id,name
0,Y,\N,,,-,,1,Private flight
1,N,\N,GENERAL,United States,,GNL,2,135 Airways
2,Y,\N,NEXTIME,South Africa,1T,RNX,3,1Time Airline
3,N,\N,,United Kingdom,,WYT,4,2 Sqn No 1 Elementary Flying Training School
4,N,\N,,Russia,,TFU,5,213 Flight Unit


In [12]:
airlines_df = airlines_df[ airlines_df.active=='Y' ][ ['iata','name','country'] ]

In [13]:
airlines_df.shape

(1178, 3)

In [14]:
airlines_df.head(10)

Unnamed: 0,iata,name,country
0,-,Private flight,
2,1T,1Time Airline,South Africa
9,Q5,40-Mile Air,United States
12,AN,Ansett Australia,Australia
13,1B,Abacus International,Singapore
20,ZI,Aigle Azur,France
21,AQ,Aloha Airlines,United States
23,AA,American Airlines,United States
27,OZ,Asiana Airlines,Republic of Korea
28,4K,Askari Aviation,Pakistan


In [15]:
print(list(airports_df.type.unique()))

['heliport', 'small_airport', 'closed', 'seaplane_base', 'balloonport', 'medium_airport', 'large_airport']


In [16]:
print(len(airports_df.iso_region.unique()))

2810


In [17]:
# getting only USA airports
gaports = airports_df[airports_df.iso_region=='US-GA']

In [18]:
gaports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
14,00GA,small_airport,Lt World Airport,700.0,,US,US-GA,Lithonia,00GA,,00GA,"-84.06829833984375, 33.76750183105469"
15,00GE,heliport,Caffrey Heliport,957.0,,US,US-GA,Hiram,00GE,,00GE,"-84.73390197753906, 33.88420104980469"
77,01GA,heliport,Medical Center Heliport,319.0,,US,US-GA,Columbus,01GA,,01GA,"-84.9791030883789, 32.47930145263672"
78,01GE,small_airport,The Farm Airport,375.0,,US,US-GA,Wrightsville,01GE,,01GE,"-82.77110290527344, 32.674400329589844"
139,02GA,small_airport,Doug Bolton Field,884.0,,US,US-GA,Commerce,02GA,,02GA,"-83.42900085449219, 34.202598571777344"


In [19]:
# getting medium to large airports only, where immigrants come, and selecting useful columns
gaports = gaports[ ((gaports.type=='large_airport') | (gaports.type=='medium_airport')) ][ ['name','type','municipality','local_code'] ]

In [20]:
gaports.head()

Unnamed: 0,name,type,municipality,local_code
26009,Southwest Georgia Regional Airport,medium_airport,Albany,ABY
26043,Augusta Regional At Bush Field,large_airport,Augusta,AGS
26047,Athens Ben Epps Airport,medium_airport,Athens,AHN
26128,Hartsfield Jackson Atlanta International Airport,large_airport,Atlanta,ATL
26261,Brunswick Golden Isles Airport,medium_airport,Brunswick,BQK


In [21]:
gaports.dtypes

name            object
type            object
municipality    object
local_code      object
dtype: object

In [22]:
print(list(gaports.municipality.unique()))

['Albany', 'Augusta', 'Athens', 'Atlanta', 'Brunswick', 'Columbus', 'Fort Benning(Columbus)', 'Macon', 'Marietta', 'Rome', 'Savannah', 'Valdosta', 'Warner Robins', 'Conyers']


In [23]:
print(list(gaports.local_code.unique()))

['ABY', 'AGS', 'AHN', 'ATL', 'BQK', 'CSG', 'DNL', 'FTY', 'LSF', 'MCN', 'MGE', 'PDK', 'RMG', 'RYY', 'SAV', 'SSI', 'SVN', 'VAD', 'VLD', 'WRB', nan]


In [24]:
immig_sample_df.occup.value_counts()

STU    2
OTH    1
PHA    1
Name: occup, dtype: int64

In [25]:
immig_sample_df = immig_sample_df[ ['cicid','i94yr','i94mon','i94port','visatype','gender','biryear','arrdate','airline'] ]

In [26]:
immig_sample_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94port,visatype,gender,biryear,arrdate,airline
0,4084316.0,2016.0,4.0,HHW,WT,F,1955.0,20566.0,JL
1,4422636.0,2016.0,4.0,MCA,B2,M,1990.0,20567.0,*GA
2,1195600.0,2016.0,4.0,OGG,WT,M,1940.0,20551.0,LH
3,5291768.0,2016.0,4.0,LOS,B2,M,1991.0,20572.0,QR
4,985523.0,2016.0,4.0,CHM,WT,F,1997.0,20550.0,


In [27]:
immig_sample_df.dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94port      object
visatype     object
gender       object
biryear     float64
arrdate     float64
airline      object
dtype: object

In [28]:
immig_sample_df.gender.unique()

array(['F', 'M', nan, 'X'], dtype=object)

In [29]:
immig_sample_df.gender.value_counts()

M    471
F    386
X      2
Name: gender, dtype: int64

In [30]:
sas_path = '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat'
csv_path = '/home/workspace/immigration_csvs/F_1_2016.csv'
if not exists(csv_path):
    immigCur_df = pd.read_sas(sas_path, 'sas7bdat', encoding="ISO-8859-1")
    immigCur_df.to_csv(csv_path,index = False)
    print('CSV Created at ',csv_path)
else:
    print('CSV Exists')

CSV Exists


In [31]:
immi_schema = StructType([
        StructField("cicid", DoubleType()),
        StructField("i94yr", DoubleType()),
        StructField("i94mon", DoubleType()),
        StructField("i94cit", DoubleType()),
        StructField("i94res", DoubleType()),
        StructField("i94port", StringType()),    
        StructField("arrdate", DoubleType()),
        StructField("i94mode", DoubleType()),
        StructField("i94addr", StringType()),
        StructField("depdate", DoubleType()),
        StructField("i94bir", DoubleType()),
        StructField("i94visa", DoubleType()),
        StructField("count", DoubleType()),
        StructField("dtadfile", StringType()),
        StructField("visapost", StringType()),
        StructField("occup", StringType()),
        StructField("entdepa", StringType()),
        StructField("entdepd", StringType()),
        StructField("entdepu", StringType()),
        StructField("matflag", StringType()),
        StructField("biryear", DoubleType()),
        StructField("dtaddto", StringType()),
        StructField("gender", StringType()),
        StructField("insnum", StringType()),
        StructField("airline", StringType()),
        StructField("admnum", StringType()),
        StructField("fltno", StringType()),
        StructField("visatype", StringType()),
    ])

In [32]:
month_str = str(cur_month)
filename = "F_{month}_{year}.csv".format(month=month_str,year=str(cur_year))
print('Loading ',filename)

Loading  F_1_2016.csv


In [33]:
tim = time.time()
spark = SparkSession.builder.master("local[1]").appName("EmpImmig").getOrCreate()
immig_df = spark.read.csv('/home/workspace/immigration_csvs/'+filename,schema = immi_schema, header=True)
immig_df.show(n=10)
print( 'Loaded in ',round(time.time()-tim,2),' sec' )

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-----------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|     admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-----------+-----+--------+
|  7.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|      T|   null|   null|   null| 1996.0|     D/S|     M|  null|     LH|346608285.0|  424|      F1|
|  8.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|    

In [34]:
immig_df = immig_df.drop("i94cit","i94res","i94mode","i94bir","i94addr","i94visa","count","depdate",\
                         "dtadfile","visapost","occup","entdepa","entdepd","entdepu","matflag","dtaddto","insnum")

In [35]:
immig_df.show(n=10)

+-----+------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|cicid| i94yr|i94mon|i94port|arrdate|biryear|gender|airline|     admnum|fltno|visatype|
+-----+------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|  7.0|2016.0|   1.0|    BOS|20465.0| 1996.0|     M|     LH|346608285.0|  424|      F1|
|  8.0|2016.0|   1.0|    BOS|20465.0| 1996.0|     M|     LH|346627585.0|  424|      F1|
|  9.0|2016.0|   1.0|    BOS|20469.0| 1999.0|     F|     AF|381092385.0|  338|      B2|
| 10.0|2016.0|   1.0|    BOS|20469.0| 1971.0|     F|     AF|381087885.0|  338|      B2|
| 11.0|2016.0|   1.0|    BOS|20469.0| 2004.0|     M|     AF|381078685.0|  338|      B2|
| 12.0|2016.0|   1.0|    BOS|20474.0| 1983.0|     M|     LH|406155985.0|  424|      B2|
| 15.0|2016.0|   1.0|    BOS|20477.0| 1988.0|     F|     LH|417363085.0|  424|      F1|
| 17.0|2016.0|   1.0|    BOS|20480.0| 1938.0|     M|     TK|428558285.0|   81|      B2|
| 18.0|2016.0|   1.0|    BOS|204

In [36]:
(immig_df.count(), len(immig_df.columns))

(2847924, 11)

In [37]:
# quality check for empty dataframe
empty_check = immig_df.count() > 0

In [38]:
immig_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



#### Data Cleaning

Based on the data exploration and future concerns, following data cleaning steps were implemented:

1. Removing Null values in airports data.

2. Dropping airline data rows with Null or empty iata codes.

3. Dropping private flights row from airlines.

4. Dropping any row of immigration DataFrame where important fields are Null.

5. Dropping any rows of immigration DataFrame with duplicate CICID.

6. Filling Missing/Null values of gender in immigration DataFrame with "U" (unknown).

7. Filling Missing airline values with "Unknown".

In [39]:
# Performing cleaning tasks here

In [40]:
# checking nan code row
gaports[gaports['local_code'].isna()]

Unnamed: 0,name,type,municipality,local_code
49100,Jim's Private Airport,medium_airport,Conyers,


In [41]:
# removing row with NaN code
gaports = gaports[gaports['local_code'].notna()]

In [42]:
gaports.head()

Unnamed: 0,name,type,municipality,local_code
26009,Southwest Georgia Regional Airport,medium_airport,Albany,ABY
26043,Augusta Regional At Bush Field,large_airport,Augusta,AGS
26047,Athens Ben Epps Airport,medium_airport,Athens,AHN
26128,Hartsfield Jackson Atlanta International Airport,large_airport,Atlanta,ATL
26261,Brunswick Golden Isles Airport,medium_airport,Brunswick,BQK


In [43]:
airlines_df.shape

(1178, 3)

In [44]:
# removing airline rows with null or empty iata codes
airlines_df = airlines_df[  (airlines_df['iata'].notna()) & (airlines_df['iata']!='')  ]

In [45]:
# removing airline rows with null or empty names
airlines_df = airlines_df[  (airlines_df['name'].notna()) & (airlines_df['name']!='')  ]

In [46]:
airlines_df.shape

(974, 3)

In [47]:
airlines_df.columns = ['Airline_IATA', 'Airline_Name', 'Airline_Country']

In [48]:
airlines_df.head(5)

Unnamed: 0,Airline_IATA,Airline_Name,Airline_Country
0,-,Private flight,
2,1T,1Time Airline,South Africa
9,Q5,40-Mile Air,United States
12,AN,Ansett Australia,Australia
13,1B,Abacus International,Singapore


In [49]:
# removing first row of private flight
airlines_df.drop([airlines_df.index[0]],inplace=True)

In [50]:
# removing all rows where important fields are null
immig_df = immig_df.filter(immig_df.visatype.isNotNull())
immig_df = immig_df.filter(immig_df.i94port.isNotNull())
immig_df = immig_df.filter(immig_df.i94yr.isNotNull())
immig_df = immig_df.filter(immig_df.i94mon.isNotNull())
immig_df = immig_df.filter(immig_df.admnum.isNotNull())

In [51]:
immig_df.count()

2847924

In [52]:
# filtering to keep only E1, E2 and E3 visa type rows
immig_df = immig_df.filter( (immig_df.visatype == "E1") | (immig_df.visatype  == "E2") | (immig_df.visatype  == "E3") )

In [53]:
immig_df.show(n=10)

+-----+------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|cicid| i94yr|i94mon|i94port|arrdate|biryear|gender|airline|     admnum|fltno|visatype|
+-----+------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|136.0|2016.0|   1.0|    CHI|20463.0| 1981.0|     M|     LH|331311985.0|  430|      E1|
|170.0|2016.0|   1.0|    LOS|20478.0| 1991.0|     M|     LH|421880385.0|  452|      E2|
|208.0|2016.0|   1.0|    NYC|20456.0| 1986.0|     M|     SU|294869885.0|  102|      E2|
|294.0|2016.0|   1.0|    HOU|20459.0| 1976.0|     M|     UA|310484285.0| 1478|      E2|
|309.0|2016.0|   1.0|    HOU|20461.0| 1946.0|     M|     UA|321556785.0|   47|      E2|
|328.0|2016.0|   1.0|    NEW|20474.0| 1978.0|     M|     LH|404250085.0|  402|      E2|
|350.0|2016.0|   1.0|    WAS|20471.0| 1986.0|     M|     LH|390833885.0|  418|      E2|
|352.0|2016.0|   1.0|    WPB|20484.0| 1952.0|     F|    *GA|443178185.0|N831B|      E2|
|434.0|2016.0|   1.0|    HOU|204

In [54]:
immig_df.count()

34195

In [55]:
ga_portcodes = list(gaports.local_code.unique())

In [56]:
print(ga_portcodes)

['ABY', 'AGS', 'AHN', 'ATL', 'BQK', 'CSG', 'DNL', 'FTY', 'LSF', 'MCN', 'MGE', 'PDK', 'RMG', 'RYY', 'SAV', 'SSI', 'SVN', 'VAD', 'VLD', 'WRB']


In [57]:
# filtering to keep only rows with Georgia airports
immig_df = immig_df.filter( immig_df.i94port.isin(ga_portcodes) )

In [58]:
immig_df.count()

2200

In [59]:
immig_df.show(n=10)

+-------+------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|  cicid| i94yr|i94mon|i94port|arrdate|biryear|gender|airline|     admnum|fltno|visatype|
+-------+------+------+-------+-------+-------+------+-------+-----------+-----+--------+
| 2485.0|2016.0|   1.0|    ATL|20469.0| 1968.0|     M|     DL|381514085.0|  368|      E2|
| 4394.0|2016.0|   1.0|    ATL|20475.0| 1968.0|     M|     DL|410773085.0|   85|      E2|
| 4395.0|2016.0|   1.0|    ATL|20476.0| 1959.0|     M|     DL|411482685.0|  104|      E2|
| 8376.0|2016.0|   1.0|    ATL|20477.0| 1977.0|     M|     KL|417874585.0|  621|      E2|
|17762.0|2016.0|   1.0|    ATL|20460.0| 1963.0|     M|     DL|315553485.0|  698|      E2|
|21354.0|2016.0|   1.0|    ATL|20458.0| 1948.0|     M|     DL|304807385.0|   29|      E2|
|21384.0|2016.0|   1.0|    ATL|20470.0| 1961.0|     M|     BA|386182685.0|  227|      E2|
|23382.0|2016.0|   1.0|    ATL|20464.0| 1966.0|     M|     LH|338377585.0|  444|      E2|
|28676.0|2

In [60]:
# dropping duplicate cicid if any
immig_df = immig_df.dropDuplicates(["cicid"])

In [61]:
immig_df.count()

2200

In [62]:
immig_df = immig_df.withColumn("i94yr", col("i94yr").cast(IntegerType()) )
immig_df = immig_df.withColumn("i94mon", col("i94mon").cast(IntegerType()) )

In [63]:
# quality check for date
date_check = ( immig_df.filter(  (immig_df.i94yr != cur_year) | (immig_df.i94mon != cur_month) )  ).count() == 0

In [64]:
# filling missing/NaN values of gender with U (unknown)
immig_df = immig_df.na.fill(value='U',subset=["gender"])

In [65]:
# filling missing airline values with unknown
immig_df = immig_df.na.fill(value='Unknown',subset=["airline"])

In [66]:
immig_df.count()

2200

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

#### Data Model:

For modeling the data a Relational Database was used in the form of PosgreSQL.
Relational Database was chosen due to several factors including: Flexibility and ease of use, ease of Joins and aggregations needed for the project, smaller data volume, low availability requirements and well-defined tabular schema. 

The objective for this project is performing analytics and BI, so it requires an OLAP system. The data is loaded to create a data warehouse to support that.

The data is denormalized for improving the speed and ease of doing analytics. It increases the performance of Joins and heavier read operations needed for the task. Write operations are rare and usually done only once a month for small amount of time. 

Dimensional Modeling was done for this project and dimension tables and fact table were created.
The fact table contains the immigration events data, where each row is an immigration. the dimension tables include tables related to locations, time, demographics and airline etc. They enable getting aggregations, information and analytics on the fact table through Joins. They also provide information about the available categories. The time dimensions are also being used for tracking time periods. 

The data warehouse will have historical data of immigrations, represented as those tables.
The schema of the data marts for this data warehouse was chosen as a Star Schema due to several reasons including: being simple, providing denormalization, simplifying the expected queries, making aggregations faster and less need for normalization in requirement.

The tables and fields were designed according to the business requirement and dimensional modeling. 
The dimension tables are following:

airports, visatypes, agegroups, genders, municipalities, years, months, monthlytimes, airlines

The fact table is immigrationfacts.

The complete data dictionary for the data model is provided in section 4.3

#### Data Pipeline Steps:

Following steps are carried out in the pipeline:

1. Data for current month is converted from SAS to CSV format and saved. Current month is determined using time tables, except for first time run, and time tables are updated.

2. Loading the dimensions data from csvs and json, and cleaning it, if it's first time run. For next time runs, some data is loaded from the databases.

3. Extracting immigration data for current date in PySpark DataFrame according to schema and cleaning it.

4. Filtering immigration data to get the smaller required data of Georgia employee immigrations.

5. Extracting Day of Month information from numeric SAS Date Format.

6. Age is derived from Birth Year and also classified into Age Group.

7. Immigration DataFrame is Joined with airports data.

8. Immigration DataFrame is Joined with airlines data.

9. Creating database and tables if it's first run and inserting data into dimension tables. 

10. Inserting the data into fact table after checks.

11. Additional quality checks.

Following are the steps for running and testing the data pipeline:

1. Make sure that PosgreSQL, Psycopg, Pandas and Pyspark are installed.

2. Run complete "Capstone_Project_Imroze.ipynb" Jupyter Notebook.

3. Run "etl.py" multiple times to do ETL on data for next months. The SAS data for full year 2016 is available in the workspace provided for project. 

4. An alternative to step 3 is installing AirFlow and running "dags/airflow_pipeline.py" and enabling dag through AirFlow UI. It is scheduled to run monthly, but can be scheduled to run after 15 minutes for testing, by replacing line 349 with commented line 348.

5. Run "test.ipynb" Jupyter notebook to view the results, data visualization and sample queries. It can be run anytime after first run of "Capstone_Project_Imroze.ipynb". 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [67]:
cur, conn = create_database()
drop_tables(cur, conn)
create_tables(cur, conn)
conn.close()

In [68]:
# Write code here

# user defined function (udf) to convert SAS date to day of month integer
@udf(returnType=IntegerType())
def sasdate_day(date):
    pd_timestamp = pd.to_timedelta(date,unit='D')+pd.Timestamp('1960-1-1')
    return int( pd_timestamp.strftime('%d') )

In [69]:
immig_df = immig_df.withColumn("day", sasdate_day(col("arrdate")))

In [70]:
immig_df.show(n=10)

+---------+-----+------+-------+-------+-------+------+-------+-------------+-----+--------+---+
|    cicid|i94yr|i94mon|i94port|arrdate|biryear|gender|airline|       admnum|fltno|visatype|day|
+---------+-----+------+-------+-------+-------+------+-------+-------------+-----+--------+---+
| 180973.0| 2016|     1|    ATL|20454.0| 1967.0|     M|     LH|84156749530.0|00444|      E2|  1|
| 190508.0| 2016|     1|    ATL|20454.0| 1954.0|     M|     DL|84101047330.0|00146|      E2|  1|
| 736433.0| 2016|     1|    ATL|20457.0| 1985.0|     M|     VS|84437768330.0|00109|      E2|  4|
| 930673.0| 2016|     1|    ATL|20458.0| 1959.0|     M|     DL|84561130630.0|00356|      E2|  5|
|1117816.0| 2016|     1|    ATL|20459.0| 1958.0|     M|     KL|84659434930.0|00621|      E2|  6|
|2144971.0| 2016|     1|    ATL|20464.0| 1990.0|     M|     DL|85150780030.0|00296|      E2| 11|
|2317313.0| 2016|     1|    ATL|20465.0| 1960.0|     F|     DL|85228115630.0|00117|      E2| 12|
|3333491.0| 2016|     1|    AT

In [71]:
immig_df = immig_df.drop("arrdate")

In [72]:
# user defined function (udf) to remove decimal in admnum string
@udf(returnType=StringType())
def parse_admnum(admnum):
    if '.' in admnum:
        return admnum[ : admnum.find('.') ]
    else:
        return admnum

In [73]:
immig_df = immig_df.withColumn("cicid", col("cicid").cast(IntegerType()) )
immig_df = immig_df.withColumn("biryear", col("biryear").cast(IntegerType()) )
immig_df = immig_df.withColumn("admnum", parse_admnum(col("admnum")) )

In [74]:
immig_df = immig_df.withColumn("age", (cur_year - col("biryear")) )

In [75]:
immig_df.show(n=10)

+-------+-----+------+-------+-------+------+-------+-----------+-----+--------+---+---+
|  cicid|i94yr|i94mon|i94port|biryear|gender|airline|     admnum|fltno|visatype|day|age|
+-------+-----+------+-------+-------+------+-------+-----------+-----+--------+---+---+
| 180973| 2016|     1|    ATL|   1967|     M|     LH|84156749530|00444|      E2|  1| 49|
| 190508| 2016|     1|    ATL|   1954|     M|     DL|84101047330|00146|      E2|  1| 62|
| 736433| 2016|     1|    ATL|   1985|     M|     VS|84437768330|00109|      E2|  4| 31|
| 930673| 2016|     1|    ATL|   1959|     M|     DL|84561130630|00356|      E2|  5| 57|
|1117816| 2016|     1|    ATL|   1958|     M|     KL|84659434930|00621|      E2|  6| 58|
|2144971| 2016|     1|    ATL|   1990|     M|     DL|85150780030|00296|      E2| 11| 26|
|2317313| 2016|     1|    ATL|   1960|     F|     DL|85228115630|00117|      E2| 12| 56|
|3333491| 2016|     1|    ATL|   1965|     M|     DL|85804591730|00117|      E2| 18| 51|
|5719159| 2016|     1

In [76]:
# user defined function (udf) to convert age to 5 age-groups
@udf(returnType=IntegerType())
def get_agegroup(age):
    if age <= 25:
        return 1
    if age > 25 and age <= 35:
        return 2
    if age > 35 and age <= 45:
        return 3
    if age > 45 and age <= 59:
        return 4
    if age >= 60:
        return 5
    return 0

In [77]:
immig_df = immig_df.withColumn("agegroup", get_agegroup(col("age")))

In [78]:
immig_df.show(n=10)

+-------+-----+------+-------+-------+------+-------+-----------+-----+--------+---+---+--------+
|  cicid|i94yr|i94mon|i94port|biryear|gender|airline|     admnum|fltno|visatype|day|age|agegroup|
+-------+-----+------+-------+-------+------+-------+-----------+-----+--------+---+---+--------+
| 180973| 2016|     1|    ATL|   1967|     M|     LH|84156749530|00444|      E2|  1| 49|       4|
| 190508| 2016|     1|    ATL|   1954|     M|     DL|84101047330|00146|      E2|  1| 62|       5|
| 736433| 2016|     1|    ATL|   1985|     M|     VS|84437768330|00109|      E2|  4| 31|       2|
| 930673| 2016|     1|    ATL|   1959|     M|     DL|84561130630|00356|      E2|  5| 57|       4|
|1117816| 2016|     1|    ATL|   1958|     M|     KL|84659434930|00621|      E2|  6| 58|       4|
|2144971| 2016|     1|    ATL|   1990|     M|     DL|85150780030|00296|      E2| 11| 26|       2|
|2317313| 2016|     1|    ATL|   1960|     F|     DL|85228115630|00117|      E2| 12| 56|       4|
|3333491| 2016|     

In [79]:
immig_df = immig_df.withColumn("month_year", lit(str(cur_month)+'_'+str(cur_year))  )

In [80]:
immig_df.show(n=10)

+-------+-----+------+-------+-------+------+-------+-----------+-----+--------+---+---+--------+----------+
|  cicid|i94yr|i94mon|i94port|biryear|gender|airline|     admnum|fltno|visatype|day|age|agegroup|month_year|
+-------+-----+------+-------+-------+------+-------+-----------+-----+--------+---+---+--------+----------+
| 180973| 2016|     1|    ATL|   1967|     M|     LH|84156749530|00444|      E2|  1| 49|       4|    1_2016|
| 190508| 2016|     1|    ATL|   1954|     M|     DL|84101047330|00146|      E2|  1| 62|       5|    1_2016|
| 736433| 2016|     1|    ATL|   1985|     M|     VS|84437768330|00109|      E2|  4| 31|       2|    1_2016|
| 930673| 2016|     1|    ATL|   1959|     M|     DL|84561130630|00356|      E2|  5| 57|       4|    1_2016|
|1117816| 2016|     1|    ATL|   1958|     M|     KL|84659434930|00621|      E2|  6| 58|       4|    1_2016|
|2144971| 2016|     1|    ATL|   1990|     M|     DL|85150780030|00296|      E2| 11| 26|       2|    1_2016|
|2317313| 2016|    

In [81]:
gaports_spark = spark.createDataFrame(gaports) 

In [82]:
gaports_spark.show(n=10)

+--------------------+--------------+--------------------+----------+
|                name|          type|        municipality|local_code|
+--------------------+--------------+--------------------+----------+
|Southwest Georgia...|medium_airport|              Albany|       ABY|
|Augusta Regional ...| large_airport|             Augusta|       AGS|
|Athens Ben Epps A...|medium_airport|              Athens|       AHN|
|Hartsfield Jackso...| large_airport|             Atlanta|       ATL|
|Brunswick Golden ...|medium_airport|           Brunswick|       BQK|
|Columbus Metropol...|medium_airport|            Columbus|       CSG|
|        Daniel Field|medium_airport|             Augusta|       DNL|
|Fulton County Air...|medium_airport|             Atlanta|       FTY|
|Lawson Army Air F...|medium_airport|Fort Benning(Colu...|       LSF|
|Middle Georgia Re...|medium_airport|               Macon|       MCN|
+--------------------+--------------+--------------------+----------+
only showing top 10 

In [83]:
# Joining immigration data with airports data
fact_df = immig_df.join( gaports_spark, immig_df.i94port == gaports_spark.local_code, how = 'inner' )\
                            .select( col("cicid").alias("CICID"), col("i94yr").alias("Year"),\
                                    col("i94mon").alias("Month"), col("day").alias("Day"),\
                                    col("month_year").alias("Monthly_Time"),\
                                    col("visatype").alias("Visa_Type"), col("gender").alias("Gender"),\
                                    col("age").alias("Age"), col("agegroup").alias("Age_Group"),\
                                    col("local_code").alias("Airport_Code"), col("municipality").alias("Airport_Municipality"),\
                                    col("admnum").alias("Adm_Number"), col("airline").alias("Airline_Code"),\
                                    col("fltno").alias("Flight_Number") )

In [84]:
fact_df.show(n=10)

+-------+----+-----+---+------------+---------+------+---+---------+------------+--------------------+-----------+------------+-------------+
|  CICID|Year|Month|Day|Monthly_Time|Visa_Type|Gender|Age|Age_Group|Airport_Code|Airport_Municipality| Adm_Number|Airline_Code|Flight_Number|
+-------+----+-----+---+------------+---------+------+---+---------+------------+--------------------+-----------+------------+-------------+
| 180973|2016|    1|  1|      1_2016|       E2|     M| 49|        4|         ATL|             Atlanta|84156749530|          LH|        00444|
| 190508|2016|    1|  1|      1_2016|       E2|     M| 62|        5|         ATL|             Atlanta|84101047330|          DL|        00146|
| 736433|2016|    1|  4|      1_2016|       E2|     M| 31|        2|         ATL|             Atlanta|84437768330|          VS|        00109|
| 930673|2016|    1|  5|      1_2016|       E2|     M| 57|        4|         ATL|             Atlanta|84561130630|          DL|        00356|
|11178

In [85]:
airlines_spark = spark.createDataFrame(airlines_df) 

In [86]:
# Joining immigration data with airlines data
fact_df = fact_df.join(airlines_spark,fact_df.Airline_Code == airlines_spark.Airline_IATA, how='left')

In [87]:
fact_df = fact_df.drop("Airline_IATA")
fact_df = fact_df.drop("Airline_Name")
fact_df = fact_df.drop("Airline_Country")

In [88]:
fact_df.printSchema()

root
 |-- CICID: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Monthly_Time: string (nullable = false)
 |-- Visa_Type: string (nullable = true)
 |-- Gender: string (nullable = false)
 |-- Age: integer (nullable = true)
 |-- Age_Group: integer (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Airport_Municipality: string (nullable = true)
 |-- Adm_Number: string (nullable = true)
 |-- Airline_Code: string (nullable = false)
 |-- Flight_Number: string (nullable = true)



In [89]:
fact_df.show(n=10)

+-------+----+-----+---+------------+---------+------+---+---------+------------+--------------------+-----------+------------+-------------+
|  CICID|Year|Month|Day|Monthly_Time|Visa_Type|Gender|Age|Age_Group|Airport_Code|Airport_Municipality| Adm_Number|Airline_Code|Flight_Number|
+-------+----+-----+---+------------+---------+------+---+---------+------------+--------------------+-----------+------------+-------------+
|5002378|2016|    1| 27|      1_2016|       E2|     M| 50|        4|         ATL|             Atlanta|86590964230|          EV|        05009|
|2675382|2016|    1| 14|      1_2016|       E2|     M| 34|        2|         ATL|             Atlanta|85429452230|          EV|        05002|
| 936867|2016|    1|  5|      1_2016|       E1|     F| 40|        3|         ATL|             Atlanta|84517594430|          EV|        05498|
|5311468|2016|    1| 29|      1_2016|       E2|     M| 38|        3|         ATL|             Atlanta|86770747330|          EV|        05009|
| 5490

In [90]:
(fact_df.count(), len(fact_df.columns))

(2341, 14)

In [91]:
print( round((time.time() - timerx) / 60.0, 2) , 'min' )

4.01 min


In [92]:
# connecting to database and inserting data to tables
conn = psycopg2.connect("host=127.0.0.1 dbname=immigdb user=student password=student")
cur = conn.cursor()

In [93]:
E1_desc = "Employment First Preference (E1): Priority Worker and Persons of Extraordinary Ability"
E2_desc = "Employment Second Preference (E2): Professionals Holding Advanced Degrees and Persons of Exceptional Ability"
E3_desc = "Employment Third Preference (E3): Skilled Workers, Professionals, and Unskilled Workers (Other Workers)"
cur.execute(visatypes_table_insert,["E1",E1_desc] )
cur.execute(visatypes_table_insert,["E2",E2_desc] )
cur.execute(visatypes_table_insert,["E3",E3_desc] )
conn.commit()

In [94]:
cur.execute(genders_table_insert,["M","Male"] )
cur.execute(genders_table_insert,["F","Female"] )
cur.execute(genders_table_insert,["X","X-Gender"] )
cur.execute(genders_table_insert,["U","Unspecified"] )
conn.commit()

In [95]:
cur.execute(agegroups_table_insert,[1,"up to 25"] )
cur.execute(agegroups_table_insert,[2,"26-35"] )
cur.execute(agegroups_table_insert,[3,"36-45"] )
cur.execute(agegroups_table_insert,[4,"46-59"] )
cur.execute(agegroups_table_insert,[5,"60 and above"] )
conn.commit()

In [96]:
gaports.rename(columns = {'name':'airport_name'}, inplace = True)

In [97]:
for index, row in gaports.iterrows():
    cur.execute(airports_table_insert,[row.local_code,row.type,row.airport_name,row.municipality] )
    cur.execute(municipalities_table_insert,[row.municipality] )
conn.commit()

In [98]:
for index, row in airlines_df.iterrows():
    cur.execute(airlines_table_insert,[row.Airline_IATA,row.Airline_Name,row.Airline_Country] )
conn.commit()

In [99]:
monthDict = {1:'Jan', 2:'Feb', 3:'Mar', 4:'Apr', 5:'May', 6:'Jun', \
            7:'Jul', 8:'Aug', 9:'Sep', 10:'Oct', 11:'Nov', 12:'Dec'}

In [100]:
cur.execute(years_table_insert,[cur_year] )
cur.execute(months_table_insert,[cur_month, monthDict[cur_month] ] )
cur.execute(monthlytimes_table_insert,[ str(cur_month)+'_'+str(cur_year) ,cur_year,cur_month] )
conn.commit()

In [101]:
fact_dfpd = fact_df.toPandas() 

In [102]:
fact_dfpd.head(5)

Unnamed: 0,CICID,Year,Month,Day,Monthly_Time,Visa_Type,Gender,Age,Age_Group,Airport_Code,Airport_Municipality,Adm_Number,Airline_Code,Flight_Number
0,5002378,2016,1,27,1_2016,E2,M,50,4,ATL,Atlanta,86590964230,EV,5009
1,2675382,2016,1,14,1_2016,E2,M,34,2,ATL,Atlanta,85429452230,EV,5002
2,936867,2016,1,5,1_2016,E1,F,40,3,ATL,Atlanta,84517594430,EV,5498
3,5311468,2016,1,29,1_2016,E2,M,38,3,ATL,Atlanta,86770747330,EV,5009
4,549091,2016,1,3,1_2016,E2,M,3,1,ATL,Atlanta,84290278530,EV,5481


In [103]:
# quality check for full Null columns
for col in list(fact_dfpd.columns):
    if not fact_dfpd[col].notnull().any():
        print('All Null rows in ',col)
        allnull_row_check = False
        break

In [104]:
if allnull_row_check:
    for index, row in fact_dfpd.iterrows():
        vals = [row.CICID,row.Year,row.Month,row.Day,row.Monthly_Time,row.Visa_Type,row.Gender,\
               row.Age,row.Age_Group,row.Airport_Code,row.Airport_Municipality,row.Adm_Number,\
               row.Airline_Code,row.Flight_Number]
        cur.execute(immigrationfacts_table_insert,vals)

In [105]:
conn.commit()
conn.close()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [106]:
# Perform quality checks here
conn = psycopg2.connect("host=127.0.0.1 dbname=immigdb user=student password=student")
cur = conn.cursor()

In [107]:
# checking that all dimension tables have been created and populated
dim_tables = ['visatypes','genders','agegroups','airports','municipalities',\
              'years','months','monthlytimes','airlines']
for dim_tbl in dim_tables:
    try:
        cur.execute("SELECT COUNT(*) FROM "+dim_tbl,[])
        res = cur.fetchone()[0]
        if res==0:
            raise ValueError("Data quality check failed. No rows in "+dim_tbl)
        print(dim_tbl,' table found, having',res,' rows')   
    except:
        raise IOError( dim_tbl+" table not found")

visatypes  table found, having 3  rows
genders  table found, having 4  rows
agegroups  table found, having 5  rows
airports  table found, having 20  rows
municipalities  table found, having 13  rows
years  table found, having 1  rows
months  table found, having 1  rows
monthlytimes  table found, having 1  rows
airlines  table found, having 941  rows


In [108]:
#implemented during ETL above
# checking if data loaded was not empty
assert empty_check, "Failed check for loading csv data as PySpark DataFrame"
print('Passed check for loading csv data as PySpark DataFrame')

Passed check for loading csv data as PySpark DataFrame


In [109]:
#implemented during ETL above
# checking if data being added is of correct time period
assert date_check, 'Failed check for time period in loaded data'
print('Passed check for time period in loaded data')

Passed check for time period in loaded data


In [110]:
#implemented during ETL above
#Full Columns are Null if data for column is not loaded correctly in Spark DataFrame due to mismatch with schema types
assert allnull_row_check, 'Failed check for no fully Null column in new portion of fact table'
print('Passed check for no fully Null column in new portion of fact table')

Passed check for no fully Null column in new portion of fact table


In [111]:
# implementing quality check for new records being added in face table, after checking that table exists
try:
    cur.execute("SELECT COUNT(*) FROM immigrationfacts",[])
    print('immigrationfacts table found')
except:
    raise IOError("immigrationfacts table not found")
new_rows_fact_table = cur.fetchone()[0]
print('Row Count Before: ',init_rows_fact_table)
print('Row Count After: ',new_rows_fact_table)
print('New Rows Added: ',new_rows_fact_table-init_rows_fact_table)
assert new_rows_fact_table > init_rows_fact_table, 'Failed check for new records added in fact table'
print('Passed check for new records added in fact table')

immigrationfacts table found
Row Count Before:  0
Row Count After:  2200
New Rows Added:  2200
Passed check for new records added in fact table


In [112]:
cur.execute("SELECT * FROM immigrationfacts LIMIT 1",[])
no_cols = len(cur.fetchone())
print(no_cols,'columns in immigrationfacts table')
assert no_cols==14, 'Failed check for number of columns in immigrationfacts'
print('Passed check for number of columns in immigrationfacts')

14 columns in immigrationfacts table
Passed check for number of columns in immigrationfacts


In [113]:
conn.close()

check etl.py for more quality checks in automated pipeline

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Dimension Tables

#### airports
source : from create_tables_csv

airport_code - varchar : Code for Airport

airport_type - varchar : Whether airport is small, medium or large 

name - varchar :  Full name of Airport

municipality - varchar : City or administrative unit

#### visatypes
source : self created

visa_code - varchar : Visa Type code

description - varchar : 1-2 lines Description of visa type

#### agegroups
source : self created

agegroup_num - int : Age Group Number

description - varchar : Description of range for Age Group 

#### genders
source : self created

gender_code - varchar : Code for gender

description - varchar : Description of gender code

#### municipalities
source : from create_tables_csv

municipality - varchar : City or administrative unit

#### years
source : self created

year - int : Year

#### months
source : self created

month_num - int : Month Number

month_name - varchar : Name of month

#### monthlytimes
source : self created

monthlytime - varchar : String representation of month and year

year - int : Year

month_num - int : Month Number

#### airlines
source : from airlines.json

airline_code - varchar : IATA Code for Airline

airline_name - varchar : Full Name of Airline

airline_country - varchar : Country of Airline

#### Fact Table

#### immigrationfacts
source : from immigration_data csvs created from SAS files

CICID - varchar : ID for the immigration

Year - int : Year

Month - int : Month Number

Day - int : Day of Month

Monthly_Time - varchar : String representation of month and year

Visa_Type - varchar : Visa Type code

Gender - varchar : Code for gender

Age - int : Age in years

Age_Group - int : Age Group Number

Airport_Code - varchar : Code for Airport

Airport_Municipality - varchar : City or administrative unit of Airport

Adm_Number - varchar : Admission Number Code

Airline_Code - varchar : IATA Code for Airline

Flight_Number - varchar : Flight Number Code

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Choice of Tools

Relational Database was chosen due to flexibility and ease of use, ease of Joins and aggregations needed for the project, smaller data volume, low availability requirements and well-defined tabular schema.

PosgreSQL was chosen as the database system because it is free, open-source, fast, popular and easy to work with. Psycopg2 Python driver was used for accessing the database, due to ease and flexibility of use and concurrency support.

The business requirements of the project indicates following: ETL has to be done at a low rate of monthly for small time, there will be small number of users doing infrequent use and most processing will be on a relatively small data size. 

Therefore, specialized distributed cloud based solutions like Amazon Redshift were not used for staging in Data Warehouse.

There are several small datasets used in the project. Pandas was chosen for easily doing data analysis, data wrangling and data cleaning on them.

The larger immigration data has to be loaded from files with only one month of data, which is not too big and it is further filtered and size is reduced significantly before the heavier operations. The ETL for immigration data can be done easily on Pandas too. However PySpark was chosen to process that data for reasons including: 1- future scalability of code and system in case of change in requirements that will reduce the filtering or make ETL frequent. 2- Application of PySpark learning done in the Nanodegree on this project. PySpark is currently running in local mode which doesn't provide distributed computing of Spark, but it can be setup on a cluster for that in future if needed. Amazon EMR can be used easy cluster setup and speed.

The pipeline is only run monthly, so etl.py script can be run monthly, which includes quality checks and ETL task is run only if ingestion task succeeds. However automation of pipeline is also being provided through Apache AirFlow. It is being used because it allows task scheduling programtically, provides easy monitoring of tasks and pipelines via UI, and is application of AirFlow learning in Nanodegree, which can be utilized for more complex pipeline and hooks. Cron job can also be used for scheduling etl.py to run monthly due to simplicity of current requirements, modularity and quality checks.  

The sytem is supposed on be deployed on a Cloud VM. APIs and dashboards can be developed for access and analytics.

#### Data Update

The business requirement is for analytics on monhtly data, so the data in fact table and time table is updated periodically only once per month, when data to ingest is available.

The dimension tables data except time can be assumed to be constant. However data for new airlines maybe updated in the table yearly, because only few and smaller ones are added each year. 

#### If the data was increased by 100x

If the data is increased by 100X then PySpark being used can be setup on cluster using Amazon EMR. Increasing the specs on cloud VM can also help for smaller increase.

#### If the data populates a dashboard that must be updated on a daily basis by 7am every day

The data is not updated daily in this case. However if it is needed then it can be easily supported by changing the "schedule_interval" value in the dags/airflow_pipeline.py code for Apache Airflow to schedule it for daily run.

#### If The database needed to be accessed by 100+ people.

The data warehouse can be migrated Amazon Redshift, where multiple queries can run quickly in parallel, while internally using modified PostgreSQL. Other options could be PosgreSQL Server as a Service or Amazon RDS for PosgreSQL.