### Capstone Project(Data Engineering)

#### Project Summary
The Project is to build an Datawarehouse where data is taken from three different sources, build ETL pipleline for the I94 Immigration, U.S. City Demographic and World Temperature data datasets using Pyspark. We create the star schema with the four dimension tables and one fact table and the results being saved in paraquet files for the downstream analysis which is helpful to the analytical team.

Dimension Tables:

1. Cities
2. Immigrants
3. Monthly average city temperature 
4. Time

Fact Tables:
1. Immigration

Data taken from three different sources
1. I94 immigration dataset of 2016
2. City temperature
3. US city demographic data


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
#!pip install pyspark
import pandas as pd
import os
import glob
import re
from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType
import Cleaned as Cleaned
import numpy as np
import pandas as pd
import glob 


### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of this project is pull data from 3 different sources and createdimension, fact table to analyze US immigration using city demographics, season, avg temperature.

#### Describe and Gather Data 

I94 Immigration Data: This data comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA.
World Temperature Data: This data comes from Kaggle and contains average weather temperatures by city. 
U.S. City Demographic Data: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population. 


# Load data from CSV file


### Load Airport Codes,Immigration, US Cities Demographic
1. Creat Spark Session
2. Read CSV File
3. Display data frame

In [3]:
# create pyspark session
pySparkSession = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()


## Load airports information

In [4]:
# Load airport codes data 
airports_data_df = pySparkSession.read.csv("airport-codes_csv.csv",header=True)
airports_data_df.toPandas().head(5)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [5]:
airports_data_df.columns
airports_data_df.describe()

DataFrame[summary: string, ident: string, type: string, name: string, elevation_ft: string, continent: string, iso_country: string, iso_region: string, municipality: string, gps_code: string, iata_code: string, local_code: string, coordinates: string]

## Load Immigration data

In [6]:
# Load immigration data through sas7bat files
#  To avoid memory errors in this project we use only the i94_may16_sub.sas7bdat will be used for this project
immi_i94_all_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
immi_i94_fname = "../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat"
immi_i94_df = pySparkSession.read.format("com.github.saurfang.sas.spark").load(immi_i94_fname)
immi_i94_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2.0,2016.0,5.0,207.0,207.0,XXX,20605.0,,,,...,U,,1989.0,D/S,,,,1141634000.0,,F1
1,3.0,2016.0,5.0,209.0,209.0,XXX,20598.0,,,,...,U,,1989.0,05232018,,,,1863211000.0,,E2
2,4.0,2016.0,5.0,213.0,213.0,XXX,20578.0,,,,...,U,,1938.0,11032016,,,,4696371000.0,,B2
3,5.0,2016.0,5.0,213.0,213.0,XXX,20601.0,,,,...,U,,1987.0,D/S,,,,1141260000.0,,F1
4,13.0,2016.0,5.0,213.0,213.0,CHI,20577.0,1.0,IL,20270.0,...,,M,1987.0,D/S,F,,EK,64792870000.0,235.0,F1


## Us cities demographics
Contains information about city demographics data

In [7]:
demog_df = pySparkSession.read.csv("us-cities-demographics.csv",inferSchema=True, header=True, sep=';')
demog_df.toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [8]:
demog_df.describe()

DataFrame[summary: string, City: string, State: string, Median Age: string, Male Population: string, Female Population: string, Total Population: string, Number of Veterans: string, Foreign-born: string, Average Household Size: string, State Code: string, Race: string, Count: string]

In [9]:
demog_df.columns

