# US Immigration information
### Data Engineering Capstone Project

#### Project Summary
This project will provide information on US immigration like the most popular cities for immigration, the gender distribution of the immigrants, the average age per immigrants and so on. As more and more immigrants move to the US, officals want to reliably access information about their immigration, weather of the destination, demographics of destination. Also it is important to keep track of immigrants and data like their visa type, visa expire date, entry method to the US. Our dataset is from 3 different sources - the I94 immigration dataset of 2016, city temperature data from Kaggle and US city demographic data from OpenSoft. We defined our data model with 4 dimension tables: (Cities, immigrants, average temperature and time) and 1 fact table: Immigration. We use Spark for ETL jobs.

***The project includes the following steps:***
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Get required packages
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from pprint import pprint
import os
import re
import glob
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull, countDistinct

In [3]:
# Basic checking of local csv files
df_immigration_test = pd.read_csv('immigration_data_sample.csv')
df_immigration_test.shape

(1000, 29)

In [5]:
pd.set_option('display.max_columns', 50)
df_immigration_test.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
df_countryCodes = pd.read_csv('airport-codes_csv.csv')
df_countryCodes.shape

(55075, 12)

In [7]:
df_countryCodes.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [5]:
df_demographics = pd.read_csv('us-cities-demographics.csv', sep=';')
df_demographics.shape

(2891, 12)

In [9]:
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [6]:
df_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
df_temperature.shape

(8599212, 7)

In [11]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [4]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of this project is to extract data from 3 different sources and create fact and dimension table to be able to do analysis on US immigration using factors of city monthly average temperature, city demographics and how things change with time. Finally perform some data quality checks.

