# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *


### Step 1: Scope the Project and Gather Data

#### Scope 
The project goal is to enrich the US I94 immigration data with more data i.e.airport data, city demographics to be more insightful during  analysis of the immigration data.


#### I94 Immigration Data
Data  is from the US National Tourism and Trade Office. The data comes from the US National Tourism and Trade Office. This table is used for the fact table in this project.A data dictionary is included in the workspace.

In [2]:
# Read in the data here
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()


In [3]:
df_I94=spark.read.parquet("sas_data")

In [18]:
df_I94.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,...,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,...,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


#### Data Dictionary
- cicid - float64 - ID that uniquely identify one record in the dataset
- i94yr - float64 - 4 digit year
- i94mon- float64 - Numeric month
- i94cit - float64 - 3 digit code of source city for immigration (Born country)
- i94res - float64 - 3 digit code of source country for immigration (Residence country)
- i94port - object - Port addmitted through
- arrdate - float64 - Arrival date in the USA
- i94mode - float64 - Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
- i94addr - object - State of arrival
- depdate -float64 - Departure date
- i94bir - float64 - Age of Respondent in Years
- i94visa - float64 - Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student)
- count - float64 - Used for summary statistics
- dtadfile - object - Character Date Field
- visapost - object - Department of State where where Visa was issued
- occup - object - Occupation that will be performed in U.S.
- entdepa - object - Arrival Flag. Whether admitted or paroled into the US
- entdepd - object - Departure Flag. Whether departed, lost visa, or deceased
- entdepu - object - Update Flag. Update of visa, either apprehended, overstayed, or updated to PR
- matflag - object - Match flag
- biryear - float64 - 4 digit year of birth
- dtaddto - object - Character date field to when admitted in the US
- gender - object - Gender
- insnum - object - INS number
- airline - object - Airline used to arrive in U.S.
- admnum - float64 - Admission number, should be unique and not nullable
- fltno - object - Flight number of Airline used to arrive in U.S.
- visatype - object - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.


#### U.S. City Demographic Data¶

This data comes from OpenSoft.The dataset contains population details of all US Cities and census-designated places.


In [4]:
df_city_demo=spark.read.csv("us-cities-demographics.csv",inferSchema=True,header=True,sep=';')

In [12]:
df_city_demo.limit(10).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


#### Data Dictionary
- City - Name of the city
- State - US state of the city
- Median Age - The median of the age of the population
- Male Population - Number of the male population
- Female Population - Number of the female population
- Total Population - Number of the total population
- Number of Veterans - Number of veterans living in the city
- Foreign-born - Number of residents of the city that were not born in the city
- Average Household Size - Average size of the houses in the city
- State Code - Code of the state of the city
- Race - Race class
- Count - Number of individual of each race

#### Airport Codes
This is a simple table of airport codes and corresponding cities

In [5]:
df_airport=spark.read.csv("airport-codes_csv.csv",inferSchema=True,header=True)

In [18]:
df_airport.limit(10).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


#### Data Dictionary
- ident -Unique identifier
- type - Type of the airport
- name - Airport Name
- elevation_ft - Altitude of the airport
- continent - Continent
- iso_country -ISO code of the country of the airport
- iso_region - ISO code for the region of the airport
- municipality - City where the airport is located
- gps_code - GPS code of the airport
- iata_code - IATA code of the airport
- local_code - Local code of the airport
- coordinates - GPS coordinates of the airport

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.





In [22]:

df_I94.describe().toPandas()

Unnamed: 0,summary,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,count,3096313.0,3096313.0,3096313.0,3096313.0,3096313.0,3096313,3096313.0,3096074.0,2943721,...,392,2957884,3095511.0,3095836,2682044,113708,3012686,3096313.0,3076764,3096313
1,mean,3078651.879075533,2016.0,4.0,304.9069344733559,303.28381949757664,,20559.84854179794,1.0736897761487614,51.652482269503544,...,,,1974.2323855415148,8291120.333841449,,4131.050016327899,59.477601493233784,70828850111.50484,1360.2463696420555,
2,stddev,1763278.0997499449,0.0,0.0,210.02688853063205,208.58321292789532,,8.777339475317723,0.5158963131657106,42.97906231370983,...,,,17.420260534589556,1656502.4244925722,,8821.743471773654,172.6333995206175,22154415947.558968,5852.676345633695,
3,min,6.0,2016.0,4.0,101.0,101.0,5KE,20545.0,1.0,..,...,U,M,1902.0,/ 183D,F,0,*FF,0.0,00000,B1
4,max,6102785.0,2016.0,4.0,999.0,760.0,YSL,20574.0,9.0,ZU,...,Y,M,2019.0,D/S,X,YM0167,ZZ,99915565930.0,ZZZ,WT


