# Project Title
### Data Engineering Capstone Project

#### Project Summary
We will take an I94 dataset on immigration from the Department of Homeland Security and turn it into the fact table of a star schema database with demographic data from the US Census Bureau and airport data from OurAirports as dimension tables. We clean the data and where possible add the corresponding name fields of code fields.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from boto3.s3.transfer import S3Transfer
from boto3 import client
import os
from pyspark.sql import SparkSession
import configparser
from pyspark.sql.functions import udf
from pyspark.sql.functions import split
from pyspark.sql.functions import upper
from pyspark.sql import functions as F
import datetime
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DateType
from pyspark.sql.functions import col, when
from pyspark.sql.functions import regexp_replace
pd.set_option('display.max_columns', None)
from pyspark.sql.functions import unix_timestamp

In [71]:
client_obj = client('s3',
                    aws_access_key_id = pd.read_csv('aws_credentials.csv').loc[:,'AWS_ACCESS_KEY_ID'][0],
                    aws_secret_access_key = pd.read_csv('aws_credentials.csv').loc[:,'AWS_SECRET_ACCESS_KEY'][0])
transfer = S3Transfer(client_obj)
os.environ["AWS_ACCESS_KEY_ID"] = pd.read_csv('aws_credentials.csv').loc[:,'AWS_ACCESS_KEY_ID'][0]
os.environ["AWS_SECRET_ACCESS_KEY"]= pd.read_csv('aws_credentials.csv').loc[:,'AWS_SECRET_ACCESS_KEY'][0]

In [72]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.5")\
                     .config("spark.jars.packages","com.amazonaws:aws-java-sdk:1.7.4")\
                     .enableHiveSupport()\
                     .config("spark.hadoop.fs.s3a.access.key", pd.read_csv('aws_credentials.csv').loc[:,'AWS_ACCESS_KEY_ID'][0]) \
                     .config("spark.hadoop.fs.s3a.secret.key", pd.read_csv('aws_credentials.csv').loc[:,'AWS_SECRET_ACCESS_KEY'][0]) \
                     .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project we will take I94 Immigration data from the US National Tourism and Trade Office and enrich it with airport data and demographic data.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

We are using the following data sets:  
- *I94 Immigration data, which comes from the US National Tourism and Trade Office, which describes various statistics on immigrants such as visa type, origin, destination, age, etc.*
- *Demographic data from the US Census Bureau's 2015 American Community Survey, which is surveyed from cities with a population greater or equal to 65,000.*
- *Airport data, taken from ourairports.com, which contains information about the type, location and elevation of airports.*

Because our aim is the generate a rich, combined dataset, we will use as many fields as are available. We augment fields which only contain codes by adding a field of the companying name, where possible.

In [73]:
immigration_df=spark.read.parquet("../workspace//immigration-parquet/")
print(immigration_df.count())
immigration_df.limit(2).toPandas()

40790529


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5680949.0,2016.0,7.0,117.0,117.0,NYC,20659.0,1.0,NY,,30.0,3.0,1.0,20160724,NPL,,G,,,,1986.0,D/S,F,,IG,2947450000.0,3940,F1
1,5680950.0,2016.0,7.0,245.0,245.0,DET,20659.0,1.0,IL,20679.0,46.0,2.0,1.0,20160813,,,G,O,,M,1970.0,01232017,M,78652.0,DL,2947451000.0,188,B2


In [5]:
airportcodes_df = spark.read.csv("../workspace//airport-codes_csv.csv",
                                 header = True)
print(airportcodes_df.count())
airportcodes_df.limit(2).toPandas()

55075


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"


In [6]:
demographic_df = spark.read.csv("../workspace/us-cities-demographics.csv",
                                sep = ';',
                                header = True)
print(demographic_df.count())
demographic_df.limit(2).toPandas()

2891


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723


In our final dataset, we will use names in addition to codes. Let's import some mapping files to map state codes to state names, city codes to city names, and country codes to country names

