# Project Title
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

#### Import & Install required libraries

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
import configparser
import os

#### Read and Set AWS Config

In [2]:
config = configparser.ConfigParser()
config.read('aws.cfg')

['aws.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
s3_staging_area = config['S3']['STAGING_AREA']
s3_dwh = config['S3']['DWH']

#### Create Spark session

In [5]:
spark = SparkSession.builder\
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11") \
                    .enableHiveSupport()\
                    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope
We will create a fact table about US immigration from I94 immigration data and also make a dimension table about cities' population from US cities demographics data. The database will facilitate querying on immigration events and tracking the population.

Technologies:
+ Spark to process data
+ AWS S3 as storage
+ AWS EMR to create Spark cluster

#### Describe and Gather Data 
I94 immigration data [[link](https://travel.trade.gov/research/reports/i94/historical/2016.html)], noticable columns:
+ cicid
+ i94yr: event year
+ i94mon: event month
+ i94port: destination city
+ i94cit: origin country code of the immigrant
+ i94mode: transportation mode code of the immigrant
+ i94bir: age of the immigrant
+ i94visa: purpose code for immigration
+ visatype: visa type of the immigrant

US cities demographics data [[link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)], noticable columns:
+ City: city name
+ State Code
+ Total Population
+ Race
+ Count

City Code mapping (to map i94port to city):
+ city_code
+ city: city name
+ state_code

In [6]:
df = pd.read_csv('immigration_data_sample.csv')
df[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94cit', 'i94mode', 'i94bir', 'i94visa', 'visatype']].head()

Unnamed: 0,cicid,i94yr,i94mon,i94port,i94cit,i94mode,i94bir,i94visa,visatype
0,4084316.0,2016.0,4.0,HHW,209.0,1.0,61.0,2.0,WT
1,4422636.0,2016.0,4.0,MCA,582.0,1.0,26.0,2.0,B2
2,1195600.0,2016.0,4.0,OGG,148.0,1.0,76.0,2.0,WT
3,5291768.0,2016.0,4.0,LOS,297.0,1.0,25.0,2.0,B2
4,985523.0,2016.0,4.0,CHM,111.0,3.0,19.0,2.0,WT


In [7]:
df = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df[['City', 'State Code', 'Total Population', 'Race', 'Count']].head()

Unnamed: 0,City,State Code,Total Population,Race,Count
0,Silver Spring,MD,82463,Hispanic or Latino,25924
1,Quincy,MA,93629,White,58723
2,Hoover,AL,84839,Asian,4759
3,Rancho Cucamonga,CA,175232,Black or African-American,24437
4,Newark,NJ,281913,White,76402


In [8]:
df = pd.read_csv('city_code_mapping.csv')
df.head()

Unnamed: 0,city_code,city,state_code
0,ANC,Anchorage,AK
1,BAR,Baker Aaf - Baker Island,AK
2,DAC,Daltons Cache,AK
3,PIZ,Dew Station Pt Lay Dew,AK
4,DTH,Dutch Harbor,AK


#### Gather data to staging area on S3 Bucket

* Gather I94 immigration data

In [9]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('./i94-immigration-data/i94_apr16_sub.sas7bdat')
df_spark.write.parquet(os.path.join(s3_staging_area, "sas_data"), mode='overwrite')

* Gather US cities demographics data

In [10]:
df_spark = spark.read.format('csv').option('header', 'true').option('delimiter', ';').load('us-cities-demographics.csv')
new_column_name_list = list(map(lambda x: x.lower().replace(" ", "_"), df_spark.columns))
df_spark = df_spark.toDF(*new_column_name_list)
df_spark.write.parquet(os.path.join(s3_staging_area, "us_cities_demographics"), mode='overwrite')

* Gather city code mapping data

In [11]:
df_spark = spark.read.format('csv').option('header', 'true').load('city_code_mapping.csv')
df_spark.write.parquet(os.path.join(s3_staging_area, "city_code_mapping"), mode='overwrite')

### Step 2: Explore and Assess the Data
#### Explore the Data 
* Some rows in I94 immigration data don't have a valid i94port value

#### Cleaning Steps
* Remove rows in I94 immigration data which don't have a valid i94port value

In [12]:
df_mapping = pd.read_csv("city_code_mapping.csv")
valid_list = df_mapping['city_code'].tolist()

In [13]:
def clean_i94_immigration_data(df):
    """
    Cleaning I94 Immigration table
    
    Parameter:
    df: a Spark Dataframe
    
    Return:
    a cleaned Spark Dataframe
    """
    
    return df.filter(df.i94port.isin(valid_list))

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Immigration Event Fact table:
+ id
+ year: event year
+ month: event month
+ city: destination city code
+ origin: origin country code of the immigrant
+ transportation: transportation mode code of the immigrant
+ birth_year: age of the immigrant
+ purpose: purpose code for immigration
+ visatype: visa type of the immigrant

City Dimension table:
+ city_id
+ city_name
+ state
+ population
+ race
+ count

#### 3.2 Mapping Out Data Pipelines
1. Read data from S3 Bucket
2. Clean I94 immigration data
3. Create Immigration Event Fact table and Write the result back to S3 Bucket
4. Create City Dimension table and Write the result back to S3 Bucket

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [14]:
immigration_events_table = spark.read.parquet(os.path.join(s3_staging_area, "sas_data"))
immigration_events_table = clean_i94_immigration_data(immigration_events_table)
immigration_events_table = immigration_events_table.selectExpr(
    "cicid AS id",
    'i94yr AS year',
    'i94mon AS month',
    'i94port AS city',
    'i94cit AS origin',
    'i94mode AS transportation',
    'i94bir AS birth_year',
    'i94visa AS purpose',
    'visatype'
)

immigration_events_table.write.mode('append').parquet(os.path.join(s3_dwh, "immigration_events_table"))

In [15]:
city_code_mapping_table = spark.read.parquet(os.path.join(s3_staging_area, "city_code_mapping"))
cities_table = spark.read.parquet(os.path.join(s3_staging_area, "us_cities_demographics"))
cities_table = cities_table.join(
    city_code_mapping_table, 
    ['city', 'state_code']
)
cities_table.selectExpr(
    'city_code AS city_id',
    'city AS city_name',
    'state_code AS state',
    'total_population AS population',
    'race',
    'count'
)

cities_table.write.mode('overwrite').parquet(os.path.join(s3_dwh, "city_table"))

#### 4.2 Data Quality Checks
Check if the ETL is succesfully processed

In [16]:
def data_quality_check(df):
    """
    Run quality check on the input dataframe
    
    Paramater:
    df: A Spark Dataframe
    """
    
    if df.count() == 0:
        print("Data quality check failed!")
    else:
        print("Data quality check succeed")

In [17]:
data_quality_check(immigration_events_table)

Data quality check succeed


In [18]:
data_quality_check(cities_table)

Data quality check succeed


#### 4.3 Data dictionary 
Immigration Event Fact table created from I94 immigration data:
+ id: id of the event
+ year: event year
+ month: event month
+ city: destination city code
+ origin: origin country code of the immigrant
+ transportation: transportation mode code of the immigrant
+ birth_year: age of the immigrant
+ purpose: purpose code for immigration
+ visatype: visa type of the immigrant

City Dimension table created from US cities demographics data and city code mapping table:
+ city_id: id of the city
+ city_name: name of the city
+ state: state code of the city
+ population: total population of the city
+ race: human race
+ count: the number of people from the race

#### Step 5: Complete Project Write Up
* Technologies:
    - Spark to process data
    - AWS S3 as storage
    - AWS EMR for Spark cluster
* The datawarehouse should be updated daily to keep track of immigration events
* Future works:
    - If the data was increased by 100x.: we should add more workers to the Spark cluster to efficiently process the data
    - If the data populates a dashboard that must be updated on a daily basis by 7am every day: we should use a scheduler to schedule to run the ETL at night to meet the SLA
    - If the database needed to be accessed by 100+ people: we will set the permission for them to access our S3 storage for datawarehouse