In [14]:
df_airport.describe().toPandas()

Unnamed: 0,summary,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,count,55075,55075,55075,48069.0,55075,55075,55075,49399,41030,9189,28686,55075
1,mean,2.3873375337777779E8,,,1240.7896773388254,,,,,2.1920446610204083E8,0.0,8.580556178571428E7,
2,stddev,9.492375382267495E8,,,1602.3634593484142,,,,,9.1123224377024E8,0.0,5.747026415216715E8,
3,min,00A,balloonport,"""""""Der Dingel"""" Airfield""",-1266.0,AF,AD,AD-04,'S Gravenvoeren,0000,-,-,"-0.004722000099718571, 9.425000190734863"
4,max,spgl,small_airport,Çá¸¾á¸á¸ á¸®á¸Ç{+91-9680118734} GiRLFRieNd...,22000.0,SA,ZZ,ZZ-U-A,Å½ocene,ZYYY,ZZV,ZZV,"99.9555969238, 8.47115039825"


In [9]:
df_city_demo = df_city_demo.withColumn("Ccount",df_city_demo.Count)
df_city_demo = df_city_demo.drop(df_city_demo.Count)
df_city_demo.describe().toPandas()

Unnamed: 0,summary,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Ccount
0,count,2891,2891,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891,2891,2891.0
1,mean,,,35.49488066413016,97328.4262465374,101769.6308864266,198966.77931511588,9367.832522585128,40653.59867963864,2.742542608695655,,,48963.77447250087
2,stddev,,,4.401616730099886,216299.93692873296,231564.5725714828,447555.9296335903,13211.21992386408,155749.1036650984,0.4332910878973046,,,144385.58856460615
3,min,Abilene,Alabama,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,AK,American Indian and Alaska Native,98.0
4,max,Yuma,Wisconsin,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,WI,White,3835726.0


#### Summary of shapes of the datasets

In [12]:
print("City Demographics")
df_city_demo_count=df_city_demo.count()
print(f"Rows: {df_city_demo_count}")
print(f"Columns: {len(df_city_demo.columns)}")
print()
print("Airport Codes")
df_airport_count=df_airport.count()
print(f"Rows: {df_airport_count}")
print(f"Columns: {len(df_airport.columns)}")
print()
print("I94 Immigration")
df_I94_count=df_I94.count()
print(f"Rows: {df_I94_count}")
print(f"Columns: {len(df_I94.columns)}")

City Demographics
Rows: 2891
Columns: 12

Airport Codes
Rows: 55075
Columns: 12

I94 Immigration
Rows: 3096313
Columns: 28


#### Checking for Nulls

In [10]:
def countNulls(df):
    nulls_list=[]
    for col in df.columns:
        nulls_d={}
        number_nulls=df.select(col).filter(F.col(col).isNull()).count()
        if number_nulls > 0:
            nulls_d["Column"]=col
            nulls_d["Nulls"]=number_nulls
            nulls_list.append(nulls_d)
    if len(nulls_list)>0:
        display(pd.DataFrame(nulls_list))
    else: print("No Nulls")



In [11]:
countNulls(df_airport)

Unnamed: 0,Column,Nulls
0,elevation_ft,7006
1,municipality,5676
2,gps_code,14045
3,iata_code,45886
4,local_code,26389


In [13]:
countNulls(df_city_demo)

Unnamed: 0,Column,Nulls
0,Male Population,3
1,Female Population,3
2,Number of Veterans,13
3,Foreign-born,13
4,Average Household Size,16


In [14]:
countNulls(df_I94)

