### Capstone Project(Data Engineering)

#### Project Summary and outline
This project aims to be able to answers questions on US immigration trend
1. Most popular cities
2. Gender distribution of the immigration
3. visa type distribution
4. average age per immigrant 
5. average temperature per month per city

Data taken from three different sources 
1. I94 immigration dataset of 2016
2. City temperature
3. US city demographic data from openshoft

Design 4 dimention tables and 1 fact table
cities, immigrants, monthl average city temp and time, and immigration

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
#!pip install pyspark
from pyspark.sql  import SparkSession

import psycopg2
from datetime import datetime, timedelta
import re
from pyspark.sql.types import *
from pyspark.sql.functions import *
import numpy as np
import pandas as pd
import glob

### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of this project is pull data from 3 different sources and create fact, dimention table to analyze US immigration using city demographisc, seasions, avg temperature.

#### Describe and Gather Data 

I94 Immigration Data: This data comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA.
World Temperature Data: This data comes from Kaggle and contains average weather temperatures by city. 
U.S. City Demographic Data: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population. 


# Load data from CSV file


### Follow below steps and repeat step 2, 3  to load Airport Codes,Immigration, US  Cities Demographic
1. Creat Spark Session(Set app name to Capstone)
2. Read Csv File
3. Show data frame

## IATA Airport Codes Data

In [2]:
# create pyspark session
pySparkSession = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()


In [3]:
# Load airport codes data 
airports_df = pySparkSession.read.csv("csv/airport-codes_csv.csv",header=True)
airports_df.toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [4]:
airports_df.columns

['ident',
 'type',
 'name',
 'elevation_ft',
 'continent',
 'iso_country',
 'iso_region',
 'municipality',
 'gps_code',
 'iata_code',
 'local_code',
 'coordinates']

## Immigratin data

In [5]:
# Load immigration data through sas7bat fine
# This project only the i94_apr16_sub.sas7bdat will be used, in order to all process all of the available files, simple use i94_files
i94_all_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
i94_fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
i94_df = pySparkSession.read.format("com.github.saurfang.sas.spark").load(i94_fname)
i94_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
# Read sample data
immigration_df = pySparkSession.read.csv("csv/immigration_data_sample.csv",inferSchema=True, header=True)
immigration_df.toPandas().head()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


## Us cities demographics
Contains information about city demographics data

In [7]:
df_demogr = pySparkSession.read.csv("csv/us-cities-demographics.csv",inferSchema=True, header=True, sep=';')
df_demogr.toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [8]:
df_demogr.describe()

DataFrame[summary: string, City: string, State: string, Median Age: string, Male Population: string, Female Population: string, Total Population: string, Number of Veterans: string, Foreign-born: string, Average Household Size: string, State Code: string, Race: string, Count: string]

In [9]:
df_demogr.columns

['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

In [10]:
### Load global temperature
path = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pySparkSession.read.csv(path,inferSchema=True, header=True)
df_temperature.limit(20).toPandas().head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
df_temp_us = df_temperature.filter("Country == 'United States'")
df_temp_us.limit(10).toPandas().head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
1,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
2,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
3,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
4,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.



### Demographics

In [12]:
df_demogr.describe()

DataFrame[summary: string, City: string, State: string, Median Age: string, Male Population: string, Female Population: string, Total Population: string, Number of Veterans: string, Foreign-born: string, Average Household Size: string, State Code: string, Race: string, Count: string]

In [13]:
df_demogr.limit(10).toPandas().head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


In [14]:
df_demogr.count()

2891

### temperature

In [15]:
df_temp_us.limit(20).toPandas().head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
1,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
2,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
3,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
4,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W
5,1820-06-01,25.682,2.008,Abilene,United States,32.95N,100.53W
6,1820-07-01,26.268,1.802,Abilene,United States,32.95N,100.53W
7,1820-08-01,25.048,1.895,Abilene,United States,32.95N,100.53W
8,1820-09-01,22.435,2.216,Abilene,United States,32.95N,100.53W
9,1820-10-01,15.83,2.169,Abilene,United States,32.95N,100.53W


In [16]:
df_temperature.count()

8599212

In [17]:
df_temp_us.count()

687289

In [18]:
df_temperature.describe()

DataFrame[summary: string, AverageTemperature: string, AverageTemperatureUncertainty: string, City: string, Country: string, Latitude: string, Longitude: string]

In [19]:
# Convert date to datetime
df_temp_con = df_temperature.withColumn("convertedDate",to_date(df_temperature.dt))

In [20]:
df_temp_con.limit(10).toPandas().head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E,1743-11-01
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E,1743-12-01
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E,1744-01-01
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E,1744-02-01
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E,1744-03-01
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E,1744-04-01
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E,1744-05-01
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E,1744-06-01
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E,1744-07-01
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E,1744-08-01


In [21]:
df_temp_con.select(min('convertedDate')).collect()

[Row(min(convertedDate)=datetime.date(1743, 11, 1))]

### Immigration data

In [22]:
i94_df.count()

3096313

In [23]:
i94_df.limit(10).toPandas().head(10)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


### Filter valid ports

In [24]:
i94_sas_label_des_filename = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_des_filename) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)


