# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project is about designing an ETL pipeline in order to integrate data from different sources and transform then loading into a datawarehouse for data analytics purpose. Datasets are ones provided from Udacity. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
import sys
!{sys.executable} -m pip install pandas

In [7]:
# Do all imports and installs here
import os
import configparser
from pathlib import Path 
import pandas as pd
import pyspark

from pyspark.sql import SparkSession

ModuleNotFoundError: No module named 'pyspark'

### Step 1: Scope the Project and Gather Data

#### Scope 
>Explain what you plan to do in the project in more detail.

* Datasets: These are provided by Udacity Capstone project. 
1. I94 Immigration Data
2. World Temperature Data
3. US City Demographic Data

* Tools:
- Pandas: Exploratory data Analysis on sample dataset.
- Pyspark: Processing a whole set of data.
- AWS: S3 data storage. 

#### Describe and Gather Data 

[I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office) : (Format SAS) This data comes from the US National Tourism and Trade Office. Dataset includes records about international visitors entering the USA. The info includes Country of residence, Year of Birth, Age, Visa type, Arrival/Departure date, States arrived, etc. 

[US City Demographic](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/) : (Format CSV) This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

> World Temperature Data: (Format CSV) 

In [None]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.\
# config("spark.jars.repositories", "https://repos.spark-packages.org/").\
# config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
# enableHiveSupport().getOrCreate()

# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


### Step 2: Explore and Assess the Data
> #### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.



### Immigration Dataset

In [None]:
# Read in the data here
df_immi = pd.read_csv('immigration_data_sample.csv')

In [None]:
pd.set_option("display.max_columns", None)

