# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project builds a data warehouse for data from different sources in order to create an analytics database for querying information regarding the immigration census of US. The endpoint of the pipeline can be then used as the backend database for a webapp

#### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 0: Import packages and Initiate the Spark Session

In [14]:
# Do all imports and installs here
import pandas as pd
import pyspark
from pyspark.sql import SparkSession


spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12").\
enableHiveSupport().getOrCreate()

---
### Step 1: Scope the Project and Gather Data

<!-- #### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?  -->

### 1.1 Immigration Data

#### Scope:
* **Dataset**: [I94 Immigration data](https://travel.trade.gov/research/reports/i94/historical/2016.html)
* **Tools**: Pandas
* **Description**: This step will integrate I94 immigration data, world temperature data and US demographic data to setup a data warehouse with fact and dimension tables.

#### Description:
* **Format**: SAS
* **Description**: Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).

In [15]:
prod_env = True

In [16]:
if not prod_env:
    df_immigration = pd.read_csv("immigration_data_sample.csv")
else:
    df_immigration = spark.read.parquet("sas_data")
df_immigration.head(5)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

In [17]:
df_immigration.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [18]:
df_immigration.count()

3096313

### 1.2 Temperature Data


#### Description:
* **Format**: CSV 
* **Content**: This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide.

In [19]:
fname = '../../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)
df_temp.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### 1.3 Demographic Data

#### Description:
* **Format**: CSV 
* **Content**: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.

In [20]:
df_demog = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_demog.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

1. Use pandas for exploratory data analysis to get an overview on these data sets
2. Split data sets to dimensional tables and change column names for better understanding

#### Prepare

Build a function to convert the integer dates to date format

In [21]:
def SAS_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

### 2.2 Immigration Data

#### Read description File

In [22]:
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

In [23]:
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business','2': 'Pleasure','3' : 'Student'}

#### Split Fact and Dimension Table

In [None]:
fact_immigration = df_immigration[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']].toPandas()
fact_immigration.columns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode', 'visa']
fact_immigration['country'] = 'United States'
fact_immigration['arrive_date'] = SAS_to_datetime(fact_immigration['arrive_date'])
fact_immigration['departure_date'] = SAS_to_datetime(fact_immigration['departure_date'])
fact_immigration.head(5)

I94 Personal Data

In [None]:
dim_immi_personal = df_immigration[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']].toPandas()
dim_immi_personal.columns = [['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']]
dim_immi_personal.head(5)

I94 Airline Data

In [None]:
dim_immi_airline = df_immigration[['cicid', 'airline', 'admnum', 'fltno', 'visatype']].toPandas()
dim_immi_airline.columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
dim_immi_airline.head(5)

### 2.3 Temperature Data

#### Add filter to get only USA temp. data and drop the Lat and Long

In [None]:
df_temp_usa = df_temp[df_temp['Country'] == 'United States'].toPandas()
df_temp_usa = df_temp_usa[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_temp_usa.columns = ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country']
df_temp_usa.head(5)

#### Split the data into multipel columns for matching purposes

In [None]:
df_temp_usa['dt'] = pd.to_datetime(df_temp_usa['dt'])
df_temp_usa['year'] = df_temp_usa['dt'].apply(lambda t: t.year)
df_temp_usa['month'] = df_temp_usa['dt'].apply(lambda t: t.month)
df_temp_usa.head()

### 2.4 Demographical Data

#### Split the population and city statistics data

In [None]:
dim_city_population = df_demog[['City', 'State', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race']]
dim_city_population.columns = ['city', 'state', 'male_pop', 'female_pop', 'num_vetarans', 'foreign_born', 'race']
dim_city_population.head(5)

In [None]:
dim_city_statistics = df_demog[['City', 'State', 'Median Age', 'Average Household Size']]
dim_city_statistics.columns = ['city', 'state', 'median_age', 'avg_household_size']
dim_city_statistics.head(5)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
* **Goal**: Map out the conceptual data model and explain why you chose that model.
* **Scope**: Since the purpose of this data warehouse is for OLAP and BI app usage, we will model these data sets with star schema data modeling.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model.  
Since the budget on my AWS account has been spent, I'll make assumptions for this step

1. Assume all data sets are stored in S3 bucket as below
    * [[Source_S3_Bucket]/immigration/18-83510-I94-Data-2016/*.sas7bdat
    * [Source_S3_Bucket]/I94_SAS_Labels_Descriptions.SAS
    * [Source_S3_Bucket]/temperature/GlobalLandTemperaturesByCity.csv
    * [Source_S3_Bucket]/demography/us-cities-demographics.csv

2. Re-run step 2 to clean up
3. Split the dataset into dimensional and fact tables
4. Parse descriptive labels to the dataset
5. Store the dataframes back to S3 buckets

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

![Data Model](Data_Model.png)

#### 4.2 Data Quality Checks
1. Data Schema of every dimensional table which should match data model
2. No empty table after executing ETL data pipeline

Check Data Model

In [22]:
import os
import configparser
from pathlib import Path
from pyspark.sql import SparkSession

In [None]:
config = configparser.ConfigParser()
config.read('capstone.cfg', encoding='utf-8-sig')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'b]=config['AWS']['AWS_SECRET_ACCESS_KEY']
SOURCE_S3_BUCKET = config['S3']['SOURCE_S3_BUCKET']
DEST_S3_BUCKET = config['S3']['DEST_S3_BUCKET']

spark = SparkSession.builder\
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
                    .enableHiveSupport().getOrCreate()

In [None]:
s3_bucket = Path(SOURCE_S3_BUCKET)

for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        print("Table: " + path.split('/')[-1])
        schema = df.printSchema()

Check empty tables

In [None]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        record_num = df.count()
        if record_num <= 0:
            raise ValueError("This table is empty!")
        else:
            print("Table: " + path.split('/')[-1] + f" is not empty: total {record_num} records.")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

![Data Dictionary](Data_Dictionary.png)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    * **Tools**:
        1. AWS S3: Raw Data Storage due to the on-demand storage cost
        2. Pandas: Process the data since it's very capable of handling datasets
        3. PySpark: Large dataset processing and transformation since we need the ability to partition the large dataset
* Propose how often the data should be updated and why.
    * **Solution**:
        1. Immigration and Temp data should be updated monthly since they are published on monthly basis
        2. Demography data is annually updated so it should be refreshed every year
        3. Everything else in on ad-hoc basis
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * **Solution**: Since Redshift offers the ability to elastically change the computing power as the data grows, the scaling ability should handle the 100% increase very well. Also, if the dataset can be partitioned and distributed to more Spark nodes, it will secure the pipeline as well
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * **Solution**: We can use Airflow scheduler to plan the runs
 * The database needed to be accessed by 100+ people.
     * **Solution**: AWS Redshift can be made public to certain groups of people by adding new inbound and outbound rules