Unnamed: 0,Column,Nulls
0,i94mode,239
1,i94addr,152592
2,depdate,142457
3,i94bir,802
4,dtadfile,1
5,visapost,1881250
6,occup,3088187
7,entdepa,238
8,entdepd,138429
9,entdepu,3095921


In [13]:
def count_distinct_values(df):
    for column in df:
            
            distinct_v = df.select(column).distinct().rdd.map(lambda r: r[0]).collect()
            
            print ("'{}' has {} distinct values" .format(column,len(distinct_v)))
            if (len(distinct_v) > 2):
                print("-----------Listing up to 2 distinct values---------")
            display(pd.DataFrame(distinct_v[0:2]))
            print ("\n-----------------------------------------------------------------------\n")


In [14]:
count_distinct_values(df_airport)

'Column<b'ident'>' has 55075 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,06IN
1,06VA



-----------------------------------------------------------------------

'Column<b'type'>' has 7 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,large_airport
1,balloonport



-----------------------------------------------------------------------

'Column<b'name'>' has 52144 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,Mc Kenzie Bridge State Airport
1,Clarks Dream Strip



-----------------------------------------------------------------------

'Column<b'elevation_ft'>' has 5450 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,6620
1,6397



-----------------------------------------------------------------------

'Column<b'continent'>' has 7 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,
1,SA



-----------------------------------------------------------------------

'Column<b'iso_country'>' has 244 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,DZ
1,LT



-----------------------------------------------------------------------

'Column<b'iso_region'>' has 2810 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,US-TN
1,AF-KAP



-----------------------------------------------------------------------

'Column<b'municipality'>' has 27134 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,Sandy Valley
1,Agawam



-----------------------------------------------------------------------

'Column<b'gps_code'>' has 40851 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,06VA
1,0LA0



-----------------------------------------------------------------------

'Column<b'iata_code'>' has 9043 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,BZT
1,YUL



-----------------------------------------------------------------------

'Column<b'local_code'>' has 27437 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,06VA
1,0LA0



-----------------------------------------------------------------------

'Column<b'coordinates'>' has 54874 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,"-88.56900024414062, 40.1786003112793"
1,"-117.28399658203125, 48.84090042114258"



-----------------------------------------------------------------------



In [15]:
count_distinct_values(df_city_demo)

'Column<b'City'>' has 567 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,Saint George
1,Worcester



-----------------------------------------------------------------------

'Column<b'State'>' has 49 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,Utah
1,Hawaii



-----------------------------------------------------------------------

'Column<b'Median Age'>' has 180 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,37.1
1,44.8



-----------------------------------------------------------------------

'Column<b'Male Population'>' has 594 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,70863
1,93948



-----------------------------------------------------------------------

'Column<b'Female Population'>' has 595 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,89574
1,76110



-----------------------------------------------------------------------

'Column<b'Total Population'>' has 594 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,100884
1,166624



-----------------------------------------------------------------------

'Column<b'Number of Veterans'>' has 578 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,4519
1,18051



-----------------------------------------------------------------------

'Column<b'Foreign-born'>' has 588 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,11317
1,26755



-----------------------------------------------------------------------

'Column<b'Average Household Size'>' has 162 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,2.86
1,3.26



-----------------------------------------------------------------------

'Column<b'State Code'>' has 49 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,AZ
1,SC



-----------------------------------------------------------------------

'Column<b'Race'>' has 5 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,Black or African-American
1,Hispanic or Latino



-----------------------------------------------------------------------

'Column<b'Count'>' has 2785 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,7240
1,23271



-----------------------------------------------------------------------



In [16]:
count_distinct_values(df_I94)

'Column<b'cicid'>' has 3096313 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,5748877.0
1,5749231.0



-----------------------------------------------------------------------

'Column<b'i94yr'>' has 1 distinct values


Unnamed: 0,0
0,2016.0



-----------------------------------------------------------------------

'Column<b'i94mon'>' has 1 distinct values


Unnamed: 0,0
0,4.0



-----------------------------------------------------------------------

'Column<b'i94cit'>' has 243 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,299.0
1,692.0



-----------------------------------------------------------------------