In [74]:
state_mapping = spark.read.csv("../workspace/state_mapping.csv",
                                  header = True)
country_mapping = spark.read.csv("../workspace/country_mapping.csv",
                                 header = True)
destination_city_mapping = spark.read.csv("../workspace/destination_city_mapping.csv",
                                 header = True)

### Step 2: Explore, Assess and Clean the the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [26]:
print(immigration_df.dtypes)
immigration_df.limit(2).toPandas()

[('cicid', 'double'), ('i94yr', 'double'), ('i94mon', 'double'), ('i94cit', 'double'), ('i94res', 'double'), ('i94port', 'string'), ('arrdate', 'double'), ('i94mode', 'double'), ('i94addr', 'string'), ('depdate', 'double'), ('i94bir', 'double'), ('i94visa', 'double'), ('count', 'double'), ('dtadfile', 'string'), ('visapost', 'string'), ('occup', 'string'), ('entdepa', 'string'), ('entdepd', 'string'), ('entdepu', 'string'), ('matflag', 'string'), ('biryear', 'double'), ('dtaddto', 'string'), ('gender', 'string'), ('insnum', 'string'), ('airline', 'string'), ('admnum', 'double'), ('fltno', 'string'), ('visatype', 'string')]


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5680949.0,2016.0,7.0,117.0,117.0,NYC,20659.0,1.0,NY,,30.0,3.0,1.0,20160724,NPL,,G,,,,1986.0,D/S,F,,IG,2947450000.0,3940,F1
1,5680950.0,2016.0,7.0,245.0,245.0,DET,20659.0,1.0,IL,20679.0,46.0,2.0,1.0,20160813,,,G,O,,M,1970.0,01232017,M,78652.0,DL,2947451000.0,188,B2


The immigration dataset contains a litany of issues:  
- the column header names are non-descriptive


In [27]:
# let's give the dataset some more descriptive names
immigration_df2 = immigration_df.withColumnRenamed('cicid', 'cicid2')\
                    .withColumnRenamed('i94yr', 'year')\
                    .withColumnRenamed('i94mon', 'month_number')\
                    .withColumnRenamed('i94cit', 'origin_city')\
                    .withColumnRenamed('i94port', 'destination_city')\
                    .withColumnRenamed('arrdate', 'arrival_date')\
                    .withColumnRenamed('i94mode', 'travel_type')\
                    .withColumnRenamed('i94addr', 'state')\
                    .withColumnRenamed('depdate', 'departure_date')\
                    .withColumnRenamed('i94bir', 'age')\
                    .withColumnRenamed('i94visa', 'visa_category')\
                    .withColumnRenamed('dtadfile', 'character_date_field')\
                    .withColumnRenamed('visapost', 'department_visa_issues')\
                    .withColumnRenamed('occup', 'occupation')\
                    .withColumnRenamed('entdepa', 'arrival_flag')\
                    .withColumnRenamed('entdepd', 'departure_flag')\
                    .withColumnRenamed('entdepu', 'update_flag')\
                    .withColumnRenamed('matflag', 'match_arrival_departure_flag')\
                    .withColumnRenamed('biryear', 'year_of_birth')\
                    .withColumnRenamed('dtaddto', 'allowed_to_stay_until')\
                    .withColumnRenamed('gender', 'gender')\
                    .withColumnRenamed('insnum', 'ins_number')\
                    .withColumnRenamed('airline', 'airline_name')\
                    .withColumnRenamed('admnum', 'admission_number')\
                    .withColumnRenamed('fltno', 'flight_number')\
                    .withColumnRenamed('visatype', 'visa_type')\