### valid states

In [25]:
valid_states = df_demogr.toPandas()["State Code"].unique().tolist()
type(valid_states)
print(valid_states)

['MD', 'MA', 'AL', 'CA', 'NJ', 'IL', 'AZ', 'MO', 'NC', 'PA', 'KS', 'FL', 'TX', 'VA', 'NV', 'CO', 'MI', 'CT', 'MN', 'UT', 'AR', 'TN', 'OK', 'WA', 'NY', 'GA', 'NE', 'KY', 'SC', 'LA', 'NM', 'IA', 'RI', 'PR', 'DC', 'WI', 'OR', 'NH', 'ND', 'DE', 'OH', 'ID', 'IN', 'AK', 'MS', 'HI', 'SD', 'ME', 'MT']


In [26]:
#valid_states = df_demogr.select('State Code').distinct().collect()
#print(valid_states.toPandas())

In [27]:
df_demogr.select('State Code').distinct().count()

49

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Following star scheam is designed , it is very simple and powerful.


### stag_i94_df
    id
    date
    city_code
    state_code
    age
    gender
    visa_type
    count

### stag_temp_df
    year
    month
    city_code
    city_name
    avg_temp
    lat
    long

### stag_demo_df
    city_code
    state_code
    city_name
    medianAge
    male_pop
    female_pop
    veterans
    foreign_born
    total_pop
### Dimension Tables
### imm_df
    id
    gender
    age
    visa_type

#### city_df
    city_code
    state_code
    city_name
    medianAge
    male_pop
    female_pop
    veterans
    foreign_born
    total_pop
    lat
    long
### monthly_city_temp_df
    city_code
    year
    month
    avg_temp

### time_df
    date
    dayofweek
    weekofyear
    month
### Fact Table
### immigration_df
    id
    state_code
    city_code
    date
    count

#### 3.2 Mapping Out Data Pipelines

### Steps necessary to pipeline the data into the chosen data model

1. Clean the data on nulls, data types, duplicates, etc
2. Load staging tables for stag_i94_df, stag_temp_df and stag_demo_df
3. Create dimension tables for imm_df, city_df, monthly_city_temp_df and time_df
4. Create fact table immigration_df with information on immigration count, mapping id in imm_df, city_code in city_df and monthly_city_temp_df and date in time_df to make sure  referential integrity
5. Save processed dimension and fact tables in parquet for downstream query

### clean immigraton data

In [28]:
# create a function
@udf(StringType())
def state_validation(st):
    print(st)
    if st in valid_states:
        return  st
    return 'None'

In [29]:
# convert date
@udf(StringType())
def conv_date(x):
    if x:
        return (datetime(1960,1,1).date() + timedelta(x)).isoformat()
    return None


In [30]:
# Remove any missing values ( any null value from columns i94port, i94addr, gender)
i94_c_d = i94_df.dropna(how="any", subset=["i94port","i94addr","gender"])

