### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline for creating a database from four different datsets. Ultimately, we'll use the database to analyze immigration related scenarios.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [126]:
# Import libraries 
import os
import pandas as pd
import glob
from pyspark.sql import SparkSession,SQLContext, GroupedData
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType
from us_state_abbrev import state_udf, abbrev_state, abbrev_state_udf,city_code_udf,city_codes 
from immigration_codes import country_udf

In [127]:
import warnings
warnings.filterwarnings('ignore')

In [128]:
# Create Spack session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [129]:
#Build SQL context object
sqlContext = SQLContext(spark)

# Check Spark information
spark

### Step 1: Scope the Project and Gather Data

#### Scope 
We'll pull data from four different sources as below to create fact and dimension tables. 

#### Describe and Gather Data 
-   **U.S. City Demographic Data**: Comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).        

> Dataframe information  (2891 records)  
    
    City                       object  
    State                      object
    Median Age                float64
    Male Population           float64
    Female Population         float64
    Total Population            int64
    Number of Veterans        float64
    Foreign-born              float64
    Average Household Size    float64
    State Code                 object
    Race                       object
    Count                       int64

    
-   **Airport Code Table**: Comes from [DataHub.io](https://datahub.io/core/airport-codes#data).        

> Dataframe information    (55075  records)

    ident            object  
    type             object  
    name             object  
    elevation_ft    float64  
    continent        object  
    iso_country      object  
    iso_region       object  
    municipality     object  
    gps_code         object  
    iata_code        object  
    local_code       object  
    coordinates      object  


-   **World Temperature Data**: Comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).        

> Raw Data Information  (645675 records)   

    dt                                object  
    AverageTemperature               float64  
    AverageTemperatureUncertainty    float64  
    State                             object  
    Country                           object  
    
 -   **Immigration I94 Data**: Comes from [the US National Tourism and Trade Offic](https://travel.trade.gov/research/reports/i94/historical/2016.html).        

> Raw Data Information     
    ```
    cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string
    ```

In [130]:
# Read in the data here (The first three CSV data)
demographic=pd.read_csv("us-cities-demographics.csv", sep =';')
airport= pd.read_csv("airport-codes_csv.csv",sep =',')
temperatures =pd.read_csv("GlobalLandTemperaturesByState.csv",sep =',')

In [131]:
# Get basic informatin of the dataset
print('Dataset: demographic\n', '*************')
print(demographic.info())
print('Dataset: airport\n', '*************')
print(airport.info())
print('Dataset: temperatue\n', '*************')
print(temperatures.info())

Dataset: demographic
 *************
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB
None
Dataset: airport
 *************
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
nam

In [132]:
df_spark=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

In [133]:
df_spark.show(3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  4.0|2016.0|   6.0| 135.0| 135.0|    XXX|20612.0|   null|   null|   null|  59.0|    2.0|  1.0|     1.0|        0.0|    

### Step 2: Explore and Assess the Data
#### Explore the Data 
The goal of this section is to identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
**1 ) Immigration Data:**  
     <li>  get country_name, visatype, port(city/state) information from I94_SAS_Labels_Descriptions.SAS
     <li>  remove nulls based on i94addr and i94res col 
     <li>  convert the i94yr to date type 
     <li>  select only needed columns

In [134]:
def process_sas_file_spark_dataframe(section, col):
    """
    Process Sas file to retrieve counrty, port and address data -> To spark dataframe
    """
    with open('I94_SAS_Labels_Descriptions.SAS') as f:
        file_string = f.read()
        f_read = file_string[file_string.index(section):].split(';')[0].split('\n')
    records= [ ]
    for row in f_read:
        if '=' in row:
            code, val = row.split('=')[0], row.split('=')[1]
            code = code.strip()
            val = val.strip()
            if code[0] =="'":
                records.append([code[1:-1],val[1:-1]])
            else:
                records.append([code,val[1:-1]])
    end_frame = sqlContext.createDataFrame(records, col)
    return end_frame

def process_sas_file_pandas_dataframe(section, col):
    """
    Process Sas file to retrieve counrty, port and address data -> To pandas dataframe 
    """
    with open('I94_SAS_Labels_Descriptions.SAS') as f:
        file_string = f.read()
        f_read = file_string[file_string.index(section):].split(';')[0].split('\n')
    records= [ ]
    for row in f_read:
        if '=' in row:
            code, val = row.split('=')[0], row.split('=')[1]
            code = code.strip()
            val = val.strip()
            if code[0] =="'":
                records.append([code[1:-1],val[1:-1]])
            else:
                records.append([code,val[1:-1]])
    end_frame = pd.DataFrame(records, columns=col)
    return end_frame

# Create Spark dataframe from the above function
i94cntyl = process_sas_file_spark_dataframe('i94cntyl',  ('code', 'country'))
i94port  = process_sas_file_spark_dataframe('i94prtl',  ('code', 'port'))
i94addr= process_sas_file_spark_dataframe('i94addrl', ('code', 'addr'))

# Create pandas dataframe for later use (get abbreviation for Temperature data)
i94addr_reference= process_sas_file_pandas_dataframe('i94addrl', ['code', 'addr'])
i94cntyl_reference = process_sas_file_pandas_dataframe('i94cntyl',  ['code', 'country'])
i94port_reference  = process_sas_file_pandas_dataframe('i94prtl',  ['code', 'port'])

In [135]:
df_spark_sub =df_spark.filter(df_spark.i94addr.isNotNull())\
                .filter(df_spark.i94res.isNotNull())\
                .filter(df_spark.i94addr.isin(list(abbrev_state.keys())))\
                .filter(df_spark.i94port.isin(list(city_codes.keys())))\
                .withColumn("origin_country",country_udf(df_spark["i94res"]))\
                .withColumn("dest_state_name",abbrev_state_udf(df_spark["i94addr"]))\
                .withColumn("i94yr", df_spark.i94yr.cast("integer"))\
                .withColumn("i94mon",df_spark.i94mon.cast("integer"))\
                .withColumn("city_port_name",city_code_udf(df_spark["i94port"]))

immigration_df  =df_spark_sub.select("cicid",df_spark_sub.i94yr.alias("year"),df_spark_sub.i94mon.alias("month"),\
                             "origin_country","i94port","city_port_name",df_spark_sub.i94addr.alias("state_code"),"dest_state_name")

In [136]:
immigration_df.show(5) 

+-----+----+-----+--------------+-------+------------------+----------+---------------+
|cicid|year|month|origin_country|i94port|    city_port_name|state_code|dest_state_name|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
| 41.0|2016|    6|   SOUTH KOREA|    SFR|SAN FRANCISCO     |        CA|     California|
| 42.0|2016|    6|   SOUTH KOREA|    SFR|SAN FRANCISCO     |        CA|     California|
| 45.0|2016|    6|       ROMANIA|    HOU|HOUSTON           |        TX|          Texas|
| 52.0|2016|    6|       ALBANIA|    BOS|BOSTON            |        MA|  Massachusetts|
| 53.0|2016|    6|       ALBANIA|    NEW|NEWARK/TETERBORO  |        PA|   Pennsylvania|
+-----+----+-----+--------------+-------+------------------+----------+---------------+
only showing top 5 rows



**2 ) US Demographic Data:**  
     <li>  retrieve the columns needed 
     <li>  calculate the percentage of foreign born person out of total population 
     <li>  convert the male and female population in percentage 

