# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import os
import re
import glob
import pandas as pd
from pprint import pprint
from datetime import datetime, timedelta

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType

In [2]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [3]:
# Create Spark session
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 

- This project deals with data absorbed from three differenct resources.
- Fact and dimension tables are consructed to save the extracted data and peform the analysis.
- The analysis is related to immigration to the US interms of monthly average temperature in each city, city demographics and seasonality of immigrants.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

- The gathered data comes from 3 different places:
1. __I94 Immigration Data__: This data comes from the US National Tourism and Trade Office. This data exists already here.
2. __World Temperature Data__: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data).
3. __U.S. City Demographic Data__: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). 

#### Immigration dataset

In [4]:
# Read data
i94_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
i94_file = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
i94_immig_df = spark.read.format("com.github.saurfang.sas.spark").load(i94_file)

#### Temperature dataset

In [5]:
# Read data
temperature_file = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_file)

#### Demographics dataset

In [6]:
# Read demographics data
demog_file = "us-cities-demographics.csv"
demog_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demog_file)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Immigration dataset (exploring)

In [10]:
#Read data 
i94_immig_df.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


In [11]:
# Create a list of valid ports
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))

659


In [13]:
# Create a list of valid states
valid_states = demog_df.toPandas()["State Code"].unique()

In [14]:
print(valid_states)

['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [15]:
@udf(StringType())
def convert_datetime(x):
    """
    To convert SAS date to PySpark date 
    """
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [16]:
@udf(StringType())
def validate_state(x): 
    """
    To validate state
    """
    if x in valid_states:
        return x
    return 'other'

### Immigration dataset (cleaning)

In [17]:
# Remove missing values
cleaned_i94_df = i94_immig_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])

# Extract valid states 
cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))

# Convert arrival_date (SAS format) to PySpark format
cleaned_i94_df = cleaned_i94_df.withColumn("arrdate", convert_datetime(cleaned_i94_df.arrdate))

# Keeping on 'US' related immigration data
cleaned_i94_df = cleaned_i94_df.filter(cleaned_i94_df.i94addr != 'other')

staging_i94_df = cleaned_i94_df.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

staging_i94_df.limit(10).toPandas()

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,168.0,2016-04-01,WAS,DC,34.0,M,2.0,1.0
1,383.0,2016-04-01,MIA,FL,40.0,M,2.0,1.0
2,608.0,2016-04-01,TOR,TX,45.0,M,1.0,1.0
3,930.0,2016-04-01,NEW,NY,49.0,F,2.0,1.0
4,1229.0,2016-04-01,NYC,CT,32.0,M,1.0,1.0


#### Temperature dataset (exploring)

In [18]:
# Read data
temperature_df.limit(10).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.787999999999999,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.050999999999998,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [19]:
@udf(StringType())
def city_to_port(city):
    """
    To map city full name to city port abbreviation
    """
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

#### Temperature dataset (cleaning)

In [20]:
# Use only temperatures from 'United States'
# Map full name to city port
# Remove invalid ports
cleaned_temp_df = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

# Use only temperatures from 2013 (the latest year in the dataset)
cleaned_temp_df = cleaned_temp_df.filter(cleaned_temp_df["year"] == 2013)

staging_temp_df = cleaned_temp_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

staging_temp_df.limit(10).toPandas()

Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,4,COL,16.9,32.95N,85.21W
1,2013,1,DAB,0.5,39.38N,83.24W
2,2013,1,ONT,6.8,34.56N,116.76W
3,2013,2,POM,5.8,45.81N,123.46W
4,2013,5,PRO,14.3,42.59N,72.00W
5,2013,2,TUL,5.0,36.17N,95.47W
6,2013,1,BAL,1.8,39.38N,76.99W
7,2013,9,HSV,23.1,34.56N,85.62W
8,2013,9,RNO,19.200001,39.38N,120.69W
9,2013,3,SLC,6.0,40.99N,112.90W


#### Demographics dataset (exploring)

In [21]:
# Read data
demog_df.limit(10).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


#### Demographics dataset (cleaning)