In [None]:
df_immi.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [None]:
df_immi.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [None]:
#Determine fact table for immigration - 12 fields
fact_immigration = df_immi[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94mode', 'i94addr', 'i94visa','arrdate', 'depdate']]
fact_immigration.columns = ['cic_id', 'year', 'month', 'city_code', 'transportation','state_code', 'visa', 'arrival_date', 'departure_date']
fact_immigration.head()

Unnamed: 0,cic_id,year,month,city_code,transportation,state_code,visa,arrival_date,departure_date
0,4084316.0,2016.0,4.0,HHW,1.0,HI,2.0,20566.0,20573.0
1,4422636.0,2016.0,4.0,MCA,1.0,TX,2.0,20567.0,20568.0
2,1195600.0,2016.0,4.0,OGG,1.0,FL,2.0,20551.0,20571.0
3,5291768.0,2016.0,4.0,LOS,1.0,CA,2.0,20572.0,20581.0
4,985523.0,2016.0,4.0,CHM,3.0,NY,2.0,20550.0,20553.0


In [None]:
#Determine dim_person
dim_person = df_immi[['cicid', 'i94cit', 'i94res', 'biryear', 'gender']]
dim_person.columns = ['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender']
dim_person.head()

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender
0,4084316.0,209.0,209.0,1955.0,F
1,4422636.0,582.0,582.0,1990.0,M
2,1195600.0,148.0,112.0,1940.0,M
3,5291768.0,297.0,297.0,1991.0,M
4,985523.0,111.0,111.0,1997.0,F


In [None]:
dim_airline = df_immi[['cicid', 'airline', 'fltno', 'visatype']]
dim_airline.columns = ['cic_id', 'airline', 'flight_no', 'visa_type']
dim_airline.head()

Unnamed: 0,cic_id,airline,flight_no,visa_type
0,4084316.0,JL,00782,WT
1,4422636.0,*GA,XBLNG,B2
2,1195600.0,LH,00464,WT
3,5291768.0,QR,00739,B2
4,985523.0,,LAND,WT


### US Cities Demographics

In [None]:
df_demo = pd.read_csv('us-cities-demographics.csv', delimiter= ';')

In [None]:
df_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [None]:
df_demo[df_demo['City'] == 'Silver Spring']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
592,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,White,37756
1678,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Black or African-American,21330
2123,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,American Indian and Alaska Native,1084
2162,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Asian,8841


In [None]:
dim_city_demo = df_demo[['City', 'State Code','Median Age', 'Male Population', 'Female Population', 'Foreign-born', 'Average Household Size']]
dim_city_demo.columns = ['City', 'Code','Median Age', 'Males', 'Females', 'Foreign-born', 'Avg. Household Size']
dim_city_demo.head()

Unnamed: 0,City,Code,Median Age,Males,Females,Foreign-born,Avg. Household Size
0,Silver Spring,MD,33.8,40601.0,41862.0,30908.0,2.6
1,Quincy,MA,41.0,44129.0,49500.0,32935.0,2.39
2,Hoover,AL,38.5,38040.0,46799.0,8229.0,2.58
3,Rancho Cucamonga,CA,34.5,88127.0,87105.0,33878.0,3.18
4,Newark,NJ,34.6,138040.0,143873.0,86253.0,2.73


### Assess Data
- After exploration, 2 datasets chosen to be main sources for DWH are Immigration I94 Dataset and US Cities demographics, for they are the 2 sources the most relevant and seemingly most useful for analytics 


> #### Cleaning Steps
- In the previous steps, notes are taken about a few data features are required to clean up for the sake of usability. 

In [None]:
spark = SparkSession.builder.config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

In [None]:
df_spark = spark.read.format('com.github.saurfang.sas.spark')

In [None]:
df_spark = spark.read.parquet("sas_data")

In [17]:
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

#####  1. Transform arrdate, depdate from SAS time format to regular datetime 

In [2]:
# Performing cleaning tasks here

def SAS_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')



In [19]:
fact_immigration['arrival_date'] = SAS_to_datetime(fact_immigration['arrival_date'])
fact_immigration['departure_date'] = SAS_to_datetime(fact_immigration['departure_date'])
fact_immigration.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,cic_id,year,month,city_code,transportation,state_code,visa,arrival_date,departure_date
0,4084316.0,2016.0,4.0,HHW,1.0,HI,2.0,2016-04-22,2016-04-29
1,4422636.0,2016.0,4.0,MCA,1.0,TX,2.0,2016-04-23,2016-04-24
2,1195600.0,2016.0,4.0,OGG,1.0,FL,2.0,2016-04-07,2016-04-27
3,5291768.0,2016.0,4.0,LOS,1.0,CA,2.0,2016-04-28,2016-05-07
4,985523.0,2016.0,4.0,CHM,3.0,NY,2.0,2016-04-06,2016-04-09


##### 2. Parse I94 SAS Labels description to generate dim tables

In [20]:
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    contents = f.readlines()

In [21]:
country_code = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country
country_code

{'236': 'AFGHANISTAN',
 '101': 'ALBANIA',
 '316': 'ALGERIA',
 '102': 'ANDORRA',
 '324': 'ANGOLA',
 '529': 'ANGUILLA',
 '518': 'ANTIGUA-BARBUDA',
 '687': 'ARGENTINA ',
 '151': 'ARMENIA',
 '532': 'ARUBA',
 '438': 'AUSTRALIA',
 '103': 'AUSTRIA',
 '152': 'AZERBAIJAN',
 '512': 'BAHAMAS',
 '298': 'BAHRAIN',
 '274': 'BANGLADESH',
 '513': 'BARBADOS',
 '104': 'BELGIUM',
 '581': 'BELIZE',
 '386': 'BENIN',
 '509': 'BERMUDA',
 '153': 'BELARUS',
 '242': 'BHUTAN',
 '688': 'BOLIVIA',
 '717': 'BONAIRE, ST EUSTATIUS, SABA',
 '164': 'BOSNIA-HERZEGOVINA',
 '336': 'BOTSWANA',
 '689': 'BRAZIL',
 '525': 'BRITISH VIRGIN ISLANDS',
 '217': 'BRUNEI',
 '105': 'BULGARIA',
 '393': 'BURKINA FASO',
 '243': 'BURMA',
 '375': 'BURUNDI',
 '310': 'CAMEROON',
 '326': 'CAPE VERDE',
 '526': 'CAYMAN ISLANDS',
 '383': 'CENTRAL AFRICAN REPUBLIC',
 '384': 'CHAD',
 '690': 'CHILE',
 '245': 'CHINA, PRC',
 '721': 'CURACAO',
 '270': 'CHRISTMAS ISLAND',
 '271': 'COCOS ISLANDS',
 '691': 'COLOMBIA',
 '317': 'COMOROS',
 '385': 'CONGO',


In [22]:
dim_country_code = pd.DataFrame(list(country_code.items()), columns= ['code', 'country'])
dim_country_code.head()

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [23]:
city_code = {}
for cities in contents[303:962]:
    pair = cities.split('=')
    code, city = pair[0].strip().strip("''"), pair[1].strip().strip("' '")
    city_code[code] = city

In [24]:
city_code

{'ANC': 'ANCHORAGE, AK',
 'BAR': 'BAKER AAF - BAKER ISLAND, AK',
 'DAC': 'DALTONS CACHE, AK',
 'PIZ': 'DEW STATION PT LAY DEW, AK',
 'DTH': 'DUTCH HARBOR, AK',
 'EGL': 'EAGLE, AK',
 'FRB': 'FAIRBANKS, AK',
 'HOM': 'HOMER, AK',
 'HYD': 'HYDER, AK',
 'JUN': 'JUNEAU, AK',
 '5KE': 'KETCHIKAN, AK',
 'KET': 'KETCHIKAN, AK',
 'MOS': 'MOSES POINT INTERMEDIATE, AK',
 'NIK': 'NIKISKI, AK',
 'NOM': 'NOM, AK',
 'PKC': 'POKER CREEK, AK',
 'ORI': 'PORT LIONS SPB, AK',
 'SKA': 'SKAGWAY, AK',
 'SNP': 'ST. PAUL ISLAND, AK',
 'TKI': 'TOKEEN, AK',
 'WRA': 'WRANGELL, AK',
 'HSV': 'MADISON COUNTY - HUNTSVILLE, AL',
 'MOB': 'MOBILE, AL',
 'LIA': 'LITTLE ROCK, AR (BPS)',
 'ROG': 'ROGERS ARPT, AR',
 'DOU': 'DOUGLAS, AZ',
 'LUK': 'LUKEVILLE, AZ',
 'MAP': 'MARIPOSA AZ',
 'NAC': 'NACO, AZ',
 'NOG': 'NOGALES, AZ',
 'PHO': 'PHOENIX, AZ',
 'POR': 'PORTAL, AZ',
 'SLU': 'SAN LUIS, AZ',
 'SAS': 'SASABE, AZ',
 'TUC': 'TUCSON, AZ',
 'YUI': 'YUMA, AZ',
 'AND': 'ANDRADE, CA',
 'BUR': 'BURBANK, CA',
 'CAL': 'CALEXICO, CA',

In [25]:
dim_city_code = pd.DataFrame(list(city_code.items()), columns=['code', 'city'])
dim_city_code.head()

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [26]:
state_code = {}
for states in contents[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip().strip("''"), pair[1].strip().strip("''")
    state_code[code] = state
state_code

{'AK': 'ALASKA',
 'AZ': 'ARIZONA',
 'AR': 'ARKANSAS',
 'CA': 'CALIFORNIA',
 'CO': 'COLORADO',
 'CT': 'CONNECTICUT',
 'DE': 'DELAWARE',
 'DC': 'DIST. OF COLUMBIA',
 'FL': 'FLORIDA',
 'GA': 'GEORGIA',
 'GU': 'GUAM',
 'HI': 'HAWAII',
 'ID': 'IDAHO',
 'IL': 'ILLINOIS',
 'IN': 'INDIANA',
 'IA': 'IOWA',
 'KS': 'KANSAS',
 'KY': 'KENTUCKY',
 'LA': 'LOUISIANA',
 'ME': 'MAINE',
 'MD': 'MARYLAND',
 'MA': 'MASSACHUSETTS',
 'MI': 'MICHIGAN',
 'MN': 'MINNESOTA',
 'MS': 'MISSISSIPPI',
 'MO': 'MISSOURI',
 'MT': 'MONTANA',
 'NC': 'N. CAROLINA',
 'ND': 'N. DAKOTA',
 'NE': 'NEBRASKA',
 'NV': 'NEVADA',
 'NH': 'NEW HAMPSHIRE',
 'NJ': 'NEW JERSEY',
 'NM': 'NEW MEXICO',
 'NY': 'NEW YORK',
 'OH': 'OHIO',
 'OK': 'OKLAHOMA',
 'OR': 'OREGON',
 'PA': 'PENNSYLVANIA',
 'PR': 'PUERTO RICO',
 'RI': 'RHODE ISLAND',
 'SC': 'S. CAROLINA',
 'SD': 'S. DAKOTA',
 'TN': 'TENNESSEE',
 'TX': 'TEXAS',
 'UT': 'UTAH',
 'VT': 'VERMONT',
 'VI': 'VIRGIN ISLANDS',
 'VA': 'VIRGINIA',
 'WV': 'W. VIRGINIA',
 'WA': 'WASHINGTON',
 'WI': '

In [27]:
dim_state_code = pd.DataFrame(list(state_code.items()), columns = ['code', 'state'])
dim_state_code.head()

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


In [28]:
transport_mode = {}
for transports in contents[973:976]:
    pair = transports.split('=')
    code, transport = pair[0].strip(), pair[1].strip()
    transport_mode[code] = transport
transport_mode

{'2': "'Sea'", '3': "'Land'", '9': "'Not reported' ;"}

In [29]:
dim_transport = pd.DataFrame(list(transport_mode.items()), columns = ['code', 'mode'])
dim_transport

Unnamed: 0,code,mode
0,2,'Sea'
1,3,'Land'
2,9,'Not reported' ;


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Data Model for this DWH is Star Schema, with 1 fact table (fact_immigration) and 2 dim tables (dim_person, dim_airline) and 4 auxiliary tables (dim_city_demo, dim_city_code, dim_state_code and dim_transport).
#### 3.2 Mapping Out Data Pipelines
- Assume data already exists in S3 Bucket (Landing zone to store raw data). 
- Extract data in EMR cluster to have transformations (change date format, define data type/format, define fact/dim tables)
- Parsing File I94_SAS_Labels_Description to obtain aux tables. 
- Load the processed data back to another bucket in S3 (Processed Zone). 
- From the Processed Zone, load data to Redshift Cluster for use. 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.
Details are provided in file etl.py

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
config = configparser.ConfigParser()
config.read('AWS_Config.cfg', encoding='utf-8-sig')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
SOURCE_S3_BUCKET = config['S3']['SOURCE_S3_BUCKET']
DEST_S3_BUCKET = config['S3']['DEST_S3_BUCKET']

In [None]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
                        .enableHiveSupport().getOrCreate()

##### 4.2.1 Ensure no table loaded in empty

In [None]:
s3_bucket = Path(SOURCE_S3_BUCKET)
for dir in s3_bucket.iterdir():
    path = str(dir)
    df = spark.read.parquet(path)
    records = df.count()
    if records <= 0:
        raise ValueError("Table"+ path.split('/')[-1] + "is empty!")
    else:
        print("table" + path.split("/")[-1] + f"contains total {records} records.")


#### 4.3 Data dictionary 
Data Dictionary will be provided in doc. format for the purpose of clarity. 

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.