# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Import libs

In [2]:
# Do all imports and installs here
import pandas as pd
import os
import configparser

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, to_date
from pyspark.sql.types import *
from utils.create_tables import *
from utils.tools import *


### Create Spark Session

In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The datasets are used in this project:
* I94 immigration Sample data. This data used for the main analysis
* Airport Code Table
* US Cities Demographic Data

In the scope of this project I will gather data from 3 sources, then load these data into staging dataframes. I will clean the raw data, write it to parquet files and perform an ETL process using a Spark cluster. After that, I will write data into fact and dimension tables to form a star schema and do quality check. This star schema can be used to analytic, corelation and ad-hoc reporting purposes.

#### Describe and Gather Data 
* [The I94 immigration Sample data](https://www.trade.gov/national-travel-and-tourism-office)
The I-94 is the Arrival/Departure Record, in either paper or electronic format, issued by a Customs and Border Protection (CBP) Officer to foreign visitors entering the United States.
* [Airport Code Table](http://ourairports.com/data/)
Airport data includes IATA airport code.An IATA airport code, also known as an IATA location identifier, IATA station code or simply a location identifier, is a three-letter geocode designating many airports and metropolitan areas around the world, defined by the International Air Transport Association (IATA). IATA code is used in passenger reservation, ticketing and baggage-handling systems (https://en.wikipedia.org/wiki/IATA_airport_code)”.
* [US Cities Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
This data comes from OpenSoft and contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. Original data comes from the US Census Bureau's 2015 American Community Survey.


### Step 2: Explore and Assess the Data

### Explore I94 immigration sample data
The I94 immigration sample data contains 1000 records. 

In [4]:
df = spark.read.csv("immigration_data_sample.csv", header=True, inferSchema=True)

In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: integer (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: integer (nullable = true)
 |-- airline: string (nullable = 

In [6]:
df.limit(5).toPandas().head()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


### Cleaning the dataset
* Identify null and Nan values
* Remove duplicate rows
* Find errors

#### Identify null and Nan values
Counting Nan values in each column, except the date type columns like dtadfile, dtadddto, arrdate, depdate because isnan function only works with numerical types.

In [7]:
df.select([count(when(isnan(colname), 1)).alias(colname) for colname in df.drop('dtadfile','dtaddto','arrdate','depdate').columns]).toPandas().head()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,i94mode,i94addr,i94bir,...,entdepd,entdepu,matflag,biryear,gender,insnum,airline,admnum,fltno,visatype
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


Counting rows have null values in dataset.

In [8]:
df.na.fill(False).count()

1000

The number of null equal to the total rows. That's mean every row have at least 1 null value.
 - Now counting null values in each columns

In [9]:
df.select([count(when(col(colname).isNull(),1)).alias(colname) for colname in df.columns]).toPandas().head()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,0,0,0,0,0,0,0,0,0,59,...,1000,46,0,0,141,965,33,0,8,0


There are a lot of nulls in columns. But it seems like some field have been left empty of information. So I think no need to correct or fill those null values.

#### Dropping duplicate rows

In [10]:
count_before = df.count()
print(f"Before dropped: {count_before}")
df = df.drop_duplicates()
count_after = df.count()
print(f"After dropped: {count_after}")

Before dropped: 1000
After dropped: 1000


The number of rows doesn't change. So dataset have no duplicate rows.

#### Find errors

In [11]:
is_consistence = df.where(col('i94yr') == 2016).count() == df.count()
if is_consistence:
    print("All rows in column i94yr have value 2016 in this dataset.")
else:
    print("Found row in column i94yr don't have value 2016 in this dataset.")

All rows in column i94yr have value 2016 in this dataset.


### The Airport codes dataset

In [12]:
df_airport_codes = spark.read.csv('airport-codes_csv.csv', header=True, inferSchema=True)

In [13]:
df_airport_codes.limit(5).toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [14]:
# df_airport_codes schema:
df_airport_codes.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



Number of rows

In [15]:
df_airport_codes.count()

55075

Remove duplicate rows

In [16]:
df_airport_codes.drop_duplicates()

DataFrame[ident: string, type: string, name: string, elevation_ft: int, continent: string, iso_country: string, iso_region: string, municipality: string, gps_code: string, iata_code: string, local_code: string, coordinates: string]

In [17]:
df_airport_codes.count()

55075

In [18]:
df_airport_codes = df_airport_codes.toPandas()

There is no duplicate rows. This dataset is clean.

### US Cities Demographic Data

In [19]:
df_cities_demographic = spark.read.csv('us-cities-demographics.csv', header=True, inferSchema=True, sep=';')

In [20]:
df_cities_demographic.limit(20).toPandas().head(20)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


In [21]:
df_cities_demographic.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



Number of rows

In [22]:
df_cities_demographic.count()

2891

Drop rows have Nan value

In [23]:
new_df_cities_demographic = df_cities_demographic.dropna()
rows_dropped = df_cities_demographic.count() - new_df_cities_demographic.count()
print(f"Number of row dropped with nan value: {rows_dropped}")

Number of row dropped with nan value: 16


Remove duplicate rows

In [24]:
new_df_cities_demographic.drop_duplicates()

DataFrame[City: string, State: string, Median Age: double, Male Population: int, Female Population: int, Total Population: int, Number of Veterans: int, Foreign-born: int, Average Household Size: double, State Code: string, Race: string, Count: int]

In [25]:
new_df_cities_demographic.count()

2875

In [26]:
df_cities_demographic = new_df_cities_demographic.toPandas()

The data is clean, no duplicate rows.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![Conceptual Data Model](conceptual_model.png)
In this project, star schema is deployed in a relational database management system as dimensional structures. Include:
- *fact_immigration*: is the heart of the data model. This table's data comes from the immigration dataset and contains keys that links to the dimension tables.
- *dim_airport*: This table's data comes from the airport code dataset and links to the *fact_immigration* via *ident* key.
- *dim_status*: This table's data comes from the immigration dataset and links to the *fact_immigration* via *status_flag_id* key.
- *dim_person*: This table's data comes from the immigration dataset and links to the *fact_immigration* via *person_id* key.
- *dim_visa*: This table's data comes from the immigration dataset and links to the *fact_immigration* via *visa_id* key.
- *dim_time*: This table's data comes from the immigration dataset and links to the *fact_immigration* via *arrdate* key.
- *dim_state*: This table's data comes from the US Cities Demographic Data and links to the *fact_immigration* via *state_code* key.
- *dim_country*: This table's data comes from the immigration dataset and links to the *fact_immigration* via *i94res* key.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
1. Load the data into staging tables
2. Create fact and dimension tables
3. Write data into parquet files
4. Perform data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [27]:
output_path = "tables/"

In [28]:
df = df.toPandas()
df.head()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,1401028,2867437.0,2016.0,4.0,691.0,691.0,MIA,20559.0,1.0,FL,...,,M,2012.0,10142016,M,,AV,93608020000.0,30,B2
1,547206,1147049.0,2016.0,4.0,690.0,690.0,MIA,20550.0,1.0,FL,...,,M,1959.0,7042016,M,,AV,55702030000.0,6,WT
2,2058357,4167704.0,2016.0,4.0,576.0,576.0,TAM,20566.0,1.0,CA,...,,M,1982.0,10212016,F,,AA,94237890000.0,2294,B2
3,1522896,3112694.0,2016.0,4.0,130.0,130.0,CHI,20561.0,1.0,NV,...,,M,1972.0,7152016,M,,SK,56344630000.0,945,WB
4,2510445,5072687.0,2016.0,4.0,213.0,213.0,HOU,20571.0,1.0,TX,...,,M,1985.0,10262016,M,,QR,94689060000.0,713,B2


In [29]:
# create immigration schema
immigration_schema = StructType([StructField("_c0", IntegerType(), True)\
                          ,StructField("cicid", FloatType(), True)\
                          ,StructField("i94yr", FloatType(), True)\
                          ,StructField("i94mon", FloatType(), True)\
                          ,StructField("i94cit", FloatType(), True)\
                          ,StructField("i94res", FloatType(), True)\
                          ,StructField("i94port", StringType(), True)\
                          ,StructField("arrdate", FloatType(), True)\
                          ,StructField("i94mode", FloatType(), True)\
                          ,StructField("i94addr", StringType(), True)\
                          ,StructField("depdate", FloatType(), True)\
                          ,StructField("i94bir", FloatType(), True)\
                          ,StructField("i94visa", FloatType(), True)\
                          ,StructField("count", FloatType(), True)\
                          ,StructField("dtadfile", StringType(), True)\
                          ,StructField("visapost", StringType(), True)\
                          ,StructField("occup", StringType(), True)\
                          ,StructField("entdepa", StringType(), True)\
                          ,StructField("entdepd", StringType(), True)\
                          ,StructField("entdepu", StringType(), True)\
                          ,StructField("matflag", StringType(), True)\
                          ,StructField("biryear", FloatType(), True)\
                          ,StructField("dtaddto", StringType(), True)\
                          ,StructField("gender", StringType(), True)\
                          ,StructField("insnum", FloatType(), True)\
                          ,StructField("airline", StringType(), True)\
                          ,StructField("admnum", FloatType(), True)\
                          ,StructField("fltno", StringType(), True)\
                          ,StructField("visatype", StringType(), True)])

immigration_spark = spark.createDataFrame(df, schema=immigration_schema)

immigration_spark.toPandas().head()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,1401028,2867437.0,2016.0,4.0,691.0,691.0,MIA,20559.0,1.0,FL,...,,M,2012.0,10142016,M,,AV,93608030000.0,30,B2
1,547206,1147049.0,2016.0,4.0,690.0,690.0,MIA,20550.0,1.0,FL,...,,M,1959.0,7042016,M,,AV,55702030000.0,6,WT
2,2058357,4167704.0,2016.0,4.0,576.0,576.0,TAM,20566.0,1.0,CA,...,,M,1982.0,10212016,F,,AA,94237880000.0,2294,B2
3,1522896,3112694.0,2016.0,4.0,130.0,130.0,CHI,20561.0,1.0,NV,...,,M,1972.0,7152016,M,,SK,56344630000.0,945,WB
4,2510445,5072687.0,2016.0,4.0,213.0,213.0,HOU,20571.0,1.0,TX,...,,M,1985.0,10262016,M,,QR,94689070000.0,713,B2


In [30]:
# create airport code schema
airport_codes_schema = StructType([StructField("ident", StringType(), True)\
                        ,StructField("type", StringType(), True)\
                        ,StructField("name", StringType(), True)\
                        ,StructField("elevation_ft", FloatType(), True)\
                        ,StructField("continent", StringType(), True)\
                        ,StructField("iso_country", StringType(), True)\
                        ,StructField("iso_region", StringType(), True)\
                        ,StructField("municipality", StringType(), True)\
                        ,StructField("gps_code", StringType(), True)\
                        ,StructField("iata_code", StringType(), True)\
                        ,StructField("local_code", StringType(), True)\
                        ,StructField("coordinates", StringType(), True)])

airport_codes_spark = spark.createDataFrame(df_airport_codes, schema=airport_codes_schema)

airport_codes_spark.toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [31]:
# create demographics schema
demographics_schema = StructType([StructField("City", StringType(), True)\
                        ,StructField("State", StringType(), True)\
                        ,StructField("Median Age", FloatType(), True)\
                        ,StructField("Male Population", IntegerType(), True)\
                        ,StructField("Female Population", IntegerType(), True)\
                        ,StructField("Total Population", IntegerType(), True)\
                        ,StructField("Number of Veterans", IntegerType(), True)\
                        ,StructField("Foreign-born", IntegerType(), True)\
                        ,StructField("Average Household Size", FloatType(), True)\
                        ,StructField("State Code", StringType(), True)\
                        ,StructField("Race", StringType(), True)\
                        ,StructField("Count", IntegerType(), True)])

demographics_spark = spark.createDataFrame(df_cities_demographic, schema=demographics_schema)

demographics_spark.toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.799999,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.599998,138040,143873,281913,5829,86253,2.73,NJ,White,76402


1. Create dim_airport

In [32]:
airport_codes = create_dim_airport(airport_codes_spark, output_path)

Writing table airport to tables/airport
Write complete!


In [33]:
airport = spark.read.parquet("tables/airport")
airport.toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,gps_code,iata_code,coordinates
0,00W,small_airport,Lower Granite State Airport,719.0,US,US-WA,Colfax,00W,,"-117.44300079346, 46.673500061035"
1,04SD,heliport,Cheyenne River Health Center Heliport,2437.0,US,US-SD,Eagle Butte,04SD,,"-101.243011, 44.993124"
2,0GA3,small_airport,Ayresouth Airport,1287.0,US,US-GA,Temple,0GA3,,"-85.06079864501953, 33.77009963989258"
3,0IA0,heliport,Knoxville Area Community Hospital Heliport,927.0,US,US-IA,Knoxville,0IA0,,"-93.09600067138672, 41.316898345947266"
4,0IL4,heliport,Good Samaritan Hospital Heliport,772.0,US,US-IL,Downers Grove,0IL4,,"-88.00779724121094, 41.81890106201172"


2. Create dim_status

In [34]:
status = create_dim_status(immigration_spark, output_path)

Writing table status to tables/status
Write complete!


In [35]:
status = spark.read.parquet("tables/status")
status.toPandas().head()

Unnamed: 0,status_flag_id,arrival_flag,departure_flag,match_flag
0,5,O,O,M
1,74,A,D,M
2,210,Z,O,M
3,984,T,I,M
4,147,O,,


3. Create dim_person

In [36]:
person = create_dim_person(immigration_spark, output_path)

Writing table person to tables/person
Write complete!


In [37]:
person = spark.read.parquet("tables/person")
person.toPandas().head()

Unnamed: 0,person_id,birth_year,gender
0,25,1991.0,M
1,99,1977.0,M
2,602,2012.0,
3,640,1965.0,
4,76,1965.0,F


4. Create dim_visa

In [38]:
visa = create_dim_visa(immigration_spark, output_path)

Writing table visa to tables/visa
Write complete!


In [39]:
visa = spark.read.parquet("tables/visa")
visa.toPandas().head()

Unnamed: 0,visa_id,i94visa,visatype,visapost
0,107,2.0,B2,MNL
1,143,2.0,B2,NRB
2,168,2.0,B2,GTM
3,970,2.0,B2,CDJ
4,0,2.0,B2,BGT


5. Create dim_country

In [40]:
country_mapping_df = spark.read.csv("./i94res.csv", header=True, inferSchema=True)

In [41]:
country_mapping_df.limit(5).toPandas().head()

Unnamed: 0,code,Name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [42]:
country = create_dim_country(country_mapping_df, output_path)

Writing table country to tables/country
Write complete!


In [43]:
country = spark.read.parquet("tables/country")
country.toPandas().head()

Unnamed: 0,code,Name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


6. Create dim_state

In [44]:
state = create_dim_state(demographics_spark, output_path)

Writing table state to tables/state
Write complete!


In [45]:
state = spark.read.parquet("tables/state")
state.toPandas().head()

Unnamed: 0,state_code,state,median_age,total_population,male_population,female_population,foreign_born,average_household_size
0,DC,District of Columbia,33.8,3361140,1598525,1762615,475585,2.24
1,AR,Arkansas,32.74,2882889,1400724,1482165,307753,2.53
2,TN,Tennessee,34.31,10690165,5124189,5565976,900149,2.46
3,LA,Louisiana,34.63,6502975,3134990,3367985,417095,2.47
4,AZ,Arizona,35.04,22497710,11137275,11360435,3411565,2.77


7. Create dim_time

In [46]:
time = create_dim_time(immigration_spark, output_path)

Writing table time to tables/time
Write complete!


In [47]:
time = spark.read.parquet("tables/time")
time.toPandas().head()

Unnamed: 0,arrdate,arrival_date,day,month,year,week,weekday
0,20559.0,2016-04-15,15,4,2016,15,6
1,20555.0,2016-04-11,11,4,2016,15,2
2,20551.0,2016-04-07,7,4,2016,14,5
3,20558.0,2016-04-14,14,4,2016,15,5
4,20571.0,2016-04-27,27,4,2016,17,4


8. Create fact_immigration

In [48]:
immigration = create_fact_immigration(immigration_spark, output_path, spark)

Writing table immigration to tables/immigration
Write complete!


In [49]:
immigration = spark.read.parquet("tables/immigration")
immigration.toPandas().head()

Unnamed: 0,cicid,ident,status_flag_id,person_id,visa_id,arrdate,state_code,i94res,depdate,i94mode,i94port,i94cit,i94addr,airline,fltno
0,2867437.0,,0.0,0.0,0.0,20559.0,FL,691.0,20590.0,1.0,MIA,691.0,FL,AV,30
1,1147049.0,,0.0,1.0,,20550.0,FL,690.0,20558.0,1.0,MIA,690.0,FL,AV,6
2,4167704.0,,0.0,2.0,2.0,20566.0,CA,576.0,20576.0,1.0,TAM,576.0,CA,AA,2294
3,3112694.0,,0.0,3.0,,20561.0,NV,130.0,20565.0,1.0,CHI,130.0,NV,SK,945
4,5072687.0,,0.0,4.0,4.0,20571.0,TX,213.0,20572.0,1.0,HOU,213.0,TX,QR,713


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

1. Check table columns

In [50]:
airport = spark.read.parquet("tables/airport")
airport.toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,gps_code,iata_code,coordinates
0,00W,small_airport,Lower Granite State Airport,719.0,US,US-WA,Colfax,00W,,"-117.44300079346, 46.673500061035"
1,04SD,heliport,Cheyenne River Health Center Heliport,2437.0,US,US-SD,Eagle Butte,04SD,,"-101.243011, 44.993124"
2,0GA3,small_airport,Ayresouth Airport,1287.0,US,US-GA,Temple,0GA3,,"-85.06079864501953, 33.77009963989258"
3,0IA0,heliport,Knoxville Area Community Hospital Heliport,927.0,US,US-IA,Knoxville,0IA0,,"-93.09600067138672, 41.316898345947266"
4,0IL4,heliport,Good Samaritan Hospital Heliport,772.0,US,US-IL,Downers Grove,0IL4,,"-88.00779724121094, 41.81890106201172"


In [51]:
status = spark.read.parquet("tables/status")
status.toPandas().head()

Unnamed: 0,status_flag_id,arrival_flag,departure_flag,match_flag
0,5,O,O,M
1,74,A,D,M
2,210,Z,O,M
3,984,T,I,M
4,147,O,,


In [52]:
person = spark.read.parquet("tables/person")
person.toPandas().head()

Unnamed: 0,person_id,birth_year,gender
0,25,1991.0,M
1,99,1977.0,M
2,602,2012.0,
3,640,1965.0,
4,76,1965.0,F


In [53]:
visa = spark.read.parquet("tables/visa")
visa.toPandas().head()

Unnamed: 0,visa_id,i94visa,visatype,visapost
0,107,2.0,B2,MNL
1,143,2.0,B2,NRB
2,168,2.0,B2,GTM
3,970,2.0,B2,CDJ
4,0,2.0,B2,BGT


In [54]:
country = spark.read.parquet("tables/country")
country.toPandas().head()

Unnamed: 0,code,Name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [55]:
state = spark.read.parquet("tables/state")
state.toPandas().head()

Unnamed: 0,state_code,state,median_age,total_population,male_population,female_population,foreign_born,average_household_size
0,DC,District of Columbia,33.8,3361140,1598525,1762615,475585,2.24
1,AR,Arkansas,32.74,2882889,1400724,1482165,307753,2.53
2,TN,Tennessee,34.31,10690165,5124189,5565976,900149,2.46
3,LA,Louisiana,34.63,6502975,3134990,3367985,417095,2.47
4,AZ,Arizona,35.04,22497710,11137275,11360435,3411565,2.77


In [56]:
time = spark.read.parquet("tables/time")
time.toPandas().head()

Unnamed: 0,arrdate,arrival_date,day,month,year,week,weekday
0,20559.0,2016-04-15,15,4,2016,15,6
1,20555.0,2016-04-11,11,4,2016,15,2
2,20551.0,2016-04-07,7,4,2016,14,5
3,20558.0,2016-04-14,14,4,2016,15,5
4,20571.0,2016-04-27,27,4,2016,17,4


In [57]:
immigration = spark.read.parquet("tables/immigration")
immigration.toPandas().head()

Unnamed: 0,cicid,ident,status_flag_id,person_id,visa_id,arrdate,state_code,i94res,depdate,i94mode,i94port,i94cit,i94addr,airline,fltno
0,2867437.0,,0.0,0.0,0.0,20559.0,FL,691.0,20590.0,1.0,MIA,691.0,FL,AV,30
1,1147049.0,,0.0,1.0,,20550.0,FL,690.0,20558.0,1.0,MIA,690.0,FL,AV,6
2,4167704.0,,0.0,2.0,2.0,20566.0,CA,576.0,20576.0,1.0,TAM,576.0,CA,AA,2294
3,3112694.0,,0.0,3.0,,20561.0,NV,130.0,20565.0,1.0,CHI,130.0,NV,SK,945
4,5072687.0,,0.0,4.0,4.0,20571.0,TX,213.0,20572.0,1.0,HOU,213.0,TX,QR,713


2. Check Record Count

In [58]:
tables = {
    "airport": airport,
    "status": status,
    "person": person,
    "visa": visa,
    "country": country,
    "state": state,
    "time": time,
    "immigration": immigration
}

for table_name, table in tables.items():
    perform_quality_check(table, table_name)

Data quality check passed for airport with record_count: 55075 records.
Data quality check passed for status with record_count: 34 records.
Data quality check passed for person with record_count: 215 records.
Data quality check passed for visa with record_count: 149 records.
Data quality check passed for country with record_count: 289 records.
Data quality check passed for state with record_count: 48 records.
Data quality check passed for time with record_count: 30 records.
Data quality check passed for immigration with record_count: 1000 records.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. Clearly state the rationale for the choice of tools and technologies for the project.
 - In this project use some processing techniques including:
     - Apache Spark, because of its ability to process massive amounts of data as well as the use of its unified analytics engine and convenient APIs.
     - Pandas, due to its convenient dataframe manipulation functions.

2. Propose how often the data should be updated and why:
    - The immigration (i94) data set is updated monthly, hence all relevant data should be updated monthly as well.

3. Write a description of how you would approach the problem differently under the following scenarios:
- The data was increased by 100x: If the data was increased by 100x I would use more sophisticated and appropriate frameworks to perform processing and storage functions, such as Amazon Redshift, Amazon EMR or Apache Cassandra.
- The data populates a dashboard that must be updated on a daily basis by 7am every day: If the data had to populate a dashboard daily, I would manage the ETL pipeline in a DAG from Apache Airflow. This would ensure that the pipeline runs in time, that data quality checks pass, and provide a convenient means of notification should the pipeline fail.
- The database needed to be accessed by 100+ people: If the data needed to be accessed by many people simultaneously, I would move the analytics database to Amazon Redshift which can handle massive request volumes and is easily scalable.