# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
import pandas as pd
from pyspark.sql import SparkSession

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, I use the provided datasets which are immigration to the United State and U.S. city demographics. 

The solution is ended up with analytics tables to analyze the relationship between immigration and city population. For example, which state has the highest immigration rate (immigration rate = immigration / population) in April, 2016 in US. 

In order to do this, I created a ETL pipeline to read those datasets into datafram using Spark. And then extract columns to create tables. At the end, I can run any analytics queries on the tables to get the information I want.

#### Describe and Gather Data 
I uses two datasets in this project.

I94 Immigration Data: This data comes from the US National Tourism and Trade Office, which includes immigration records.

U.S. City Demographic Data: This data comes from OpenSoft, which includes city population.

In [5]:
# output immigration data sample
immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = pd.read_sas(immigration_fname, 'sas7bdat', encoding="ISO-8859-1")
immigration_df.head()

In [None]:
# output demographics data sample
demographics_fname = 'us-cities-demographics.csv'
demographics_df = pd.read_csv(demographics_fname, delimiter=";")
demographics_df.head()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Remove duplicates for immigration data
immigration_df.loc[immigration_df['i94yr'] == 2016.0]
immigration_df.loc[immigration_df['i94mon'] == 4.0]
immigration_data = immigration_df[[\
    'cicid', 'i94yr', 'i94mon', 'i94addr', \
    'arrdate', 'depdate', 'biryear', 'gender']]\
    .drop_duplicates()
immigration_data.head()

In [None]:
print(len(immigration_data))

In [None]:
# Remove duplicates for city demographics data
demographics_data = demographics_df[[\
    'City', 'State', 'Median Age', \
    'Male Population', 'Female Population', \
    'Total Population','State Code']].drop_duplicates()
demographics_data.head()

In [None]:
print(len(demographics_data))

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
There are two tables for final analyses. The immigration table includes the immigration records and the city table includes city population. At the end, two tables can be joined using i94addr from immigration table and state code from city table.


#### 3.2 Mapping Out Data Pipelines
Step 1: Read and load data into staging datafram using Spark: immigration_df_spark and city_df_spark.

Step 2: Clean and transform staging datafram to analytics tables: immigration_table and city_table.

Step 3: Output tables into parquet files.



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# create spark session
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

In [None]:
# read immigration data
immigration_df_spark =spark.read\
    .format('com.github.saurfang.sas.spark')\
    .load(immigration_fname)

# extract columns to create immigration table
immigration_table = immigration_df_spark\
    .selectExpr(\
        "cicid as id", "i94yr as year", \
        "i94mon as month", \
        "i94addr as address", \
        "arrdate as arrive_date", \
        "depdate as depart_date", \
        "biryear as birth_year", "gender")\
    .dropDuplicates()

# write immigration table to parquet files
immigration_table\
    .write\
    .partitionBy("year", "month")\
    .parquet("sas_data", 'overwrite')

In [None]:
# read city data
city_df_spark = spark\
    .read\
    .format("csv")\
    .options(header='true', delimiter=';')\
    .load(demographics_fname)

# extract columns to create city table
city_table = city_df_spark\
    .selectExpr(\
        "City as city", \
        "State as state", \
        "`Median Age` as median_age", \
        "`Male Population` as male_population", \
        "`Female Population` as female_population", \
        "`Total Population` as total_population", \
        "`State Code` as state_code")\
    .dropDuplicates()

# write city table to parquet files
city_table\
    .write\
    .parquet("city_data", 'overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# read parquet data to perform quality checks
immigration_check = spark.read.parquet("sas_data")

city_check = spark.read.parquet("city_data")

In [None]:
# output sample data for immigration table
immigration_check.show(5)

In [None]:
# output sample data for city table
city_check.show(5)

In [None]:
# check duplicates
immigration_check.count() == len(immigration_data)

In [None]:
# check duplicates
city_check.count() == len(demographics_data)

In [None]:
# check unique key
immigration_check.select("id").distinct().count() == len(immigration_data)

In [None]:
# check unique key
city_check.select("city", "state").distinct().count() == len(demographics_data)

#### 4.3 Data dictionary 
immigration table:
* id - unique id for each record
* year - the year of data
* month - the month of data
* address - the state
* arrive_date - the arrive date
* depart_date - the departure date
* birth_year - the birth year which can be used to calculate age
* gender - the gender

city table:
* city - city name
* state - state name
* median_age - median age of population
* male_population - the number of male population
* female_population - the number of femail population
* total_population - the number of total population
* state_code - the code of state

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

        Spark can run analytics orders of magnitude faster than existing Hadoop deployments. This means more interactivity, faster experimentation, and increased productivity for analysts.


* Propose how often the data should be updated and why.

        Data should be updated in daily or monthly basis as the analytics job will be focus on the month or year dimension.


* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.

        Parameterize the ETL pipeline so that it can be run by Date (YYYY/MM).


 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

        Improve the ETL pipeline to load new data in a daily basis and schedule it to run at 12am every day with a date parameter which should be the current date.


 * The database needed to be accessed by 100+ people.

        Store data in separate tables using date parameter. For example, immigration data for 201604 will be stored in table immigration_201604. The goal is to reduce posibilities of running queries on one table which will be inefficient.