'Column<b'i94res'>' has 229 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,692.0
1,299.0



-----------------------------------------------------------------------

'Column<b'i94port'>' has 299 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,FMY
1,BGM



-----------------------------------------------------------------------

'Column<b'arrdate'>' has 30 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,20550.0
1,20556.0



-----------------------------------------------------------------------

'Column<b'i94mode'>' has 5 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,
1,1.0



-----------------------------------------------------------------------

'Column<b'i94addr'>' has 458 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,.N
1,RG



-----------------------------------------------------------------------

'Column<b'depdate'>' has 236 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,20593.0
1,20689.0



-----------------------------------------------------------------------

'Column<b'i94bir'>' has 113 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,8.0
1,67.0



-----------------------------------------------------------------------

'Column<b'i94visa'>' has 3 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,1.0
1,3.0



-----------------------------------------------------------------------

'Column<b'count'>' has 1 distinct values


Unnamed: 0,0
0,1.0



-----------------------------------------------------------------------

'Column<b'dtadfile'>' has 118 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,20160615
1,20160825



-----------------------------------------------------------------------

'Column<b'visapost'>' has 531 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,CRS
1,KGL



-----------------------------------------------------------------------

'Column<b'occup'>' has 112 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,PHA
1,REL



-----------------------------------------------------------------------

'Column<b'entdepa'>' has 14 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,K
1,F



-----------------------------------------------------------------------

'Column<b'entdepd'>' has 13 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,K
1,Q



-----------------------------------------------------------------------

'Column<b'entdepu'>' has 3 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,
1,Y



-----------------------------------------------------------------------

'Column<b'matflag'>' has 2 distinct values


Unnamed: 0,0
0,
1,M



-----------------------------------------------------------------------

'Column<b'biryear'>' has 113 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,1988.0
1,1976.0



-----------------------------------------------------------------------

'Column<b'dtaddto'>' has 778 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,9282016
1,2282017



-----------------------------------------------------------------------

'Column<b'gender'>' has 5 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,F
1,



-----------------------------------------------------------------------

'Column<b'insnum'>' has 1914 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,5325
1,3414



-----------------------------------------------------------------------

'Column<b'airline'>' has 535 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,MM
1,DZ



-----------------------------------------------------------------------

'Column<b'admnum'>' has 3075579 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,907737600.0
1,94947800000.0



-----------------------------------------------------------------------

'Column<b'fltno'>' has 7153 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,456
1,556



-----------------------------------------------------------------------

'Column<b'visatype'>' has 17 distinct values
-----------Listing up to 2 distinct values---------


Unnamed: 0,0
0,F2
1,GMB



-----------------------------------------------------------------------



#### Cleaning Steps
Document steps necessary to clean the data

#### Checking for broKen records in each data set

In [7]:
# Performing cleaning tasks here


df_airport.withColumn('nullsNum', sum(df_airport[col].isNull().cast('int') for col in df_airport.columns))\
.orderBy(F.desc("nullsNum")).groupBy("nullsNum").count().toPandas()


Unnamed: 0,nullsNum,count
0,5,1508
1,4,5020
2,3,6479
3,2,12623
4,1,26699
5,0,2746


In [17]:
df_airport.withColumn('nullsNum', sum(df_airport[col].isNull().cast('int') for col in df_airport.columns))\
.orderBy(F.desc("nullsNum")).limit(10).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,nullsNum
0,AE-0005,heliport,Kempinski Emirates Palace Twin Heliport,,AS,AE,AE-UQ,,,,,"54.3205904961, 24.462267882799996",5
1,AE-0017,heliport,Ghantoot Hotel 2 Helipad,,AS,AE,AE-AZ,,,,,"54.882283, 24.865417",5
2,AE-0007,heliport,Al Ghuwaifat Border Post helipad,,AS,AE,AE-DU,,,,,"51.600595, 24.120421",5
3,AE-0003,small_airport,Dubai Skydive,,AS,AE,AE-DU,,,,,"55.1366257668, 25.0898742332",5
4,AE-0008,heliport,Al Ghuwaifat Customs Post helipad,,AS,AE,AE-DU,,,,,"51.616767, 24.128339",5
5,AE-0013,heliport,Oceana Palm Jumeirah Helipad,,AS,AE,AE-U-A,,,,,"55.135169, 25.110083",5
6,AE-0014,heliport,Ghantoot Racing & Polo Club Helipad,,AS,AE,AE-AZ,,,,,"54.905556, 24.86824",5
7,AE-0004,heliport,Sheikh Sultan Bin Khalifa bin Zayed Al Nahyan ...,,AS,AE,AE-DU,,,,,"55.1746809483, 25.1225658976",5
8,AE-0015,heliport,Golden Tulip Al Jazira Helipad,,AS,AE,AE-AZ,,,,,"54.894996, 24.858628",5
9,AE-0012,heliport,Waldorf Astoria Dubai Palm Jumeirah Helipad,,AS,AE,AE-DU,,,,,"55.150379, 25.133372",5


