# City Immigration, Demographics and Temperatures¶
### Data Engineering Capstone Project

#### Project Summary
In the Udacity provided project, you'll work with four datasets to complete the project. The main dataset will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data. You're also welcome to enrich the project with additional data if you'd like to set your project apart.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob
import re
from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

### Step 1: Scope the Project and Gather Data

#### Scope 
The following datasets are included in the project workspace. We purposely did not include a lot of detail about the data and instead point you to the sources. This is to help you get experience doing a self-guided project and researching the data yourself. If something about the data is unclear, make an assumption, document it, and move on. Feel free to enrich your project by gathering and including additional data sources.

#### Describe and Gather Data 
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- World Temperature Data: This dataset came from Kaggle.
- U.S. City Demographic Data: This data comes from OpenSoft.
- Airport Code Table: This is a simple table of airport codes and corresponding cities.

In [None]:
# Read in the data here
#df1 = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
#immigration_data = pd.read_sas(df1, 'sas7bdat', encoding="ISO-8859-1")

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [3]:
#write to parquet
df_spark.write.parquet("data")
df_spark=spark.read.parquet("data")

In [8]:
df_i94 = df_spark

In [5]:
# Read temperature data
temperature = "../../data2/GlobalLandTemperaturesByCity.csv"
df_temperature = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature)

In [7]:
# Read demographics data
demo = "us-cities-demographics.csv"
df_demo = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demo)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [9]:
# Performing cleaning tasks here

df_i94.count()

3096313

In [10]:
df_i94.limit(20).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,...,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,...,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


In [13]:
# Create list of valid ports
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))

659


In [15]:
# Create list of valid states
valid_states = df_demo.toPandas()["State Code"].unique()
print(len(valid_states))
print(valid_states)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [17]:
# Create udf to validate state
@udf(StringType())
def validate_state(x):  
    if x in valid_states:
        return x
    return 'other'

In [19]:
# Create udf to convert SAS date to PySpark date 
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [29]:
# Clean immigration data

# Remove any missing values
clean_df_i94 = df_i94.dropna(how="any", subset=["i94port", "i94addr", "gender"])

# Extract valid states 
clean_df_i94 = clean_df_i94.withColumn("i94addr", validate_state(clean_df_i94.i94addr))

# Convert arrival_date (SAS format) to PySpark format
clean_df_i94 = clean_df_i94.withColumn("arrdate", convert_datetime(clean_df_i94.arrdate))

# only keep us related immigration data
clean_df_i94 = clean_df_i94.filter(clean_df_i94.i94addr != 'other')

staging_df_i94 = clean_df_i94.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

staging_df_i94.limit(5).toPandas()

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,5925997.0,2016-04-09,BUF,NY,,U,2.0,1.0
1,5928645.0,2016-04-25,HHW,HI,,U,2.0,1.0
2,158100.0,2016-04-01,MIA,FL,0.0,F,2.0,1.0
3,1868564.0,2016-04-10,NAS,FL,0.0,M,2.0,1.0
4,2686612.0,2016-04-15,MIA,FL,0.0,F,2.0,1.0


In [30]:
# Create udf to map city full name to city port abbreviation

@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [31]:
# Clean temperature data

