# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project ultimately constitutes of a source-of-truth database consisting of general data of US cities and travel.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd

In [2]:
pd.set_option('display.max_columns', None)

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### In this project, I plan to use all three Udacity-provided datasets to create a source-of-truth database related to cities. More specifically, the connection between the amount of travel and the demographics of corresponding cities.

In [3]:
# Read in the data here
immigration_df = pd.read_csv('immigration_data_sample.csv')
airport_df = pd.read_csv('airport-codes_csv.csv')
cities_df = pd.read_csv('us-cities-demographics.csv', sep=';')
port_df = pd.read_csv('i94portCodes.csv') # Helper data for referencing port code and corresponding city

In [4]:
immigration_df.head(1)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,782,WT


In [5]:
airport_df.head(1)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"


In [6]:
cities_df.head(1)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924


In [7]:
port_df.head(1)

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK


In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark_imm = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [9]:
#write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

In [10]:
df_spark_imm.count()

3096313

In [11]:
df_spark_imm.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Let's analyze the immigration dataset:

In [12]:
immigration_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [13]:
immigration_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

#### The first column is unnamed, and therefore without any useful context for the column. Let's drop it:

In [14]:
immigration_df.drop(columns=immigration_df.columns[0], inplace=True)

In [15]:
immigration_df.head(2)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2


#### Let's check to see if ```cicid``` can be used as a Primary Key:

In [16]:
immigration_df.shape[0]

1000

In [17]:
immigration_df.shape[0] == immigration_df['cicid'].nunique() # Yes, it appears to be unique to each row

True

#### The columns ```i94cit``` and ```i94res``` appear to be identical. Let's see if they have fully identical data for all the 1000 rows:

In [18]:
immigration_df[immigration_df['i94cit'] != immigration_df['i94res']].shape[0] == 0 # False, certain rows have different values

False

#### Let's create a new column with country names, derived from the ```i94res``` column:

In [19]:
import json

with open('country_codes.json', 'r') as f:
    country_json_data = json.load(f)

In [20]:
def country_json_parse(x):
    try:
        return country_json_data[str(int(x))]
    except KeyError:
        return 'N/A'

In [21]:
immigration_df['countries'] = immigration_df['i94res'].apply(lambda x: country_json_parse(x))

In [22]:
immigration_df.head(2)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,countries
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT,JAPAN
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2,"MEXICO Air Sea, and Not Reported (I-94, no lan..."


In [23]:
immigration_df['countries'].unique()