In [31]:
i94_c_d.limit(10).toPandas().head(10)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296,F1
1,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93,B2
2,27.0,2016.0,4.0,101.0,101.0,BOS,20545.0,1.0,MA,20549.0,...,,M,1958.0,04062016,M,,LH,92478760000.0,422,B1
3,28.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20549.0,...,,M,1960.0,04062016,F,,LH,92478900000.0,422,B1
4,29.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20561.0,...,,M,1954.0,09302016,M,,AZ,92503780000.0,614,B2
5,30.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NJ,20578.0,...,,M,1967.0,09302016,M,,OS,92470210000.0,89,B2
6,31.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NY,20611.0,...,,M,1973.0,09302016,M,,OS,92471290000.0,89,B2
7,33.0,2016.0,4.0,101.0,101.0,HOU,20545.0,1.0,TX,20554.0,...,,M,1963.0,09302016,F,,TK,92509300000.0,33,B2
8,34.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,CT,,...,,,1968.0,09302016,M,,AZ,92470420000.0,602,B2
9,35.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,CT,,...,,,1942.0,09302016,F,,TK,669712200.0,1,B2


In [32]:
i94_c_d = i94_c_d.withColumn("i94addr", state_validation(i94_c_d.i94addr))

In [33]:
i94_c_d= i94_c_d.withColumn("arrdate", conv_date(i94_c_d.arrdate))


In [34]:
i94_c_d = i94_c_d.filter(i94_c_d.i94addr != 'None')


In [35]:
i94_c_d.count()

2435922

In [36]:
### staging i94 df table
i94_s_t = i94_c_d.select(
col("cicid").alias("id"),
    col("arrdate").alias("date"),
    col("i94addr").alias("city_code"),
    col("i94bir").alias("age"),
    col("gender").alias("gender"),
    col("i94visa").alias("visa_type"), "count").drop_duplicates()



In [37]:
i94_s_t.limit(10).toPandas().head(10)

Unnamed: 0,id,date,city_code,age,gender,visa_type,count
0,279.0,2016-04-01,NY,24.0,M,2.0,1.0
1,590.0,2016-04-01,FL,1.0,M,2.0,1.0
2,823.0,2016-04-01,TX,29.0,M,1.0,1.0
3,1025.0,2016-04-01,NY,47.0,F,2.0,1.0
4,1358.0,2016-04-01,NY,45.0,M,2.0,1.0
5,1367.0,2016-04-01,NY,36.0,F,2.0,1.0
6,1488.0,2016-04-01,NY,41.0,F,2.0,1.0
7,1527.0,2016-04-01,NY,27.0,M,2.0,1.0
8,1570.0,2016-04-01,CA,31.0,F,2.0,1.0
9,1573.0,2016-04-01,CA,20.0,F,2.0,1.0


In [38]:
# Create udf to map city full name to city port
@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [39]:
# Temperature clean up
df_temp_con.filter(df_temp_con["Country"] == "United States")

DataFrame[dt: timestamp, AverageTemperature: double, AverageTemperatureUncertainty: double, City: string, Country: string, Latitude: string, Longitude: string, convertedDate: date]

In [40]:
# Remove any missing values from temperature ( any null value from columns i94port)
df_temp_con_clean_up = df_temp_con.dropna(how="any", subset=["City"])