# Remove invalid ports
clean_df_temp = df_temperature.filter(df_temperature["Country"] == "United States") \
    .withColumn("year", year(df_temperature['dt'])) \
    .withColumn("month", month(df_temperature["dt"])) \
    .withColumn("i94port", city_to_port(df_temperature["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

# Only use temperatures from 2013 (the latest year in the dataset)
clean_df_temp = clean_df_temp.filter(clean_df_temp["year"] == 2013)

staging_df_temp = clean_df_temp.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

print(staging_df_temp.count())
staging_df_temp.limit(5).toPandas()

1044


Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,4,COL,16.9,32.95N,85.21W
1,2013,1,DAB,0.5,39.38N,83.24W
2,2013,1,ONT,6.8,34.56N,116.76W
3,2013,2,POM,5.8,45.81N,123.46W
4,2013,5,PRO,14.3,42.59N,72.00W


In [32]:
# Clean demographics data

# Calculate percentages of numeric columns and create new ones
clean_df_demo = df_demo.withColumn("median_age", df_demo['Median Age']) \
    .withColumn("pct_male_pop", (df_demo['Male Population'] / df_demo['Total Population']) * 100) \
    .withColumn("pct_female_pop", (df_demo['Female Population'] / df_demo['Total Population']) * 100) \
    .withColumn("pct_veterans", (df_demo['Number of Veterans'] / df_demo['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (df_demo['Foreign-born'] / df_demo['Total Population']) * 100) \
    .withColumn("pct_race", (df_demo['Count'] / df_demo['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(df_demo["City"])) \
    .dropna(how='any', subset=["city_code"])

clean_df_demo = clean_df_demo.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()



In [33]:
# Pivot the race column
pivot_df_demo = clean_df_demo.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_df_demo = pivot_df_demo.withColumn("city_code", city_to_port(pivot_df_demo["city_name"])) \
    .dropna(how='any', subset=["city_code"])

staging_df_demo = pivot_df_demo.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")
print(staging_df_demo.count())
staging_df_demo.limit(10).toPandas()

180


Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop
0,TUC,AZ,Tucson,33.6,49.8,50.2,7.2,7.2,4.6,4.6,6.4,43.5,76.1,531674
1,MCA,TX,Allen,37.2,52.3,47.7,3.6,3.6,0.2,16.1,13.4,10.8,71.2,98138
2,CRP,TX,Corpus Christi,35.0,49.5,50.5,7.7,7.7,0.9,2.8,4.6,61.9,90.3,324082
3,FMY,FL,Fort Myers,37.3,49.8,50.2,5.8,5.8,,4.8,23.4,24.1,67.8,74015
4,ORL,FL,Orlando,33.1,48.3,51.7,4.7,4.7,0.9,4.1,25.1,33.0,66.1,270917
5,LOS,CA,Los Angeles,35.0,49.3,50.7,2.2,2.2,1.6,12.9,10.2,48.8,54.8,3971896
6,PRO,RI,Providence,29.9,49.7,50.3,2.8,2.8,2.3,7.5,17.1,43.5,54.6,179204
7,CID,IA,Cedar Rapids,36.2,48.4,51.6,6.0,6.0,1.0,4.1,9.1,4.1,89.6,130405
8,SPI,IL,Springfield,38.8,47.2,52.8,6.4,6.4,1.4,3.3,21.5,2.3,77.2,117809
9,POM,OR,Portland,36.7,49.6,50.4,4.7,4.7,2.4,10.2,7.3,9.7,82.9,632187


### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model
The star schema is chosen as the data model because it is simple and yet effective. users can write simple queries by joing fact and dimension tables to analyze the data.

Here are the tables of the schema:

#### Staging Tables

- staging_df_i94

    - id
    - date
    - city_code
    - state_code
    - age
    - gender
    - visa_type
    - count

- staging_df_temp

    - year
    - month
    - city_code
    - avg_temperature
    - lat
    - long

- staging_df_demo
    - city_code
    - state_code
    - city_name
    - median_age
    - pct_male_pop
    - pct_female_pop
    - pct_veterans
    - pct_foreign_born
    - pct_native_american
    - pct_asian
    - pct_black
    - pct_hispanic_or_latino
    - pct_white
    - total_pop
    
#### Dimension Tables

- df_immigrant
    - id
    - gender
    - age
    - visa_type

- df_city
    - city_code
    - state_code
    - city_name
    - median_age
    - pct_male_pop
    - pct_female_pop
    - pct_veterans
    - pct_foreign_born
    - pct_native_american
    - pct_asian
    - pct_black
    - pct_hispanic_or_latino
    - pct_white
    - total_pop
    - lat
    - long

- df_monthly_city_temp
    - city_code
    - year
    - month
    - avg_temperature

- df_time
    - date
    - dayofweek
    - weekofyear
    - month
    
#### Fact Table

- df_immigration
    - id
    - state_code
    - city_code
    - date
    - count
    
#### 3.2 Mapping Out Data Pipelines
Steps necessary to pipeline the data into the chosen data model

- Clean the data on nulls, data types, duplicates, etc
Load staging tables for staging_df_i94, staging_df_temp and staging_df_demo

- Create dimension tables for df_immigrant, df_city, df_monthly_city_temp and df_time

- Create fact table df_immigration with information on immigration count, mapping id in df_immigrant, city_code in df_city and df_monthly_city_temp and date in df_time ensuring referential integrity

- Save processed dimension and fact tables in parquet for downstream query

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [34]:
# Create dimension table for immigrant

df_immigrant = staging_df_i94.select("id", "gender", "age", "visa_type").drop_duplicates()

df_immigrant.limit(20).toPandas()

In [36]:
# Create dimension table for city

df_city = staging_df_demo.join(staging_df_temp, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

df_city.limit(20).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop,lat,long
0,BRO,TX,Brownsville,30.6,47.7,52.3,2.3,2.3,0.6,0.9,0.7,92.5,95.0,183888,26.52N,96.72W
1,HSV,WI,Madison,30.7,49.2,50.8,3.9,3.9,0.9,9.6,8.2,7.9,82.1,248956,34.56N,85.62W
2,ATL,GA,Atlanta,33.8,48.3,51.7,4.0,4.0,1.0,5.2,52.9,4.0,42.3,463875,34.56N,83.68W
3,NEW,NJ,Newark,34.6,49.0,51.0,2.1,2.1,0.8,2.6,51.4,35.6,27.1,281913,40.99N,74.56W
4,NWH,CT,New Haven,29.9,48.9,51.1,2.0,2.0,1.7,6.1,33.3,33.4,43.2,130310,40.99N,72.43W
5,BFL,CA,Bakersfield,30.6,48.8,51.2,3.3,3.3,2.0,8.5,9.2,48.2,73.0,373627,36.17N,119.34W
6,SLS,CA,Salinas,30.4,49.4,50.6,2.6,2.6,1.8,7.1,2.2,77.0,60.5,157386,36.17N,121.33W
7,SDP,CA,San Diego,34.5,49.7,50.3,6.6,6.6,1.2,19.2,8.0,30.5,68.1,1394907,32.95N,117.77W
8,DET,MI,Detroit,34.8,47.2,52.8,4.4,4.4,0.9,1.6,80.6,8.0,15.4,677124,42.59N,82.91W
9,VAN,WA,Vancouver,37.2,48.0,52.0,7.2,7.2,2.6,6.3,4.4,13.4,86.7,172853,45.81N,123.46W


In [37]:
# Create dimension table for monthly city temperature

df_monthly_city_temp = staging_df_temp.select("city_code", "year", "month", "avg_temperature").drop_duplicates()

df_monthly_city_temp.limit(20).toPandas()

Unnamed: 0,city_code,year,month,avg_temperature
0,SAA,2013,6,18.6
1,PHI,2013,5,16.6
2,BOS,2013,5,14.3
3,BUR,2013,3,14.5
4,RNO,2013,2,4.7


In [38]:
# Create dimension table for time

df_time = staging_df_i94.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
df_time = df_time.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

df_time.limit(20).toPandas()

Unnamed: 0,date,dayofweek,weekofyear,month
0,2016-04-23,7,16,4
1,2016-04-22,6,16,4
2,2016-04-08,6,14,4
3,2016-04-09,7,14,4
4,2016-04-26,3,17,4


In [39]:

# Create fact table for immigration

df_immigration = staging_df_i94.select("id", "state_code", "city_code", "date", "count").drop_duplicates()

df_immigration.limit(20).toPandas()

Unnamed: 0,id,state_code,city_code,date,count
0,4750990.0,HI,HHW,2016-04-25,1.0
1,4308064.0,HI,HHW,2016-04-23,1.0
2,4232231.0,FL,ATL,2016-04-22,1.0
3,56083.0,HI,HHW,2016-04-01,1.0
4,3488147.0,NY,NYC,2016-04-19,1.0
5,2883445.0,FL,MIA,2016-04-16,1.0
6,2472975.0,NJ,NYC,2016-04-13,1.0
7,4028045.0,FL,MIA,2016-04-21,1.0
8,5650094.0,FL,MIA,2016-04-29,1.0
9,3648943.0,FL,FTL,2016-04-19,1.0


In [None]:
# Write dimension tables to parquet
df_immigrant.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
df_city.write.mode("overwrite").partitionBy("state_code").parquet("cities")
df_monthly_city_temp.write.mode("overwrite").parquet("monthly_city_temperatues")
df_time.write.mode("overwrite").parquet("time")

# Write fact table to parquet
df_immigration.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if table_exists(df_immigrant) & table_exists(df_city) & table_exists(df_monthly_city_temp) & table_exists(df_time) & table_exists(df_immigration):
    print("data quality check passed")
    print("dimension tables and fact table exist")
    print()
else:
    print("data quality check failed")
    print("table missing...")

In [None]:
def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(df_immigrant) & table_not_empty(df_city) & table_not_empty(df_monthly_city_temp) & table_not_empty(df_time) & table_not_empty(df_immigration):
    print("data quality check passed!")
    print("dimension tables and fact table contain records")
    print()
else:
    print("data quality check failed!")
    print("null records...")

#### 4.3 Data dictionary 

### Dimension Tables

- df_immigrant
    - id: id of immigrant
    - gender: gender of immigrant
    - age: age of immigrant
    - visa_type: immigrant's visa type

- df_city
    - city_code: city port code
    - state_code: state code of the city
    - city_name: name of the city
    - median_age: median age of the city
    - pct_male_pop: city's male population in percentage
    - pct_female_pop: city's female population in percentage
    - pct_veterans: city's veteran population in percentage
    - pct_foreign_born: city's foreign born population in percentage
    - pct_native_american: city's native american population in percentage
    - pct_asian: city's asian population in percentage
    - pct_black: city's black population in percentage
    - pct_hispanic_or_latino: city's hispanic or latino population in percentage
    - pct_white: city's white population in percentage
    - total_pop: city's total population
    - lat: latitude of the city
    - long: longitude of the city

- df_monthly_city_temp
    - city_code: city port code
    - year: year
    - month: month 
    - avg_temperature: average temperature in city for given month

- df_time
    - date: date
    - dayofweek: day of the week
    - weekofyear: week of year
    - month: month


### Fact Table

- df_immigration
    - id: id
    - state_code: state code of arrival city
    - city_code: city port code of arrival city
    - date: date of arrival
    - count: count of immigrant's entries into the US

#### Step 5: Complete Project Write Up

Spark is chosen for this project as it is known for processing a large amount of data quickly, with the ability to process different data formats (Parquet, CSV, among others) and integrate very well with cloud storage.

The data update cycle is typically chosen based on two criteria. One is the reporting cycle, the other is the availability of new data.

- If the data was increased by 100x: We can consider spinning up larger instances of EC2s hosting Spark and/or additional Spark work nodes. With added capacity arising from either vertical scaling or horizontal scaling, we should be able to accelerate processing time.

- If the data populates a dashboard that must be updated on a daily basis by 7am every day: We can consider using Airflow to schedule and automate the data pipeline jobs. Built-in retry and monitoring mechanism can enable us to meet user requirement.

- If the database needed to be accessed by 100+ people: We can consider hosting our solution in production scale data warehouse in the cloud, with larger capacity to serve more users, and workload management to ensure equitable usage of resources across users.