array(['JAPAN',
       'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)',
       'GERMANY', 'QATAR', 'FRANCE', 'GUATEMALA', 'CHINA, PRC',
       'UNITED KINGDOM', 'SWITZERLAND', 'IRELAND', 'AUSTRALIA',
       'PHILIPPINES', 'BAHAMAS', 'BRAZIL', 'RUSSIA', 'ICELAND',
       'MARTINIQUE', 'ISRAEL', 'TAIWAN', 'DOMINICAN REPUBLIC', 'INDIA',
       'TURKEY', 'BERMUDA', 'ANGOLA', 'VENEZUELA', 'ITALY', 'ARGENTINA ',
       'SPAIN', 'HONDURAS', 'NETHERLANDS', 'PAKISTAN', 'COLOMBIA',
       'SWEDEN', 'POLAND', 'AUSTRIA', 'PERU', 'SOUTH KOREA', 'HONG KONG',
       'EGYPT', 'COSTA RICA', 'HAITI', 'LITHUANIA', 'JAMAICA', 'NORWAY',
       'MALAYSIA', 'ECUADOR', 'ESTONIA', 'NICARAGUA', 'BOSNIA-HERZEGOVINA',
       'PORTUGAL', 'THAILAND', 'NEW ZEALAND', 'SURINAME', 'LUXEMBOURG',
       'UKRAINE', 'BANGLADESH', 'CHILE', 'SINGAPORE', 'BELGIUM',
       'BRITISH VIRGIN ISLANDS', 'BULGARIA', 'ANTIGUA-BARBUDA', 'NIGERIA',
       'PANAMA', 'EL SALVADOR', 'KUWAIT', 'DENMARK', 'ROMANIA',
       'CZEC

### Let's analyze the airport dataset:

In [24]:
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [25]:
airport_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [26]:
airport_df['iso_country'].value_counts()

US    22757
BR     4334
CA     2784
AU     1963
KR     1376
MX     1181
RU     1040
DE      947
GB      911
FR      850
AR      848
CO      706
IT      671
PG      593
VE      592
ZA      489
CL      474
ID      470
ES      416
CN      404
KE      372
IN      341
CD      285
PH      282
PL      278
CZ      269
JP      234
NO      228
SE      224
NZ      212
      ...  
SH        2
MF        2
MS        2
DM        2
BN        2
MC        2
WF        2
AD        2
RE        2
KN        2
SM        2
JE        1
AI        1
NU        1
MO        1
CW        1
GI        1
CX        1
CC        1
LI        1
GM        1
VA        1
BL        1
SX        1
MQ        1
IO        1
NR        1
NF        1
YT        1
AW        1
Name: iso_country, Length: 243, dtype: int64

#### This dataset includes airport data for numerous countries. Because our analysis is only within the scope of the US, let's drop the rows that contain non-US airports:

In [27]:
airport_df = airport_df[airport_df['iso_country'] == 'US']

#### Let's examine the types of ports provided in the dataset:

In [28]:
airport_df['type'].value_counts()

small_airport     13720
heliport           6265
closed             1326
medium_airport      692
seaplane_base       566
large_airport       170
balloonport          18
Name: type, dtype: int64

#### In this analysis, it is assumed that incoming travelers arrive by mainstream, regulated ports. Therefore, we'll drop rows containing values of ```heliport```, ```closed```, ```seaplane_base```, and ```balloonport```:

In [29]:
invalid_values = ['heliport', 'closed', 'seaplane_base', 'balloonport']
airport_df = airport_df[~airport_df['type'].isin(invalid_values)]

In [30]:
airport_df['type'].value_counts()

small_airport     13720
medium_airport      692
large_airport       170
Name: type, dtype: int64

#### Ultimately, we must join our airport dataset with out immigration dataset. It appears that the join will happen with the ```municipality``` field. Let's see if there are any missing values in that field:

In [31]:
airport_df.isnull().sum()

ident               0
type                0
name                0
elevation_ft       63
continent       14582
iso_country         0
iso_region          0
municipality       50
gps_code          399
iata_code       12717
local_code        199
coordinates         0
dtype: int64

#### Let's remove these 50 rows with missing ```municipality```:

In [32]:
airport_df.shape

(14582, 12)

In [33]:
airport_df = airport_df[~airport_df['municipality'].isnull()]

In [34]:
airport_df.shape

(14532, 12)

#### Convert the ```municipality``` column to uppercase for joining purposes:

In [35]:
airport_df['municipality'] = airport_df['municipality'].str.upper()

#### Let's examine the ```iso_region``` column:

In [36]:
airport_df['iso_region'].value_counts()

US-TX     1546
US-AK      586
US-IL      579
US-CA      551
US-FL      522
US-OH      492
US-IN      486
US-PA      486
US-WI      457
US-MO      411
US-NY      402
US-MI      379
US-WA      379
US-OK      372
US-KS      372
US-GA      365
US-MN      361
US-OR      357
US-NC      349
US-VA      311
US-ND      297
US-AR      291
US-CO      288
US-LA      281
US-NE      259
US-MT      255
US-ID      238
US-IA      232
US-TN      228
US-AZ      214
US-MS      211
US-AL      197
US-SC      173
US-KY      164
US-SD      162
US-MD      157
US-NM      149
US-ME      122
US-NJ      116
US-NV      113
US-UT      103
US-WY       95
US-WV       83
US-MA       79
US-VT       66
US-CT       56
US-NH       54
US-DE       36
US-HI       35
US-RI       10
US-U-A       3
US-DC        2
Name: iso_region, dtype: int64

#### As we can see, there is an error with the value ```US-U-A```. Upon verification, this doesn't apear to closely match any other State abbreviation, so we will remove rows containing that value:

In [37]:
airport_df = airport_df[airport_df['iso_region'] != 'US-U-A']

#### Let's create a new column with the extracted state abbreviations:

In [38]:
airport_df['state'] = airport_df['iso_region'].apply(lambda x: x[3:])

In [39]:
airport_df.head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,LEOTI,00AA,,00AA,"-101.473911, 38.704022",KS
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,ANCHOR POINT,00AK,,00AK,"-151.695999146, 59.94919968",AK


### Let's inspect the demographic dataset:

In [40]:
cities_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


While there are null values present, there don't appear to be enough to thwart our analysis.

In [41]:
cities_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Due to joining purposes, let's capitalize the ```City``` field:

In [42]:
cities_df['City'] = cities_df['City'].str.upper().str.strip()

##### To make sure that we can join on the ```city``` field, let's ensure its uniqueness within the dataset:

In [43]:
cities_df['City'].nunique()

567

In [44]:
cities_df.shape[0] == cities_df['City'].nunique()

False

In [45]:
cities_df['City'].value_counts()

COLUMBIA           15
BLOOMINGTON        15
SPRINGFIELD        15
COLUMBUS           10
PEORIA             10
UNION CITY         10
ALLEN              10
CONCORD            10
FAYETTEVILLE       10
JACKSONVILLE       10
ALBANY             10
AURORA             10
LAKEWOOD           10
GLENDALE           10
WILMINGTON         10
JACKSON            10
ROCHESTER          10
WESTMINSTER        10
PORTLAND           10
ARLINGTON          10
NORWALK            10
PASADENA           10
KANSAS CITY        10
RICHMOND           10
LAWRENCE            9
LAFAYETTE           9
GREEN BAY           5
TOLEDO              5
YORBA LINDA         5
INGLEWOOD           5
                   ..
CARMEL              4
SKOKIE              4
MISSOURI CITY       4
NEW BRITAIN         4
VICTORIA            4
PASSAIC             4
LAGUNA NIGUEL       4
SUGAR LAND          4
KISSIMMEE           4
CAMDEN              4
LEAGUE CITY         4
BOYNTON BEACH       4
AMES                4
PALATINE            4
SOUTH JORD

#### It appears that most cities are represented in multiple rows within this dataset. Because of this, ```City``` alone cannot act as a primary key. Let's consider composite primary keys:

In [46]:
cities_df[cities_df['City'] == 'EDINBURG']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
203,EDINBURG,Texas,29.3,39422.0,45079.0,84501,2409.0,16569.0,3.31,TX,Hispanic or Latino,74698
1457,EDINBURG,Texas,29.3,39422.0,45079.0,84501,2409.0,16569.0,3.31,TX,White,77560
2019,EDINBURG,Texas,29.3,39422.0,45079.0,84501,2409.0,16569.0,3.31,TX,Asian,812


#### ```Race``` looks like it can distinguish rows with the same city. Let's ensure this:

In [47]:
cities_df[cities_df.duplicated(subset=['City', 'Race']) == True][:3]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
177,WILMINGTON,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193
210,LAKEWOOD,California,39.9,41523.0,40069.0,81592,4094.0,18274.0,3.13,CA,Hispanic or Latino,24987
238,GLENDALE,California,42.1,98181.0,102844.0,201025,4448.0,111510.0,2.69,CA,White,146718


In [48]:
cities_df[(cities_df['City'] == 'GLENDALE') & (cities_df['Race'] == 'White')]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
227,GLENDALE,Arizona,34.4,116795.0,123319.0,240114,13241.0,44133.0,2.89,AZ,White,202539
238,GLENDALE,California,42.1,98181.0,102844.0,201025,4448.0,111510.0,2.69,CA,White,146718


In [49]:
cities_df[(cities_df['City'] == 'WILMINGTON') & (cities_df['Race'] == 'Asian')]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
102,WILMINGTON,North Carolina,35.5,52346.0,63601.0,115947,5908.0,7401.0,2.24,NC,Asian,3152
177,WILMINGTON,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193


#### ```(City, Race)``` is not sufficient as a primary key. This is primarily because different states have identically-named cities. Let's add ```State``` to the composite key and check for duplicates:

In [50]:
cities_df[cities_df.duplicated(subset=['City', 'State', 'Race']) == True].shape

(0, 12)

#### Now we can conclde that ```(City, State, Race)``` is a sufficient primary key

### Complete immigrant dataset Analysis:

In [51]:
df_spark_imm.show(2)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

In [52]:
df_spark_imm.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

#### Let's double-check to see if cicid can be used as a Primary Key:

In [53]:
df_spark_imm.createOrReplaceTempView("imm_view")

In [54]:
df_spark_imm.count()

3096313

In [55]:
spark.sql("""
SELECT COUNT (DISTINCT cicid)
FROM imm_view
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



#### ```cicid``` is a valid Primary Key candidate. 

#### We would like to analyze the date of arrival into the US as well. For this, let's convert the ```arrdate``` field into a useful datatype.<br>
#### The ```arrdate``` column represents the number of days after 1/1/1960. Therefore, let's add a column with the appropriate date, extracted from ```arrdate```:

In [56]:
df_spark_imm = spark.sql("""
SELECT *, date_add(to_date('1960-01-01'), arrdate) AS date_arrival 
FROM imm_view
""")

In [57]:
df_spark_imm.createOrReplaceTempView("imm_view")

In [58]:
df_spark_imm.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|date_arrival|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|  2016-04-29|
+-----+------+------+------+------+-------+-------+-------+-----

#### The numeric values in the ```i94visa``` column indicates the type of visa of the incoming traveler. They include:
- 1: Business
- 2: Pleasure
- 3: Student

Let's replace the data in that column accordingly:

In [59]:
df_spark_imm = spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_reason
             FROM imm_view""")
df_spark_imm.createOrReplaceTempView("imm_view")

#### Let's ensure there are no outlier departure date values:

In [60]:
df_spark_imm = spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS date_departure 
                        
                FROM imm_view""")
df_spark_imm.createOrReplaceTempView("imm_view")

#### For all our records, let's ensure that the arrival date comes before the departure date:

In [61]:
spark.sql("""
SELECT COUNT(*)
FROM imm_view
WHERE date_arrival > date_departure
""").show()

+--------+
|count(1)|
+--------+
|     375|
+--------+



#### There doesn't appear to be any way to fix this anomaly within the dataset. We have no choice but to drop these rows. 

In [62]:
df_spark_imm = spark.sql("""
                            SELECT *
                            FROM imm_view
                            WHERE date_departure >= date_arrival
                        """)
df_spark_imm.createOrReplaceTempView("imm_view")

#### Let's analyze the various modes of transport for our dataset:

In [63]:
spark.sql("""
SELECT i94mode, count(*)
FROM imm_view
GROUP BY i94mode
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|   null|     238|
|    1.0| 2871184|
|    3.0|   61572|
|    2.0|   17970|
|    9.0|    2517|
+-------+--------+



#### As per the dataset definition, the mode of arrivals are numerically represented as follows:

- 1: Air
- 2: Sea
- 3: Land
- 9: Not reported

#### We'll only keep Air arrivals since our analysis is in the scope of US airports only

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

#### Our dataset concerns the correspondence between the amount of travel and the demographics of corresponding US cities. Because of this, we have determined that the following data model makes the most sense:

##### Data from the immigration table will act as our fact table, due to the large amount and frequency of its available data. Our ```fact_immigration``` table will resemble the following:

```fact_immigration```
- cicid
- citizenship_country
- residence_country
- city
- state
- arrival_date
- departure_date
- age
- visa_type
- visa_detailed

##### The following dimension table ```dim_airports``` may be used to determine areas with the largest flow of travelers:

```dim_airports```
- ident
- type
- name
- elevation
- state
- municipality
- iata_code

##### The following dimension table ```dim_city``` can be used to analyze the areas with the most travelers, and its corresponding impact of the demographic makeup:

```dim_city```
- city
- state
- avg_age
- male_population
- female_population
- total_population
- born_outside_US
- Race
- Count

##### The following dimension table ```dim_time``` can assist with time-related calculations:

```dim_time```
- date
- year
- month
- day
- week
- weekday
- dayofyear

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [64]:
df_cities_spark = spark.read.format("csv").option("delimiter", ";").option("header", "true").load('us-cities-demographics.csv')

In [65]:
df_cities_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [66]:
# Load the data
countryref_df = pd.read_csv('countries.csv')
i94port_df = pd.read_csv('i94portCodes.csv', index_col=False)
cities_df = pd.read_csv('us-cities-demographics.csv', sep=';')
df_spark_imm = spark.read.parquet("sas_data")

In [67]:
# Let's create a temporary view of the country code reference for easy access within Spark SQL
spark_countryref_df = spark.createDataFrame(countryref_df)
spark_countryref_df.createOrReplaceTempView("countryRef")

In [68]:
i94port_df['state'].unique()

array(['AK', 'AL', 'AR', 'AZ', 'CA', 'CO', 'CT', 'DC', 'DE', 'FL', 'GA',
       'GU', 'HI', 'IA', 'ID', 'IL', 'IN', 'KS', 'KY', 'LA', 'MA', 'WA',
       'MD', 'ME', 'MT', 'MI', 'MN', 'MO', 'MS', 'NC', 'ND', 'NE', 'NH',
       'NJ', 'NM', 'NV', 'NY', 'OH', 'OK', 'OR', 'PA', 'PR', 'RI', 'SC',
       'SD', 'SPN', 'TN', 'TX', 'VI', 'UT', 'VA', 'VT', 'WI', 'WV', 'WY',
       nan, 'CANADA', 'Canada', 'NETHERLANDS', 'NETH ANTILLES', 'THAILAND',
       'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO',
       'BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX',
       'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
       'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA',
       'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR'], dtype=object)

In [69]:
# We need to remove non-US locations from the dataset:
non_US = ['CANADA', 'Canada', 'NETHERLANDS', 'NETH ANTILLES', 'THAILAND', 'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO', 
               'BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX', 'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
               'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA', 'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR']
i94port_df = i94port_df[~i94port_df.state.isin(non_US)]

In [70]:
i94port_df.head(2)

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK


In [71]:
i94port_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 604 entries, 0 to 659
Data columns (total 3 columns):
code        604 non-null object
location    604 non-null object
state       529 non-null object
dtypes: object(3)
memory usage: 18.9+ KB


In [72]:
i94port_df.fillna('', inplace=True)

In [73]:
spark_i94port_df = spark.createDataFrame(i94port_df)
spark_i94port_df.createOrReplaceTempView("i94portRef")

In [74]:
df_spark_imm.createOrReplaceTempView("imm_view")

In [75]:
# Drop all US entries that are not by airplane travel
spark.sql("""
SELECT *
FROM imm_view
WHERE i94mode = 1
""").createOrReplaceTempView("imm_view")

In [76]:
# Remove all rows with undefined gender values
spark.sql("""
SELECT * 
FROM imm_view
WHERE gender = 'F' OR gender = 'M'
""").createOrReplaceTempView("imm_view")

In [77]:
# Convert the arrival date values into a useable format
spark.sql("""
SELECT *, date_add(to_date('1960-01-01'), arrdate) AS date_arrival
FROM imm_view
""").createOrReplaceTempView("imm_view")

In [78]:
# Convert the departure date values into a useful format
spark.sql("""
SELECT *, CASE 
            WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
            WHEN depdate IS NULL THEN NULL
            ELSE 'N/A' END AS date_departure
FROM imm_view""").createOrReplaceTempView("imm_view")

In [79]:
# Join the citizenship data
spark.sql("""
SELECT imm.*, cr.country AS citizen_of_country
FROM imm_view imm
INNER JOIN countryRef cr
ON imm.i94cit = cr.code
""").createOrReplaceTempView("imm_view")

In [80]:
# Join the residence data
spark.sql("""
SELECT imm.*, cr.country AS residence_country
FROM imm_view imm
INNER JOIN countryRef cr
ON imm.i94res = cr.code
""").createOrReplaceTempView("imm_view")

In [81]:
# Replace visa column with corresponding appropriate value
spark.sql("""
SELECT *, CASE 
            WHEN i94visa = 1.0 THEN 'Business' 
            WHEN i94visa = 2.0 THEN 'Pleasure'
            WHEN i94visa = 3.0 THEN 'Student'
            ELSE 'N/A' END AS visa_type 
FROM imm_view""").createOrReplaceTempView("imm_view")

In [82]:
# Compute ages of individuals in dataset. This is a 2015 dataset.
spark.sql("""
SELECT *, (2015-biryear) AS age 
FROM imm_view
""").createOrReplaceTempView("imm_view")

In [83]:
spark.sql("""
SELECT * FROM imm_view LIMIT 1
""").show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+------------------+-----------------+---------+----+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|date_arrival|date_departure|citizen_of_country|residence_country|visa_type| age|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+------------------+-----------------+---------+----+
|5761447.0|2016.0|   4.0| 299.0| 299.0|    LOS|20574.0|    1.0|     CA|20603.0|  44.0|  

In [84]:
# Add entry port names and states columns
spark.sql("""
SELECT imm.*, pf.location AS entry_port, pf.state AS entry_port_state
FROM imm_view imm 
INNER JOIN i94portRef pf
ON imm.i94port = pf.code
""").createOrReplaceTempView("imm_view")

In [85]:
# Add immigration data
fact_immigration = spark.sql("""
SELECT 
    cicid, 
    citizen_of_country,
    residence_country,
    UPPER (entry_port) AS city,
    UPPER (entry_port_state) AS state,
    date_arrival,
    date_departure,
    age,
    visa_type,
    visatype AS visa_detailed
FROM imm_view
""")

In [86]:
# Extracts dates for dim_time dimension table
dim_time = spark.sql("""
SELECT DISTINCT date_arrival AS date
FROM imm_view
UNION
SELECT DISTINCT date_departure AS date
FROM imm_view
WHERE date_departure IS NOT NULL
""")
dim_time.createOrReplaceTempView("dim_time_table")

In [87]:
# Convert appropriate fields to uppercase:
cities_df['State Code'] = cities_df['State Code'].str.upper()
cities_df['City'] = cities_df['City'].str.upper()
cities_df['Race'] = cities_df['Race'].str.upper()

In [88]:
cities_df.head(1)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,SILVER SPRING,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,HISPANIC OR LATINO,25924


In [89]:
# Convert pandas dataframes to Spark dataframes
spark_df_cities = spark.createDataFrame(cities_df)
spark_df_cities.createOrReplaceTempView("cities")

In [90]:
# Initialize the Demographics Dimension table:
dim_city = spark.sql("""
    SELECT  City, 
            State, 
            `Median Age` AS median_age, 
            `Male Population` AS male_population, 
            `Female Population` AS female_population, 
            `Total Population` AS total_population, 
            `Foreign-born` AS born_outside_US, 
            Race, 
            Count
    FROM cities
""")

In [91]:
# Load airport data
spark_df_airports = spark.read.format("csv").option("header", "true").load('airport-codes_csv.csv')
spark_df_airports.createOrReplaceTempView("airport_view")

spark.sql("""
SELECT *
FROM airport_view
WHERE iso_country IS NOT NULL
AND UPPER(iso_country) LIKE 'US'
""").createOrReplaceTempView("airport_view")

In [92]:
# Make sure to load only conventional airports
spark_df_airports = spark.sql("""
SELECT *
FROM airport_view
WHERE LOWER(type) NOT IN ('closed', 'heliport', 'seaplane_base', 'balloonport')
AND municipality IS NOT NULL
""")

In [93]:
spark_df_airports.show(1)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
only showing top 1 row



In [94]:
fact_immigration.write.parquet("fact_immigration")
dim_city.write.parquet("dim_demographics")
dim_time.write.parquet("dim_time")
spark_df_airports.write.parquet("dim_airports")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

#### First Data quality check:

In [95]:
# Immigration Fact Table Quality Check:
fact_immigration = spark.read.parquet("fact_immigration")
fact_immigration.count() > 0

True

In [96]:
 # Demographic dimension Table quality check:
dim_city = spark.read.parquet("dim_demographics")
dim_city.count() > 0

True

In [97]:
 # Airport dimension Table quality check: 
dim_airports = spark.read.parquet("dim_airports")
dim_airports.count() > 0

True

In [98]:
 # Time dimension Table quality check: 
dim_time = spark.read.parquet("dim_time")
dim_time.count() > 0

True

#### Second data quality check:

In [99]:
# Create temporary views for second null Data quality check:
fact_immigration.createOrReplaceTempView("fact_immigration")
dim_city.createOrReplaceTempView("dim_city")
dim_time.createOrReplaceTempView("dim_time")
spark_df_airports.createOrReplaceTempView("spark_df_airports")

In [100]:
# Define function to check for NULL values:
def nullCheck(spark, tables):
    for table in tables:
        for column in tables[table]:
            nullCount = spark.sql(f"""SELECT COUNT(*) as nullCount FROM {table} WHERE {column} IS NULL""")
            if nullCount.head()[0] > 0:
                raise ValueError(f"Data Quality Check failed: Found Null values in table: {table}, column: {column}")
        print(f"Table {table} passed.")

In [101]:
tables = {'fact_immigration':['cicid'], 'dim_time':['date'], 'dim_city':['City'], 'spark_df_airports':['ident']}
nullCheck(spark, tables)

Table fact_immigration passed.
Table dim_time passed.
Table dim_city passed.
Table spark_df_airports passed.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Due to the large size of the data being dealt with during analysis, the best technology for processing this data on scale would be Spark. This is due to its ability to provision multiple nodes/executors within a cluster to process data in parallel.

#### Because the nature of demographic data is such that trends are only apparent over the course of years, if not decades, this data would not have to be updated often. Perhaps every few months would be sufficient. 

#### If the data was increased by 100x, I would migrate the staging area to an S3 bucket, for ingestion purposes. S3 is a convenntional storage service that is generally used for tasks such as storage of ingested data, which is usually updated in batches. 

#### If the data populates a dashboard that needs daily updates and several points of access, I would add a Data Warehouse technology to the pipeline, such as Amazon Redshift, which would be optimized for parallel execution and general Big Data storage in a normalized form. 

#### This data model may be used by the Business Intelligence team for important decision-making processes when testing certain markets for demographic appeal, or it may simply be used by Data Analysts or Data Scientists willing to explore greater insights within the data. 
- This data model can be used by analysts that work in security, who may be able to monitor vital people of interest who travel to the US
- This data model can be used by marketing teams to gauge if a product may sell well in a certain area based on the demographic makeup of the area
- This data model can be used by data scientists, who may work to find patterns in demographic trends and use it to make predictions of the cultural makeup of certain populations

#### Question: List all Nepalese travelers above 50 that visited the state of New Jersey:

In [102]:
spark.sql("""
    SELECT * 
    FROM fact_immigration F
    WHERE residence_country = 'NEPAL' AND state = 'NJ' AND age > 50
""").show()

+---------+------------------+-----------------+----------------+-----+------------+--------------+----+---------+-------------+
|    cicid|citizen_of_country|residence_country|            city|state|date_arrival|date_departure| age|visa_type|visa_detailed|
+---------+------------------+-----------------+----------------+-----+------------+--------------+----+---------+-------------+
|4744121.0|             NEPAL|            NEPAL|NEWARK/TETERBORO|   NJ|  2016-04-25|    2016-05-23|57.0| Pleasure|           B2|
|4744122.0|             NEPAL|            NEPAL|NEWARK/TETERBORO|   NJ|  2016-04-25|    2016-05-23|54.0| Pleasure|           B2|
| 297995.0|             NEPAL|            NEPAL|NEWARK/TETERBORO|   NJ|  2016-04-02|          null|55.0| Pleasure|           B2|
|4915685.0|             NEPAL|            NEPAL|NEWARK/TETERBORO|   NJ|  2016-04-26|    2016-05-02|55.0| Pleasure|           B2|
+---------+------------------+-----------------+----------------+-----+------------+-------------