In [41]:
cleaned_temp_df = df_temp_con.\
withColumn("year", year(df_temp_con['dt'])) \
    .withColumn("month", month(df_temp_con["dt"])) \
    .withColumn("i94port", city_to_port(df_temp_con["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

cleaned_temp_df.limit(10).toPandas().head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate,year,month,i94port
0,1743-11-01,8.758,1.886,Aberdeen,United Kingdom,57.05N,1.48W,1743-11-01,1743,11,ABE
1,1743-12-01,,,Aberdeen,United Kingdom,57.05N,1.48W,1743-12-01,1743,12,ABE
2,1744-01-01,,,Aberdeen,United Kingdom,57.05N,1.48W,1744-01-01,1744,1,ABE
3,1744-02-01,,,Aberdeen,United Kingdom,57.05N,1.48W,1744-02-01,1744,2,ABE
4,1744-03-01,,,Aberdeen,United Kingdom,57.05N,1.48W,1744-03-01,1744,3,ABE
5,1744-04-01,6.07,2.934,Aberdeen,United Kingdom,57.05N,1.48W,1744-04-01,1744,4,ABE
6,1744-05-01,7.751,1.494,Aberdeen,United Kingdom,57.05N,1.48W,1744-05-01,1744,5,ABE
7,1744-06-01,10.62,1.574,Aberdeen,United Kingdom,57.05N,1.48W,1744-06-01,1744,6,ABE
8,1744-07-01,12.35,1.591,Aberdeen,United Kingdom,57.05N,1.48W,1744-07-01,1744,7,ABE
9,1744-08-01,,,Aberdeen,United Kingdom,57.05N,1.48W,1744-08-01,1744,8,ABE


In [42]:
#consider data only from 2013 year
cleaned_temp_df = cleaned_temp_df.filter(cleaned_temp_df["year"] == 2013)

In [43]:
stag_temp_df = cleaned_temp_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temp"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

In [44]:
print(stag_temp_df.count())
stag_temp_df.limit(5).toPandas()

1908


Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,2,BHX,3.1,52.24N,2.63W
1,2013,3,ICT,-13.7,52.24N,112.99E
2,2013,4,COL,16.9,32.95N,85.21W
3,2013,1,DAB,0.5,39.38N,83.24W
4,2013,3,GEO,26.4,7.23N,57.57W


In [45]:
stag_temp_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



In [46]:
c_demo_df = df_demogr.withColumn("medianAge", df_demogr['Median Age']) \
    .withColumn("male_pop", (df_demogr['Male Population'] / df_demogr['Total Population']) * 100) \
    .withColumn("female_pop", (df_demogr['Female Population'] / df_demogr['Total Population']) * 100) \
    .withColumn("veterans", (df_demogr['Number of Veterans'] / df_demogr['Total Population']) * 100) \
    .withColumn("foreign_born", (df_demogr['Foreign-born'] / df_demogr['Total Population']) * 100) \
    .withColumn("race", (df_demogr['Count'] / df_demogr['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(df_demogr["City"])) \
    .dropna(how='any', subset=["city_code"])

c_demo_df.limit(10).toPandas().head(10)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,race,Count,median_age,male_pop,female_pop,veterans,foreign_born,city_code
0,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,27.101269,76402,34.6,48.965461,51.034539,2.067659,30.595609,NEW
1,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,1.131796,1343,33.1,47.386252,52.613748,5.590716,6.334853,PIA
2,Philadelphia,Pennsylvania,34.1,741270,826172,1567442,61995,205339,2.61,PA,7.829381,122721,34.1,47.291702,52.708298,3.95517,13.100261,PHI
3,Fort Myers,Florida,37.3,36850,37165,74015,4312,15365,2.45,FL,67.782206,50169,37.3,49.787205,50.212795,5.825846,20.759306,FMY
4,Laredo,Texas,28.8,124305,131484,255789,4921,68427,3.66,TX,0.489857,1253,28.8,48.596695,51.403305,1.923851,26.751346,LCB
5,Allen,Pennsylvania,33.5,60626,59581,120207,5691,19652,2.67,PA,18.55466,22304,33.5,50.434667,49.565333,4.734333,16.348466,MCA
6,New Haven,Connecticut,29.9,63765,66545,130310,2567,25871,2.48,CT,1.692119,2205,29.9,48.933313,51.066687,1.969918,19.853426,NWH
7,Salt Lake City,Utah,32.1,98364,94296,192660,6829,32166,2.38,UT,6.827053,13153,32.1,51.055746,48.944254,3.544586,16.695733,SLC
8,Suffolk,Virginia,38.2,43048,45113,88161,10114,2829,2.72,VA,44.358617,39107,38.2,48.828847,51.171153,11.472193,3.208902,FOK
9,Los Angeles,California,35.0,1958998,2012898,3971896,85417,1485425,2.86,CA,54.826461,2177650,35.0,49.321483,50.678517,2.150535,37.398386,LOS


In [47]:
cleaned_demo_df = c_demo_df.select(col("City").alias("city_name"), \
                                   col("State Code").alias("state_code"), 
                                  "median_age", "male_pop", "female_pop","veterans", \
                                   "foreign_born", \
                                   col("Total Population").alias("total_pop"), \
                                   #col("Race").alias("race"), \
                                   "race").drop_duplicates()

cleaned_demo_df.count()

883

In [48]:
p_demo_df = cleaned_demo_df.groupBy("city_name", "state_code", "medianAge", "male_pop",
                                        "female_pop","veterans", "foreign_born", "total_pop").pivot("Race").avg("race")

p_demo_df = p_demo_df.withColumn("city_code", city_to_port(p_demo_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

p_demo_df.limit(10).toPandas().head(10)



Unnamed: 0,city_name,state_code,median_age,male_pop,female_pop,veterans,foreign_born,total_pop,0.18909789182195527,0.23130693513216083,...,94.88423937022502,94.98716610110503,95.07445589919816,95.37466518586267,96.17545996759253,96.28797282289149,96.3458162782606,98.04872062342763,99.27785956918319,city_code
0,Boise,ID,34.9,50.439344,49.560656,7.331867,6.143027,218280,,,...,,,,,,,,,,BOI
1,Jacksonville,NC,24.2,59.402046,40.597954,12.250048,5.540133,67363,,,...,,,,,,,,,,JAC
2,Norfolk,VA,30.2,52.321697,47.678303,11.950421,6.778602,246393,,,...,,,,,,,,,,NOR
3,Los Angeles,CA,35.0,49.321483,50.678517,2.150535,37.398386,3971896,,,...,,,,,,,,,,LOS
4,Rochester,MN,35.0,48.953803,51.046197,6.138162,15.829294,112216,,,...,,,,,,,,,,RST
5,Salinas,CA,30.4,49.410367,50.589633,2.55423,37.292389,157386,,,...,,,,,,,,,,SLS
6,South Bend,IN,32.4,48.655031,51.344969,3.573735,8.461116,103757,,,...,,,,,,,,,,SBN
7,Huntsville,AL,38.1,48.523113,51.476887,8.797339,6.710767,189114,,,...,,,,,,,,,,HSV
8,Medford,OR,38.6,49.634689,50.365311,8.311298,7.751112,79795,,,...,,,,,,,,,,MED
9,Longview,TX,36.8,49.105283,50.894717,6.75573,10.763574,81590,,,...,,,,,,,,,,LON


In [49]:
stag_demo_df = p_demo_df.select("city_code", "state_code", "city_name", "median-age", \
                                    round(col("male_pop"), 1).alias("male_pop"),\
                                    round(col("female_pop"), 1).alias("female_pop"),\
                                    round(col("veterans"), 1).alias("veterans"),\
                                    round(col("veterans"), 1).alias("foreign_born"), "total_pop")
stag_demo_df.limit(10).toPandas()
stag_demo_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_pop: double (nullable = true)
 |-- female_pop: double (nullable = true)
 |-- veterans: double (nullable = true)
 |-- foreign_born: double (nullable = true)
 |-- total_pop: integer (nullable = true)



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [50]:
im_df = i94_s_t.select("id", "gender", "age", "visa_type").drop_duplicates()

In [51]:
im_df.count()

2435922

In [52]:
c_df = stag_demo_df.join(stag_temp_df, "city_code") \
    .select("city_code", "state_code", "city_name", "medianAge", "male_pop", "female_pop", "veterans",
           "foreign_born", "total_pop", "lat", "long").drop_duplicates()
c_df.limit(10).toPandas().head(10)

Unnamed: 0,city_code,state_code,city_name,median_age,male_pop,female_pop,veterans,foreign_born,total_pop,lat,long
0,RDU,NC,Raleigh,32.8,48.5,51.5,3.7,3.7,451949,36.17N,79.56W
1,RST,MN,Rochester,35.0,49.0,51.0,6.1,6.1,112216,42.59N,78.55W
2,RCM,VA,Richmond,33.6,47.6,52.4,5.7,5.7,220289,37.78N,77.29W
3,OKC,OK,Oklahoma City,34.1,49.0,51.0,6.6,6.6,631263,36.17N,97.46W
4,BFL,CA,Bakersfield,30.6,48.8,51.2,3.3,3.3,373627,36.17N,119.34W
5,DAB,OH,Dayton,32.8,47.4,52.6,6.0,6.0,140597,39.38N,83.24W
6,HSV,WI,Madison,30.7,49.2,50.8,3.9,3.9,248956,34.56N,85.62W
7,OTM,AZ,Mesa,36.9,49.8,50.2,6.7,6.7,471833,32.95N,112.02W
8,CHI,IL,Chicago,34.2,48.5,51.5,2.6,2.6,2720556,42.59N,87.27W
9,ABQ,NM,Albuquerque,36.0,48.9,51.1,6.7,6.7,559131,34.56N,107.03W


In [53]:
m_df = stag_temp_df.select("city_code", "year", "month", "avg_temp").drop_duplicates()
m_df.limit(10).toPandas().head(10)

Unnamed: 0,city_code,year,month,avg_temperature
0,FRK,2013,5,27.0
1,SAA,2013,6,18.6
2,PHI,2013,5,16.6
3,LOS,2013,5,29.6
4,NPT,2013,8,16.6
5,BOS,2013,5,14.3
6,BUR,2013,3,14.5
7,RNO,2013,2,4.7
8,GRP,2013,4,6.8
9,GDL,2013,1,18.5


In [54]:
m_df.count()

1901

In [55]:
time_df = i94_s_t.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
time_df = time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [56]:
time_df.count()

30

In [57]:
time_df.limit(5).toPandas().head(5)

Unnamed: 0,date,dayofweek,weekofyear,month
0,2016-04-23,7,16,4
1,2016-04-22,6,16,4
2,2016-04-08,6,14,4
3,2016-04-09,7,14,4
4,2016-04-26,3,17,4


In [None]:
# Write to dimension tables
im_df.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
c_df.write.mode("overwrite").partitionBy("state_code").parquet("cities")
m_df.write.mode("overwrite").parquet("monthly_city_temperatues")
time_df.write.mode("overwrite").parquet("time")

# Write to  fact table
im_df.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration")

### 4.2 Data Quality Checks

In [None]:
if (im_df is not None and im_df.count() != 0)  and \
   (c_df is not None and c_df.count() != 0) and \
   (m_df is not None and m_df.count() != 0) and \
   (time_df is not None and time_df.count() != 0):
    print("Data Quality Check Passed")
else:
    print ("Data Quality Check Failed")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.


### Dimension Tables

### city_df
    city_code: represents city port code
    state_code: represents state code of the city
    city_name: represents name of the city
    medianAge: represents median age of the city
    male_pop: represents city male population in %
    female_pop: represents city's female population in %
    veterans: represents city's veteran population in %
    foreign_born: represents city's foreign born population in %
    total_pop: represents city's total population
    lat: represents latitude of the city
    long: represents longitude of the city
	
### imm_df
    id: represents id of immigrant
    gender: represents gender of immigrant
    age: represents age of immigrant
    visa_type: represents immigrant's visa type

### city_df
    city_code: represents city port code
    state_code: represents state code of the city
    city_name: represents name of the city
    medianAge: represents median age of the city
    male_pop: represents city's male population in %
    female_pop: represents city's female population in %
    veterans: represents city's veteran population in %
    foreign_born: represents city's foreign born population in %
    total_pop: represents city's total population
    lat: represents latitude of the city
    long: represents longitude of the city

### monthly_city_temp_df
    city_code: represents city port code
    year: represents year
    month: represents month 
    avg_temp: represents average temperature in city for given month

### time_df
    date: represents date
    dayofweek: represents day of the week
    weekofyear: represents week of year
    month: represents month
### Fact Table
### immigration_df
    id: represents id
    state_code: represents state code of arrival city
    city_code: represents city port code of arrival city
    date: represents date of arrival
    count: represents count of immigrant's entries into the US

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.

Apache Spark used  because of ability to process large set of data along with apis to read data and its convenient dataframe manipulation functions

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x: An Amazon EMR cluster can be useda with Apache Spark installed to process the increase in data easily prior to being stored on S3.S3 have capabiliity to auto scale at any speed. 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day: We can define airflow to run job every day at 7am on dela to make process more effective. 
 * The database needed to be accessed by 100+ people.
  We can use redshift to store staging, dimention and fact tables as it was cluster and improves performance, multiple people can case at any point of time. 