So there is no single record that has all columns null, the highest amount null columns for each record is not alarming, as there are a lot of column values which could possibly not be filled

In [8]:
df_I94.withColumn('nullsNum', sum(df_I94[col].isNull().cast('int') for col in df_I94.columns))\
.orderBy(F.desc("nullsNum")).groupBy("nullsNum").count().toPandas()


Unnamed: 0,nullsNum,count
0,12,3
1,11,308
2,10,289
3,9,3721
4,8,17196
5,7,38935
6,6,106841
7,5,460335
8,4,1346942
9,3,1115984


In [9]:
df_I94.withColumn('nullsNum', sum(df_I94[col].isNull().cast('int') for col in df_I94.columns))\
.orderBy(F.desc("nullsNum")).limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,nullsNum
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,,1979.0,10282016.0,,,,1897628000.0,,B2,12
1,6020025.0,2016.0,4.0,252.0,582.0,PSP,20553.0,9.0,,,...,,,7072016.0,U,,,34605310000.0,,WT,12
2,5973645.0,2016.0,4.0,252.0,209.0,SAI,20549.0,9.0,,,...,,,5192016.0,F,,,57551700000.0,,GMT,12
3,5901492.0,2016.0,4.0,582.0,582.0,XXX,20556.0,,,20575.0,...,M,1986.0,,,,,78081390000.0,,B2,11
4,5901490.0,2016.0,4.0,582.0,582.0,XXX,20554.0,,,20575.0,...,M,1986.0,,,,,84332940000.0,,B2,11
5,5901928.0,2016.0,4.0,112.0,112.0,HHW,20559.0,9.0,,,...,,,7142016.0,M,3517.0,,47114220000.0,,WT,11
6,5879643.0,2016.0,4.0,582.0,582.0,XXX,20555.0,,,20574.0,...,M,1951.0,,,,,89677700000.0,,B2,11
7,5901488.0,2016.0,4.0,582.0,582.0,XXX,20553.0,,,20574.0,...,M,1978.0,,,,,89095960000.0,,B2,11
8,5901491.0,2016.0,4.0,582.0,582.0,XXX,20554.0,,,20575.0,...,M,1989.0,,,,,89885530000.0,,B2,11
9,5901728.0,2016.0,4.0,111.0,111.0,CHM,20553.0,9.0,,,...,,,7072016.0,U,5522.0,,42348700000.0,,WT,11


So there is no single record that has all columns null, the highest amount null columns for each record is not alarming, as there are a lot of column values which could possibly not be filled

In [18]:
df_city_demo.withColumn('nullsNum', sum(df_city_demo[col].isNull().cast('int') for col in df_city_demo.columns))\
.orderBy(F.desc("nullsNum")).groupBy("nullsNum").count().toPandas()


Unnamed: 0,nullsNum,count
0,3,16
1,0,2875


In [19]:
df_city_demo.withColumn('nullsNum', sum(df_city_demo[col].isNull().cast('int') for col in df_city_demo.columns))\
.orderBy(F.desc("nullsNum")).limit(10).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count,nullsNum
0,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Hispanic or Latino,335559,3
1,Ponce,Puerto Rico,40.5,56968.0,64615.0,121583,,,,PR,Hispanic or Latino,120705,3
2,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,Hispanic or Latino,76349,3
3,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,American Indian and Alaska Native,12143,3
4,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Hispanic or Latino,1066,3
5,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Black or African-American,331,3
6,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,Hispanic or Latino,139967,3
7,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,White,72211,3
8,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,American Indian and Alaska Native,4031,3
9,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Asian,235,3