In [137]:
demographic_df = demographic.loc[:,['Male Population', 'Female Population',
       'Total Population',  'Foreign-born',
       'State Code']]
demographic_df['pct_male'] = demographic_df['Male Population']/demographic_df['Total Population']
demographic_df['pct_female'] = demographic_df['Female Population']/demographic_df['Total Population']
demographic_df['pct_foreign'] = demographic_df['Foreign-born']/demographic_df['Total Population']
demographic_df = demographic_df.loc[:,['pct_male','pct_foreign','pct_female','State Code','Total Population'] ]
demographic_df['pct_male'] = demographic_df['pct_male'].round(decimals=2)
demographic_df['pct_female'] = demographic_df['pct_female'].round(decimals=2)
demographic_df['pct_foreign'] = demographic_df['pct_foreign'].round(decimals=2)
demographic_df.columns

In [203]:
demographic_df.columns =['pct_male', 'pct_foreign', 'pct_female', 'State_Code','Total_Population']

In [204]:
demographic_df.sample(3)

Unnamed: 0,pct_male,pct_foreign,pct_female,State_Code,Total_Population
2842,0.48,0.06,0.52,SC,81309
1876,0.5,0.08,0.5,NE,277346
2748,0.48,0.32,0.52,MA,92459


**3 ) US Airport Data:**  
     <li>  filter data without iata_code AND iso_country = 'US' 
     <li>  count the number of flights in each states 
     <li>  get the country and state information 

In [139]:
# filter iso_counrty <> US and records with no iata_code 
airport_df  =   airport.loc[(~airport.iata_code.isnull()) & (airport.iso_country=='US'),['iso_country','iso_region']]\
                       .groupby(['iso_region'])['iso_country']\
                       .count().reset_index().rename(columns={"iso_country": "number_of_flights"})

