# The Immigration Data in the United States 
### Data Engineering Capstone Project

#### Project Summary
As the title displayed above immigration data in the US. The points that will be covered are as follows:

- How the travellers and immigrants are affected by the temperature
- The duration and period of the travellers
- Compare and Contract between the arrival at the airport and over all number of travellers
- Compare and Contrasts between the number of travellers and the overall demographic of US.

##### Following Datasets are employed to valide the discussion questions above. 

I94 Immigration Data: Data dictionary is included in the workspace with the entry of i94 into US which was recored under US National Tourism and Trade Office.

- countries.csv : Country code that display in the table is extracted from the data dictionary. 

- i94portCodes.csv: City codes extracted from the data dictionary are display and in the table.

- World Temperature Data: was taken from Kaggle websites which includes various cities of the 
  from 1743 to 2013. 

- US City Demographic Data: Derived from OpenSoft, constitute all US  cities demographic. 
  All the census data was taken from the US Census Bureau's 2015 American Community Survey.
  Included all the census area with population greater or equal to 65, 000. 

- Airport Code Table: Contains air port codes and its corresponding cities. 

##### Aggregation of data to produce desire output based on the discussions: 

- Will do aggregiation based on the time, year, month, day, etc.
- Will do aggregiation based on cities and the airports.
- Observing the impact of temperature on travellers.
- Analyse the impact of regional demographic. 



The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import datetime
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum



### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### Immigration Datasets

Due to largness of immigration dataset, roughly 1000 rows in a csv is employed to explore.

In [3]:
# Read in the data here
df_immig_sample = pd.read_csv('immigration_data_sample.csv')

In [4]:
df_immig_sample.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

The discussion will be base on the following fields, indept description are on 194_SAS_Labels_Descriptios.SAS

i94cit : country of citizenship
i94res : country of residence
i94port: arrival airport
arrdate: arrival date.
i94mode
i94addr
depdate
i94bir
i94visa
occup
biryear
dtaddto
gender
insnum
airline
admnum
fltno
visatype

Following displays increased size of the column to have better view of the data sets. 

In [8]:
pd.set_option('display.max_columns', 50)
df_immig_sample.head(15)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,20635.0,48.0,2.0,1.0,20160412,,,T,O,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,33.0,2.0,1.0,20160402,,,G,O,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,20575.0,39.0,2.0,1.0,20160428,,,O,O,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,35.0,2.0,1.0,20160401,,,O,O,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


#### Adding Data Dictionaries 

Originally dnormalized data was used, now the details from the data dectionary are are included which will substite codes in the model

Now adding the dictionary for colums 149CIT & I94RES within countries.csv. Hopefully assumming that it relates to the country of citizenship of each residence.


In [12]:
df_country_codes = pd.read_csv('countries.csv')

In [13]:
df_country_codes.shape

(289, 2)

In [14]:
df_country_codes.head(10)

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA


From here the i49_port codes adds with i94_port_code_csv the city port entry

In [16]:
i94_port_codes = pd.read_csv('i94_port_codes.csv')

In [17]:
i94_port_codes.shape

(660, 3)

In [18]:
i94_port_codes.head(10)

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


#### Demographic Data

The interesting part here is to see if there's connection between demographic and immigration data of various US cities of travelling. 

In [21]:
# Read in the data here
df_demographics = pd.read_csv('us-cities-demographics.csv', sep=';')

In [22]:
df_demographics.shape

(2891, 12)

In [23]:
df_demographics.head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


In [24]:
##### observing the available column in the dataset
df_demographics.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

From here will load the air port codes, very important cause it will paves the way to connect airport data to the airport codes

#### Airport data


Air port information is included becasue it is the point of entry of the immigrants. 

In [28]:
##### Reading of the data
df_airports = pd.read_csv('airport-codes_csv.csv')

In [29]:
df_airports.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

In [30]:
df_airports.head(10)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


#### World Temperature

In [32]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)

In [33]:
df_temperature.shape

(8599212, 7)

In [34]:
df_temperature.head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


#### Full Immigration Dataset

In [36]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [37]:
df_immigration.count()

3096313

In [38]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [39]:
##### What display is the display of schema which is same as the sample data set

In [None]:
#write to parquet
#df_immigration.write.parquet("sas_data")
#df_immigration=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Temperature Dataset

Exploring the temperature dataset

In [44]:
df_temperature.shape

(8599212, 7)

In [45]:
df_temperature.head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [46]:
df_temperature['Country'].nunique()

159

since the year 1743 the countries contains data temperature is 159. The size of the data will be reduce to make more concise to handle. 

In [48]:
#Keep only data for the United States
df_temperature = df_temperature[df_temperature['Country']=='United States']


In [49]:
#Convert the date to datetime objects
df_temperature['convertedDate'] = pd.to_datetime(df_temperature.dt)

The forcus will be air travel on the  year 1950 and not less than that. 
Reason is there's no commercial air travel beofre 1950s.

In [51]:
# Remove all dates prior to 1950s
df_temperature=df_temperature[df_temperature['convertedDate']>"1950-01-01"].copy()

In [54]:
df_temperature.shape

(196348, 8)

In [55]:
# Let's check the most recent date in the dataset
df_temperature['convertedDate'].max()


Timestamp('2013-09-01 00:00:00')

There's no applicable temperature available to join with the immigration dataset.
Thus will assume for the sake of this project that we have temperature dataset to connect with the immigration dataset.

In [57]:
Checking the missing value

In [58]:
# Now checking the null values.
df_temperature.isnull().sum()


dt                               0
AverageTemperature               1
AverageTemperatureUncertainty    1
City                             0
Country                          0
Latitude                         0
Longitude                        0
convertedDate                    0
dtype: int64