So there is no single record that has all columns null, the highest amount null columns for each record is not alarming, as there are a lot of column values which could possibly not be filled

##### Only keep records with valid gender for I94 Immigration dataset

In [6]:
df_I94=df_I94.filter(df_I94.gender.isNotNull())

In [8]:
print(f"Number of rows in I94 dataset after cleaning out invalid gender values: {df_I94.count()}")


Number of rows in I94 dataset after cleaning out invalid gender values: 2682044


##### Only keep records where continent is NA in city demographics datasets 

In [47]:
df_airport.count()

55075

In [7]:
df_airport=df_airport.filter(df_airport.continent == "NA" )

In [50]:
print(f"Number of rows in airport dataset after removing all aairports that are not in North America: {df_airport.count()}")



Number of rows in airport dataset after removing all aairports that are not in North America: 27719


##### Only keep records where airport type is either "small_airport","medium_airport", or "large_airport"

In [8]:
df_airport = df_airport.filter('type == "small_airport" or type == "medium_airport" or type == "large_airport" ')

In [11]:
print(f"Number of rows in airport dataset after filtering out some airport types: {df_airport.count()}")



Number of rows in airport dataset after filtering out some airport types: 17637


##### Drop Duplicates by city in city demographics dataset

In [9]:
df_city_demo.count()

2891

In [9]:

df_city_demo.dropDuplicates(subset=['city']).count()


567

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

I will be using a star schema as my data model(dimensional modelling)
   - Why:
        - Simplified Queries
        - Fast aggregations
        - You can easily denormalize your tables.
        
A star schema consists of fact and dimensional tables.
- Fact table: Consists of facts, metrics, measurements.
- Dimension table: Table containg business elements. It consists of basically descriptions.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

- Transform the staged data into their appropriate data types
- Perform the necessary joins
- Write the tables(dataframes) to different parquet files. One parquet file per table.
- These tables should form a star schema when created and saved to the respective parquet files.
- For every parquet file created, run a count statement to confirm the existence of the data in the table(dataframe) stored in the respective parquet file.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [10]:
output_path = "output_data/"
def write_to_parquet(table, file_name):
    """
    Write the table as parquet file
    
    Params:
        table
        filename
    Return value:
        Outputs the dataframe/table as parquet file to a folder
    """
    # write artists table to parquet files
    file_output = output_path + file_name
    table.write.mode("overwrite").parquet(file_output)
    

def customUnion(df1, df2):
    """
    Combines two dataframes 
    
    Parameters:
         dataframe1
         dataframe2
     
     Returns:
         A combined dataframe
    """
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

##### Airport table(dimension)

In [11]:
airport_table = df_airport.select(['ident', 'type', 'name', 'iso_region', 'municipality','gps_code','local_code','coordinates']) \
               .dropDuplicates().dropna()
write_to_parquet(airport_table,"airport_table.parquet" )

In [15]:
airport_table.limit(3).toPandas()

Unnamed: 0,ident,type,name,iso_region,municipality,gps_code,local_code,coordinates
0,0LA3,small_airport,Belcher Airpatch Airport,US-LA,Belcher,0LA3,0LA3,"-93.87349700927734, 32.749298095703125"
1,0OR8,small_airport,Sutton on Rogue Airport,US-OR,White City,0OR8,0OR8,"-122.86599731445312, 42.484798431396484"
2,0R2,small_airport,Lincoln Municipal Airport,US-MO,Lincoln,0R2,0R2,"-93.33300018310547, 38.403900146484375"


##### Demographics Table(DImension)

In [16]:
print("Demographics data schema:")
df_city_demo.printSchema()

Demographics data schema:
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [13]:
from pyspark.sql.functions import *