In [140]:
# retrieve state information from iso_region column 
airport_df['country'], airport_df['state'] = airport_df.iso_region.apply(lambda x: x.split('-')[0]), airport_df.iso_region.apply(lambda x: x.split('-')[1])
airport_df = airport_df.loc[:,[ 'country', 'state','number_of_flights']]

In [141]:
airport_df.sample(3)

Unnamed: 0,country,state,number_of_flights
1,US,AL,30
6,US,CT,9
26,US,MT,27


**4 ) US temperature Data:**  
     <li>  filter data without in only US and drop the records without temperature information
     <li>  get the average monthly temperature by states data
     <li>  add a column of abbreviation state 

In [164]:
# Filtering out data and create a month column 
temperature = temperatures.loc[(temperatures.Country=='United States') & (~temperatures.AverageTemperature.isnull()),:]
temperature['dt'] = pd.to_datetime(temperatue['dt'])
temperature['month'] = temperature.dt.apply(lambda x: x.month)

# get the average historical monthly temperature 
temperature_df = temperature.groupby(['Country','State','month'])['AverageTemperature']\
                          .mean().round(2)\
                          .reset_index()
temperature_df.State = temperature_df.State.apply(lambda x: x.upper())

In [165]:
# Get abbreviations for State
temperature_df = temperature_df.merge(i94addr_reference.rename(columns={"addr":"State","code":'Code'}), how='left', on ="State")

# Fill out all the States unmatched from i94addr_reference table
patch_list = ['DC','GA','NC','ND','SC','SD','WV','WI']
for missing_code, code in zip(temperature_df.loc[temperature_df.Code.isnull(),['State','Code']].drop_duplicates().State, patch_list):
    temperature_df.loc[temperature_df.State == missing_code,'Code'] = code

In [166]:
temperature_df = temperature_df.loc[:,['Code','month','AverageTemperature'] ]

In [168]:
temperature_df.sample(3)

Unnamed: 0,Code,month,AverageTemperature
168,IN,1,-2.76
129,GA,10,17.69
403,NC,8,24.46


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
 
Here we choose **Star Schema** for this database since we have clear fact and dimension table here.     
An ultimate goal is to do analaysis based on immigration data to answer the follow possible questions, for example:   
- 1:  Is there a higher possibility for people to land the state which has more foreign people? (higher pct_foregn percentage than other states) 
- 2:  Does people prefer to go to states with higher temperature in winter and lower temperature in summer? (for vocation) 

**Fact Table**     
    
 - US_immigration_table 
 >  civid  
 year, 
 month,  
 origin_country,   
 i94port,   
 city_port_name,   
 state_code,   
 dest_state_name   
 
        
    
**Dimension Tables**   

 - airport_table   
 > State,    country,   number_of_flights 

- demographic_table   
> State Code,
pct_male, 
pct_female, 
pct_foreign, 
Total Population

- temperature_table   
> State Code, 
month, 
AverageTemperature 

- i94_table
 
 >  civid   year, month,   origin_country,    i94port,    city_port_name,    state_code,    dest_state_name   
#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
- Dimension tables will be created from processed dataset (3 from pandas dataframe + 1 pyspark dataframe).
- Fact table will be created by joining the dimensions tables. 
- Fact table is written as final parquet file.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [218]:
temperature_pyspark_DF =spark.createDataFrame(temperature_df) 
airport_df_pyspark_DF =spark.createDataFrame(airport_df) 
demographic_df_pyspark_DF =spark.createDataFrame(demographic_df) 


# Create Dimension tables
demographic_df_pyspark_DF.createOrReplaceTempView("demographics")
airport_df_pyspark_DF.createOrReplaceTempView("airport")
temperature_pyspark_DF.createOrReplaceTempView("temperature")
immigration_df.createOrReplaceTempView("i94")

In [206]:
# reset joining time
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

# US immigration monthly view table 
immigration_stats =spark.sql(
""" 
SELECT 
        m.year,
        m.month AS immig_month,
        m.origin_country AS immig_origin,
        m.dest_state_name AS to_immig_state,
        COUNT(m.state_code) AS to_immig_state_count,
        t.AverageTemperature,
        d.pct_foreign,
        a.number_of_flights FROM i94 m JOIN temperature t ON m.state_code=t.Code AND m.month=t.month
    JOIN demographics d 
                    ON d.State_Code= t.Code
    JOIN airport a 
                    ON a.state= d.State_Code 

GROUP BY 
m.year,m.month, m.origin_country,
m.dest_state_name,m.state_code, t.AverageTemperature,a.number_of_flights ,
d.pct_foreign

ORDER BY m.origin_country,m.state_code
""")