immigration_df2 = immigration_df2.withColumn("cicid2",immigration_df2["cicid2"].cast('int'))\
                     .withColumn("year",immigration_df2["year"].cast('int'))\
                     .withColumn("month_number",immigration_df2["month_number"].cast('int'))\
                     .withColumn("origin_city",immigration_df2["origin_city"].cast('int'))\
                     .withColumn("i94res",immigration_df2["i94res"].cast('int'))\
                     .withColumn("age",immigration_df2["age"].cast('int'))\
                     .withColumn("count",immigration_df2["count"].cast('int'))\
                     .withColumn("year_of_birth",immigration_df2["year_of_birth"].cast('int'))\
                     .withColumn("flight_number",immigration_df2["flight_number"].cast('int'))\

print(immigration_df2.count())
immigration_df2.limit(1).toPandas()

40790529


Unnamed: 0,cicid2,year,month_number,origin_city,i94res,destination_city,arrival_date,travel_type,state,departure_date,age,visa_category,count,character_date_field,department_visa_issues,occupation,arrival_flag,departure_flag,update_flag,match_arrival_departure_flag,year_of_birth,allowed_to_stay_until,gender,ins_number,airline_name,admission_number,flight_number,visa_type
0,5680949,2016,7,117,117,NYC,20659.0,1.0,NY,,30,3.0,1,20160724,NPL,,G,,,,1986,D/S,F,,IG,2947450000.0,3940,F1


- the date fields `arrdate`, `depdate`, `dtadfile`, `dtaddto` are in different and unusual formats
- the field `allowed_to_stay_until` contains many entries which aren't dates

