# Project Title
### Data Engineering Capstone Project

#### Project Summary
The purpose of this project to create an open analytical database for consumption by US Government so they are able to understand immigration patterns,
  in order for them to make and monitor policy decisions around immigration so that industries dependent on immigration such as tourism, business and international student industry are able to strive successfully. The immigration patterns can further help in making infrastructure based decisions. The analytical database should serve daily to long-term decision making needs.
  
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from datetime import datetime, timedelta
from i94MetadataMappings import visa_codes, ports_codes, cit_and_res_codes, mode_codes
from helperFunctions import explore_dataframe, clean_latitude_longitude, clean_date, split_extract
from pyspark.sql import SparkSession, Window, SQLContext
from pyspark.sql.functions import (datediff,isnan, udf,mean, upper, first, last, col, sum as Sum, ltrim, rtrim,concat,
                                  row_number, when, date_format, dayofmonth, dayofweek,
                                  hour, lit, month, row_number, to_date, weekofyear,
                                  year, split, max as Max, min as Min)
from pyspark.sql.types import TimestampType, IntegerType, StringType, FloatType
import pandas as pd
import numpy as np
import os
from IPython.display import Image
pd.set_option('display.max_columns', None)

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project an analytical database will be made available for the USA government, so they can quickly gather insights from the data which may enable them to:

1. Make policy decisions on immigration.
2. Monitor their policy decisions, and the impact of policy decisions in the USA.
3. Optimise infrastructure spending based on immigration patterns.


##### Data
The following data was leveraged to build the database:

* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.
* World Temperature Data: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
* U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
* Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

##### Solution

A spark etl job was created to read the data and write to s3
bucket using emr cluster on AWS. Then the second job which is outside the scope of the 
project can be run to pick data from aws s3 bucket and load it into redshift cluster.
The cluster end point will be made available for analytics team of the government.

The analytical database will levereage the star schema design for query optimisation with immigration data held in fact table and others will be dimension tables. The following are the tables created with brief summary:


* regions_dm: (State/region wise aggregated demographic information)
* airports_dm: (
* airlines_dm: (airlines and flights information)
* regions_dm: (countries/regions with monthly temperature data and latitude longitude)
* ports_dm: (port codes and names)
* modes_dm: (mode of transport names)
* visas_dm: (visa codes and purpose)
* immi_dates_dm: (arrival and departure dates, with other date related information such as days, day of week, year, month, etc)
* immigration_ft: Immigration fact table for daily immigration data

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [2]:
# Read in the data here
airport_data = pd.read_csv("airport-codes_csv.csv")
immigration_data = pd.read_csv("immigration_data_sample.csv")
demographics_data = pd.read_csv("us-cities-demographics.csv", sep=";")
climate_file_path = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_data = pd.read_csv(climate_file_path)

#### Running the pandas explore script function

##### Exploring the airports data

In [3]:
explore_dataframe(airport_data)

The data has 55075 rows and 12 columns

The data types are : ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

Showing number of missing records per column
ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

Length of dataframe is 55075
The number of unique rows for column ident are 55075
The number of unique rows for column type are 7
The number of unique rows for column name are 52144
The number of unique rows for column elevation_ft are 5450
The number of unique rows for column continent are 7
The number of uniqu

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


##### Exploring the Immigration data

In [4]:
explore_dataframe(immigration_data)

The data has 1000 rows and 29 columns

The data types are : Unnamed: 0      int64
cicid         float64
i94yr         float64
i94mon        float64
i94cit        float64
i94res        float64
i94port        object
arrdate       float64
i94mode       float64
i94addr        object
depdate       float64
i94bir        float64
i94visa       float64
count         float64
dtadfile        int64
visapost       object
occup          object
entdepa        object
entdepd        object
entdepu       float64
matflag        object
biryear       float64
dtaddto        object
gender         object
insnum        float64
airline        object
admnum        float64
fltno          object
visatype       object
dtype: object

Showing number of missing records per column
Unnamed: 0       0
cicid            0
i94yr            0
i94mon           0
i94cit           0
i94res           0
i94port          0
arrdate          0
i94mode          0
i94addr         59
depdate         49
i94bir           0
i94visa       

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,20635.0,48.0,2.0,1.0,20160412,,,T,O,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,33.0,2.0,1.0,20160402,,,G,O,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,20575.0,39.0,2.0,1.0,20160428,,,O,O,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,35.0,2.0,1.0,20160401,,,O,O,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


##### Exploring the Demographics data

In [5]:
explore_dataframe(demographics_data)

The data has 2891 rows and 12 columns

The data types are : City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

Showing number of missing records per column
City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

Length of dataframe is 2891
The number of unique rows for column City are 567
The number of unique rows for co

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


##### Exploring the climate data

In [6]:
explore_dataframe(temperature_data)

The data has 8599212 rows and 7 columns

The data types are : dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

Showing number of missing records per column
dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

Length of dataframe is 8599212
The number of unique rows for column dt are 3239
The number of unique rows for column AverageTemperature are 111995
The number of unique rows for column AverageTemperatureUncertainty are 10903
The number of unique rows for column City are 3448
The number of unique r

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


#### Cleaning Steps
Document steps necessary to clean the data and 

### Creating Spark Session

In [7]:
#Creating or getting the spark session
spark = SparkSession.builder\
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .config("spark.sql.broadcastTimeout", "36000")\
        .config("spark.sql.autoBroadcastJoinThreshold","-1")\
        .enableHiveSupport()\
        .appName("SparkImmigration")\
        .getOrCreate()

In [8]:
#Defining the sql context 
sc = SQLContext(spark)

In [9]:
#Checking if the data exists , else getting data from another location
folder_name = "sas_data"
if not os.path.exists(os.getcwd() + f"/{folder_name}"):
    print("Reading data from source")
    df_immigration_data = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
    df_immigration_data.write.parquet(f"{folder_name}")
else:
    print(f"Folder already exists, hence reading data from directory {folder_name}")
    df_immigration_data = spark.read.parquet(f"{folder_name}")

Folder already exists, hence reading data from directory sas_data


In [10]:
#Getting the mappings visa, ports and mode of transaport codes and loading in their own database
df_visa_codes = spark.createDataFrame(list(map(list, visa_codes.items())),
                                        ["code","travel_purpose"])
                                        
df_ports_codes = spark.createDataFrame(list(map(list, ports_codes.items())),
                                         ["code", "port_name"])

#Lets clean the ports and remove any left and right trailing spaces
df_ports_codes = df_ports_codes.withColumn("port_name", ltrim(col("port_name")))\
                               .withColumn("port_name", rtrim(col("port_name")))
                                         
df_mode_codes = spark.createDataFrame(list(map(list, mode_codes.items())),
                                        ["code","travel_mode"])                       

In [11]:
#user defined functions and window function definition

get_datetime = udf(lambda date : (timedelta(days=date) + datetime(1960,1,1)) if date > 0.0 else None, TimestampType())

cast_integer = udf(lambda val: int(val) if val != 0 else np.NaN , IntegerType())

to_split_extract_string = udf(split_extract, StringType())

to_split_extract_float = udf(split_extract,FloatType())

cast_lat_lon = udf(clean_latitude_longitude, FloatType())

window = Window.orderBy(col("airline_code"), col("flight_number"))

In [12]:
#Checking the schema of the immigration data
df_immigration_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

##### The dates are in double, hence need to be converted to date for better analysis. A lot of columns are not used by the government and hence will be dropped. These are listed below:

 DTADFILE 


 VISAPOST 


 OCCUP 


 ENTDEPA 


 ENTDEPD


 ENTDEPU 


 DTADDTO 

In [13]:
#looking at number of cicid unique records
df_immigration_data.select("cicid").distinct().count()

3096313

In [14]:
#checking if unique records are same as total records
df_immigration_data.count()

3096313

##### Since cicid unique records are same as number of records, we can assume there is no duplicate data.

Lets clean and process immigration data

In [15]:
#cleaning the arrival and departure dates, and if dates are null assigning 0 date
df_immigration_data = df_immigration_data.withColumn("arrdate", get_datetime(df_immigration_data.arrdate))\
                               .withColumn("depdate", when((df_immigration_data.depdate.isNull()) , 0.0)\
                               .otherwise(df_immigration_data.depdate))

df_immigration_data = df_immigration_data.withColumn("depdate",get_datetime(df_immigration_data.depdate))

#converting double columns to integer
float_immi_columns = {_[0]:_[1] for _ in df_immigration_data.dtypes if _[1] == 'double'}
columns = [_ for _ in float_immi_columns.keys()]
df_immigration_data = df_immigration_data.fillna(0, subset=columns)
for _ in columns:
    df_immigration_data = df_immigration_data.withColumn(_, cast_integer(df_immigration_data[_]))

#extracting unique flights data and creating a flights table with the unique identifier
df_flights_data = df_immigration_data.selectExpr("airline as airline_code","fltno as flight_number")\
                           .dropDuplicates()\
                           .na\
                           .drop(subset=["airline_code","flight_number"])

df_flights_data = df_flights_data.withColumn("flight_id", row_number().over(window) )


#joining the immigration data back to flights data
df_immigration_data = df_flights_data.selectExpr("airline_code as airline","flight_number as fltno","flight_id").join(df_immigration_data, on=["airline", "fltno"], how="left")

df_immigration_data = df_immigration_data.selectExpr("cicid",\
                         "i94cit as city_code",\
                         "i94res as res_code",\
                         "i94port as port",\
                         "arrdate as arr_date",\
                         "i94mode as mode",\
                         "i94addr as addr",\
                         "depdate as dept_date",\
                         "i94bir as age",\
                         "i94visa as visa_code",\
                         "count as counter",\
                         "matflag as matflag",\
                         "biryear as birth_year",\
                         "gender",\
                         "flight_id",\
                         "i94yr as date_year",\
                         "i94mon as date_month",\
                         "visatype as visa_type",\
                         "admnum as admission_number") 



In [16]:
#Lets explore how many values by year
df_immigration_data.select(year("dept_date").alias("dept_year"),year("arr_date").alias("arr_year"))\
                   .groupBy("dept_year","arr_year").count().toPandas()

Unnamed: 0,dept_year,arr_year,count
0,2012.0,2016,2
1,2015.0,2016,3
2,,2016,124587
3,2016.0,2016,2887203


Since the data extracted should be limiting 2016 april, but dept_year has some dates going back to 2012 and 2015,
these should be filtered out

In [17]:
df_immigration_data = df_immigration_data.filter("year(dept_date) = 2016 or dept_date IS NULL")

We should also ensure that dept_date cannot be before arrival date, and keep null dept_date ,since these people are likely to be in the country

In [18]:
df_immigration_data = df_immigration_data.withColumn("diff", datediff("dept_date","arr_date"))\
                                        .filter("diff >= 0 or dept_date is NULL")\
                                        .drop("diff")

In [19]:
#confirming we have 2016 dates and NaN date
df_immigration_data.select(year("dept_date").alias("year")).groupBy("year").count().toPandas()

Unnamed: 0,year,count
0,,124587
1,2016.0,2886939


In [21]:
#Extracting unique dates from arrival and departure to be used for further analysis
df_immigration_data.select("arr_date","dept_date")\
              .filter("dept_date IS NOT NULL")\
              .dropDuplicates(subset=["arr_date","dept_date"])\
              .registerTempTable("temp_date_table")

In [22]:
#View the top 5 rows of final immigartion
df_immigration_data.limit(5).toPandas()

Unnamed: 0,cicid,city_code,res_code,port,arr_date,mode,addr,dept_date,age,visa_code,counter,matflag,birth_year,gender,flight_id,date_year,date_month,visa_type,admission_number
0,74495,254,276,ANC,2016-04-01,1,AK,2016-04-03,44,1,1,M,1972,M,200,2016,4,WB,-429925015
1,77160,254,276,ANC,2016-04-01,1,AK,2016-04-03,38,1,1,M,1978,M,200,2016,4,B1,-2069752582
2,4013911,687,687,FTL,2016-04-21,1,FL,2016-04-23,24,1,1,M,1992,M,222,2016,4,B1,-339484682
3,4013912,687,687,FTL,2016-04-21,1,FL,2016-04-23,51,1,1,M,1965,M,222,2016,4,B1,-339606582
4,6072603,582,582,LAR,2016-04-18,1,TX,2016-06-24,60,2,1,M,1956,M,303,2016,4,B2,-570672994


In [23]:
#View the 5 rows of flight data extracted
df_flights_data.limit(5).toPandas()

Unnamed: 0,airline_code,flight_number,flight_id
0,*FF,1,1
1,*FF,52,2
2,*FF,11626,3
3,*FF,354,4
4,*FF,4520,5


#### The airport data explored with pandas above has no duplicates as identity record matches the length of the dataframe, however there are few columns with lot of missing values. Since the column that will be used to map to the immigration data, which is IATA_CODE, and has 45886 missing rows out of 55075 rows. Hence we will remove all the rows with null values

In [24]:
#Read in airports data into spark dataframe
df_airports_data = spark.read.format('csv').option("header",True).load("airport-codes_csv.csv")

In [25]:
#Dropping the null values for iata_code
df_airports_data = df_airports_data.dropna(subset=["iata_code"])

In [26]:
#Lets summarise the airports data to see if after removing iata_code with nulls, data looks better, and less missing records across other fields
df_airports_data.describe().toPandas()

Unnamed: 0,summary,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,count,9189,9189,9189,8819.0,9189,9189,9189,8423,8538,9189,2987,9189
1,mean,,,,1146.67161809729,,,,,40.0,0.0,22077.5,
2,stddev,,,,1715.6977018420007,,,,,56.568542494923804,0.0,41040.010051168356,
3,min,03N,closed,"""Aeropuerto """"General Tomas de Heres"""". Ciudad...",-11.0,AF,AD,AD-07,108 Mile,00F,-,-,"-0.005456000100821257, 16.24839973449707"
4,max,rjns,small_airport,Å½ilina Airport,999.0,SA,ZW,ZW-MW,Å½ilina,ZYYK,ZZV,ZZV,"99.951499939, 12.6361999512"


#### The missing values have decreased significantly for other columns. ISO_COUNTRY, CONTINENT, ISO_REGION and COORDINATES columns, which might be essential for analytics, have full data. Lets now confirm that IATA_CODE has no duplicates

In [27]:
#printing the unique values for IATA_CODE
df_airports_data.select("iata_code").distinct().count()

9042

In [28]:
#Since the unique values are less than total values, there are some duplicated codes. 
#Lets explore and check which codes are mostly duplicated
df_airports_data.select("iata_code","iso_country").groupBy("iata_code","iso_country")\
                        .count().where(col("count") > 1).sort(col("count").desc()).limit(10).toPandas()

Unnamed: 0,iata_code,iso_country,count
0,0,BR,80
1,OHE,CN,3
2,PRI,SC,3
3,NWT,PG,2
4,BVW,AU,2
5,CMN,MA,2
6,TFY,MA,2
7,GGC,AO,2
8,PCO,MX,2
9,IZA,BR,2


In [29]:
#Since country code Brazil has 0 iata code replicated 80 times, lets explore what is happening in Brazil
df_airports_data.where(col("iata_code")=="0").groupBy("type").count().toPandas()

Unnamed: 0,type,count
0,small_airport,80


In [30]:
#All the 0 code airports are small airports 
#Lets check the remaining airports and by types
df_airports_data.select("iata_code","type").groupBy("iata_code","type")\
                        .count().where((col("count") > 1) & (col("iata_code") != '0')).sort(col("iata_code")).limit(50).toPandas()

Unnamed: 0,iata_code,type,count
0,AHT,closed,2
1,CMN,large_airport,2
2,DDU,small_airport,2
3,DLR,small_airport,2
4,DZI,small_airport,2
5,GGC,small_airport,2
6,GVA,large_airport,2
7,HLA,medium_airport,2
8,IST,large_airport,2
9,IZA,medium_airport,2


There seems to be no direct relation of type of airports to duplication. Lets filter duplication on **type** of airports, filter out iata_code "0" since its all small airports and is not part of mapping in **i94PORT** codes, and also worth removing any airports that are closed

In [31]:
df_airports_data = df_airports_data.filter((col("iata_code")!="0") & (col("type") != "closed"))\
                            .dropDuplicates(subset=["iata_code","type"])

In [32]:
#printing the unique values for IATA_CODE
df_airports_data.select("iata_code").distinct().count()

8797

In [33]:
#lets compare it with total number of records
df_airports_data.count()

8803

In [34]:
#Since still unique values are less that total rows, lets find the duplicated airports iata_codes
df_airports_data.select("iata_code").groupBy("iata_code")\
                        .count().where(col("count") > 1).sort(col("iata_code")).limit(50).toPandas()

Unnamed: 0,iata_code,count
0,LHG,2
1,MRE,2
2,PRM,2
3,RCH,2
4,SGL,2
5,YMX,2


In [35]:
#Finally lets break it down for each IATA code and see if there is any  reason for duplication
df_airports_data.where(col("iata_code").isin(["MRE","YMX","SGL","RCH","PRM","LHG"])).orderBy(col("iata_code")).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,YLRD,medium_airport,Lightning Ridge Airport,540.0,OC,AU,AU-NSW,,YLRD,LHG,,"147.98399353027344, -29.45669937133789"
1,EFDD,large_airport,LOAN OFFER,,AS,AE,AE-DU,dubai,EFDD,LHG,NY33,"0, 0"
2,KE-MRE,small_airport,Mara Lodges Airport,5706.0,AF,KE,KE-700,Mara Lodges,,MRE,,"35.11130905151367, -1.1782759428024292"
3,HKMS,medium_airport,Mara Serena Lodge Airstrip,5200.0,AF,KE,KE-700,Masai Mara,HKMS,MRE,,"35.008057, -1.406111"
4,CO-0039,heliport,Proma Heliport,,SA,CO,CO-CUN,Bogota,,PRM,,"-74.084444, 4.728056"
5,LPPM,medium_airport,PortimÃ£o Airport,5.0,EU,PT,PT-08,PortimÃ£o,LPPM,PRM,,"-8.58396, 37.1493"
6,UY-0002,small_airport,Rocha Airport,91.0,SA,UY,UY-RO,Rocha,,RCH,,"-54.27989959716797, -34.47809982299805"
7,SKRH,medium_airport,Almirante Padilla Airport,43.0,SA,CO,CO-LAG,Riohacha,SKRH,RCH,RCH,"-72.926, 11.5262"
8,RPLS,small_airport,Danilo Atienza Air Base,8.0,AS,PH,PH-CAV,Cavite City,RPLS,SGL,,"120.90399932861, 14.495400428772"
9,SGL,heliport,Danilo Atienza Air Base,,AS,PH,PH-U-A,Cavite,RPLS,SGL,,"120.906987, 14.49562"


In [36]:
#Since all these airports dont have a pattern for duplication and there IATA codes dont exist in i94PORT codes, it is better we filter these out
df_airports_data = df_airports_data.filter(col("iata_code").isin(["MRE","YMX","SGL","RCH","PRM","LHG"]) == False)

In [37]:
#Final check of unique values
(df_airports_data.select("iata_code").distinct().count()/df_airports_data.count())*100

100.0

Since there is 100 percent match to number of unique iata_codes to number of rows, we have achieved data integrity. We now do not need ident field, and that can be dropped.

In [38]:
#The column iso_region has hiphen, so we can clean and extract region code
#Also lets extract the latitude and longitude from coordinates column, and convert them into float to be used for geo plotting and analytics
df_airports_data = df_airports_data.withColumn("region_code", to_split_extract_string(df_airports_data.iso_region,lit("-"),lit(1)))\
                         .withColumn("latitude", to_split_extract_float(df_airports_data.coordinates,lit(","),lit(0),lit("float")))\
                         .withColumn("longitude", to_split_extract_float(df_airports_data.coordinates,lit(","),lit(1),lit("float")))\
                         .drop("coordinates")

#Finally subsetting the dataframe where iata_code will be the main identifier
df_airports_data = df_airports_data.selectExpr("iata_code as id",\
                                     "type",\
                                     "name",\
                                     "elevation_ft",\
                                     "continent",\
                                     "iso_country",\
                                     "iso_region",\
                                     "municipality",\
                                     "gps_code",\
                                     "region_code",\
                                     "latitude",\
                                     "longitude")

In [39]:
#View the 5 rows of airport data extracted
df_airports_data.limit(5).toPandas()

Unnamed: 0,id,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,region_code,latitude,longitude
0,BCA,medium_airport,Gustavo Rizo Airport,26,,CU,CU-14,Baracoa,MUBA,14,-74.506203,20.365299
1,BGA,medium_airport,Palonegro Airport,3897,SA,CO,CO-SAN,Bucaramanga,SKBG,SAN,-73.184799,7.1265
2,BWB,small_airport,Barrow Island Airport,26,OC,AU,AU-WA,,YBWX,WA,115.405998,-20.864401
3,CFM,small_airport,Conklin (Leismer) Airport,1930,,CA,CA-AB,Conklin,CET2,AB,-111.278999,55.695301
4,CGY,large_airport,Laguindingan Airport,190,AS,PH,PH-MSR,Cagayan de Oro City,RPMY,MSR,124.456497,8.612203


##### Dates can provide more features which can then lead to insightful information, hence lets extract them and create more features and use them for analytics

In [40]:
#Query to create unique dates from departure and arrival date of immigration fact table
query = """
           SELECT distinct date_date FROM
           (
           SELECT arr_date AS date_date FROM  temp_date_table
           UNION ALL
           SELECT dept_date AS date_date FROM temp_date_table)
        """

In [41]:
#running the query and creating a spark dataframe
df_date_data = sc.sql(sqlQuery=query)

#Lets create some date features such as month, day, week number,day of week,etc.
df_date_data = df_date_data.select("date_date",\
               dayofmonth("date_date").alias("day"),\
               weekofyear("date_date").alias("week"),\
               month("date_date").alias("month"),\
               date_format("date_date", 'MMMM').alias("month_name"),\
               year("date_date").alias("year"),\
               dayofweek("date_date").alias("day_of_week"),\
               date_format("date_Date", 'E').alias("day_of_week_name"))

In [42]:
#View the 5 rows of dates data with new features
df_date_data.limit(5).toPandas()

Unnamed: 0,date_date,day,week,month,month_name,year,day_of_week,day_of_week_name
0,2016-08-21,21,33,8,August,2016,1,Sun
1,2016-05-31,31,22,5,May,2016,3,Tue
2,2016-08-14,14,32,8,August,2016,1,Sun
3,2016-04-10,10,14,4,April,2016,1,Sun
4,2016-04-30,30,17,4,April,2016,7,Sat


As explored above with pandas, temperature data has many records almost dating back to 1743, however since immigration data is mostly 2016 we can filter out all data before 2000

In [43]:
#Loading the climate/temperature data in spark
climate_file_path = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature_data = spark.read.format("csv").option("header",True).load(climate_file_path)

In [44]:
#Lets see the data types
df_temperature_data.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [45]:
#Since date is a string, lets extract it to date column. Also worth adding a month feature to enable better insights and 
# worth filtering out data before 2000
df_temperature_data = df_temperature_data.withColumn("dt",to_date("dt",'yyyy-MM-dd'))\
                                         .withColumn("month",date_format("dt","MMMM"))\
                                         .filter(col("dt") >= "2000-01-01")
                                    
                                    

#The latitude and longitude are in string format, which will make analytics and geo plotting difficult, so lets cast them to float
df_temperature_data = df_temperature_data.withColumn("Latitude",cast_lat_lon(df_temperature_data.Latitude))\
                                         .withColumn("Longitude",cast_lat_lon(df_temperature_data.Longitude))\
                                         .withColumn("Country", upper(df_temperature_data.Country))

In [46]:
#Getting min and max date of temperature data
df_temperature_data.selectExpr("max(dt) as max_date", "min(dt) as min_date").toPandas()

Unnamed: 0,max_date,min_date
0,2013-09-01,2000-01-01


Since cities can have similar names across countries and sometimes states, and is difficult to establish a direct relationship with cities in immigration data, lets just focus on countries temperature data. Also dates between immigration and climate dont coincide completely, as temperature data have missing 3 years of data, so we can just use average on monthly countries temperature data

In [47]:
#Lets drop cities, dt and AverageTemperatureUncertainty 

df_temperature_data = df_temperature_data.drop("AverageTemperatureUncertainty","City","dt")


#Lets first group on country and month
df_temperature_data = df_temperature_data.groupby("Country", "month")\
                   .agg(mean("AverageTemperature").alias("AverageTemperature"),\
                        mean("Latitude").alias("Latitude"),\
                        mean("Longitude").alias("Longitude"))

#Since countries will be duplicated and country and month make unique combination , we can just pivot on month, and have unique rows based
#on country

#First grouping on country
df_temperature_lat_lon = df_temperature_data.groupby("Country")\
                        .agg(first("Latitude").alias("Latitude"),\
                             first("Longitude").alias("Longitude"))

#Pivoting on months and extracting averaget temperatures
df_temperature_data = df_temperature_data.groupby("Country")\
                        .pivot("Month")\
                        .agg(mean("AverageTemperature").alias("AverageTemperature"))

#Finally joining back to get the final data 
df_temperature_data = df_temperature_data.join(df_temperature_lat_lon, on="Country",how="inner")

We can use i94cit and i94res mappings to join with temperature data, and put it all under regions table

In [48]:
#Getting the i94cit and res mappings
df_regions_data = spark.createDataFrame(list(map(list, cit_and_res_codes.items())),
                                               ["cit_res_code","cit_res_name"])

#Creating a column to join to temperature table
df_regions_data = df_regions_data.withColumn("Country", df_regions_data.cit_res_name)

#Joining back to temperature data and adding column names
df_regions_data = df_regions_data.join(df_temperature_data, on="Country", how="left")
df_regions_data = df_regions_data.selectExpr("cit_res_code",\
                                             "cit_res_name",\
                                             "Latitude as latitude",\
                                             "Longitude as longitude",\
                                             "January as temp_january",\
                                             "February as temp_february",\
                                             "March as temp_march",\
                                             "April as temp_april",\
                                             "May as temp_may",\
                                             "June as temp_june",\
                                             "July as temp_july",\
                                             "August as temp_august",\
                                             "September as temp_september",\
                                             "October as temp_october",\
                                             "November as temp_november",\
                                             "December as temp_december")
                                            

In [49]:
#View the 5 rows of the data
df_regions_data.limit(5).toPandas()

Unnamed: 0,cit_res_code,cit_res_name,latitude,longitude,temp_january,temp_february,temp_march,temp_april,temp_may,temp_june,temp_july,temp_august,temp_september,temp_october,temp_november,temp_december
0,151,ARMENIA,40.990002,44.73,-3.168214,-1.168286,3.802357,9.074071,13.886571,18.552286,21.878857,22.051214,17.523769,11.301154,4.420923,-1.065769
1,512,BAHAMAS,24.92,-78.029999,21.002071,22.173214,23.392714,24.8145,26.678,28.3245,29.111214,29.229,28.370357,26.490923,23.826154,22.132385
2,739,INVALID: DRONNING MAUD LAND (ANTARCTICA-NORWAY),,,,,,,,,,,,,,
3,373,SOUTH AFRICA,-28.1294,27.3316,21.170151,21.256719,19.966431,16.898169,13.993247,11.345843,10.889649,13.48891,16.55442,18.634994,19.675094,20.693595
4,914,No Country Code (914),,,,,,,,,,,,,,


Exploring the demographics data above using pandas , the column with most unique values was Count. Lets explore if there is duplication in other columns

In [50]:
#Importing the demographics data
df_demographics_data = spark.read.csv("us-cities-demographics.csv", header=True, sep=";")

In [51]:
df_demographics_data.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [52]:
#Viewing the 5 rows
df_demographics_data.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [53]:
#Lets filter by a city name
df_demographics_data.where(col("City") == "Silver Spring").toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,White,37756
2,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Black or African-American,21330
3,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,American Indian and Alaska Native,1084
4,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Asian,8841


Since data seems duplicated for majority of columns except for **Race** and **Count** column, we may need to separate the data set, 
removed duplicates and join back

In [54]:
#Lets explore if we will join with i94PORT, and how many matches we will get
df_demographics_data.dropDuplicates(subset=["City","State","State Code"])\
                    .withColumn("City", upper(col("City")))\
                    .withColumn("port_name", concat(col("City"),lit(", "),col("State Code")))\
                    .join(df_ports_codes, on="port_name", how="left")\
                    .groupBy("code").count().toPandas()

Unnamed: 0,code,count
0,FMY,1
1,SNA,1
2,GRB,1
3,EVE,1
4,RIV,1
5,OAK,1
6,FAR,1
7,DET,1
8,REN,1
9,TAC,1


Since majority of matches are None (483), its worth getting rid of city, so we can join with immigration on **State Code**

In [55]:
#Subsetting the data without race and count, and aggregating after removing duplicates
df_demographics_state_city_data = df_demographics_data.dropDuplicates(subset=["City","State","State Code"])\
                                                      .groupby("State Code","State")\
                                                      .agg(mean("Median Age").alias("median_age"),\
                                                            Sum("Male Population").alias("male_population"),\
                                                            Sum("Female Population").alias("female_population"),\
                                                            Sum("Total Population").alias("total_population"),\
                                                            Sum("Number of Veterans").alias("veterans_population"),\
                                                            Sum("Foreign-born").alias("foreign_born_population"),\
                                                            mean("Average Household Size").alias("household_size_avg"))

#Subsetting out the race data and splitting race to it individual columns, by doing a pivot
df_race_state_city_data = df_demographics_data.select("State Code", "Race", "Count")\
                                   .groupby("State Code")\
                                   .pivot("Race")\
                                   .agg(Sum("Count"))

#Once the data is transformed and normalised, can join and bring back together
df_demographics_data = df_demographics_state_city_data.join(df_race_state_city_data,\
                                                 on="State Code",\
                                                 how="inner")

In [56]:
#Lets view the data again
df_demographics_data.limit(5).toPandas()

Unnamed: 0,State Code,State,median_age,male_population,female_population,total_population,veterans_population,foreign_born_population,household_size_avg,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,AZ,Arizona,35.0375,2227455.0,2272087.0,4499542.0,264505.0,682313.0,2.774375,129708.0,229183.0,296222.0,1508157.0,3591611.0
1,SC,South Carolina,34.18,260944.0,272713.0,533657.0,33463.0,27744.0,2.472,3705.0,13355.0,175064.0,29863.0,343764.0
2,LA,Louisiana,34.625,626998.0,673597.0,1300595.0,69771.0,83419.0,2.465,8263.0,38739.0,602377.0,87133.0,654578.0
3,MN,Minnesota,35.618182,702157.0,720246.0,1422403.0,64894.0,215873.0,2.500909,25242.0,151544.0,216731.0,103229.0,1050239.0
4,NJ,New Jersey,35.125,705736.0,723172.0,1428908.0,30195.0,477028.0,2.965833,11350.0,116844.0,452202.0,600437.0,615083.0


In [57]:
#Lets count the rows and see if many duplicates has been removed, rows shouldnt exist more than 49
df_demographics_data.count()

49

In [58]:
#Creating the alias for few columns in demographics
df_demographics_data = df_demographics_data.select(col("State Code").alias("state_code"),\
                                col("State").alias("state_name"),\
                                col("median_age"),\
                                col("male_population"),\
                                col("female_population"),\
                                col("total_population"),\
                                col("veterans_population"),\
                                col("foreign_born_population"),\
                                col("household_size_avg"),\
                                col("American Indian and Alaska Native")\
                                .alias("american_indian_and_alaskan_native_population"),\
                                col("Asian").alias("asian_population"),\
                                col("Black or African-American")\
                                .alias("black_or_african_american_populaton"),\
                                col("Hispanic or Latino")\
                                .alias("hispanic_latino_population"),\
                                col("White").alias("white_population")\
                                )

In [59]:
#Viewing the 5 rows of the demographics data
df_demographics_data.limit(5).toPandas()

Unnamed: 0,state_code,state_name,median_age,male_population,female_population,total_population,veterans_population,foreign_born_population,household_size_avg,american_indian_and_alaskan_native_population,asian_population,black_or_african_american_populaton,hispanic_latino_population,white_population
0,AZ,Arizona,35.0375,2227455.0,2272087.0,4499542.0,264505.0,682313.0,2.774375,129708.0,229183.0,296222.0,1508157.0,3591611.0
1,SC,South Carolina,34.18,260944.0,272713.0,533657.0,33463.0,27744.0,2.472,3705.0,13355.0,175064.0,29863.0,343764.0
2,LA,Louisiana,34.625,626998.0,673597.0,1300595.0,69771.0,83419.0,2.465,8263.0,38739.0,602377.0,87133.0,654578.0
3,MN,Minnesota,35.618182,702157.0,720246.0,1422403.0,64894.0,215873.0,2.500909,25242.0,151544.0,216731.0,103229.0,1050239.0
4,NJ,New Jersey,35.125,705736.0,723172.0,1428908.0,30195.0,477028.0,2.965833,11350.0,116844.0,452202.0,600437.0,615083.0


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model


##### Immigration Analytical Engine Data Model
The analytical database will levereage the star schema design for query optimisation with immigration data held in fact table and others will be dimension tables. If we will normalise the data more, it may become slow for analysis. It is recommended practice for analytical databases (OLAP) to utilise star schema design with one or few fact tables and multiple dimension tables, to ensure fast query results and reduce joins, especially when Business Analysts and Data Science teams working for the government, is the key audience in for this.

![title](img/DataModel.png)

#### 3.2 Mapping Out Data Pipelines
The pipeline steps are in the etl.py. Its running the etl.py on EMR cluster on AWS will generate data and load it into S3 which can then be consumed by Redshift

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

The code is in the etl.py file

#### 4.2 Data Quality Checks
The data quality checks that will be performed include are:
 * Ensure that identity/primary key on fact table and dimension table is unique, and doesnt have nulls
 * The immigration fact table date has date types, and dimension date table has date type
 * The total number of rows are greater than 0

In [60]:
def check_unique_keys(df,table_name="", primary_key=""):
    """
    Function to check table primary key is unique in nature by comparing
    distinct number of primary key values with total number of rows, a
    value error is raised 
    """
    num_rows = df.count()
    num_unique_identifier_rows = df.dropDuplicates(subset=[primary_key]).count()
    if num_rows == num_unique_identifier_rows:
        print(f"{table_name} has unique rows, and primary key constraint is not violated")
    else:
        raise ValueError(f"{table_name} uniqueness violated , duplicated data detected") 

In [61]:
def ensure_no_nulls(df, column=""):
    """
    Function to check for null counts on spark dataframe columns, 
    a value error is raised if column has null values
    """
    null_counts = df.filter(f"{column} is NULL").count()
    if null_counts == 0:
        print(f"{column} doesnt have any nulls")
    else:
        raise ValueError(f"{column} violated the null constraint, cannot contain nulls")

In [62]:
def check_data_type(df, column, datatype):
    """
    Function to check datatype matches for spark dataframe.
    If mismatch is detected then value error is raised, and if column
    is not found then key error is raised
    """
    for _ in df.schema:
        if _.name == column:
            if str(_.dataType) == datatype:
                print(f"datatype match for column {column} having {datatype} values")
                break
            else:
                raise ValueError(f"datatype mismatch detected for column {column}. Expected {datatype} but got {_.dataType}")

In [63]:
def check_greater_that_zero(df):
    """
    Function to perform greater than 0 rows check for spark df.
    If check fails a value error is raised
    """
    if df.count() > 0:
        print(f"Greater than 0 test passed for the table")
    else:
        raise ValueError(f"Table has 0 rows, data may not have loaded correctly")

In [64]:
def match_source_input(df_input, df_output):
    """
    Function to check if data pushed to a location matches 
    data before pushing, to ensure data completeness, else
    value error is raised
    """
    if df_input.count() == df_output.count():
        print(f"Data pushed has complete data")
    else:
        raise ValueError(f"Data at source doesnt match data at destination")

In [65]:
def run_quality_checks():
    """
    Main function for running quality checks on the fact table and all
    dimension tables
    """
    
    print("Running quality test for immigration_fct table")
    check_unique_keys(df_immigration_data, "immigration_fct", "cicid")
    ensure_no_nulls(df_immigration_data, "cicid")
    check_data_type(df_immigration_data, "dept_date", "TimestampType")
    check_data_type(df_immigration_data, "arr_date", "TimestampType")
    check_greater_that_zero(df_immigration_data)

    print("\nRunning quality test for airports_dm table")
    check_unique_keys(df_airports_data, "airports_dm", "id")
    ensure_no_nulls(df_airports_data, "id")
    check_greater_that_zero(df_airports_data)

    print("\nRunning quality test for regions_dm table")
    check_unique_keys(df_regions_data, "regions_dm", "cit_res_code")
    ensure_no_nulls(df_regions_data, "cit_res_code")
    check_greater_that_zero(df_regions_data)
    
    print("\nRunning quality test for modes_dm table")
    check_unique_keys(df_mode_codes, "modes_dm", "code")
    ensure_no_nulls(df_mode_codes,  "code")
    check_greater_that_zero(df_mode_codes)

    print("\nRunning quality test for immi_dates_dm table")
    check_unique_keys(df_date_data, "immi_dates_dm", "date_date")
    ensure_no_nulls(df_date_data, "date_date")
    check_data_type(df_date_data, "date_date", "TimestampType")
    check_greater_that_zero(df_date_data)

    print("\nRunning quality test for ports_dm table")
    check_unique_keys(df_ports_codes, "ports_dm", "code")
    ensure_no_nulls(df_ports_codes, "code")
    check_greater_that_zero(df_ports_codes)

    print("\nRunning quality test for visas_dm table")
    check_unique_keys(df_visa_codes, "visas_dm", "code")
    ensure_no_nulls(df_visa_codes, "code")
    check_greater_that_zero(df_visa_codes)

    print("\nRunning quality test for airlines_dm table")
    check_unique_keys(df_flights_data, "airlines_dm", "flight_id")
    ensure_no_nulls(df_flights_data, "flight_id")
    check_greater_that_zero(df_flights_data)

    print("\nRunning quality test for demographics_dm table")
    check_unique_keys(df_demographics_data, "demographics_dm", "state_code")
    ensure_no_nulls(df_demographics_data, "state_code")
    check_greater_that_zero(df_demographics_data)

In [66]:
run_quality_checks()

Running quality test for immigration_fct table
immigration_fct has unique rows, and primary key constraint is not violated
cicid doesnt have any nulls
datatype match for column dept_date having TimestampType values
datatype match for column arr_date having TimestampType values
Greater than 0 test passed for the table

Running quality test for airports_dm table
airports_dm has unique rows, and primary key constraint is not violated
id doesnt have any nulls
Greater than 0 test passed for the table

Running quality test for regions_dm table
regions_dm has unique rows, and primary key constraint is not violated
cit_res_code doesnt have any nulls
Greater than 0 test passed for the table

Running quality test for modes_dm table
modes_dm has unique rows, and primary key constraint is not violated
code doesnt have any nulls
Greater than 0 test passed for the table

Running quality test for immi_dates_dm table
immi_dates_dm has unique rows, and primary key constraint is not violated
date_date d

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

This is available in dataDictionary.txt file

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* 
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Step 5: Complete Project Write Up
* Rationale for the choice of tools and technologies for the project:
    - Used apache spark for the job on EMR cluster, as EMR is easy to setup and takes care of spark dependencies.
    - Spark use was made because of its ability to process big chunks of data at speed and in memory, and its ability to use ditributed computing. 
    - Spark can also assist in ML, so keeping the analytical use cases for future and its adoptability, its easy to say at this point spark is not going away in the near future, due to its robust framework and operability with other tools. Its a perfect big data technology.

* Propose how often the data should be updated and why.
     - The I94 immigration data is updated on a monthly basis and hence it is feasible to say, data processing and ETL can be done on a monthly basis.


* Write a description of how you would approach the problem differently under the following scenarios:
 * Given the data was increased by 100x.
    - Spark can handle the increase but we would consider increasing the number of nodes and memory of worker nodes in our cluster.
    - If a tipping point comes, and since data is required for analytical purposes, and monthly, can change the pipeline design and do pre processing and transformation on a daily basis, and then do a batch processing every month.
    - Further can explore use of cassandra, but is a beast of its own and need to assess effort vs reward
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - Can utilise a pipeline orchestrator and scheduling framwork such as airflow or luigi.
    - Can setup SLAs in airflow if data is not updated in an hour for example pipeline will fail with error
    - If two days data is delayed for example, airlow and luigi both should be able to handle backfills.
 * The database needed to be accessed by 100+ people.
    - We can use redshift for the above scenario since it is a scalable database. We can scale more if more requirement is there
    - If demand further increases we know a no sql database on cloud like cassandra or a managed service should be a way to go, but costing and engineering effort needs to be taken into account.