In [14]:
dem_table = df_city_demo \
.groupBy(col("State Code").alias("stateCode"), col("State").alias("state")).agg(
round(mean('Median Age'), 2).alias("medianAge"),\
sum("Total Population").alias("totalPopulation"),\
sum("Male Population").alias("malePopulation"), \
sum("Female Population").alias("femalePopulation"),\
sum("Number of Veterans").alias("numberOfVeterans"),\
sum("Foreign-born").alias("foreignBorn"), \
round(mean("Average Household Size"),2).alias("averageHouseholdSize")
).dropna()

In [19]:
dem_table.limit(3).toPandas()

Unnamed: 0,stateCode,state,medianAge,totalPopulation,malePopulation,femalePopulation,numberOfVeterans,foreignBorn,averageHouseholdSize
0,MT,Montana,35.5,906470,438535,467935,69270,29885,2.27
1,NC,North Carolina,33.79,15300995,7330525,7970470,830730,1896635,2.48
2,MD,Maryland,36.37,6560645,3139755,3420890,320715,1148970,2.66


In [15]:
print("Demographics table schema:")
dem_table.printSchema()

Demographics table schema:
root
 |-- stateCode: string (nullable = true)
 |-- state: string (nullable = true)
 |-- medianAge: double (nullable = true)
 |-- totalPopulation: long (nullable = true)
 |-- malePopulation: long (nullable = true)
 |-- femalePopulation: long (nullable = true)
 |-- numberOfVeterans: long (nullable = true)
 |-- foreignBorn: long (nullable = true)
 |-- averageHouseholdSize: double (nullable = true)



In [21]:
write_to_parquet(dem_table,"dem_table.parquet" )

##### I94 Immigration dataset

In [22]:

print("I94 schema:")
df_I94.printSchema()

I94 schema:
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double 

##### Changing 'double' to 'integer'

In [16]:
df_I94 = df_I94 \
.withColumn("cicid", col("cicid").cast("integer")) \
.withColumn("year", col("i94yr").cast("integer")) \
.drop("i94yr") \
.withColumn("month", col("i94mon").cast("integer")) \
.drop("i94mon") \
.withColumn("bornCountry", col("i94cit").cast("integer")) \
.drop("i94cit") \
.withColumn("residentCountry", col("i94res").cast("integer")) \
.drop("i94res") \
.withColumnRenamed("i94port", "arrivalPort") \
.withColumn("mode", col("i94mode").cast("integer")) \
.drop("i94mode") \
.withColumnRenamed("i94addr", "arrivalAddress") \
.withColumn("age", col("i94bir").cast("integer")) \
.drop("i94bir") \
.withColumn("visa", col("i94visa").cast("integer")) \
.drop("i94visa") \
.withColumn("birthYear", col("biryear").cast("integer")) \
.drop("biryear") \
.withColumnRenamed("fltno", "flightNumber") \
.withColumnRenamed("visaType", "visaType") \
.withColumn("sasDate", to_date(lit("01/01/1960"), "MM/dd/yyyy")) \
.withColumn("arrivalDate", expr("date_add(sasDate, arrdate)")) \
.withColumn("departureDate", expr("date_add(sasDate, depdate)")) \
.drop("sasDate", "arrdate", "depdate", "count", "admnum", "dtadfile", "visapost", "occup", "dtaddto", "insnum")


In [17]:
print("New I94 schema:")
df_I94.printSchema()