In [28]:
# define some functions to clean the date fields
get_date = udf(lambda x: (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat() if x else None)
get_travel = udf(lambda x: 'Air' if x == 1 else 'Sea' if x == 2 else 'Land' if x == 3 else 'Not reported' if x == 4 else None)
get_category = udf(lambda x: 'Business' if x == 1 else 'Pleasure' if x == 2 else 'Student' if x == 3 else None)
get_character_date = udf(lambda x: str(x)[:4] + "-" + str(x)[4:6] + "-" + str(x)[-2:] if len(str(x)) == 8 else None)
transform_allowed_to_stay_until = udf(lambda x: None if x == 'D/S' 
                                            or x == '183'
                                            or x == '184'
                                            or x == '-00-0000'
                                            or x == '182'
                                            or x == '146'
                                            or x == '29'
                                            or x == '/184D'
                                            or x == '181'
                                            or x == '730'
                                            or x == '77'
                                            or x == 'DSS'
                                            or x == ''                             
                                            or x == 'Q4142016'                             
                                            or x == '`1132017'
                                            or x == '/   183D'
                                            or x == '6252018'
                                            or x == '11 02003'
                                            or x == '0000-00-00'
                                            or x == '10 02003'
                                            or x == '08100101'                             
                                            or x == '06 02002'
                                            or int(str(x)[4:8]) > 2020
                                            or int(str(x)[4:8]) <= 1930                             
                                            or len(str(x)) < 8
                                            or len(str(x)) == 0                                      
                                            or x == '90' else x)
get_allowed_to_stay_until = udf(lambda x: str(x)[4:8] + "-" + str(x)[:2] + "-" + str(x)[2:4] if x else None)

# apply the functions 
immigration_df2 = immigration_df2.withColumn("arrival_date", get_date("arrival_date"))\
                     .withColumn("departure_date", get_date("departure_date"))\
                     .withColumn("travel_type", get_travel("travel_type"))\
                     .withColumn("visa_category", get_category("visa_category"))\
                     .withColumn("allowed_to_stay_until", transform_allowed_to_stay_until("allowed_to_stay_until"))\
                     .withColumn("allowed_to_stay_until", get_allowed_to_stay_until("allowed_to_stay_until"))\
                     .withColumn("character_date_field", get_character_date("character_date_field"))\

immigration_df2 = immigration_df2.withColumn('arrival_date', col("arrival_date").cast("date"))\
                     .withColumn('departure_date', col("departure_date").cast("date"))\
                     .withColumn('allowed_to_stay_until', col("allowed_to_stay_until").cast("date"))\
                     .withColumn('character_date_field', col("character_date_field").cast("date"))\

print(immigration_df2.count())
immigration_df2.limit(1).toPandas()

40790529


Unnamed: 0,cicid2,year,month_number,origin_city,i94res,destination_city,arrival_date,travel_type,state,departure_date,age,visa_category,count,character_date_field,department_visa_issues,occupation,arrival_flag,departure_flag,update_flag,match_arrival_departure_flag,year_of_birth,allowed_to_stay_until,gender,ins_number,airline_name,admission_number,flight_number,visa_type
0,5680949,2016,7,117,117,NYC,2016-07-24,Air,NY,,30,Student,1,2016-07-24,NPL,,G,,,,1986,,F,,IG,2947450000.0,3940,F1


- the dataset contains several codes for `origin_city`, `i94res` and `state`, but no accompanying names

In [29]:
# join the immigration table with the mapping tables
immigration_df = immigration_df2.join(country_mapping,
                           immigration_df2['origin_city'] == country_mapping.country_code,
                           how = 'left')\
                     .withColumnRenamed('country_name', 'origin_city_country_name')\
                     .drop('country_code')\
                     .drop('origin_city')\
                     .join(country_mapping,
                           immigration_df2['i94res'] == country_mapping.country_code,
                           how = 'left')\
                     .withColumnRenamed('country_name', 'i94res_country_name')\
                     .drop('country_code')\
                     .drop('i94res')\
                     .join(state_mapping,
                           immigration_df2['state'] == state_mapping['Statecode'],
                           how = 'left')\
                     .drop('Statecode')\
                     .drop('state')\
                     .join(destination_city_mapping,
                           immigration_df2['destination_city'] == destination_city_mapping['destination_city_code'],
                           how = 'left')\
                     .drop('destination_city_code')\
                     .drop('destination_city')
print(immigration_df.count())
immigration_df.limit(2).toPandas()

40790529


Unnamed: 0,cicid2,year,month_number,arrival_date,travel_type,departure_date,age,visa_category,count,character_date_field,department_visa_issues,occupation,arrival_flag,departure_flag,update_flag,match_arrival_departure_flag,year_of_birth,allowed_to_stay_until,gender,ins_number,airline_name,admission_number,flight_number,visa_type,origin_city_country_name,i94res_country_name,Statename,destination_city_name
0,5680949,2016,7,2016-07-24,Air,,30,Student,1,2016-07-24,NPL,,G,,,,1986,,F,,IG,2947450000.0,3940,F1,ITALY,ITALY,NEW YORK,"NEW YORK, NY"
1,5680950,2016,7,2016-07-24,Air,2016-08-13,46,Pleasure,1,2016-08-13,,,G,O,,M,1970,2017-01-23,M,78652.0,DL,2947451000.0,188,B2,"CHINA, PRC","CHINA, PRC",ILLINOIS,"DETROIT, MI"


Let's take a look at the airport codes dataset

In [30]:
print(airportcodes_df.dtypes)
airportcodes_df.limit(1).toPandas()

[('ident', 'string'), ('type', 'string'), ('name', 'string'), ('elevation_ft', 'string'), ('continent', 'string'), ('iso_country', 'string'), ('iso_region', 'string'), ('municipality', 'string'), ('gps_code', 'string'), ('iata_code', 'string'), ('local_code', 'string'), ('coordinates', 'string')]


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"


Note that elevation_ft has data type string. Let's change it to float

In [31]:
airportcodes_df = airportcodes_df.withColumn("elevation_ft",col("elevation_ft").cast("float"))
print(airportcodes_df.count())
airportcodes_df.limit(2).toPandas()

55075


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"


The end goal is to make a many-to-one star schema, so let's group the states to find the average elevation

In [32]:
# get the average elevation per state
df_elevation = airportcodes_df.groupBy('ident',"iso_country","iso_region")\
                              .avg("elevation_ft")\
                              .drop('iso_country')\
                              .drop('iso_region')
df_elevation.limit(2).toPandas()

Unnamed: 0,ident,avg(elevation_ft)
0,01WN,103.0
1,0AK3,250.0


Note that the `iso_region` combines country and state. Since state names are easier to work with, we will extract the state code and map it to state name

In [33]:
# get state code from iso_region
split_col = split(airportcodes_df['iso_region'], '-')
airportcodes_df = airportcodes_df.withColumn('state', split_col.getItem(1))\
                                 .drop('iso_region')
airportcodes_df.limit(2).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,coordinates,state
0,00A,heliport,Total Rf Heliport,11.0,,US,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",PA
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,Leoti,00AA,,00AA,"-101.473911, 38.704022",KS


 Since we want a many-to-one star schema for our datamodel, we sort the airports per `type` on size and keep the biggest airport per `state`

In [34]:
airportcodes_df = airportcodes_df.orderBy(when(col("type") == "large_airport", 1)
                                          .when(col("type") == "medium_airport", 2)
                                          .when(col("type") == "small_airport", 3)
                                          .when(col("type") == "seaplane_base", 4)
                                          .when(col("type") == "heliport", 5)
                                          .when(col("type") == "balloonport", 6)
                                          .when(col("type") == "closed", 6))\
                                 .drop_duplicates(subset = ['state'])

Let's merge back the earlier dataframe with average elevation, to get average elevation per state

In [35]:
airportcodes_df = airportcodes_df.join(df_elevation,
                                       airportcodes_df['ident'] == df_elevation['ident'],
                                       how = 'left')\
                                  .drop('ident')
airportcodes_df.limit(1).toPandas()

Unnamed: 0,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,coordinates,state,avg(elevation_ft)
0,large_airport,Abeid Amani Karume International Airport,54.0,AF,TZ,Zanzibar,HTZA,ZNZ,,"39.224899, -6.22202",7,54.0


We have the field `state` as a code field, but in addition we prefer the state name to make the dataset easier to use for our business users.  
Let's merge the `state` field with our state mapping table to generate `state_name`

In [36]:
# add the statename column
airportcodes_df = airportcodes_df.join(state_mapping,
                                       airportcodes_df['state'] == state_mapping['Statecode'],
                                       how = 'inner')\
                                 .drop('Statecode')\
                                 .withColumnRenamed('Statename', 'state_name')
airportcodes_df.limit(1).toPandas()

Unnamed: 0,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,coordinates,state,avg(elevation_ft),state_name
0,large_airport,Luke Air Force Base,1085.0,,US,Glendale,KLUF,LUF,LUF,"-112.383003235, 33.534999847399995",AZ,1085.0,ARIZONA


Let's take a look at the demographics dataset

In [37]:
print(demographic_df.dtypes)
demographic_df.limit(1).toPandas()

[('City', 'string'), ('State', 'string'), ('Median Age', 'string'), ('Male Population', 'string'), ('Female Population', 'string'), ('Total Population', 'string'), ('Number of Veterans', 'string'), ('Foreign-born', 'string'), ('Average Household Size', 'string'), ('State Code', 'string'), ('Race', 'string'), ('Count', 'string')]


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924


`Median Age` and  `Average Household Size` should be float instead of string
`Male Population`, `Female Population`, `Total Population`, `Number of Veterans`, `Foreign-born`, `Count` should be integer instead of string

In [38]:
demographic_df = demographic_df.withColumn('Median Age', col("Median Age").cast("float"))\
                               .withColumn('Male Population', col("Male Population").cast("int"))\
                               .withColumn('Female Population', col("Female Population").cast("int"))\
                               .withColumn('Total Population', col("Total Population").cast("int"))\
                               .withColumn('Number of Veterans', col("Number of Veterans").cast("int"))\
                               .withColumn('Foreign-born', col("Foreign-born").cast("int"))\
                               .withColumn('Average Household Size', col("Average Household Size").cast("float"))\
                               .withColumn('Count', col("Count").cast("int"))
print(demographic_df.count())
demographic_df.limit(1).toPandas()

2891


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.799999,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924


We have `State` and `State Code`, but neither are in the all-caps format that the other datasets are in.  
Let's merge the `State Code` on our mapping table to obtain `state name`

In [39]:
# add the statename column
demographic_df = demographic_df.join(state_mapping,
                                     demographic_df['State Code'] == state_mapping['Statecode'],
                                     how = 'inner')\
                               .drop('Statecode')\
                               .drop('State')\
                               .withColumnRenamed('Statename', 'state name')
print(demographic_df.count())
demographic_df.limit(1).toPandas()

2891


Unnamed: 0,City,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count,state name
0,Silver Spring,33.799999,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924,MARYLAND


Finally, as the end goal is to make a many-to-one relationship between `immigration_df` and `demographic_df`, let's groupby state and sum the population fields.  
For `Average Household Size` we take the average instead of sum

In [40]:
# since we want a many-to-one merge on state name, we group the results by state
demographic_df = demographic_df.groupBy(['state name']).agg(F.sum("Male Population"),
                                                            F.sum('Female Population'),
                                                            F.sum('Total Population'),
                                                            F.sum('Number of Veterans'),
                                                            F.sum('Foreign-born'),
                                                            F.avg('Average Household Size'))\
                                                       .withColumnRenamed('sum(Male Population)', 'Male Population')\
                                                       .withColumnRenamed('sum(Female Population)', 'Female Population')\
                                                       .withColumnRenamed('sum(Total Population)', 'Total Population')\
                                                       .withColumnRenamed('sum(Number of Veterans)', 'Number of Veterans')\
                                                       .withColumnRenamed('sum(Foreign-born)', 'Foreign-born')\
                                                       .withColumnRenamed('avg(Average Household Size)', 'Average Household Size')
demographic_df.limit(1).toPandas()

Unnamed: 0,state name,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size
0,NEW JERSEY,3423033,3507991,6931024,146632,2327750,2.960877


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model



![This is the data model](Datamodel.PNG)

The datamodel chosen is a star schema with `immigration_df` as the fact table and `df_airportcodes` and `demographic_df` as dimension tables.  
This setup is recommended by Ralph Kimball as ideal for data analysis.  

The datamodel allows us to answer questions such as:  
- *Is there a correlation between elevation and immigration?*  
- *Do states with higher immigration have higher average household size?*

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. *Merge the fact and dimension tables*
2. *Drop superfluous merge artifacts*

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
To generate our final data model, we merge `immigration_df` with `airportcodes_df` and `demographic_df`

In [41]:
final_table = immigration_df.join(airportcodes_df,
                                  immigration_df['Statename'] == airportcodes_df['state_name'],
                                  how = 'left')

final_table = final_table.join(demographic_df,
                                final_table['Statename'] == demographic_df['state name'],
                                how = 'left')

print(final_table.count())
final_table.limit(2).toPandas()

40790529


Unnamed: 0,cicid2,year,month_number,arrival_date,travel_type,departure_date,age,visa_category,count,character_date_field,department_visa_issues,occupation,arrival_flag,departure_flag,update_flag,match_arrival_departure_flag,year_of_birth,allowed_to_stay_until,gender,ins_number,airline_name,admission_number,flight_number,visa_type,origin_city_country_name,i94res_country_name,Statename,destination_city_name,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,coordinates,state,avg(elevation_ft),state_name,state name,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size
0,5680950,2016,7,2016-07-24,Air,2016-08-13,46,Pleasure,1,2016-08-13,,,G,O,,M,1970,2017-01-23,M,78652.0,DL,2947451000.0,188,B2,"CHINA, PRC","CHINA, PRC",ILLINOIS,"DETROIT, MI",large_airport,Scott AFB/Midamerica Airport,459.0,,US,Belleville,KBLV,BLV,BLV,"-89.835197, 38.5452",IL,459.0,ILLINOIS,ILLINOIS,10943864,11570526,22514390,723049,4632600,2.731868
1,1283504,2016,9,2016-09-06,Air,2016-09-20,52,Pleasure,1,2016-09-06,,,O,O,,M,1964,2016-12-04,F,,VS,9175913000.0,75,WT,UNITED KINGDOM,UNITED KINGDOM,FLORIDA,"ORLANDO, FL",large_airport,Daytona Beach International Airport,34.0,,US,Daytona Beach,KDAB,DAB,DAB,"-81.058098, 29.179899",FL,34.0,FLORIDA,FLORIDA,15461937,16626425,32306132,1861951,7845566,2.760274


In [83]:
final_table.drop('state_name')
final_table.drop('state name')
final_table.limit(2).toPandas()

Unnamed: 0,cicid2,year,month_number,arrival_date,travel_type,departure_date,age,visa_category,count,character_date_field,department_visa_issues,occupation,arrival_flag,departure_flag,update_flag,match_arrival_departure_flag,year_of_birth,allowed_to_stay_until,gender,ins_number,airline_name,admission_number,flight_number,visa_type,origin_city_country_name,i94res_country_name,Statename,destination_city_name,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,coordinates,state,avg(elevation_ft),state_name,state name,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size
0,5680950,2016,7,2016-07-24,Air,2016-08-13,46,Pleasure,1,2016-08-13,,,G,O,,M,1970,2017-01-23,M,78652.0,DL,2947451000.0,188,B2,"CHINA, PRC","CHINA, PRC",ILLINOIS,"DETROIT, MI",large_airport,Scott AFB/Midamerica Airport,459.0,,US,Belleville,KBLV,BLV,BLV,"-89.835197, 38.5452",IL,459.0,ILLINOIS,ILLINOIS,10943864,11570526,22514390,723049,4632600,2.731868
1,1283504,2016,9,2016-09-06,Air,2016-09-20,52,Pleasure,1,2016-09-06,,,O,O,,M,1964,2016-12-04,F,,VS,9175913000.0,75,WT,UNITED KINGDOM,UNITED KINGDOM,FLORIDA,"ORLANDO, FL",large_airport,Daytona Beach International Airport,34.0,,US,Daytona Beach,KDAB,DAB,DAB,"-81.058098, 29.179899",FL,34.0,FLORIDA,FLORIDA,15461937,16626425,32306132,1861951,7845566,2.760274


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [43]:
# check whether the left outer merges didn't increase the datasetset size
if not immigration_df.count() == final_table.count():
    print(f'The dataset size has increased due to a many-to-many merge')
else:
    print(f'The dataset size has remained unchanged')

The dataset size has remained unchanged


In [54]:
# check whether any of the tables are empty:
tables = [demographic_df,
          airportcodes_df,
          state_mapping,
          country_mapping,
          destination_city_mapping,
          immigration_df,
          final_table]

tablenames = ['demographic_df',
              'airportcodes_df',
              'state_mapping',
              'country_mapping',
              'destination_city_mapping',
              'immigration_df',
              'final_table']

for tablenumber, table in enumerate(tables):
    if table.count():
        print(f'Table {tablenames[tablenumber]} is empty')
    else:
        print(f'Table {tablenames[tablenumber]} is not blank')

Table demographic_df is empty
Table airportcodes_df is empty
Table state_mapping is empty
Table country_mapping is empty
Table destination_city_mapping is empty
Table immigration_df is empty
Table final_table is empty


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

| Field  | Source | Description |
| ------- | ---------- | ----------- |
| cicid2 | I94 dataset | Unable to find the meaning of this field on the I94 website, the SAS label descriptions, or Google. Most likely it is a registration id.
| year | I94 dataset | Year of arrival in 4 digits
| month_number | I94 dataset | Month of arrival in 2 digits
| arrival_date | I94 dataset | Arrival date in YYYY-MM-DD format
| travel_type | I94 dataset | Mode of travel
| departure_date | I94 dataset | Departure date in YYYY-MM-DD format
| age | I94 dataset | Age
| visa_category | I94 dataset | Visa request type
| count | I94 dataset | Used for summary statistics
| character_date_field | I94 dataset | Character Date Field
| department_visa_issues | I94 dataset | Department of State where where Visa was issued
| occupation | I94 dataset | Occupation that will be performed in U.S.
| arrival_flag | I94 dataset | Admitted or paroled into the U.S.
| departure_flag | I94 dataset | Departed, lost I-94 or is deceased 
| update_flag | I94 dataset | Either apprehended, overstayed, adjusted to perm residence
| match_arrival_departure_flag | I94 dataset | Match of arrival and departure records
| year_of_birth | I94 dataset | Year of birth in 4 digits
| allowed_to_stay_until | I94 dataset | The date at which the Visa expires in YYYY-MM-DD format
| gender | I94 dataset | First digit of the gender
| ins_number | I94 dataset | Immigration and Naturalisation Service number
| airline_name | I94 dataset | Airline used to arrive in U.S.
| admission_number | I94 dataset | Admission number
| flight_number | I94 dataset | Flighg number 
| visa_type | I94 dataset | Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
| origin_city_country_name | I94 dataset | Country of origin
| i94res_country_name | I94 dataset | Country of residence
| Statename | I94 dataset | Name of state in all caps
| destination_city_name | I94 dataset | City of destination
| type | Airport dataset | Type of port
| name | Airport dataset | Name of port
| elevation_ft | Airport dataset | Elevation of port in feet
| continent | Airport dataset | 2-digit code of continent
| iso_country | Airport dataset | ISO 3166 Alpha-2 code
| municipality | Airport dataset | Name of municipality
| gps_code | Airport dataset |  GPS identifier
| iata_code | Airport dataset | Code from International Air Transport Association
| local_code | Airport dataset | Local code
| coordinates | Airport dataset | GPS coordinates
| state | Airport dataset | Name of state 
| avg(elevation_ft) | Airport dataset | Average elevation of the airports in the state
| Male Population | Demographic dataset | Male population in state
| Female Population | Demographic dataset | Female population in state
| Total Population | Demographic dataset | Total population in state
| Number of Veterans | Demographic dataset | Number of veterans in state
| Foreign-born | Demographic dataset | Number of foreign-born in state
| Average Household Size | Demographic dataset | Average size of households in state

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.  

##### Because the I94 dataset has over 40 million records, we need a technology that allows for distributed computation, such as Spark. We use the `pyspark` plugin due to its ease of use and compatability with Pandas dataframes to take a peek at the table heads.

* Propose how often the data should be updated and why.  

##### According to https://travel.trade.gov/view/m-2017-I-001/index.asp, the highest frequency of I94 data available is monthly, therefore the data should be updated on a monthly basis.


* Write a description of how you would approach the problem differently under the following scenarios:

 1. The data was increased by 100x.  
 
    ##### If the data was increased by 100x, then the virtual machine I currently run it on would be insufficient. In that case I would upload the data to S3, and use Redshift to perform the ETL and generate the data model.
      
      
 2. The data populates a dashboard that must be updated on a daily basis by 7am every day.  
 
 ##### In this case, I would convert the current notebook to a Python file and use an Airflow pipeline with a daily schedule to run the tasks in this notebook.
 
 
 3. The database needed to be accessed by 100+ people.  
 
 ##### An Amazon Redshift cluster would still do the trick, but I would likely convert it into a HDFD Amazon EMR database for faster access.
 
 