In [59]:
df_temperature[df_temperature.AverageTemperature.isnull()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
287781,2013-09-01,,,Anchorage,United States,61.88N,151.13W,2013-09-01


Fix the null values for the temperature datset for Anchorage in Septermber 2019. Tow of  the possible solutions to this is as follows

- Will average out the value for August and October 2019 Anchorage;
- Will be using the Anchorage historial data in September.

The options two is feasiable less desirable since temperature tends to b higher in 2013 compares to the previous years. There's tendencies that it might 
create oulier in the dataset

As usual our dataset will include data up to April 2016, which will then compare with immigration dataset. So it seems convient to use option one.
Nevertheless the data available in the set is beyond 2013-09-01, and also immigration dataset only covers the month of april 2016. 
For the convienent sake this missing dataset will just leave it as it is.

Alright will try to make sure that the city and the data can be employ as a primary key. Just assume that each row represents a combination of city and
date. Now let emobrate and check

In [60]:
df_temperature.shape

(196348, 8)

In [61]:
df_temperature[['City','convertedDate']].drop_duplicates().shape

(189472, 2)

In [None]:
#### Now there's seems like there can be multiple entries of a given city. Lets examine the example of multiple entries 

In [66]:
df_temperature[df_temperature[['City','convertedDate']].duplicated()].head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
405836,1950-02-01,1.655,0.057,Arlington,United States,39.38N,76.99W,1950-02-01
405837,1950-03-01,3.871,0.232,Arlington,United States,39.38N,76.99W,1950-03-01
405838,1950-04-01,9.678,0.191,Arlington,United States,39.38N,76.99W,1950-04-01
405839,1950-05-01,16.786,0.234,Arlington,United States,39.38N,76.99W,1950-05-01
405840,1950-06-01,21.548,0.222,Arlington,United States,39.38N,76.99W,1950-06-01
405841,1950-07-01,23.133,0.279,Arlington,United States,39.38N,76.99W,1950-07-01
405842,1950-08-01,22.731,0.244,Arlington,United States,39.38N,76.99W,1950-08-01
405843,1950-09-01,17.849,0.171,Arlington,United States,39.38N,76.99W,1950-09-01
405844,1950-10-01,13.932,0.152,Arlington,United States,39.38N,76.99W,1950-10-01
405845,1950-11-01,7.164,0.354,Arlington,United States,39.38N,76.99W,1950-11-01


In [67]:
df_temperature[(df_temperature['City'] == 'Arlington') & (df_temperature.dt == '1950-02-01')]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
402597,1950-02-01,11.144,0.199,Arlington,United States,32.95N,96.70W,1950-02-01
405836,1950-02-01,1.655,0.057,Arlington,United States,39.38N,76.99W,1950-02-01


In [None]:
it appears that temperature is measured in multiple locations for each city. Average temperature and uncertainities per city will be 
will compute when constructing dimension table. 

#### Airport Data

Now will be looking at the content s of the airport dataset

In [68]:
df_airports.shape

(55075, 12)

In [69]:
df_airports.head(10)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


Check countries where each airports are located

In [70]:
df_airports.groupby('iso_country')['iso_country'].count()

iso_country
AD        2
AE       57
AF       64
AG        3
AI        1
AL       13
AM       13
AO      104
AQ       27
AR      848
AS        4
AT      145
AU     1963
AW        1
AZ       35
BA       15
BB        6
BD       16
BE      146
BF       51
BG      134
BH        4
BI        7
BJ       10
BL        1
BM        3
BN        2
BO      197
BQ        3
BR     4334
      ...  
TM       21
TN       15
TO        6
TR      124
TT        3
TV        3
TW       65
TZ      207
UA      191
UG       38
UM        6
US    22757
UY       54
UZ      176
VA        1
VC        6
VE      592
VG        3
VI        9
VN       50
VU       32
WF        2
WS        4
XK        6
YE       25
YT        1
ZA      489
ZM      103
ZW      138
ZZ        7
Name: iso_country, Length: 243, dtype: int64

The dataset contains airport data for numerous countries. The immigration dataset only contains entries into the US,
therefore via airports based in the US. Thus the data size will be reduce

So its executing below. The point here to is to make sure that there's no mission data in the iso_country field. 

In [71]:
df_airports[df_airports['iso_country'].isna()].shape

(247, 12)

In [72]:
# Now will check the mission country values to see if the continent data is filled out
df_airports[df_airports['iso_country'].isna()].groupby('continent')['continent'].count()

continent
AF    247
Name: continent, dtype: int64

All the mission country data is for airports based in Africa

Thus, the dataset can be safely reduce

In [73]:
# Now all the missing data are from Africa, will remove all of them from the dataset.
df_airports = df_airports[df_airports['iso_country'].fillna('').str.upper().str.contains('US')].copy()

#### Type column contains several values. Lets explore them;

In [74]:
df_airports.groupby('type')['type'].count()

type
balloonport          18
closed             1326
heliport           6265
large_airport       170
medium_airport      692
seaplane_base       566
small_airport     13720
Name: type, dtype: int64

In [None]:
#### Now the following will be assumed

#### Closed indicated that the airport is closed
Balloonports immigration data is not collected, the seaplane/helicopter are only use for recreational purposes/with short distance only.

#### Thus the row with this values are filtered out. 

In [75]:
excludedValues = ['closed', 'heliport', 'seaplane_base', 'balloonport']
df_airports = df_airports[~df_airports['type'].str.strip().isin(excludedValues)].copy()

#### Now looking at the other missing values 

In [76]:
# checking again values:
df_airports.isnull().sum()

ident               0
type                0
name                0
elevation_ft       63
continent       14582
iso_country         0
iso_region          0
municipality       50
gps_code          399
iata_code       12717
local_code        199
coordinates         0
dtype: int64

Immigration and airport dataset will not be joined with indent code, thus the airport linked to the indent. Data definition found in the dictionary are very different from local or data code columns.

Now from the prvious validation, 50 values are mission from the dataset. Now looking at some of there mission values to see if we can get the municipality name through some other means. 


In [78]:
# We also verify that the municipality field is available for all airports
df_airports[df_airports.municipality.isna()].head(10)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
7653,6XA4,small_airport,Zadow Airstrip,,,US,US-TX,,6XA4,,,"-95.954353809, 29.991738550900003"
7887,74xa,small_airport,Gun Barrel City Airpark,385.0,,US,US-TX,,74XA,,,"-96.1456650496, 32.3551499558"
8082,79ID,small_airport,Kooskia (Clear Creek Int) Airport,1800.0,,US,US-ID,,79ID,,,"-115.869691372, 46.0488642914"
8114,79WT,small_airport,Ellensburg (Rotor Ranch) Airport,1962.0,,US,US-WA,,79WT,,,"-120.589778423, 47.091426059499994"
9055,8FA4,small_airport,Samsula / Coe Field,40.0,,US,US-FL,,8FA4,,,"-81.1328315735, 29.0102045831"
9855,99XA,small_airport,Briggs / Skotz Airfield,1200.0,,US,US-TX,,99XA,,,"-98.0037117004, 30.863976076700002"
25272,K1C2,small_airport,Howell New Lenox Airport,753.0,,US,US-IL,,,,1C2,"-87.92130279541016, 41.479801177978516"
28253,KNQB,small_airport,Silverhill Nolf Airport,129.0,,US,US-AL,,KNQB,,KNQB,"-87.80970001220703, 30.563600540161133"
28633,KPRS,small_airport,Presidio Lely International Airport,,,US,US-TX,,KPRS,,PRS,"-104.361493, 29.634212"
29754,KRCP,small_airport,Stockton / Rooks County Regional,1998.0,,US,US-KS,,KRCP,,,"-99.304649, 39.346592"


If building the pipeline to be automated non of this appear to be usable. Thus will remove them our dataset.

In [79]:
df_airports = df_airports[~df_airports['municipality'].isna()].copy()

##### Municipality column is converted to upper case to enable join with other datasets

In [80]:
df_airports.municipality = df_airports.municipality.str.upper()

In [81]:
df_airports.groupby('iso_region')['iso_region'].count()

iso_region
US-AK      586
US-AL      197
US-AR      291
US-AZ      214
US-CA      551
US-CO      288
US-CT       56
US-DC        2
US-DE       36
US-FL      522
US-GA      365
US-HI       35
US-IA      232
US-ID      238
US-IL      579
US-IN      486
US-KS      372
US-KY      164
US-LA      281
US-MA       79
US-MD      157
US-ME      122
US-MI      379
US-MN      361
US-MO      411
US-MS      211
US-MT      255
US-NC      349
US-ND      297
US-NE      259
US-NH       54
US-NJ      116
US-NM      149
US-NV      113
US-NY      402
US-OH      492
US-OK      372
US-OR      357
US-PA      486
US-RI       10
US-SC      173
US-SD      162
US-TN      228
US-TX     1546
US-U-A       3
US-UT      103
US-VA      311
US-VT       66
US-WA      379
US-WI      457
US-WV       83
US-WY       95
Name: iso_region, dtype: int64

observing at the state data, U_A seems like an error.  State is used in combination with city name to join wit city demographics.

In [82]:
# apply len to the iso_region field to see which ones are longer than 5 characters since the field is a combination of US and state code
df_airports['len'] = df_airports["iso_region"].apply(len)
# let's remove the codes that are incorrect.
df_airports = df_airports[df_airports['len']==5].copy()
# finally, let's extract the state code
df_airports['state'] = df_airports['iso_region'].str.strip().str.split("-", n = 1, expand = True)[1]

#### Demographic Data

In [83]:
df_demographics.shape

(2891, 12)

Removing any leading trailing space by converting city to upper case

In [84]:
df_demographics.City = df_demographics.City.str.upper().str.strip()

Exploring missing values

In [85]:
df_demographics.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

Few missing values of significance makes datasets looks clean.

Now will try to fix missing data from now. Will fix any issues with the missing rows when loading the dimension table

In [86]:
# remove any leading or trailing spaces and convert to upper case
df_demographics.City = df_demographics.City.str.strip().str.upper()

Check to see whether city and race would work as a primary key for this table

In [87]:
#primary key will be the combination of city name and race
df_demographics[df_demographics[['City','Race']].duplicated()].head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
177,WILMINGTON,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193
210,LAKEWOOD,California,39.9,41523.0,40069.0,81592,4094.0,18274.0,3.13,CA,Hispanic or Latino,24987
238,GLENDALE,California,42.1,98181.0,102844.0,201025,4448.0,111510.0,2.69,CA,White,146718
300,SPRINGFIELD,Massachusetts,31.8,74744.0,79592.0,154336,5723.0,16226.0,2.81,MA,Asian,5606
549,BLOOMINGTON,Indiana,23.5,40588.0,43227.0,83815,2368.0,10033.0,2.33,IN,Asian,9801
597,JACKSONVILLE,Florida,35.7,419203.0,448828.0,868031,75432.0,85650.0,2.62,FL,Hispanic or Latino,80064
611,UNION CITY,California,38.5,38599.0,35911.0,74510,1440.0,32752.0,3.46,CA,Black or African-American,5508
698,BLOOMINGTON,Illinois,35.1,37972.0,40323.0,78295,3888.0,7287.0,2.34,IL,White,60652
732,SPRINGFIELD,Illinois,38.8,55639.0,62170.0,117809,7525.0,4264.0,2.22,IL,American Indian and Alaska Native,1602
800,RICHMOND,California,35.3,52615.0,57100.0,109715,3611.0,42215.0,2.87,CA,Asian,20082


As we can clearyly see that the combination of city and race is not sufficient to work as a primary key. Now let look at the specific example

In [88]:
df_demographics[(df_demographics.City == 'WILMINGTON') & (df_demographics.Race == 'Asian')]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
102,WILMINGTON,North Carolina,35.5,52346.0,63601.0,115947,5908.0,7401.0,2.24,NC,Asian,3152
177,WILMINGTON,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193


Now state is the difference between the two rows above, will add this in the primary key combinaton

In [89]:
df_demographics[df_demographics[['City', 'State','Race']].duplicated()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


Fare enough that we don't have any duplicate when combined them. Wll use this as the primary key

#### Immigration Data

There are lot of missing data provide by the data dictionary 

In [90]:
df_immigration.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

Now lets check to see if cicid can be used as a primary key

In [91]:
# We create a view of the immigration dataset
df_immigration.createOrReplaceTempView("immig_table")

In [92]:
df_immigration.count()

3096313

In [93]:
spark.sql("""
SELECT COUNT (DISTINCT cicid)
FROM immig_table
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



data dictionary for i94port is provided which codes are 3 character long. Now let check to see if the same applies to the code in the dataset

In [94]:
spark.sql("""
SELECT LENGTH (i94port) AS len
FROM immig_table
GROUP BY len
""").show()

+---+
|len|
+---+
|  3|
+---+



There wont be any processign to join this to the dictionary

The next move is to convert the arrdate field into something that can be used.

Now all the dates in SAS correspond to the number of days since 1960-01-01. Thus, the computing is done on arrival dates by adding arrdate to 1960-01-01

In [95]:
df_immigration = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immig_table")
df_immigration.createOrReplaceTempView("immig_table")

From here, will replace the data in the 194VISA columns, the three categories are;

1 = Business
2 = Pleasure
3 = Student

In [96]:
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [97]:
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [98]:
#Let's check the results from our previous query to make sure there are no N/A values
spark.sql("SELECT count(*) FROM immig_table WHERE departure_date = 'N/A'").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



From here, will make sure that the departure_date > arrival_date

In [99]:
spark.sql("""
SELECT COUNT(*)
FROM immig_table
WHERE departure_date <= arrival_date
""").show()

+--------+
|count(1)|
+--------+
|     375|
+--------+



In [100]:
spark.sql("""
SELECT arrival_date, departure_date
FROM immig_table
WHERE departure_date <= arrival_date
""").show(10)

+------------+--------------+
|arrival_date|departure_date|
+------------+--------------+
|  2016-04-01|    2016-03-31|
|  2016-04-02|    2016-03-19|
|  2016-04-02|    2016-01-26|
|  2016-04-02|    2016-04-01|
|  2016-04-02|    2016-01-31|
|  2016-04-02|    2016-04-01|
|  2016-04-03|    2016-04-02|
|  2016-04-04|    2016-03-12|
|  2016-04-05|    2016-04-04|
|  2016-04-05|    2016-04-04|
+------------+--------------+
only showing top 10 rows



It's quite impossible to know how to fix these errors. Have no choice here but to drope the rows, due to the number of affected rows are quite small.

In [101]:
spark.sql("""
SELECT *
FROM immig_table
WHERE departure_date >= arrival_date
""").createOrReplaceTempView("immig_table")

Lastely, in order to merge two data sets for the dimension table that will be employ in the model. Will need to check how many distinct values we get in the arrival and departures dates

In [102]:
#check distinct departure dates
spark.sql("SELECT COUNT (DISTINCT departure_date) FROM immig_table ").show()
#check distinct arrival dates
spark.sql("SELECT COUNT (DISTINCT arrival_date) FROM immig_table ").show()
#check the common values between the two sets
spark.sql("""   SELECT COUNT(DISTINCT departure_date) 
                FROM immig_table 
                WHERE departure_date IN (
                    SELECT DISTINCT arrival_date FROM immig_table
                ) 
                """).show()

+------------------------------+
|count(DISTINCT departure_date)|
+------------------------------+
|                           174|
+------------------------------+

+----------------------------+
|count(DISTINCT arrival_date)|
+----------------------------+
|                          30|
+----------------------------+

+------------------------------+
|count(DISTINCT departure_date)|
+------------------------------+
|                            29|
+------------------------------+



Now since one value is missing, will merge the two datasets to allow for the Dim table to include both departure and arrival dates

Will check the data for the various arrival modes 

In [103]:
spark.sql("""
SELECT i94mode, count(*)
FROM immig_table
GROUP BY i94mode
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|   null|     238|
|    1.0| 2871184|
|    3.0|   61572|
|    2.0|   17970|
|    9.0|    2517|
+-------+--------+



The arrival modes definition as per the dictionary is:

1 = 'Air'
2 = 'Sea'
3 = 'Land'
9 = 'Not reported'

From here will keep only the Air arrival since we are joining this with the airport datasets

For the dataset to work with the ariport dataset, will keep only arrivaly by air

Now lets check to see if there is any missing value in the column

In [104]:
spark.sql("""
SELECT COUNT(*)
FROM immig_table
WHERE i94bir IS NULL
""").show()

+--------+
|count(1)|
+--------+
|      46|
+--------+



Now we have some missing values here, will check the birth year to see if it can be used. 

In [105]:
spark.sql("SELECT COUNT(biryear) FROM immig_table WHERE biryear IS NULL").show()

+--------------+
|count(biryear)|
+--------------+
|             0|
+--------------+



Now will check to see if the year of birth makes sense to us

In [106]:
spark.sql("SELECT MAX(biryear), MIN(biryear) FROM immig_table WHERE biryear IS NOT NULL").show()

+------------+------------+
|max(biryear)|min(biryear)|
+------------+------------+
|      2016.0|      1916.0|
+------------+------------+



From here will look at the frequency of travellers who are at least 80 years old, that's born in 1936 or earlier

In [108]:
# Number of travellers who are older than 80
spark.sql("""
SELECT COUNT(*)
FROM immig_table 
WHERE biryear IS NOT NULL
AND biryear <= 1936
""").show()

# frequency of travellers by birth year
spark.sql("""
SELECT biryear, COUNT(*)
FROM immig_table 
WHERE biryear IS NOT NULL
AND biryear <= 1936
GROUP BY biryear
ORDER BY biryear ASC
""").show()

+--------+
|count(1)|
+--------+
|   24694|
+--------+

+-------+--------+
|biryear|count(1)|
+-------+--------+
| 1916.0|       8|
| 1917.0|      16|
| 1918.0|      21|
| 1919.0|      36|
| 1920.0|      34|
| 1921.0|      69|
| 1922.0|      89|
| 1923.0|     155|
| 1924.0|     209|
| 1925.0|     274|
| 1926.0|     414|
| 1927.0|     569|
| 1928.0|     792|
| 1929.0|    1073|
| 1930.0|    1442|
| 1931.0|    1794|
| 1932.0|    2239|
| 1933.0|    2688|
| 1934.0|    3442|
| 1935.0|    4194|
+-------+--------+
only showing top 20 rows



8 observations (from the 3 million) are from travellers whos age are 105 years old are outliera. Only 0.6% of the travellers are over the age of 80 -
which is quite reasonable. 

Now the birth year is available for each row, which helps to compute the age. Check to see if the compute values match the age

In [109]:
spark.sql("SELECT (2016-biryear)-i94bir AS difference, count(*) FROM immig_table WHERE i94bir IS NOT NULL GROUP BY difference").show()

+----------+--------+
|difference|count(1)|
+----------+--------+
|       0.0| 2953435|
+----------+--------+



The above output displays the same output as the age when the field is available. We will use that instead of the age to fill in the missing values. 

Check to see if data is usable in gender

In [110]:
spark.sql("""
SELECT gender, count(*) 
FROM immig_table
GROUP BY gender
""").show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F| 1228646|
|  null|  407456|
|     M| 1316305|
|     U|     238|
|     X|     836|
+------+--------+



Now we being retaing gender of various travellers, will filter out all the rows where the gender is missing or incorect 

In [112]:
spark.sql("""SELECT * FROM immig_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immig_table")

Now checking to see if there is any missing values in citizenship and residence data

In [113]:
#citizenship countries
spark.sql("""
SELECT count(*) 
FROM immig_table
WHERE i94cit IS NULL
""").show()

#residence countries
spark.sql("""
SELECT count(*) 
FROM immig_table
WHERE i94res IS NULL
""").show()

#reported address
spark.sql("""
SELECT count(*) 
FROM immig_table
WHERE i94addr IS NULL
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|  114019|
+--------+



the address of the state of residence are missing quite often. Thus wont be relying on this field, 
but instead on the port of entry as a proxy for the trveller's address.

In [114]:
spark.sql("""
SELECT COUNT(*)
FROM immig_table
WHERE visatype IS NULL
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



The avalbality of all rows for  detailed visa. Check the aggragiaton to see that visa types are unique to each category. 

In [115]:
spark.sql("""
SELECT visa_type, visatype, count(*)
FROM immig_table
GROUP BY visa_type, visatype
ORDER BY visa_type, visatype
""").show()

+---------+--------+--------+
|visa_type|visatype|count(1)|
+---------+--------+--------+
| Business|      B1|  186610|
| Business|      E1|    3182|
| Business|      E2|   16227|
| Business|     GMB|     132|
| Business|       I|    2962|
| Business|      I1|     214|
| Business|      WB|  185857|
| Pleasure|      B2|  967988|
| Pleasure|      CP|   11785|
| Pleasure|     CPL|       8|
| Pleasure|     GMT|   79454|
| Pleasure|     SBP|       2|
| Pleasure|      WT| 1060229|
|  Student|      F1|   27789|
|  Student|      F2|    1774|
|  Student|      M1|     708|
|  Student|      M2|      30|
+---------+--------+--------+



The definitions for various detailed visa types are listed below. Some are unknown. We couldn't find definitions for all the visa types. We will retain the details since it might be of interest from a demographic standpoint

- B1 visa is for business visits valid for up to a year
- B2 visa is for pleasure visits valid for up to a year
- CP could not find a definition
- E2 investor visas allows foreign investors to enter and work inside of the United States based on a substantial investment
- F1 visas are used by non-immigrant students for Academic and Language training Courses.
- F2 visas are used by the dependents of F1 visa holders
- GMT could not find a definition
- M1 for students enrolled in non-academic or “vocational study”. Mechanical, language, cooking classes, etc...
- WB Waiver Program (WT/WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.
- WT Waiver Program (WT/WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.

Since we have little information besides the detailed visa type and the aggregate visa type, we will simply keep the information in our dimension table.

Now will look at the occupational field

In [116]:
spark.sql("""
SELECT occup, COUNT(*) AS n
FROM immig_table
GROUP BY occup
ORDER BY n DESC, occup
""").show()

+-----+-------+
|occup|      n|
+-----+-------+
| null|2538838|
|  STU|   3275|
|  OTH|    508|
|  NRR|    299|
|  MKT|    262|
|  EXA|    175|
|  ULS|    142|
|  ADM|    119|
|  GLS|    119|
|  TIE|    108|
|  MVC|     58|
|  ENO|     55|
|  CEO|     53|
|  TIP|     49|
|  LLJ|     45|
|  RET|     44|
|  CMP|     43|
|  PHS|     42|
|  UNP|     33|
|  HMK|     30|
+-----+-------+
only showing top 20 rows



The field is missing most of the time and the values provided are abbreviations. We won't be using it in our data model

Several other fields are missing a lot of values or simply not used or documented and will be dropped.

**Now it's time to build conceptual model becasue we are done with the analysis. 

In [117]:
df_immigration = spark.sql("""SELECT * FROM immig_table""") 

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

Since we're interested in the flow of travellers through the united states. The i94 data will serve as our fact table. Our fact_immigration table will be :

- cicid,
- citizenship_country,
- residence_country,
- city,
- state,
- arrival_date,
- departure_date,
- age,
- visa_type,
- detailed_visa_type,

For our dimension tables, since our dataset only contains one month of data we will keep a record of the daily entries and provide the uses with four dimensions to aggregate our data:\

##### dim_time : to aggregate the data suing various time units: The fileds available will be:

- date,
- year,
- month,
- day,
- week,
- weekday,
- dayofyear

##### dim_airports: Used to determine the areas with the largest flow of travelers. Fileds included will be:

- ident,
- type,
- name,
- elevation_ft,
- state,
- municipality,
- iata_code

##### dim_city_demographics: To look at the demographic data of the areas with the most travelers and potentially look at the impact of the flow of travellers on the demographic data (if it were updated on a regular basis). The fiels available will be:



- City,
- state,
- median_age,
- male_population,
- female_population,
- total population
- Foreign_born,
- Average_Household_Size,
- Race,
- Count,


##### dim_temperatures: to look at the temperature data of the cities where traveller entry and departure is being reported. The fields included will be:

- date,
- City,
- average temperature,
- average temperature uncertainty

#### 3.2 Mapping Out Data pipelines

Many of data data cleaning steps were documented in adetailed fashion in the section 2. Here are the steps again:

Data Extraction:
- Load all the datasets from CSV and SAS data files;


#### Data Transformation and Loading:

fact_immigration:
- Drop rows where the mode of arrival is not air travel
- Drop rows with incorrect gender data
- convert arrival and departure dates;
- replace country codes with the character string equivalents
- replace visa_type with character string
- replace port of entry with city and state
- filter out any row where the port of entry is not in the US
- compute age in a new row using birth year and year of our current date.
- insert data into our fact table
- Write to parquet

dim_temperature:
- For the temperature table, drop all data for cities outside the united states;
- For the temperature table, drop all data for dates before 1950 since airtravel wasn't possible before that date;
- Convert city to upper case
- Compute the average temperature and uncertainty over date+city partitions
- Insert into the temperature table as is since our dataset since our dataset may include new cities in future dates;
- Write to parquet


dim_time:
- Get all the arrival dates from the immigration data_set;
- extract year, month, day, week from the date and insert all the values in the dim_time table;
- Write to parquet

dim_airports:
- Remove all non us airports
- Remove all invalid port of entries, ie: ['closed', 'heliport', 'seaplane_base', 'balloonport']
- Remove all rows where municipalities are missing.
- Convert municipality to upper case
- Insert to our table
- Write to parquet

dim_city_demographics:
- Convert to city names to upper case
- Insert to our table
- Write to parquet

### Step 4: Run Pipelines to Model the Data 

#### 4.1 Create the data model

Now spark automatically reads all the fields as strings in the CSV files while pandas normally correctly autodetects the data types as shown below.
Will reas the the csv using pandas dataframes and then converts them to spark dataframes. 

In [119]:
df_demographics_spark = spark.read.format("csv").option("header", "true").option("delimiter", ";").load('us-cities-demographics.csv')

In [120]:
df_demographics_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [127]:
df_demographics.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

### Staging the Data

In [128]:
# load dictionary data
df_countryCodes = pd.read_csv('countries.csv')
df_i94portCodes = pd.read_csv('i94portCodes.csv')

# load the various csv files into pandas dataframes
df_demographics = pd.read_csv('us-cities-demographics.csv', sep=';')
df_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')

# load the SAS data
df_immigration=spark.read.parquet("sas_data")

### Transforming the Data

In [129]:
# let's convert the data dictionaries to views in our spark context to perform SQL operations with them
spark_df_countryCodes = spark.createDataFrame(df_countryCodes)
spark_df_countryCodes .createOrReplaceTempView("countryCodes")

In [130]:
# remove all entries with null values as they are either un reported or outside the US
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isna()].copy()

In [131]:
# We need to exclude values for airports outside of the US. 
nonUSstates = ['CANADA', 'Canada', 'NETHERLANDS', 'NETH ANTILLES', 'THAILAND', 'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO', 
               'BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX', 'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
               'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA', 'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR']

In [132]:
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isin(nonUSstates)].copy()

In [133]:
spark_df_i94portCodes = spark.createDataFrame(df_i94portCodes)
spark_df_i94portCodes .createOrReplaceTempView("i94portCodes")

In [134]:
df_immigration.createOrReplaceTempView("immig_table")

In [135]:
# Remove all entries into the united states that weren't via air travel
spark.sql("""
SELECT *
FROM immig_table
WHERE i94mode = 1
""").createOrReplaceTempView("immig_table")

In [136]:
# drop rows where the gender values entered is undefined
spark.sql("""SELECT * FROM immig_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immig_table")

In [137]:
# convert the arrival dates into a useable value
spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immig_table").createOrReplaceTempView("immig_table")

In [138]:
# convert the departure dates into a useable value
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [139]:
# we use an inner join to drop invalid codes
#country of citizenship
spark.sql("""
SELECT im.*, cc.country AS citizenship_country
FROM immig_table im
INNER JOIN countryCodes cc
ON im.i94cit = cc.code
""").createOrReplaceTempView("immig_table")

In [140]:
#country of residence
spark.sql("""
SELECT im.*, cc.country AS residence_country
FROM immig_table im
INNER JOIN countryCodes cc
ON im.i94res = cc.code
""").createOrReplaceTempView("immig_table")

In [141]:
# Add visa character string aggregation
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [142]:
# Add entry_port names and entry port states to the view
spark.sql("""
SELECT im.*, pc.location AS entry_port, pc.state AS entry_port_state
FROM immig_table im 
INNER JOIN i94portCodes pc
ON im.i94port = pc.code
""").createOrReplaceTempView("immig_table")


In [143]:
# Compute the age of each individual and add it to the view
spark.sql("""
SELECT *, (2016-biryear) AS age 
FROM immig_table
""").createOrReplaceTempView("immig_table")

In [144]:
# Insert the immigration fact data into a spark dataframe
fact_immigration = spark.sql("""
                        SELECT 
                            cicid, 
                            citizenship_country,
                            residence_country,
                            TRIM(UPPER (entry_port)) AS city,
                            TRIM(UPPER (entry_port_state)) AS state,
                            arrival_date,
                            departure_date,
                            age,
                            visa_type,
                            visatype AS detailed_visa_type

                        FROM immig_table
""")

In [145]:
# extract all distinct dates from arrival and departure dates to create dimension table
dim_time = spark.sql("""
SELECT DISTINCT arrival_date AS date
FROM immig_table
UNION
SELECT DISTINCT departure_date AS date
FROM immig_table
WHERE departure_date IS NOT NULL
""")
dim_time.createOrReplaceTempView("dim_time_table")

In [146]:
# extract year, month, day, weekofyear, dayofweek and weekofyear from the date and insert all the values in the dim_time table;
dim_time = spark.sql("""
SELECT date, YEAR(date) AS year, MONTH(date) AS month, DAY(date) AS day, WEEKOFYEAR(date) AS week, DAYOFWEEK(date) as weekday, DAYOFYEAR(date) year_day
FROM dim_time_table
ORDER BY date ASC
""")

In [147]:
# Keep only data for the United States
df_temperature = df_temperature[df_temperature['Country']=='United States'].copy()

# Convert the date to datetime objects
df_temperature['date'] = pd.to_datetime(df_temperature.dt)

# Remove all dates prior to 1950
df_temperature=df_temperature[df_temperature['date']>"1950-01-01"].copy()

In [148]:
# convert the city names to upper case
df_temperature.City = df_temperature.City.str.strip().str.upper() 

In [149]:
# convert the dataframes from pandas to spark
spark_df_temperature = spark.createDataFrame(df_temperature)
spark_df_temperature .createOrReplaceTempView("temperature")

In [150]:
dim_temperature = spark.sql("""
SELECT
    DISTINCT date, city,
    AVG(AverageTemperature) OVER (PARTITION BY date, City) AS average_temperature, 
    AVG(AverageTemperatureUncertainty)  OVER (PARTITION BY date, City) AS average_termperature_uncertainty
    
FROM temperature
""")

In [151]:
df_demographics.City = df_demographics.City.str.strip().str.upper()
df_demographics['State Code'] = df_demographics['State Code'].str.strip().str.upper()
df_demographics.Race = df_demographics.Race.str.strip().str.upper()

In [152]:
# convert the dataframes from pandas to spark
spark_df_demographics = spark.createDataFrame(df_demographics)
spark_df_demographics.createOrReplaceTempView("demographics")

In [153]:
# insert data into the demographics dim table
dim_demographics = spark.sql("""
                                SELECT  City, 
                                        State, 
                                        `Median Age` AS median_age, 
                                        `Male Population` AS male_population, 
                                        `Female Population` AS female_population, 
                                        `Total Population` AS total_population, 
                                        `Foreign-born` AS foreign_born, 
                                        `Average Household Size` AS average_household_size, 
                                        `State Code` AS state_code, 
                                        Race, 
                                        Count
                                FROM demographics
""")

In [154]:
#The airport dataset contains a lot of nulls. We'll load the csv directly into a spark dataframe to avoid having to deal with converting pandas NaN into nulls
spark_df_airports = spark.read.format("csv").option("header", "true").load('airport-codes_csv.csv')
spark_df_airports.createOrReplaceTempView("airports")

In [155]:
#equivalent to the following pandas code:
# df_airports = df_airports[df_airports['iso_country'].fillna('').str.upper().str.contains('US')].copy()
spark.sql("""
SELECT *
FROM airports
WHERE iso_country IS NOT NULL
AND UPPER(TRIM(iso_country)) LIKE 'US'
""").createOrReplaceTempView("airports")

In [156]:
#equivalent to the following pandas code:
# excludedValues = ['closed', 'heliport', 'seaplane_base', 'balloonport']
# df_airports = df_airports[~df_airports['type'].str.strip().isin(excludedValues)].copy()
# df_airports = df_airports[~df_airports['municipality'].isna()].copy()
# df_airports = df_airports[~df_airports['municipality'].isna()].copy()
# df_airports['len'] = df_airports["iso_region"].apply(len)
# df_airports = df_airports[df_airports['len']==5].copy()

spark.sql("""
SELECT *
FROM airports
WHERE LOWER(TRIM(type)) NOT IN ('closed', 'heliport', 'seaplane_base', 'balloonport')
AND municipality IS NOT NULL
AND LENGTH(iso_region) = 5
""").createOrReplaceTempView("airports")

In [157]:
dim_airports = spark.sql("""
SELECT TRIM(ident) AS ident, type, name, elevation_ft, SUBSTR(iso_region, 4) AS state, TRIM(UPPER(municipality)) AS municipality, iata_code
FROM airports
""")

In [158]:
# Saving the data in parquet format
dim_demographics.write.parquet("dim_demographics")
dim_time.write.parquet("dim_time")
dim_airports.write.parquet("dim_airports")
dim_temperature.write.parquet("dim_temperature")
fact_immigration.write.parquet("fact_immigration")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [159]:
#Let's check some things in our data
dim_demographics.createOrReplaceTempView("dim_demographics")
dim_time.createOrReplaceTempView("dim_time")
dim_airports.createOrReplaceTempView("dim_airports")
dim_temperature.createOrReplaceTempView("dim_temperature")
fact_immigration.createOrReplaceTempView("fact_immigration")

First, let's make sure the columns used as primary keys don't contain any null values. We define a function that could be incorporated in an automated data pipeline

In [160]:
# we define the following function to check for null values
def nullValueCheck(spark_ctxt, tables_to_check):
    """
    This function performs null value checks on specific columns of given tables received as parameters and raises a ValueError exception when null values are encountered.
    It receives the following parameters:
    spark_ctxt: spark context where the data quality check is to be performed
    tables_to_check: A dictionary containing (table, columns) pairs specifying for each table, which column is to be checked for null values.   
    """  
    for table in tables_to_check:
        print(f"Performing data quality check on table {table}...")
        for column in tables_to_check[table]:
            returnedVal = spark_ctxt.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""")
            if returnedVal.head()[0] > 0:
                raise ValueError(f"Data quality check failed! Found NULL values in {column} column!")
        print(f"Table {table} passed.")

Check data quality check on all the tables in the data model

In [161]:
#dictionary of tables and columns to be checked
tables_to_check = { 'fact_immigration' : ['cicid'], 'dim_time':['date'], 'dim_demographics': ['City','state_code'], 'dim_airports':['ident'], 'dim_temperature':['date','City']}

#We call our function on the spark context
nullValueCheck(spark, tables_to_check)

Performing data quality check on table fact_immigration...
Table fact_immigration passed.
Performing data quality check on table dim_time...
Table dim_time passed.
Performing data quality check on table dim_demographics...
Table dim_demographics passed.
Performing data quality check on table dim_airports...
Table dim_airports passed.
Performing data quality check on table dim_temperature...
Table dim_temperature passed.


We are succesful in data quality check

Now, will perform more detail check for each table

In [162]:
#time dimension verification

#check the number of rows in our time table : 192 expected
spark.sql("""
SELECT COUNT(*) - 192
FROM dim_time
""").show()

# make sure each row has a distinct date key : 192 expected
spark.sql("""
SELECT COUNT(DISTINCT date) - 192
FROM dim_time
""").show()

# we could also subtract the result of one query from the other


# and make sure all dates from the fact table are included in the time dimension (NULL is the expected result)
spark.sql("""
SELECT DISTINCT date
FROM dim_time

MINUS

(SELECT DISTINCT arrival_date AS date
FROM immig_table
UNION
SELECT DISTINCT departure_date AS date
FROM immig_table
WHERE departure_date IS NOT NULL)

""").show()

+--------------------------------+
|(count(1) - CAST(192 AS BIGINT))|
+--------------------------------+
|                               0|
+--------------------------------+

+--------------------------------------------+
|(count(DISTINCT date) - CAST(192 AS BIGINT))|
+--------------------------------------------+
|                                           0|
+--------------------------------------------+

+----+
|date|
+----+
+----+



In [163]:
#immigration verification

# The number of primary key from the staging table (2165257 expected)
spark.sql("""
SELECT count(distinct cicid) - 2165257
FROM immig_table
""").show()

#should match the primary key count from the fact table (2165257 expected)
spark.sql("""
SELECT count(distinct cicid) - 2165257
FROM fact_immigration
""").show()

#and should match the row count from the fact table since it is also the primary key (2165257 expected)
spark.sql("""
SELECT count(*) - 2165257
FROM fact_immigration
""").show()

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2165257 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2165257 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+

+------------------------------------+
|(count(1) - CAST(2165257 AS BIGINT))|
+------------------------------------+
|                                   0|
+------------------------------------+



In [164]:
# Let's check the demographics dimension table (2891 expected) 
spark.sql("""
SELECT count(*) - 2891
FROM dim_demographics
""").show()

spark.sql("""
SELECT COUNT(DISTINCT city, state, race) - 2891
FROM dim_demographics
""").show()

+---------------------------------+
|(count(1) - CAST(2891 AS BIGINT))|
+---------------------------------+
|                                0|
+---------------------------------+

+----------------------------------------------------------+
|(count(DISTINCT city, state, race) - CAST(2891 AS BIGINT))|
+----------------------------------------------------------+
|                                                         0|
+----------------------------------------------------------+



In [165]:
# Let's check the primary key for airports (expected 14529)
spark.sql("""
SELECT count(*) - 14529
FROM dim_airports
""").show()

spark.sql("""
SELECT COUNT(DISTINCT ident) - 14529
FROM dim_airports
""").show()

+----------------------------------+
|(count(1) - CAST(14529 AS BIGINT))|
+----------------------------------+
|                                 0|
+----------------------------------+

+-----------------------------------------------+
|(count(DISTINCT ident) - CAST(14529 AS BIGINT))|
+-----------------------------------------------+
|                                              0|
+-----------------------------------------------+



In [166]:
#finally, city + date is our primary key for the temperature (expected 189472)

spark.sql("""
SELECT count(*) - 189472
FROM dim_temperature
""").show()

spark.sql("""
SELECT COUNT(DISTINCT date, city) - 189472
FROM dim_temperature
""").show()


+-----------------------------------+
|(count(1) - CAST(189472 AS BIGINT))|
+-----------------------------------+
|                                  0|
+-----------------------------------+

+-----------------------------------------------------+
|(count(DISTINCT date, city) - CAST(189472 AS BIGINT))|
+-----------------------------------------------------+
|                                                    0|
+-----------------------------------------------------+



Will join our facts and dimensional tables and see what will happen

In [167]:
# First, we join airport and immigration
fact_immigration.show(2)
dim_airports.show(2)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
only showing top 2 rows

+-----+-------------+--------------------+------------+-----+------------+---------+
|ident|         type|                name|elevation_ft|state|municipality|iata_code|
+-----+-------------+--------------------+------------+

Since a given city can have more than one airport and airport data is not provided in the immigration dataset, let's try to see how many city & state combinations are common to the two datasets.



We're looking at immigrant influx based on cities. Thus, we'd like to check whether the use of city and state combination works well to match the data between dim_airport and fact_immigration

In [168]:
#here are the distinct combinations of city and state in our fact table
spark.sql("""
SELECT COUNT(DISTINCT city, state)
FROM fact_immigration
""").show()

# and the combinations of city and state that are common to both
spark.sql("""
SELECT COUNT(*)
FROM
(
SELECT DISTINCT city, state
FROM fact_immigration
) fi
INNER JOIN 
(
SELECT DISTINCT municipality, state
FROM dim_airports 
) da
ON fi.city = da.municipality
AND fi.state = da.state
""").show(2)

+---------------------------+
|count(DISTINCT city, state)|
+---------------------------+
|                        151|
+---------------------------+

+--------+
|count(1)|
+--------+
|     102|
+--------+



Roughly two thirds of our data in the fact table can be paired with data in the airport fact table. Considering that the immigration table only includes one month of data, this is quite good. We would normally use a left join

Let's check the same thing with the demographics table. We expect the results of the join to be lower since the table doesn't include all cities in the united states.



In [169]:
fact_immigration.show(2)
dim_demographics.show(2)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
only showing top 2 rows

+-------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+------------------+-----+
|         City|        State|median_age|male_population|femal

In [170]:
#here are the distinct combinations of city and state in our fact table
spark.sql("""
SELECT COUNT(DISTINCT city, state)
FROM fact_immigration
""").show()

# and the combinations of city and state that are common to both the fact table and the demographics table
spark.sql("""
SELECT COUNT(*)
FROM
(
SELECT DISTINCT city, state
FROM fact_immigration
) fi
INNER JOIN 
(
SELECT DISTINCT City, state_code
FROM dim_demographics 
) da
ON fi.city = da.City
AND fi.state = da.state_code
""").show(2)

+---------------------------+
|count(DISTINCT city, state)|
+---------------------------+
|                        151|
+---------------------------+

+--------+
|count(1)|
+--------+
|      69|
+--------+



A little less than half the cities are accounted for in our demographics database which isn't surprising but still quite good.

We have the option of filtering out non existent city/state combinations from the data using a query similar to the one below:

In [171]:
# We use a count to see how many rows we would keep using this strategy
spark.sql("""
SELECT COUNT(*)
FROM fact_immigration
WHERE CONCAT(city, state) IN (
    SELECT CONCAT(fi.city, fi.state)
    FROM
    (
        SELECT DISTINCT city, state
        FROM fact_immigration
    ) fi
    INNER JOIN 
    (
        SELECT DISTINCT municipality, state
        FROM dim_airports 
    ) da
    ON fi.city = da.municipality
    AND fi.state = da.state
)
""").show(2)

+--------+
|count(1)|
+--------+
| 1983869|
+--------+



We drop from 2 165 257 rows to 1 983 869 rows which is still quite good. However, we are assuming that our datasets are incomplete, especially the demographic data since it only includes cities with populations larger than 65,000 inhabitants and prefer to minimize the amount of data that is being left out of our final result.

#### 4.3 Data dictionary

 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up

Consdiering the significant size of the immigration dataset (~ 3 million rows) for only a month, combined with the temperature, airport and demographic dataset, the most sensible technology choice for such an approach would be spark, especially if we were to process data over a longer period of time.

We stated at the beginning of this project that we were interested in:

- the effects of temperature aon the volume of travellers,
- the seasonality of travel
- the connection between the volume of travel and the number of entry ports (ie airports)
- the connection between the volume of travel and the demographics of various cities

None of these phenomenons require a rapid update of our data. A monthly or quarterly update would be sufficient for the needs of this study.

#### Alternate requirement scenarios:

How would our approach change if the problem had the following requireements:

- The data was increased by 100x: Our data would be stored in an Amazon S3 bucket (instead of storing it in the EMR cluster along with the staging tables) and loaded to our staging tables. We would still use spark as it as our data processing platform since it is the best suited platform for very large datasets.
- The data populates a dashboard that must be updated on a daily basis by 7am every day: We would use Apache Airflow to perform the ETL and data qualtiy validation.
- The database needed to be accessed by 100+ people: Once the data is ready to be consumed, it would be stored in a postgres database on a redshift cluster that easily supports multiuser access.