['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

In [10]:
### Load global temperature
path = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = pySparkSession.read.csv(path,inferSchema=True, header=True)
temperature_df.limit(10).toPandas().head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.



### Duplicate/Empty columns data clean up for immigration,temperature and demographics data


In [11]:
immi_i94_df_cleanup = Cleaned.drop_empty_columns(immi_i94_df,["arrdate","i94addr","visatype","biryear","gender","depdate"])
immi_i94_df_cleanup = Cleaned.drop_duplicate_rows(immi_i94_df_cleanup)

Missing data is dropped..
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid|i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum         |fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|13.0 |2016.0|5.0   |213.0 |213.0 |CHI    |20577.0|1.0    |IL     |20270.0|29.0  |3.0    |1.0  |20150619|null    |null |T      |O      |null   |M      |1987.0 |D/S     |F     |null  |EK     |6.479287483E10 |00235|F1      |
|22.0 |2016.0|5.0   |101.0 |101.0 |BOS    |20575.0|1.0    |NY     |20587.0|53.0  |

In [12]:
demog_df_cleanup = Cleaned.drop_empty_columns(demog_df,["city","state"])
demog_df_cleanup = Cleaned.drop_duplicate_rows(demog_df_cleanup)

Missing data is dropped..
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+------+
|City            |State         |Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race                             |Count |
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+------+
|Silver Spring   |Maryland      |33.8      |40601          |41862            |82463           |1562              |30908       |2.6                   |MD        |Hispanic or Latino               |25924 |
|Quincy          |Massachusetts |41.0      |44129          |49500            |93629           |4147              |32935       |2.39                  |MA        |W

In [13]:
temperature_df_cleanup = Cleaned.drop_empty_columns(temperature_df,["dt","AverageTemperature"])
temperature_df_cleanup = Cleaned.drop_duplicate_rows(temperature_df_cleanup)

Missing data is dropped..
+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|dt                 |AverageTemperature |AverageTemperatureUncertainty|City |Country|Latitude|Longitude|
+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|6.068              |1.7369999999999999           |Århus|Denmark|57.05N  |10.33E   |
|1744-04-01 00:00:00|5.7879999999999985 |3.6239999999999997           |Århus|Denmark|57.05N  |10.33E   |
|1744-05-01 00:00:00|10.644             |1.2830000000000001           |Århus|Denmark|57.05N  |10.33E   |
|1744-06-01 00:00:00|14.050999999999998 |1.347                        |Århus|Denmark|57.05N  |10.33E   |
|1744-07-01 00:00:00|16.082             |1.396                        |Århus|Denmark|57.05N  |10.33E   |
|1744-09-01 00:00:00|12.780999999999999 |1.454                        |Århus|Denmark|57.05N  |10.33E   |
|1744-10-01 00:00:00|7.95    

In [14]:
demog_df_cleanup.describe()

DataFrame[summary: string, City: string, State: string, Median Age: string, Male Population: string, Female Population: string, Total Population: string, Number of Veterans: string, Foreign-born: string, Average Household Size: string, State Code: string, Race: string, Count: string]

In [15]:
demog_df_cleanup.limit(5).toPandas().head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Maple Grove,Minnesota,38.6,31780,36601,68381,2943,7645,2.64,MN,White,59683
1,Concord,California,39.6,62310,66358,128668,6287,37428,2.72,CA,White,92575
2,Highlands Ranch,Colorado,39.6,49186,53281,102467,4840,8827,2.72,CO,Asian,5650
3,Asheville,North Carolina,37.9,42100,46407,88507,4973,6630,2.18,NC,American Indian and Alaska Native,496
4,Westland,Michigan,39.9,37742,44253,81995,4756,6429,2.41,MI,Black or African-American,16422


In [16]:
demog_df_cleanup.count()

2891

### Temperature

In [17]:
temperature_df_cleanup.limit(5).toPandas().head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1850-04-01,,,Bontang,Indonesia,0.80N,118.13E
1,1859-05-01,,,Bontang,Indonesia,0.80N,118.13E
2,1862-09-01,,,Bontang,Indonesia,0.80N,118.13E
3,1863-09-01,,,Butembo,Congo (Democratic Republic Of The),0.80N,29.73E
4,1864-07-01,20.314,1.248,Butembo,Congo (Democratic Republic Of The),0.80N,29.73E


In [18]:
temperature_us_df_cleanup = temperature_df_cleanup.filter("Country == 'United States'")
temperature_us_df_cleanup.limit(10).toPandas().head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1848-06-01,24.97,1.799,Abilene,United States,32.95N,100.53W
1,1892-05-01,21.656,0.501,Abilene,United States,32.95N,100.53W
2,1917-02-01,8.004,0.518,Abilene,United States,32.95N,100.53W
3,1937-04-01,17.291,0.307,Abilene,United States,32.95N,100.53W
4,1942-09-01,21.529,0.319,Abilene,United States,32.95N,100.53W
5,1948-11-01,9.912,0.472,Abilene,United States,32.95N,100.53W
6,1954-05-01,19.402,0.26,Abilene,United States,32.95N,100.53W
7,1970-07-01,28.667,0.203,Abilene,United States,32.95N,100.53W
8,1761-05-01,18.023,2.477,Akron,United States,40.99N,80.95W
9,1766-03-01,3.027,2.652,Akron,United States,40.99N,80.95W


In [19]:
temperature_us_df_cleanup.count()

687289

In [20]:
temperature_us_df_cleanup.describe()

DataFrame[summary: string, AverageTemperature: string, AverageTemperatureUncertainty: string, City: string, Country: string, Latitude: string, Longitude: string]

### Immigration data

In [21]:
# Count of immigration data
immi_i94_df_cleanup.count()

3444249

In [23]:
immi_i94_df_cleanup.limit(5).toPandas().head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,244.0,2016.0,5.0,103.0,103.0,NYC,20575.0,1.0,NY,20584.0,...,,M,1964.0,07292016,F,,OS,59604030000.0,87,WT
1,315.0,2016.0,5.0,103.0,103.0,LOS,20575.0,1.0,CA,20586.0,...,,M,1982.0,07292016,M,,AF,59611250000.0,66,WT
2,630.0,2016.0,5.0,103.0,103.0,NYC,20575.0,1.0,NY,20622.0,...,,M,1995.0,D/S,M,,OS,95086870000.0,87,F1
3,784.0,2016.0,5.0,104.0,104.0,ATL,20575.0,1.0,TN,20579.0,...,,M,1977.0,07292016,M,,DL,59603700000.0,25,WT
4,1341.0,2016.0,5.0,104.0,104.0,ORL,20575.0,1.0,FL,20594.0,...,,M,2010.0,07292016,,,LH,59625960000.0,464,WT


### Create Valid port List

In [24]:
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()
re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))