In [208]:
immigration_stats.show(5)

+----+-----------+------------+--------------+--------------------+------------------+-----------+-----------------+
|year|immig_month|immig_origin|to_immig_state|to_immig_state_count|AverageTemperature|pct_foreign|number_of_flights|
+----+-----------+------------+--------------+--------------------+------------------+-----------+-----------------+
|2016|          6| AFGHANISTAN|      Arkansas|                  10|             24.55|       0.08|               29|
|2016|          6| AFGHANISTAN|      Arkansas|                   5|             24.55|       0.15|               29|
|2016|          6| AFGHANISTAN|      Arkansas|                   9|             24.55|       0.04|               29|
|2016|          6| AFGHANISTAN|      Arkansas|                   5|             24.55|       0.25|               29|
|2016|          6| AFGHANISTAN|       Arizona|                   5|             24.08|       0.19|               46|
+----+-----------+------------+--------------+------------------

In [209]:
# Write fact table to parquet
immigration_stats.write.parquet("immigration_stats")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [245]:
# Check the record number in Dimension tables 
Quality_check = spark.sql(
""" Select 'Airport' as Table, count(state) as Records from airport 
    union 
    Select 'temperature' as Table, count(Code) as Records from temperature
    union 
    Select 'demographics' as Table, count(State_Code) as Records from demographics
    union 
    Select 'i94' as Table, count(State_Code) as Records from i94""" )

Quality_check.show()

+------------+-------+
|       Table|Records|
+------------+-------+
|         i94|3214208|
|demographics|   2891|
| temperature|    612|
|     Airport|     51|
+------------+-------+



In [210]:
# Check the nulls
immigration_stats.select(isnull('year').alias('year'),\
                 isnull('immig_month').alias('month'),\
                 isnull('immig_origin').alias('country'),\
                 isnull('to_immig_state').alias('state')).dropDuplicates().show()

+-----+-----+-------+-----+
| year|month|country|state|
+-----+-----+-------+-----+
|false|false|  false|false|
+-----+-----+-------+-----+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**Dimension Tables** 

airport  
```
-  State  (StringType) - two digit Sate code 
-  country (StringType) - country name 
-  number_of_flights (LongType) - Aggregated by IATA_code to get number of flights in that Stats
```
temperature
```
- Code (StringType)  -two digit Sate code 
- month (LongType)   - Month presents in number 
- AverageTemperature (DoubleType) - the average historical temperature of that given month, in Celsius 
```

demographic
```
- pct_male (DoubleType)  - male percentage of that states 
- pct_foreign (DoubleType)   - foreign bron people percentage of that states 
- pct_female (DoubleType) - female percentage of that states 
- State_Code (StringType) - two digit Sate code   
- Total_Population (LongType) - Number of people in that state 
```

i94
```
- cicid (DoubleType)  - ID number of each individual
- year (IntegerType)   - year of immigration
- month (IntegerType) - month of immigration
- i94port (StringType) - City Port Code where Immigrant entered
- city_port_name (StringType) - City Port Code name 
- state_code (StringType) - two digit Sate code
- dest_state_name (StringType) - Detstination State name 
```

**Fact Table**
immigration_stats  
- year (IntegerType)   - year of immigration
- immig_month (IntegerType) - month of immigration
- to_immig_state (StringType) - Detstination State name 
- to_immig_state_count (IntegerType) -Total count of people immigrated per state from immigration table 
- AverageTemperature (DoubleType) - the average historical temperature of that given month, in Celsius 
- pct_foreign (DoubleType)   - foreign bron people percentage of that states 
-  number_of_flights (LongType) - Aggregated by IATA_code to get number of flights in that Stats

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Summary**
<li>  This project used Pandas as data wrangling/cleaning tool since its functionality and ease of use. Also, Apache Spark was used to do the Extract, transform and load tool since this dataset is relatively small.  
<li>  This data should be updated on a monthly basis since the Fact table (immigration stats) present the monthly overview of aggregated information by temperature, people into US etc. A monthly update will ensure the users to get new data in timely basis and also will ease the loading traffic. 

**Changes by Scenario** 
<li>  If the data increased by 100x, i'll take the advantage of the Hadoop to get more node boosting the performance. 
<li>  If the data must be updated on a daily basis by 7am every day, i'll use the Airflow and set a day schedule_interval through the Airlow service. a
<li>  If the data needs to be accessed by 100+ people, I'll get a web app running on AWS based and also will save my data in S3 with replication.  