In [24]:
# Calculate percentages of numeric columns and create new ones
cleaned_demo_df = demog_df.withColumn("median_age", demog_df['Median Age']) \
    .withColumn("pct_male_pop", (demog_df['Male Population'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demog_df['Female Population'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demog_df['Number of Veterans'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demog_df['Foreign-born'] / demog_df['Total Population']) * 100) \
    .withColumn("pct_race", (demog_df['Count'] / demog_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(demog_df["City"])) \
    .dropna(how='any', subset=["city_code"])

cleaned_demo_df = cleaned_demo_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

In [25]:
# Pivot the race column
pivot_demo_df = cleaned_demo_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demo_df = pivot_demo_df.withColumn("city_code", city_to_port(pivot_demo_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

staging_demo_df = pivot_demo_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")

In [26]:
# Read data
staging_demo_df.limit(10).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop
0,TUC,AZ,Tucson,33.6,49.8,50.2,7.2,7.2,4.6,4.6,6.4,43.5,76.1,531674
1,MCA,TX,Allen,37.2,52.3,47.7,3.6,3.6,0.2,16.1,13.4,10.8,71.2,98138
2,CRP,TX,Corpus Christi,35.0,49.5,50.5,7.7,7.7,0.9,2.8,4.6,61.9,90.3,324082
3,FMY,FL,Fort Myers,37.3,49.8,50.2,5.8,5.8,,4.8,23.4,24.1,67.8,74015
4,ORL,FL,Orlando,33.1,48.3,51.7,4.7,4.7,0.9,4.1,25.1,33.0,66.1,270917
5,LOS,CA,Los Angeles,35.0,49.3,50.7,2.2,2.2,1.6,12.9,10.2,48.8,54.8,3971896
6,PRO,RI,Providence,29.9,49.7,50.3,2.8,2.8,2.3,7.5,17.1,43.5,54.6,179204
7,CID,IA,Cedar Rapids,36.2,48.4,51.6,6.0,6.0,1.0,4.1,9.1,4.1,89.6,130405
8,SPI,IL,Springfield,38.8,47.2,52.8,6.4,6.4,1.4,3.3,21.5,2.3,77.2,117809
9,POM,OR,Portland,36.7,49.6,50.4,4.7,4.7,2.4,10.2,7.3,9.7,82.9,632187


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

- We chose star schema for data modeling for simplicity by constructing staging, fact ,and dimension tables.

### Staging tables
1. __staging_i94_df__ >>>
('id', 'date', 'city_code', 'state_code', 'age', 'gender', 'visa_type', 'count')
2. __staging_temp_df__ >>>
('year', 'month', 'city_code', 'city_name', 'avg_temperature', 'lat', 'long')
3. __staging_demo_df__ >>>
('city_code', 'state_code', 'city_name', 'median_age', 'pct_male_pop', 'pct_female_pop', 'pct_veterans', 'pct_foreign_born', 'pct_native_american',  'pct_asian', 'pct_black', 'pct_hispanic_or_latino', 'pct_white', 'total_pop')

### Fact table
1. __immigration_df__ >>>
('id', 'state_code', 'city_code', 'date', 'count')

### Dimension tables
1. __immigrant_df__ >>>
('id', 'gender', 'age', 'visa_type')
2. __city_df__ >>>
('city_code', 'state_code', 'city_name', 'median_age', 'pct_male_pop', 'pct_female_pop', 'pct_veterans', 'pct_foreign_born', 'pct_native_american', 'pct_asian', 'pct_black', 'pct_hispanic_or_latino', 'pct_white', 'total_pop', 'lat', 'long')
3. __monthly_city_temp_df__ >>>
('city_code', 'year', 'month', 'avg_temperature')
4. __time_df__ >>>
('date', 'dayofweek', 'weekofyear', 'month')

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

1. Clean data (nulls, data types, duplicates, etc)
2. Load staging tables.
3. Create dimension tables.
4. Create fact table immigration_df with information on immigration count, mapping id in immigrant_df, city_code in city_df and monthly_city_temp_df and date in time_df ensuring data integrity.
6. Save processed tables in parquet for downstream query.

In [27]:
# Create immigrant dimension table
immigrant_df = staging_i94_df.select("id", "gender", "age", "visa_type")
immigrant_df = immigrant_df.drop_duplicates()

In [28]:
# Create city dimension table
city_df = staging_demo_df.join(staging_temp_df, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long")
city_df = city_df.drop_duplicates()

In [29]:
# Create city temperature dimension table
monthly_city_temp_df = staging_temp_df.select("city_code", "year", "month", "avg_temperature")
monthly_city_temp_df = monthly_city_temp_df.drop_duplicates()

In [30]:
# Create dimension table for time
time_df = staging_i94_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))

In [31]:
# Drop duplicate
time_df = time_df.select("date", "dayofweek", "weekofyear", "month")
time_df = time_df.drop_duplicates()

In [32]:
# Create immigration fact table
immigration_df = staging_i94_df.select("id", "state_code", "city_code", "date", "count")
immigration_df = immigration_df.drop_duplicates()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [33]:
# Perform quality checks here
def check_table_existing(dataframe):
    """
    To check if the dataframe exists or not
    """
    if dataframe:
        return True
    else:
        return False
        
if check_table_existing(immigrant_df) & check_table_existing(city_df) & check_table_existing(monthly_city_temp_df) &\
    check_table_existing(time_df) & check_table_existing(immigration_df):
    print("...checking data quality is good (all tables exist)...")
else:
    print("...checking data quality has issues (some tables may not exist)...")

...checking data quality is good...


In [None]:
# Perform quality checks here
def check_table_records(dataframe):
    """
    To check if the dataframe is empty or not
    """
    return dataframe.count() != 0 

if check_table_records(immigrant_df) & check_table_records(city_df) & check_table_records(monthly_city_temp_df) &\
   check_table_records(time_df) & check_table_records(immigration_df):
    print("...checking data quality is good (all tables contains records)...")
else:
    print("...checking data quality has issues (some tables contain no records)...")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Fact Table
1. __immigration_df__ 

    - id: id
    - state_code: state code of arrival city
    - city_code: city port code of arrival city
    - date: date of arrival
    - count: count of immigrant's entries into the US

#### Dimension Tables
1. __immigrant_df__
    - id: id of immigrant
    - gender: gender of immigrant
    - age: age of immigrant
    - visa_type: immigrant's visa type
2. __city_df__
    - city_code: city port code
    - state_code: state code of the city
    - city_name: name of the city
    - median_age: median age of the city
    - pct_male_pop: city's male population in percentage
    - pct_female_pop: city's female population in percentage
    - pct_veterans: city's veteran population in percentage
    - pct_foreign_born: city's foreign born population in percentage
    - pct_native_american: city's native american population in percentage
    - pct_asian: city's asian population in percentage
    - pct_black: city's black population in percentage
    - pct_hispanic_or_latino: city's hispanic or latino population in percentage
    - pct_white: city's white population in percentage
    - total_pop: city's total population
    - lat: latitude of the city
    - long: longitude of the city
3. __monthly_city_temp_df__
    - city_code: city port code
    - year: year
    - month: month 
    - avg_temperature: average temperature in city for given month
4. __time_df__
    - date: date
    - dayofweek: day of the week
    - weekofyear: week of year
    - month: month

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

    We chose Spark in this project because its capabilities in reading and processing massive amounts of data and its integration with a large number of other services like AWS. We also used Pandas to display dataset in a readable way.


* Propose how often the data should be updated and why.

    The data can be updated each month to capture a noticable change in data or upon the insertion of new elements to capture the change instantaneously.

* Write a description of how you would approach the problem differently under the following scenarios:
 * __The data was increased by 100x__: We can move to cloud (AWS) and use powerful EC2 instances.
 * __The data populates a dashboard that must be updated on a daily basis by 7am every day__: We can use Ailflow for sceduling and monotoring tasks through a pipeline.
 * __The database needed to be accessed by 100+ people__: We can move to cloud (AWS) to manage all the users.