659


### Create Valid states List

In [25]:
valid_states = demog_df.toPandas()["State Code"].unique().tolist()
type(valid_states)
print(valid_states)

['MD', 'MA', 'AL', 'CA', 'NJ', 'IL', 'AZ', 'MO', 'NC', 'PA', 'KS', 'FL', 'TX', 'VA', 'NV', 'CO', 'MI', 'CT', 'MN', 'UT', 'AR', 'TN', 'OK', 'WA', 'NY', 'GA', 'NE', 'KY', 'SC', 'LA', 'NM', 'IA', 'RI', 'PR', 'DC', 'WI', 'OR', 'NH', 'ND', 'DE', 'OH', 'ID', 'IN', 'AK', 'MS', 'HI', 'SD', 'ME', 'MT']


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Following star scheam is designed , it is very simple and powerful.


### staging_immi_i94_df
    id
    date
    city_code
    state_code
    age
    gender
    visa_type
    count

### staging_temperature_df
    year
    month
    city_code
    city_name
    avg_temp
    lat
    long

### staging_demog_df
    city_code
    state_code
    city_name
    medianAge
    male_pop
    female_pop
    veterans
    foreign_born
    total_pop
### Dimension Tables
### dim_imm_df
    id
    gender
    age
    visa_type
    
#### dim_city_df
    city_code
    state_code
    city_name
    medianAge
    male_pop
    female_pop
    veterans
    foreign_born
    total_pop
    lat
    long
### dim_monthly_city_temp_df
    city_code
    year
    month
    avg_temp

### dim_time_df
    date
    dayofweek
    weekofyear
    month
### Fact Table
### fact_immigrations
    id
    state_code
    city_code
    date
    count

#### 3.2 Mapping Out Data Pipelines

### Steps necessary to pipeline the data into the chosen data model

1. Clean the data on nulls, data types, duplicates, etc
2. Load staging tables for staging_immi_i94_df, staging_temperature_df and staging_demog_df
3. Create dimension tables for dim_imm_df, dim_city_df, dim_monthly_city_temp_df and dim_time_df
4. Create fact table fact_immigration_df with information on immigration count, mapping id in imm_df, city_code in city_df and monthly_city_temp_df and date in time_df to make sure  referential integrity
5. Save processed dimension and fact tables in parquet for downstream query

In [26]:
# create a function
@udf(StringType())
def state_validation(st):
    print(st)
    if st in valid_states:
        return  st
    return 'None'

In [27]:
# convert date
@udf(StringType())
def conv_date(x):
    if x:
        return (datetime(1960,1,1).date() + timedelta(x)).isoformat()
    return None


In [28]:
## Clean immigraton data
# Remove any missing values ( any null value from columns i94port, i94addr, gender)
immi_i94_cleaned_df = immi_i94_df.dropna(how="any", subset=["i94port","i94addr","gender"])