New I94 schema:
root
 |-- cicid: integer (nullable = true)
 |-- arrivalPort: string (nullable = true)
 |-- arrivalAddress: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- flightNumber: string (nullable = true)
 |-- visaType: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- bornCountry: integer (nullable = true)
 |-- residentCountry: integer (nullable = true)
 |-- mode: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa: integer (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- arrivalDate: date (nullable = true)
 |-- departureDate: date (nullable = true)



##### Make  sure age is zero years plus, and drop duplicates

In [18]:
df_I94 = df_I94.where((col("age") >= 0) & (col("cicid").isNotNull())) \
     .dropDuplicates(['cicid'])

In [19]:
print((df_I94.count(), len(df_I94.columns)))

(2681242, 21)


In [20]:
df_I94.limit(2).toPandas()

Unnamed: 0,cicid,arrivalPort,arrivalAddress,entdepa,entdepd,entdepu,matflag,gender,airline,flightNumber,...,year,month,bornCountry,residentCountry,mode,age,visa,birthYear,arrivalDate,departureDate
0,148,NEW,NY,G,O,,M,F,OS,89,...,2016,4,103,103,1,21,2,1995,2016-04-01,2016-04-08
1,471,MIA,,G,O,,M,M,VES,91285,...,2016,4,103,103,2,63,2,1953,2016-04-01,2016-04-03


##### Time table(Dimension)

In [21]:
time_table = df_I94.select(['arrivalDate'])\
                    .withColumnRenamed('arrivalDate','time') 

time_table = time_table \
             .withColumn('day', F.dayofmonth('time')) \
             .withColumn('month', F.month('time')) \
             .withColumn('year', F.year('time')) \
             .withColumn('week', F.weekofyear('time')) \
             .withColumn('weekday', F.dayofweek('time'))\
             .dropDuplicates()
write_to_parquet(time_table,"time_table.parquet" )

In [29]:
print("Time table schema:")
time_table.printSchema()

Time table schema:
root
 |-- time: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [30]:
time_table.limit(2).toPandas()

Unnamed: 0,time,day,month,year,week,weekday
0,2016-04-01,1,4,2016,13,6
1,2016-04-06,6,4,2016,14,4


##### Person table(Dimension)

In [22]:
person_table = df_I94.select(['birthYear','gender','age']) \
                 .dropDuplicates() \
                 .withColumn("personId", \
                        monotonically_increasing_id())
print((person_table.count(), len(person_table.columns)))

(375, 4)


In [32]:
person_table.limit(2).toPandas()

Unnamed: 0,birthYear,gender,age,personId
0,1965,F,51,0
1,2004,U,12,1


In [33]:
print("Person table schema:")
person_table.printSchema()

Person table schema:
root
 |-- birthYear: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- personId: long (nullable = false)



In [34]:
write_to_parquet(person_table,"person_table.parquet" )

##### Building the fact table

In [23]:
fact_table = df_I94.select(['cicid', 'arrivalDate','departureDate','mode','bornCountry', 'airline','flightNumber','visa','visaType',
                         'gender','arrivalPort']) \
                .dropna() \
                .dropDuplicates(['cicid'])
fact_table = fact_table.withColumnRenamed("arrivalDate", "time")

In [24]:

fact_table =fact_table.join(person_table.select(['personId', 'gender']), (fact_table.gender == person_table.gender) , how = 'inner') \
                      .drop('gender') \
                      .dropna() \
                      .dropDuplicates()

In [25]:
fact_table.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- time: date (nullable = true)
 |-- departureDate: date (nullable = true)
 |-- mode: integer (nullable = true)
 |-- bornCountry: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- flightNumber: string (nullable = true)
 |-- visa: integer (nullable = true)
 |-- visaType: string (nullable = true)
 |-- arrivalPort: string (nullable = true)
 |-- personId: long (nullable = false)



In [26]:
fact_table.count()

265736828

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

- Since I have been saving my fact and dimensional tables in parquet files, all one has to do is look at the file size to see if there is data in it. 
- If there is need to constantly check the files, without having to check for file size, one can just follow the steps that I took in from the beginning while exploring the raw datasets, then have these steps in an apache airflow customized plugin to avoid unnecessary repetition.
- In brief, I have been performing data quality checks as I explored the data.



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    - Python: Readable Programming Language that has many libraries that support data analytical activities. Also, many technologies provide a pyton api to make use of.
    - Spark: Fast parallel processing. Fast data transformations. No need to first create tables and connect to a database in order to analyse the data. Flexible schema manipulation. Distributed system. Big data procesing.
    
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - Use Redshift (https://aws.amazon.com/redshift/),allows querying petabytes of structured and semi-structured data across the data warehouse, and also provides quality and fast query perfomance since it provides columnar data storage and massive parallel processing.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - Use Apache Airflow which is a tool that is used for easy and efficient maintanibility of data pipelnes. Hence the data pipelines can be scheduled to run daily by 7am.
 * The database needed to be accessed by 100+ people.
     - More cpu resources will be needed to get a fast experience. By using a distributed database you can to improve your replications and partitioning to get faster query results for each user.