#### Describe and Gather Data 
***I94 Immigration Data:*** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.(https://travel.trade.gov/research/reports/i94/historical/2016.html)

***World Temperature Data:*** This dataset came from Kaggle. (https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

***U.S. City Demographic Data:*** This data comes from OpenSoft. (https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

***Airport Code Table:*** This is a simple table of airport codes and corresponding cities.(https://datahub.io/core/airport-codes#data)

In [5]:
# Input in the data here
# Input immigration data

df_immigration = spark.read.format("com.github.saurfang.sas.spark")\
.load("../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat")
df_immigration.head(5)

[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2'),
 Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1'),
 Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=20545.0, i94mode=1.0, i94addr='MI', depdate=20691.0, i94bir=55.0, i94visa=2.0, count=1.0, dtadfile=

In [9]:
# Input temperature data
df_temp = spark.read.format("csv").option("delimiter", ",").option("header", "true")\
.load("../../data2/GlobalLandTemperaturesByCity.csv")
df_temp

DataFrame[dt: string, AverageTemperature: string, AverageTemperatureUncertainty: string, City: string, Country: string, Latitude: string, Longitude: string]

In [10]:
# Input demographics data
df_demographics = spark.read.format("csv").option("delimiter", ";").option("header", "true")\
.load("us-cities-demographics.csv")
df_demographics.head(5)

[Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924'),
 Row(City='Quincy', State='Massachusetts', Median Age='41.0', Male Population='44129', Female Population='49500', Total Population='93629', Number of Veterans='4147', Foreign-born='32935', Average Household Size='2.39', State Code='MA', Race='White', Count='58723'),
 Row(City='Hoover', State='Alabama', Median Age='38.5', Male Population='38040', Female Population='46799', Total Population='84839', Number of Veterans='4819', Foreign-born='8229', Average Household Size='2.58', State Code='AL', Race='Asian', Count='4759'),
 Row(City='Rancho Cucamonga', State='California', Median Age='34.5', Male Population='88127', Female Population='87105', Total Population='175232', Number of Veterans='5821', Foreign-born='3387

In [11]:
# Input airpurt codes
df_airport_codes = spark.read.format("csv").option("delimiter", ";").option("header", "true")\
.load("airport-codes_csv.csv")
df_airport_codes.head(5)
# df_airport_codes = df_airport_codes.to_pandas()

[Row(ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates='00A,heliport,Total Rf Heliport,11,NA,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"'),
 Row(ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates='00AA,small_airport,Aero B Ranch Airport,3435,NA,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"'),
 Row(ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates='00AK,small_airport,Lowell Field,450,NA,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"'),
 Row(ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates='00AL,small_airport,Epps Airpark,820,NA,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"'),
 Row(ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_cod

### Step 2: Explore and Assess the Data
#### Explore the Data 
More than 3 million entries in the i94 df and 659 valid ports. The demo dataset have 49 valid states. UDF written for coversion to pyspark datetime format and validate state codes.
Temperature df has 8.6 million entries. UDF is written to map city full name to city port abbreviation


#### Cleaning Steps
Removed any missing values. Removed invalid states from US immigration dataset. Created staging data from i94 data.
Removed invalid ports, Only used temperatures from United States and mapped full name to city port abbrv. Create staging data from Temperature dataset.
Calculated the percentage of demographics column. Used a pivot for the Race column. Create staging data from Demographics dataset.


## Immigration data

In [17]:
df_immigration.count()
df_immigration.head(10)

[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2'),
 Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1'),
 Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=20545.0, i94mode=1.0, i94addr='MI', depdate=20691.0, i94bir=55.0, i94visa=2.0, count=1.0, dtadfile=

In [6]:
# Input list of valid ports
with open( "I94_SAS_Labels_Descriptions.SAS") as f:
    lines = f.readlines()

airport_code = {}
for i in lines[302:962]:
    temp = re.compile(r"\'(.*)\'.*\'(.*)\'").search(i)
    airport_code[temp.group(1)] = temp.group(2)
print(len(airport_code))


660


In [13]:
# Create list of unique US states
states_US = df_demographics.toPandas()["State Code"].unique()
print(len(states_US))
print(states_US)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [14]:
# UDF to check if code is a US state
@udf(StringType())
def check_US_state(x):  
    if x in states_US:
        return x
    return 'All Other Codes'

In [15]:
# UDF to check immigrant's visa type
@udf(StringType())
def check_visa(x):  
    if x == 1.0:
        return "Business"
    elif x == 2.0:
        return "Pleasure"
    elif x == 3.0:
        return "Student"
    else:
        return "N/A"

In [16]:
# convert SAS date to PySpark date 
@udf(StringType())
def date_reformat(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

For the i94 immigration data, we dropped all entries where the destination city code i94port is not a valid value as described in I94_SAS_Labels_Description.SAS.

In [17]:
# Process and clean the data

# Drop missing data
df_immigration_processed = df_immigration.dropna(how="any", subset=["i94cit", "gender", "i94port"])

# check visa 
df_immigration_processed = df_immigration_processed.withColumn("i94visa", check_visa(df_immigration_processed.i94visa))

# Keep US immigration data only
df_immigration_processed = df_immigration_processed.withColumn("i94addr", check_US_state(df_immigration_processed.i94addr))
df_immigration_processed = df_immigration_processed.filter(df_immigration_processed.i94addr != 'All Other Codes')

# Convert arrival_date from SAS format to PySpark format
df_immigration_processed = df_immigration_processed.withColumn("depdate", date_reformat(df_immigration_processed.depdate))
df_immigration_processed = df_immigration_processed.withColumn("arrdate", date_reformat(df_immigration_processed.arrdate))

# Make sure departure date is before arrival date
df_immigration_processed = df_immigration_processed.filter\
(df_immigration_processed.depdate <= df_immigration_processed.arrdate)

# Keep only 'arrival by air' data
df_immigration_processed = df_immigration_processed.filter(df_immigration_processed.i94mode == 1)



In [25]:
print(df_immigration_processed.toPandas()["i94cit"].isnull().count())

63


In [26]:
print(df_immigration_processed.toPandas()["i94res"].isnull().count())

63


In [27]:
print(df_immigration_processed.toPandas()["i94addr"].isnull().count())

63


In [18]:
# df_demographics.City = df_demographics.toPandas().City.str.upper().str.strip()
df_demographics.toPandas()[df_demographics.toPandas()[['City', 'State','Race']].duplicated()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


In [19]:
# Remove countries other than US

df_countryCodes_processed = df_countryCodes[df_countryCodes.iso_country == 'US']

# clean up NaN from 'municipality', to use it as ident
df_countryCodes_processed.municipality = df_countryCodes_processed.municipality.str.upper()
df_countryCodes_processed = df_countryCodes_processed[~df_countryCodes_processed['municipality'].isna()]
df_countryCodes_processed.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self[name] = value


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,BENSALEM,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,LEOTI,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,ANCHOR POINT,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,HARVEST,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,NEWPORT,,,,"-91.254898, 35.6087"


In [20]:


df_immigration_processed = df_immigration_processed.select(col("cicid"), col("arrdate"), col("depdate"),
    col("i94bir").alias("age"), col("i94cit").alias("citizenship"), col("i94addr").alias("State Code"),
    col("i94visa").alias("visa"), col("visatype"), col("i94port").alias("airport_code"), col("i94res").alias("residence"),
    col("gender"), col("count")).drop_duplicates()

df_immigration_processed.limit(5).toPandas()



Unnamed: 0,cicid,arrdate,depdate,age,citizenship,State Code,visa,visatype,airport_code,residence,gender,count
0,4816078.0,2016-04-25,2016-04-21,35.0,582.0,NV,Pleasure,B2,SNA,582.0,M,1.0
1,2614532.0,2016-04-14,2016-04-13,34.0,577.0,CA,Pleasure,B2,LOS,577.0,M,1.0
2,5699340.0,2016-04-30,2016-04-29,27.0,135.0,NV,Pleasure,WT,LOS,135.0,M,1.0
3,1289185.0,2016-04-07,2016-04-06,27.0,582.0,CA,Business,B1,SAC,582.0,M,1.0
4,5686199.0,2016-04-30,2016-04-29,62.0,129.0,NY,Pleasure,WT,MIA,129.0,F,1.0


In [47]:
df_immigration_processed.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

## Temperature data

In [32]:
# df_temp.count()
df_temp.head(5)

[Row(dt='1743-11-01', AverageTemperature='6.068', AverageTemperatureUncertainty='1.7369999999999999', City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1743-12-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-01-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-02-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E'),
 Row(dt='1744-03-01', AverageTemperature=None, AverageTemperatureUncertainty=None, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E')]

In [21]:
# UDF to map city full name to city port abbreviation

@udf(StringType())
def get_airport_code(city):
    for key in airport_code:
        if city.lower() in airport_code[key].lower():
            return key

For the temperature data, we dropped all entries where AverageTemperature = NaN, then dropped duplicated values, and added the i94port of the location in each entry. Also, we used data from 2013.

In [22]:
# Process temp data to remove other countries, reformat month and year, and include airport codes
df_temp_processed = df_temp.filter(df_temp["Country"] == "United States") \
.withColumn("month", month(df_temp["dt"])) \
.withColumn("year", year(df_temp["dt"])) \
.withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
.withColumn("i94port", get_airport_code(df_temp["City"])) \
.dropna(how='any', subset=["i94port"])

# Keep temperatures from 2013
df_temp_processed = df_temp_processed.filter(df_temp_processed["year"] == 2013)

df_temp_processed = df_temp_processed.select(col("year"), col("month"), col("i94port").alias("airport_code"),
                                         round(col("AverageTemperature"), 1), round(col("AverageTemperatureUncertainty"),2),
                                           col("Latitude"), col("Longitude")).drop_duplicates()



In [36]:
# df_temp_processed.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- round(AverageTemperature, 1): float (nullable = true)
 |-- round(AverageTemperatureUncertainty, 2): double (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



## Demographics Data

In [39]:
df_demographics.count()
df_demographics.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [23]:
# Process demographics data to insert airport code
df_demo_processed = df_demographics.withColumn("airport_code", get_airport_code(df_demographics["City"])) \
    .dropna(how='any', subset=["airport_code"])

# df_demo_processed.limit(5).toPandas()
# df_demo_processed.count()

In [61]:
df_demo_processed.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)
 |-- airport_code: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Here we have chosen the star schema for the data modeling part.BY joining fact and dimension tables, we can query to analyze the data.

Here are the tables of the schema:

#### Staging Tables
df_immigration_processed: <br>
    cicid,
    arrdate,
    depdate,
    age,
    citizenship,
    state_code,
    Visa,
    visa_type,
    airport_code,
    residence,
    gender,
    count
    
df_temp_processed: <br>
    year,
    month,
    airport_code,
    city_name,
    AverageTemperature,
    AverageTemperatureUncertainty,
    lat,
    long

df_demo_processed: <br>    
    city_name,
    state,
    median_age,
    Male_population,
    Female_Population,
    Total_population,
    Number_of_Veterans,
    Foreign_Born,
    Average_Household_size,
    State Code,
    Race,
    Count,
    airport_code    
    
#### Dimension Tables

df_immigration_dim: <br>
    cicid<br>
    gender<br>
    age<br>
    visa<br>
    visa_type
    

df_time_dim: <br>
    arrDate<br>
    dayofweek<br>
    weekofyear<br>
    month

df_demographics_dim: <br>
    city_name<br>
    state<br>
    median_age<br>
    Male_population<br>
    Female_Population<br>
    Total_population<br>
    Number_of_Veterans<br>
    Foreign_Born<br>
    Average_Household_size<br>
    State Code<br>
    Race<br>
    Count<br>
    airport_code
    
df_temperature_dim: <br>
    year<br>
    month<br>
    airport_code<br>
    city_name<br>
    AverageTemperature<br>
    AverageTemperatureUncertainty<br>
    lat<br>
    long

    
#### Fact Table
df_immigration_fact: <br>
    cicid<br>
    State Code<br>
    airport_code<br>
    arrDate<br>
    Count

#### 3.2 Mapping Out Data Pipelines
List of steps necessary to pipeline the data into the data model:

1. Clean the data which includes remove nulls, invalid data types, duplicates, etc
2. Create staging tables for df_immigration_processed, df_temp_processed and df_demo_processed
3. Create dimension tables for df_immigration_dim, df_demographics_dim, df_temperature_dim and df_time_dim
4. Finally Create fact table df_immigration_fact with information on immigration count, mapping cicid in df_immigration_dim, airport_code in df_demographics_dim and State Code df_temperature_dim and arrdate in df_time_dim


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [24]:
# immigration dimension table 

df_immigration_dim = df_immigration_processed.select("cicid", "gender", "age", "visa","visatype").drop_duplicates()

In [25]:
# Create dimension table for monthly city temperature

df_temperature_dim = df_temp_processed.drop_duplicates()

In [59]:
# df_immigration_dim.count()

63

In [26]:
# demographic dimension table

df_demographics_dim = df_demo_processed.join(df_temp_processed, "airport_code").drop_duplicates()
#     .select("airport_code", "City", "State", "Median Age")



In [30]:
df_demographics_dim.count()

142

In [32]:
df_temperature_dim.count()

1043

In [27]:
# Create dimension table for time

df_time_dim = df_immigration_processed.withColumn("dayofweek", dayofweek("arrdate"))\
                .withColumn("weekofyear", weekofyear("arrdate"))\
                .withColumn("month", month("arrdate"))
                        
df_time_dim = df_time_dim.select("arrdate", "dayofweek", "weekofyear", "month").drop_duplicates()

In [34]:
# df_time_dim.count()

30

In [28]:
# Create fact table for immigration

df_immigration_fact = df_immigration_processed.select("cicid", "State Code", "airport_code", "arrdate", "count").drop_duplicates()

In [36]:
# df_immigration_fact.count()

2435922

#### 4.2 Data Quality Checks

Run Quality Checks to ensure the pipeline ran as expected

In [31]:
# Perform quality checks here

if (df_immigration_dim is not None) & (df_demographics_dim is not None) & (df_temperature_dim is not None) & (df_time_dim is not None) & (df_immigration_fact is not None):
    print("You have successfully passed your Data Quality Check")
    print("dimension tables and fact table exist")
    print()
else:
    print("missing files !!! - Please check your dimension and fact tables")

Your successfully passed your Data Quality Check
dimension tables and fact table exist



In [32]:

if (df_immigration_dim.count() != 0) &(df_demographics_dim.count() != 0 ) & (df_temperature_dim.count() != 0) & (df_time_dim.count() != 0) & (df_immigration_fact.count() != 0):
    print("data quality check passed!")
    print(" Good news !!! dimension tables and fact table shows data")
    
else:
    print("Sorry!!!  You have empty records ")

data quality check passed!
 Good news !!! dimension tables and fact table shows data


In [42]:
if (df_demographics_dim.agg(countDistinct("airport_code", "State Code")).collect() is not None) & (df_immigration_dim.agg(countDistinct("cicid")).collect() is not None) & (df_temperature_dim.agg(countDistinct("airport_code")).collect() is not None) & (df_time_dim.agg(countDistinct("arrdate")).collect() is not None) & (df_immigration_fact.agg(countDistinct("cicid", "arrdate", "State Code", "airport_code")).collect() is not None) :
    print("data quality check passed!")
    print(" Good news !!! dimension tables and fact table show UNIQUE data") 
        
else:
    print("Sorry!!!  You do not have unique records")

data quality check passed!
 Good news !!! dimension tables and fact table show UNIQUE data


#### 4.3 Data dictionary 

The following tables will be entered as dictionary:

##### Dimension Tables

DF_IMMIGRATION <br>
    CICID: id of immigrant <br>
    GENDER: immigrant's gender <br>        
    AGE: age of immigrant <br>
    VISA: immigrant's visa type  <br>
    VISA TYPE: immigrant's visa type in details <br>

DF_DEMOGRAPHICS <br>    
    CITY_name: name of the city <br>
    STATE: state of the city <br>
    MEDIAN_AGE: median age of the city <br>
    MALE_population: city's male population  <br>
    FEMALE_population: city's female population  <br>
    TOTAL_population: city's totall population <br>
    LATITUDE: latitude of the city <br>
    LONGITUDE: longitude of the city  <br>
    NUMBER_of_VETERAN: city's veteran population <br>
    FOREIGN_born: city's foreign born population <br>  
    AVERAGE_Household_size: Number of people in a household in Average <br>
    STATE_code: state code of the city <br>
    RACE: types of Race <br>
    COUNT: Number of people <br>
    AIRPORT_code: city port code <br>
    
    
DF_TEMPERATURE <br>    
    YEAR: year <br>
    MONTH: month 
    AIRPORT_code: city port code <br>
    AVERAGE_Temp: average temperature in city for given month <br>
    AVERAGE_Temperature_Uncertainty: Uncertainty in average temperature <br>
    city_name: Name of the city<br>        
    lat: Latitude<br>
    long: Longitude
    
DF_Time <br>
    ARRDATE: arrival date <br>
    MONTH: month <br>
    DAY_of_Week: day of the week <br>
    Week_of_Year: week of year 
    
    
##### Fact Table

DF_IMMIGRATION <br>
    CICID: id <br>
    ARRDATE: date of arrival <br>
    STATE_Code: state code of arrival city <br>
    AIRPORT_code: city port code of arrival city <br>
    COUNT: count of immigrant's entries into the US    

#### Step 5: Complete Project Write Up

I chose Spark for this project because it is known for processing large amount of data (with less compute time), scale easily with additional worker nodes and integrate appropriately with cloud storage like S3 and data warehouse like Redshift.Spark can also handle different file formats.

The data should be updated monthly in order to handle the files.

Answers:

1. ***If the data was increased by 100x ?***  We can store data in Amazon S3 bucket and scale up larger instances of EC2. Spark would still be the best pltform to use because it can handle ladrge datasets.

2. ***If the data populates a dashboard that must be updated on a daily basis by 7am every day?***  We can use Airflow to schedule and run the ETL during night.

3. ***If the database needed to be accessed by 100+ people?***  We can use data warehouse like Redshiift in the cloud, to hold larger capacity to serve more users, and support workload management and accessiblity for large number of users. We can also use parquet files in HDFS.