In [29]:
immi_i94_cleaned_df.limit(10).toPandas().head(10)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,13.0,2016.0,5.0,213.0,213.0,CHI,20577.0,1.0,IL,20270.0,...,,M,1987.0,D/S,F,,EK,64792870000.0,235,F1
1,22.0,2016.0,5.0,101.0,101.0,BOS,20575.0,1.0,NY,20587.0,...,,M,1963.0,10312016,F,,BA,911023900.0,215,B2
2,28.0,2016.0,5.0,101.0,101.0,MIA,20575.0,1.0,FL,20596.0,...,,M,1984.0,10312016,F,,AZ,909366500.0,630,B2
3,29.0,2016.0,5.0,101.0,101.0,WAS,20575.0,1.0,NY,20589.0,...,,M,1988.0,10312016,F,,UA,95077050000.0,951,B2
4,34.0,2016.0,5.0,101.0,101.0,BOS,20575.0,1.0,MA,20583.0,...,,M,1957.0,10312016,M,,AZ,95108880000.0,614,B2
5,37.0,2016.0,5.0,101.0,101.0,WAS,20575.0,1.0,MD,20594.0,...,,M,1965.0,10312016,F,,UA,95115440000.0,933,B2
6,38.0,2016.0,5.0,101.0,101.0,WAS,20575.0,1.0,VA,20630.0,...,,M,1981.0,10312016,F,,UA,95115380000.0,933,B2
7,39.0,2016.0,5.0,101.0,101.0,NYC,20575.0,1.0,NY,,...,,,1997.0,D/S,F,,TK,95102220000.0,1,F1
8,42.0,2016.0,5.0,101.0,101.0,NYC,20575.0,1.0,NY,,...,,,1995.0,D/S,F,,AY,95090390000.0,5,F1
9,43.0,2016.0,5.0,101.0,101.0,LOS,20575.0,1.0,CA,20580.0,...,,M,1955.0,10312016,M,,BA,95093800000.0,283,B2


In [30]:
immi_i94_cleaned_df = immi_i94_cleaned_df.withColumn("i94addr", state_validation(immi_i94_cleaned_df.i94addr))

In [31]:
# Convert arrival_date (SAS format) to PySpark format
immi_i94_cleaned_df = immi_i94_cleaned_df.withColumn("arrdate", conv_date(immi_i94_cleaned_df.arrdate))

In [32]:
# Immigration data related to US
immi_i94_cleaned_df = immi_i94_cleaned_df.filter(immi_i94_cleaned_df.i94addr != 'other')


In [33]:
immi_i94_cleaned_df.count()

2795121

In [34]:
### Creating table staging immi i94:

staging_immi_i94_df = immi_i94_cleaned_df.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()


In [35]:
staging_immi_i94_df.limit(5).toPandas().head()

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,214.0,2016-05-01,NYC,NY,50.0,M,2.0,1.0
1,382.0,2016-05-01,MIA,FL,51.0,M,2.0,1.0
2,1308.0,2016-05-01,LOS,AZ,56.0,F,2.0,1.0
3,1416.0,2016-05-01,PHO,AZ,71.0,F,2.0,1.0
4,1470.0,2016-05-01,SLC,NV,49.0,M,1.0,1.0


In [36]:
# Create udf to map city full name to city port
@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [37]:
# Temperature Data clean up
temperature_us_df_cleanup = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

In [38]:
# Data from year 2013
temperature_us_df_cleanup = temperature_us_df_cleanup.filter(temperature_us_df_cleanup["year"] == 2013)

In [39]:
### Creating table staging i94 df 
staging_temperature_df = temperature_us_df_cleanup.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()
print(staging_temperature_df.count())
staging_temperature_df.limit(5).toPandas()

1044


Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,4,COL,16.9,32.95N,85.21W
1,2013,1,DAB,0.5,39.38N,83.24W
2,2013,1,ONT,6.8,34.56N,116.76W
3,2013,2,POM,5.8,45.81N,123.46W
4,2013,5,PRO,14.3,42.59N,72.00W


In [40]:
staging_temperature_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



In [41]:
# Cleaning the demographics data:

# Calculate percentages of numeric columns
cleaned_demog_df = demog_df.withColumn("median_age", demog_df['Median Age']) \
    .withColumn("pct_male_pop", (demog_df['Male Population'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demog_df['Female Population'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demog_df['Number of Veterans'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demog_df['Foreign-born'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_race", (demog_df['Count'] / demog_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(demog_df["City"])) \
    .dropna(how='any', subset=["city_code"])

cleaned_demog_df = cleaned_demog_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

cleaned_demog_df.count()

883

In [43]:
# Pivot
pivot_demog_df = cleaned_demog_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demog_df = pivot_demog_df.withColumn("city_code", city_to_port(pivot_demog_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

pivot_demog_df.limit(10).toPandas().head(10)



Unnamed: 0,city_name,state_code,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,total_pop,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White,city_code
0,Jacksonville,NC,24.2,59.402046,40.597954,12.250048,5.540133,67363,2.584505,6.240815,19.674005,17.735255,76.072918,JAC
1,Boise,ID,34.9,50.439344,49.560656,7.331867,6.143027,218280,1.853583,4.492395,2.021715,8.924317,93.876214,BOI
2,Los Angeles,CA,35.0,49.321483,50.678517,2.150535,37.398386,3971896,1.605228,12.915721,10.193318,48.760894,54.826461,LOS
3,Norfolk,VA,30.2,52.321697,47.678303,11.950421,6.778602,246393,1.322684,4.811013,43.788988,7.613853,51.844411,NOR
4,Rochester,MN,35.0,48.953803,51.046197,6.138162,15.829294,112216,1.044414,8.767912,8.398981,5.896664,83.300955,RST
5,Salinas,CA,30.4,49.410367,50.589633,2.55423,37.292389,157386,1.814011,7.133417,2.154575,77.005579,60.540963,SLS
6,South Bend,IN,32.4,48.655031,51.344969,3.573735,8.461116,103757,0.396118,1.723257,28.918531,15.865917,68.80596,SBN
7,Huntsville,AL,38.1,48.523113,51.476887,8.797339,6.710767,189114,0.928012,3.47198,32.552323,5.756845,64.46059,HSV
8,Green Bay,WI,32.9,49.911139,50.088861,5.245151,9.326085,105221,6.222142,5.371551,7.185828,15.953089,77.962574,GRB
9,Medford,OR,38.6,49.634689,50.365311,8.311298,7.751112,79795,1.552729,2.675606,1.505107,17.913403,94.572342,MED


In [44]:
staging_demog_df = pivot_demog_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")

staging_demog_df.limit(10).toPandas()
staging_demog_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_native_american: double (nullable = true)
 |-- pct_asian: double (nullable = true)
 |-- pct_black: double (nullable = true)
 |-- pct_hispanic_or_latino: double (nullable = true)
 |-- pct_white: double (nullable = true)
 |-- total_pop: integer (nullable = true)



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [45]:
# Creating Immigrant dimension Table
dim_immigrant_df = staging_immi_i94_df.select("id", "gender", "age", "visa_type").drop_duplicates()

In [46]:
dim_immigrant_df.count()

2795121

In [47]:
dim_immigrant_df.limit(5).toPandas()

Unnamed: 0,id,gender,age,visa_type
0,1308.0,F,56.0,2.0
1,73543.0,F,22.0,3.0
2,229938.0,M,46.0,1.0
3,362142.0,F,46.0,2.0
4,520305.0,M,36.0,2.0


In [48]:
# Creating City dimension table 
dim_city_df = staging_demog_df.join(staging_temperature_df, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

In [49]:
dim_city_df.count()

142

In [50]:
dim_city_df.limit(5).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop,lat,long
0,LLB,NE,Lincoln,32.3,50.0,50.0,5.3,5.3,1.5,5.5,5.9,7.3,88.9,277346,40.99N,95.86W
1,CHI,IL,Chicago,34.2,48.5,51.5,2.6,2.6,0.9,7.2,32.1,28.9,50.5,2720556,42.59N,87.27W
2,ATL,GA,Atlanta,33.8,48.3,51.7,4.0,4.0,1.0,5.2,52.9,4.0,42.3,463875,34.56N,83.68W
3,JAC,MS,Jackson,31.7,46.3,53.7,4.8,4.8,0.2,0.5,81.7,1.8,16.8,170811,32.95N,90.96W
4,SYR,NY,Syracuse,30.3,48.2,51.8,4.1,4.1,2.5,6.5,31.9,9.3,61.5,144152,42.59N,76.36W


In [51]:
# Creating monthly city temperature Dimension table
dim_monthly_city_temp_df = staging_temperature_df.select("city_code", "year", "month", "avg_temperature").drop_duplicates()

In [52]:
dim_monthly_city_temp_df.count()

1043

In [53]:
dim_monthly_city_temp_df.limit(5).toPandas()

Unnamed: 0,city_code,year,month,avg_temperature
0,SAA,2013,6,18.6
1,PHI,2013,5,16.6
2,BOS,2013,5,14.3
3,BUR,2013,3,14.5
4,RNO,2013,2,4.7


In [54]:
# Creating Time Dimesion table
dim_time_df = staging_immi_i94_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
dim_time_df = dim_time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [55]:
dim_time_df.count()

31

In [56]:
dim_time_df.limit(5).toPandas().head(5)

Unnamed: 0,date,dayofweek,weekofyear,month
0,2016-05-25,4,21,5
1,2016-05-09,2,19,5
2,2016-05-02,2,18,5
3,2016-05-27,6,21,5
4,2016-05-13,6,19,5


In [57]:
# Creating Immigration fact table
fact_immigration_df = staging_immi_i94_df.select("id", "state_code", "city_code", "date", "count").drop_duplicates()

In [58]:
fact_immigration_df.count()

2795121

In [None]:
fact_immigration_df.limit(5).toPandas().head(5)

In [None]:
#Dimension Table written into the parquet
dim_immigrant_df.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
dim_city_df.write.mode("overwrite").partitionBy("state_code").parquet("cities")
dim_monthly_city_temp_df.write.mode("overwrite").parquet("monthly_city_temperatures")
dim_time_df.write.mode("overwrite").parquet("time")

# Fact written into the parquet
fact_immigration_df.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration")

### 4.2 Data Quality Checks

In [102]:

def load_parquets():
    # load immigration parquet file, create view and query
    read_im_df = pySparkSession.read.parquet("immigrants/")
    immigration = read_im_df.createOrReplaceTempView("immigrants")
    table_im_df = pySparkSession.sql("select * from immigrants limit 10");
    Cleaned.data_quality_check(table_im_df, "immigrants")
    table_im_df.printSchema()
    # load cities parquet file create view and query
    table_c_df = pySparkSession.read.parquet("cities/")
    cities = table_c_df.createOrReplaceTempView("cities")
    citi_table = pySparkSession.sql("select * from cities limit 10");
    Cleaned.data_quality_check(citi_table, "cities")
    table_m_df = pySparkSession.read.parquet("monthly_city_temperatues/")
    city_temperatures = table_m_df.createOrReplaceTempView("city_temperatures")
    city_temparature_table = pySparkSession.sql("select * from city_temperatures limit 10");
    Cleaned.data_quality_check(city_temparature_table, "city_temperatures")
    table_time = pySparkSession.read.parquet("time/")
    time = table_time.createOrReplaceTempView("time")
    time_table = pySparkSession.sql("select * from time limit 10");
    Cleaned.data_quality_check(time_table, "time")




In [None]:
load

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.


### Dimension Tables

### DIM_CITY_DF
    city_code: Information about city port code
    state_code:Information aboutstate code of the city
    city_name: Information about of the city
    medianAge: Information about median age of the city
    male_pop:  Information about city male population in %
    female_pop:Information about city's female population in %
    veterans:  Information about city's veteran population in %
    foreign_born: Information about city's foreign born population in %
    total_pop: It is city's total population
    lat: It is latitude of the city
    long: It is longitude of the city
    
### DIM_IMMIGRANT_df
    id: Contains information about id of immigrant
    gender:  Contains information about gender of immigrant
    age:  Contains information about age of immigrant
    visa_type:  Contains information about immigrant's visa type

### DIM_MONTHLY_CITY_TEMP_DF
    city_code:  Contains information about city port code
    year:  Contains information about year
    month:  Contains information about month 
    avg_temp: Contains information about average temperature in city for given month

### DIM_TIME_DF
    date:  Contains information about date
    dayofweek:  Contains information about day of the week
    weekofyear:  Contains information about week of year
    month:  Contains information about month
### Fact Table
### FACT_IMMIGRATION_DF
    id:  Contains information about id
    state_code:  Contains information about state code of arrival city
    city_code:  Contains information aboutcity port code of arrival city
    date:  Contains information about date of arrival
    count:  Contains information about count of immigrant's entries into the US

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.


Apache Spark used  because of ability to process large set of data along with apis to read data and its convenient dataframe manipulation functions

* Propose how often the data should be updated and why.

The immigration (i94) data set and relevant data can be updated montly as this is report can fetch mothly/seasonally. 

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x:
There are two different options I would try to use first one is try increase the nodes in the cluster to process large amount of data and second one we can use AWS EMR cluster.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day:
 Airflow we can use to schedule daily at 7am and can be kicked of by spark jobs
 
 * The database needed to be accessed by 100+ people:
  Move  the database to cloud and AWS redshift has autoscaling capabilities which can take care.