# Project Title
### Data Engineering Capstone Project

#### Project Summary
* The project is about analyze the Immigration data of the US in the year 2016, such as what are the most top 5 heavy duty airport name based on the number of  the immigrant, or the population or average temperature of a specific state ordered by city and race.
* As an Data Engineer, our responsibility is to create and operate and ETL that process the Immigration data with other dataset to fullfile the requirement. This project will show the full ETL process that include from the data gathering to data modeling to create the most optimize data model so that Business Analyst or Data Scienctist member can use the data properly.
* Note that with only the Immigration data of the US for the year 2016, we can not achieve our goal. Hence,  in this propject, we also  crawl 3 more dataset that can contributed our main goal. The description of all 4 dataset is as the following:

1. **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
2. **Airport Code Table**: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).
3. **U.S. City Demographic Data**: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
4. **World Temperature Data**: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

* The step we conduct in this project is as follow:
    * Step 1: Scope the Project and Gather Data
    * Step 2: Explore and Assess the Data
    * Step 3: Define the Data Model
    * Step 4: Run ETL to Model the Data
    * Step 5: Complete Project Write Up

In [1]:
import sys
!{sys.executable} -m pip install regex

Collecting regex
[?25l  Downloading https://files.pythonhosted.org/packages/a0/a0/6cbb1435fa3055307b442e1a87d8498f9c8e2bf741607c777b678be9838f/regex-2022.3.15-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (671kB)
[K    100% |████████████████████████████████| 675kB 15.5MB/s ta 0:00:01
[?25hInstalling collected packages: regex
Successfully installed regex-2022.3.15


In [2]:
# Do all imports and installs here
import pyspark as ps
import pandas as pd
import regex as re
import numpy as np
import datetime as dt
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, col, isnan, when, count, mean, lower, avg, round
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, monotonically_increasing_id, dayofweek
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, TimestampType

### Step 1: Scope the Project and Gather Data

#### Scope 
* For this project we will use 4 source of data:
    1. Main data: I-94 Immigration dataset
    2. Sub data consist of 3 tables:
        
        2.1. Airport code dataset
        
        2.2. US Cities Demographics datset
        
        2.3. World Temperature dataset

#### Describe and Gather Data 

--------------------------------------------------------------------------------------------

#### Create a SparkSession

In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

#### Load Immigration Data

In [4]:
imda_file_path= '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
imda_df = spark.read.\
format('com.github.saurfang.sas.spark').\
option('header', True).\
load(imda_file_path)

* Then we check the schema and the first 5 rows of this dataset

In [5]:
print(f'Total row count: {imda_df.count()}')
imda_df.printSchema()
imda_df.limit(5).toPandas()

Total row count: 3096313
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- ad

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


--------------------------------------------------------------------------------------------

#### Load Airport code data
* First we re-define the data type for this dataset

In [6]:
ac_Schema= R([
    Fld('ident', Str()),
    Fld('type', Str()),
    Fld('name', Str()),
    Fld('elevation_ft', Dbl()),
    Fld('continent', Str()),
    Fld('iso_country', Str()),
    Fld('iso_region', Str()),
    Fld('municipality', Str()),
    Fld('gps_code', Str()),
    Fld('iata_code', Str()),
    Fld('local_code', Str()),
    Fld('coordinates', Str())
])

* Then, we load the dataset

In [7]:
ac_df= spark.read.\
format('csv').\
option('header', True).\
schema(ac_Schema).\
load('airport-codes_csv.csv')

* Then we check the schema and the first 5 rows of this dataset

In [8]:
ac_df.printSchema()
ac_df.limit(5).toPandas()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


--------------------------------------------------------------------------------------------

#### Load US Cities Demographics
* First we re-define the data type for this dataset

In [9]:
uscd_Schema= R([
    Fld('City', Str()),
    Fld('State', Str()),
    Fld('Median Age', Dbl()),
    Fld('Male Population', Int()),
    Fld('Female Population', Int()),
    Fld('Total Population', Int()),
    Fld('Number of Veterans', Int()),
    Fld('Foreign-born', Int()),
    Fld('Average Household Size', Dbl()),
    Fld('State Code', Str()),
    Fld('Race', Str()),
    Fld('Count', Int())
])

* Then, we load the dataset

In [10]:
uscd_df= spark.read.\
format('csv').\
option('header', True).\
options(delimiter= ';').\
schema(uscd_Schema).\
load('us-cities-demographics.csv')

* Then we check the schema and the first 5 rows of this dataset

In [11]:
uscd_df.printSchema()
uscd_df.limit(5).toPandas()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


--------------------------------------------------------------------------------------------

#### Load World Temperature Data
* First we re-define the data type for this dataset

In [12]:
wtd_Schema= R([
    Fld('dt', Str()),
    Fld('AverageTemperature', Dbl()),
    Fld('AverageTemperatureUncertainty', Dbl()),
    Fld('City', Str()),
    Fld('Country', Str()),
    Fld('Latitude', Str()),
    Fld('Longitude', Str())
])

* Then, we load the dataset

In [13]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
wtd_df= spark.read.\
format('csv').\
option('header', True).\
schema(wtd_Schema).\
load(fname)

* Then we check the schema and the first 5 rows of this dataset

In [14]:
wtd_df.printSchema()
wtd_df.limit(5).toPandas()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

* First we create a function that check for missing or null values

In [15]:
# Function to check for missing values or null values
def missnull_check(df):
    # Get total rows of dataset
    get_all_rows= df.count()

    # Count if data is missing or null
    missnull_df= df.select(
        [count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column in df.columns]
    ).toPandas()
    
    # Format data from wide to long
    long_missnull_df= pd.melt(missnull_df, var_name= 'headers', value_name= 'miss or null count')
    
    # Miss or Null ratio
    long_missnull_df['data lost ratio']= np.round((long_missnull_df['miss or null count'] / get_all_rows) * 100,3)
    
    # Sort dataframe
    final_df= long_missnull_df.sort_values(by= 'data lost ratio', ascending= False)
    
    print(final_df)

--------------------------------------------------------------------------------------------

#### Preprocess I94 Immigration

* With provided I94_SAS_Labels_Descriptions file we derive the list of valid port and valid cit, valid res, and valid address

In [16]:
# Read valid port from I94_SAS_Labels_Descriptions
get_valid_port= re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_port_dict= {'port_code': [], 'port_name': []}
with open('i94validport.txt') as f:
    for line in f:
        match= get_valid_port.search(line)
        valid_port_dict['port_code'].append(match[1])
        valid_port_dict['port_name'].append(match[2].split(',')[0].lower())

valid_port= list(valid_port_dict['port_code'])

# Read valid CIT from I94_SAS_Labels_Descriptions
get_valid_cit= re.compile(r'(.*).*\'(.*)\'')
valid_cit_dict= {'origin_country_code': [], 'origin_country': []}
with open('i94validcit.txt') as f:
    for line in f:
        match= get_valid_cit.search(line)
        valid_cit_dict['origin_country_code'].append(float(match[1][:3]))
        valid_cit_dict['origin_country'].append(match[2].lower())

valid_cit= list(valid_cit_dict['origin_country_code'])  

# Read valid Address from I94_SAS_Labels_Descriptions
get_valid_addr= re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_addr_dict= {'state_code': [], 'state': []}
with open('i94validaddr.txt') as f:
    for line in f:
        match= get_valid_addr.search(line)
        valid_addr_dict['state_code'].append(match[1][:4])
        valid_addr_dict['state'].append(match[2].lower())

valid_addr= list(valid_addr_dict['state_code'])

# Get valid dataframe
def data_validation(valid_port, valid_cit, valid_addr, df):
    #df_valid_port= df.filter(col('i94port').isin(valid_port))
    df_valid= df.filter(
        (col('i94port').isin(valid_port)) &
        (col('i94cit').isin(valid_cit)) & 
        (col('i94res').isin(valid_cit)) &
        (col('i94addr').isin(valid_addr))
    )
    return df_valid

* Filter out data that only show up in the valid list

In [17]:
imda_df_valid= data_validation(valid_port, valid_cit, valid_addr, imda_df)

* Since cicid is the primary key for this dataset, it must be unique, hence drop all duplicate cicid data

In [18]:
imda_df_dropdup= imda_df_valid.drop_duplicates(['cicid'])

* Check for missing and null values appeared in the dataset

In [19]:
missnull_check(imda_df_dropdup)

     headers  miss or null count  data lost ratio
18   entdepu             2548903           99.987
15     occup             2542534           99.737
23    insnum             2537920           99.556
14  visapost             1448676           56.828
22    gender              330090           12.949
9    depdate              107102            4.201
19   matflag              106237            4.167
17   entdepd              106237            4.167
24   airline               49107            1.926
26     fltno                7985            0.313
20   biryear                 123            0.005
10    i94bir                 123            0.005
21   dtaddto                  78            0.003
25    admnum                   0            0.000
16   entdepa                   0            0.000
0      cicid                   0            0.000
1      i94yr                   0            0.000
13  dtadfile                   0            0.000
12     count                   0            0.000


* We can see that there are 3 columns (entdepu, occup, insnum) with over 98% missing or null values, we will drop these columns out of the dataset 

In [20]:
drop_columns= ['entdepu', 'occup', 'insnum']

imda_df_dropcols= imda_df_dropdup.drop(*drop_columns)

* To handle with the 4th and 5th most missing or null column, visa post gender, we can consider replace missing or null values with **'Not mentioned'** to make the data more meaninful, for example, when query, the result will return F (female), M (male), and Not mentioned, that is much more meaning than just F, M, and Null

In [21]:
imda_df_fillna= imda_df_dropcols.na.fill('Not mentioned', subset= ['gender', 'visapost'])

* About others collumn that have missing or null values, we perform a drop command to eliminate all of remain missing or null values where all columns is missing or null

In [22]:
imda_df_final= imda_df_fillna.dropna(how='all')

* Nextup, we convert the ```arrdate``` from SAS date form to readable datetime from and to create a dimension table later.

In [23]:
sasdate_converter= udf(
    lambda x: dt.datetime(1960, 1, 1) + dt.timedelta(days= int(x)),
    TimestampType()
)
imda_df_final= (
    imda_df_final
    .withColumn('arrival_date', sasdate_converter('arrdate'))
    .withColumn('arrival_year', year(sasdate_converter('arrdate')))
    .withColumn('arrival_month', month(sasdate_converter('arrdate')))
    .withColumn('arrival_day', dayofmonth(sasdate_converter('arrdate')))
)


* Finnally, let check the data size after the cleaning process:

In [24]:
print(f'Raw I94 Immigration dataset size: {imda_df.count()}')
print(f'I94 Immigration dataset size after validate port, cit, and res: {imda_df_valid.count()}')
print(f'I94 Immigration dataset size after handling missing and null values: {imda_df_final.count()}')

Raw I94 Immigration dataset size: 3096313
I94 Immigration dataset size after validate port, cit, and res: 2549246
I94 Immigration dataset size after handling missing and null values: 2549246


--------------------------------------------------------------------------------------------

#### Preprocess Airport Code Dataset
* Like the I94 Immigration dataset, we see that the column ident is the primary key, so we must check for its duplicate.
* Drop all duplicate in the primary key

In [25]:
ac_df_dropdup= ac_df.drop_duplicates(['ident'])

* As we inspect the iso_region column, we break down the iso_region format into country and region and keep only the region value

In [26]:
# Define UDF for spliting iso_region column
split_region= udf(
    lambda x: x.split('-')[1]
)

In [27]:
ac_df_region= ac_df_dropdup.withColumn('iso_region', split_region('iso_region'))

In [28]:
missnull_check(ac_df_region)

         headers  miss or null count  data lost ratio
9      iata_code               45886           83.315
10    local_code               26389           47.915
8       gps_code               14045           25.502
3   elevation_ft                7006           12.721
7   municipality                5676           10.306
0          ident                   0            0.000
1           type                   0            0.000
2           name                   0            0.000
4      continent                   0            0.000
5    iso_country                   0            0.000
6     iso_region                   0            0.000
11   coordinates                   0            0.000


* Depend on the missing or null data lost ratio and consider its necessary to this project, these columns: 'iata_code', 'local_mode', 'gps_code' will be drop out this dataset

In [29]:
drop_columns= ['iata_code', 'local_code', 'gps_code']

ac_df_dropcols= ac_df_region.drop(*drop_columns)

* Next, the since 'elevation_ft' is a numerical data and it has null and missing data, we can fill these data with the mean value of all available value in the column

In [30]:
elevation_mean_df= ac_df_dropcols.select(mean(col('elevation_ft')).alias('elevation_mean')).collect()

elevation_mean= elevation_mean_df[0]['elevation_mean']
    
ac_elevation_fillna= ac_df_dropcols.na.fill(elevation_mean, subset= ['elevation_ft'])

* Then, we drop all rows that all columns are missing or null

In [31]:
ac_df_final= ac_elevation_fillna.dropna(how='all')

* Finnally, let check the data size after the cleaning process:

In [32]:
print(f'Raw Airport code dataset size: {ac_df.count()}')
print(f'Airport code dataset size after handling missing and null values: {ac_df_final.count()}')

Raw Airport code dataset size: 55075
Airport code dataset size after handling missing and null values: 55075


--------------------------------------------------------------------------------------------

#### Preprocess US Cities Demographics
* With this dataset, we notice that there are 3 columns that can be considered to make a unique key, that is City. State and Race, we call it key column
* First, we count all recorded in the dataset

In [33]:
uscd_df.count()

2891

* Then, we check for duplicated data in the dataset

In [34]:
uscd_df.distinct().count()

2891

* We see that there are not any duplicated values in the dataset, thus every record is bounded to the unique key (primary key)

* With each key column, we check distinct values for this columns

In [35]:
uscd_df.select('City').distinct().count()

567

In [36]:
uscd_df.select('State').distinct().count()

49

In [37]:
uscd_df.select('Race').distinct().count()

5

* With three key alone, we can not cover for all recorded in the dataset, we will considered combine these columns to create a composite key. In this case, we will combine all 3 keys, since we know that City is the main measurement here, and only State and Race can not cover all the record, so combine 3 column can create a composite key to this dataset

In [38]:
uscd_df.select(['City', 'State', 'Race']).distinct().count()

2891

* We can see that 2891 unique records is match with the non-duplicated raw dataset --> We can conclude primary key for this dataset will be State-City-Race.
* Next up, we will check for the missing or null value appear in the dataset

In [39]:
missnull_check(uscd_df)

                   headers  miss or null count  data lost ratio
8   Average Household Size                  16            0.553
6       Number of Veterans                  13            0.450
7             Foreign-born                  13            0.450
3          Male Population                   3            0.104
4        Female Population                   3            0.104
0                     City                   0            0.000
1                    State                   0            0.000
2               Median Age                   0            0.000
5         Total Population                   0            0.000
9               State Code                   0            0.000
10                    Race                   0            0.000
11                   Count                   0            0.000


* The data loss ratio is below 0.5%, we will drop all row that included missing or null values

In [40]:
uscd_df_final= uscd_df.dropna(
    subset=[
        'Average Household Size', 
        'Number of Veterans', 
        'Foreign-born', 
        'Male Population',
        'Female Population'
    ]
)

In [41]:
uscd_df_final= uscd_df_final.withColumn('City', lower(col('City')))

* Finnally, let check the data size after the cleaning process:

In [42]:
print(f'Raw US Cities Demographics dataset size: {uscd_df.count()}')
print(f'US Cities Demographics dataset size after handling missing and null values: {uscd_df_final.count()}')

Raw US Cities Demographics dataset size: 2891
US Cities Demographics dataset size after handling missing and null values: 2875


--------------------------------------------------------------------------------------------

#### Preprocess World Temperature Data
* Before deep dive into this dataset, we need to get back to the I94 Immigration dataset.
* Get the year appear in the I94 Immigration dataset

In [43]:
imda_df_final.select('i94yr').distinct().show()

+------+
| i94yr|
+------+
|2016.0|
+------+



* We see that the I94 Immigration dataset only show the data for the year 2016. Thus, for this World Temperature dataset, we will filter only data for year 2016 only.
* First, we break the dt column down into small portion like year, month, and day.

In [44]:
wtd_df_ymd= (
    wtd_df
    .withColumn('record_year', year('dt'))
    .withColumn('record_month', month('dt'))
    .withColumn('record_day', dayofmonth('dt'))
)

In [45]:
wtd_df_ymd.select('record_year').\
distinct().\
orderBy('record_year', ascending= False).\
limit(5).\
show()

+-----------+
|record_year|
+-----------+
|       2013|
|       2012|
|       2011|
|       2010|
|       2009|
+-----------+



* As we can see, there is no record for the year 2016. The most recent year is 2013
* Also, according to this [question](https://knowledge.udacity.com/questions/293780), we just can select most recent year in the dataset that as close as the year 2016
* Thus, about this dataset, first we need to take only data where year from 2011 to 2013.
* Also, as we aim to merge this dataset with the US Demographic dataset, so we only need temperature data from the US only.

In [46]:
wtd_df_filter= wtd_df_ymd.filter((col('record_year') >= 2011) & (col('Country')== 'United States')).drop('Country')

* Then we count all the record for this filtered dataset

In [47]:
wtd_df_filter.count()

8481

* then, with the filtered data, we check for its duplicated

In [48]:
wtd_df_filter.distinct().count()

8481

* We can confirm that there is no duplicated data appear in our dataset
* Also, about this dataset, we can determine that two column dt and columnn City will the composite key to this dataset
* Next, we check for the missing or null values

In [49]:
missnull_check(wtd_df_filter)

                         headers  miss or null count  data lost ratio
1             AverageTemperature                   1            0.012
2  AverageTemperatureUncertainty                   1            0.012
0                             dt                   0            0.000
3                           City                   0            0.000
4                       Latitude                   0            0.000
5                      Longitude                   0            0.000
6                    record_year                   0            0.000
7                   record_month                   0            0.000
8                     record_day                   0            0.000


* This dataset is about temperature, and since there is only 1 record got missing or null values, it becames meaningless to use this record. Hence, we will drop this row.

In [50]:
wtd_df_dropna= wtd_df_filter.dropna(
    subset=[
        'AverageTemperature', 
        'AverageTemperatureUncertainty'
    ]
)

* Then, we lower all the city in the ```City``` column and calculate the mean temperature for both AverageTemperature and AverageTemperatureUncertainty columns by ```City```

In [51]:
wtd_df_final= wtd_df_dropna.select([lower(col('City')).alias('us_city'), 'AverageTemperature', 'AverageTemperatureUncertainty'])
wtd_df_final= wtd_df_final.groupBy('us_city').agg(round(avg('AverageTemperature'), 2).alias('avg_temperature'), round(avg('AverageTemperatureUncertainty'), 2).alias('avg_temperature_uncertainty'))

* Finnally, let check the data size after the cleaning process:

In [52]:
print(f'Raw Word Temperature dataset size: {wtd_df.count()}')
print(f'Word Temperature dataset size after filtering, handling missing and null values: {wtd_df_dropna.count()}')
print(f'Word Temperature dataset size after aggregated: {wtd_df_final.count()}')

Raw Word Temperature dataset size: 8599212
Word Temperature dataset size after filtering, handling missing and null values: 8480
Word Temperature dataset size after aggregated: 248


#### This is the end for Step 2: Explore and Assess the Data

--------------------------------------------------------------------------------------------

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
* For this project, the Data Model is designed as the following 1 fact table (**fact_immigration**) and 4 dimension tables (**dim_uscd**, **dim_aircode**, **dim_arrivaldate**)

    ![Data model](data-model.png)


1. **Fact Table:** **fact_immigration** This table data is extracted fromt the I94 Immigration dataset
| fact_immigration |
| --------- |
| cic_id (PK) |
| entry_year |
| entry_month |
| port_code |
| arrival_sas_date |
| entry_mode |
| address_state_code |
| origin_country_code |
| origin_residential_country_code |
| visa_mode |
| count |
| birth_year |
| gender |
| airline |
| addmission_number |
| flight_number |
| visatype |

2. **Dimension tables:**

* **Aircode table**

| dim_aircode |
| --------- |
| indent (PK) |
| type |
| name |
| elevation_ft |
| iso_country |
| iso_region (FK) |
| municipality |
| coordinates |

* **US Demographic table:** This table consists of two dataset: The US Demographics dataset and the World Temperature dataset

| dim_uscd |
| --------- |
| city (CK) |
| state (CK) |
| race (CK) |
| state_code (FK) |
| median_age |
| male_population |
| female_population |
| total_population |
| number_veterans |
| foreign_born |
| avg_household_size |
| count |
| avg_temperature |
| avg_temperature_uncertainty |
| latitude |
| longitude |

* **Arrival date table:**

| dim_arrivaldate |
| --------- |
| arrival_sas_date (PK) |
| arrival_date |
| arrival_year |
| arrival_month |
| arrival_day |

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

* The Data Pipeline step is as follow:
1. Load 4 dataset from source: I94 Immigration dataset, Aircode dataset, US cities demographics dataset, and World Temperature dataset
2. Clean the dataset and perform data aggregation to World Temperature dataset by attribute ```City``` (As perform in Step 2)
3. Create fact table ```fact_immigration``` from I94 Immigration dataset and write to parquet file partitioned by ```entry_year``` and ```entry_month```
4. Create dimension table ```dim_aircode``` from Aircode dataset and write to parquet file.
5. Create dimension table ```dim_uscd``` from combined dataset US Citites Demographics and World Temperature, then write to partitioned by ```state```, ```city```, and ```race```
6. Create two dimension tables dim ```dim_arrivaldate``` from I94 Immigration dataset's ```arrival_date```, then write to parquet file partitioned by year and month.

--------------------------------------------------------------------------------------------

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

0. First, we define the output data path

In [53]:
output_path= '/home/workspace/output_data/'

1. Create **fact_immigration** table from I-94 Immigration dataset

In [54]:
# Rename and re-define columns in the dataset
imda_df_final= imda_df_final.select(
    col('cicid').cast(Int()).alias('cic_id'),
    col('i94yr').cast(Int()).alias('entry_year'),
    col('i94mon').cast(Int()).alias('entry_month'),
    col('i94cit').cast(Int()).alias('origin_country_code'),
    col('i94res').cast(Int()).alias('origin_resident_country_code'),
    col('i94port').alias('port_code'),
    col('arrdate').cast(Int()).alias('arrival_sas_date'),
    col('i94mode').cast(Int()).alias('entry_mode'),
    col('i94addr').alias('address_state_code'),
    col('i94visa').cast(Int()).alias('visa_mode'),
    col('biryear').cast(Int()).alias('birth_year'),
    col('admnum').cast(Int()).alias('addmission_number'),
    col('fltno').cast(Int()).alias('flight_number'),
    col('count').cast(Int()).alias('count'),
    'arrival_date',
    'arrival_year',
    'arrival_month',
    'arrival_day',
    'gender',
    'airline',
    'visatype'
)

fact_immigration_table= imda_df_final.select(
    'cic_id',
    'entry_year',
    'entry_month',
    'port_code',
    'arrival_sas_date',
    'entry_mode',
    'address_state_code',
    'origin_country_code',
    'origin_resident_country_code',
    'visa_mode',
    'count',
    'birth_year',
    'gender',
    'airline',
    'addmission_number',
    'flight_number',
    'visatype'
)

fact_immigration_table.createOrReplaceTempView('fact_immigration_temp')

fact_immigration_table.\
write.\
mode('overwrite').\
partitionBy('entry_year', 'entry_month').\
parquet(path= os.path.join(output_path, 'fact_immigration.parquet'))

--------------------------------------------------------------------------------------------

2. Create **dim_aircode** table from Airport code dataset schema

In [55]:
dim_aircode_table= ac_df_final.filter(col('iso_country') == 'US')
dim_aircode_table.createOrReplaceTempView('dim_aircode_temp')
dim_aircode_table.write.mode('overwrite').parquet(path= os.path.join(output_path, 'dim_aircode.parquet'))

--------------------------------------------------------------------------------------------

3. Create **dim_uscd** table by using US Cities Demographics dataset JOIN World Temperature dataset

In [56]:
# Rename columns in the dataset
uscd_df_final= uscd_df_final.select(
    col('City').alias('city'),
    col('State').alias('state'),
    col('Race').alias('race'),
    col('State Code').alias('state_code'),
    col('Median Age').alias('median_age'),
    col('Male Population').alias('male_population'),
    col('Female Population').alias('female_population'),
    col('Total Population').alias('total_population'),
    col('Number of Veterans').alias('number_veterans'),
    col('Foreign-born').alias('foreign_born'),
    col('Average Household Size').alias('avg_household_size'),
    col('Count').alias('count')
)

dim_uscd_table= uscd_df_final.join(wtd_df_final, uscd_df_final.city == wtd_df_final.us_city, 'left')

dim_uscd_table.createOrReplaceTempView('dim_uscd_temp')

dim_uscd_table.write.\
mode('overwrite').\
partitionBy('state', 'city', 'race').\
parquet(path= os.path.join(output_path, 'dim_uscd.parquet'))

--------------------------------------------------------------------------------------------

4. Create **dim_arrivaldate** tables

In [57]:
dim_arrivaldate= imda_df_final.select(
    'arrival_sas_date',
    'arrival_date',
    'arrival_year',
    'arrival_month',
    'arrival_day'
).distinct()

dim_arrivaldate.createOrReplaceTempView('dim_arrivaldate_temp')

dim_arrivaldate.write.\
mode('overwrite').\
partitionBy('arrival_year', 'arrival_month').\
parquet(path= os.path.join(output_path, 'dim_arrivaldate.parquet'))

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

1. SQL check:  Query an SQL confirm that data model is right formed

* Check all airport that where immigrant stay in California

In [None]:
spark.sql(
    """
    select distinct
        a.name as airport_name,
        count(i.cic_id) as immigrant_number
    from
        dim_aircode_temp as a
    inner join
        fact_immigration_temp as i
    on
        i.address_state_code == a.iso_region
    group by
        a.name
    order by
        immigrant_number
    limit
        5
    """
).show()

* Check the total population and average temperature in each state where immigrant move in

In [None]:
spark.sql(
    """
    select distinct
        i.address_state_code as state code,
        u.state as state,
        u.city as city,
        u.race as race,
        u.total_population as total population,
        u.avg_temperature as average temperature
    from
        fact_immigration_temp as i
    inner join
        dim_uscd_temp as u
    on
        i.address_state_code == u.state_code
    order by
        total population
    """
).show()

2. Check for null in primary key of each table:

In [60]:
def data_qualiity_check(quality_check_list, sqlcontext):
    
    failed_test_case= 0
    
    for sql in quality_check_list:
        query= sql.get('check_sql')
        expected_result= sql.get('expected_result')
        
        records= sqlcontext.sql(f'{query}').first()['count']
        
        if expected_result != records:
            print(f'Failed at query {query}')
            failed_test_case+= 1
        
    if failed_test_case > 0:
        print(f'{failed_test_case} table(s) test failed')
    else:
        print('All tables has passed the data quality check')

In [61]:
sqlcontext= SQLContext(spark)

quality_check_list= [
    {'check_sql': 'select count(*) as count from fact_immigration_temp where cic_id is null', 'expected_result': 0},
    {'check_sql': 'select count(*) as count from dim_aircode_temp where ident is null', 'expected_result': 0},
    {'check_sql': 'select count(*) as count from dim_uscd_temp where (city is null and state is null and race is null)', 'expected_result': 0},
    {'check_sql': 'select count(*) as count from dim_arrivaldate_temp where arrival_sas_date is null', 'expected_result': 0}
]

data_qualiity_check(quality_check_list, sqlcontext)

All tables has passed the data quality check


#### 4.3 Data dictionary 
1. fact_immigration
    * ```cic_id```: entry id recorded immigrate into the US, stored as **Primary Key**
    * ```entry_year```: entry year recorded immigrate into the US
    * ```entry_month```: entry month recorded immigrate into the US
    * ```port_code```: entry_port_code
    * ```arrival_sas_date```: arrival data into the US, SAS format, stored as **Foreign Key**, link to ```dim_arrivaldate```
    * ```entry_mode```: entry type when immigrate into the US (1 for Air, 2 for Sea, 3 for Land and 9 for Not Reported)
    * ```address_state_code```: Immigrant address state code after immigrated into the US, stored as **Foreign Key**, link to ```dim_aircode``` and ```dim_uscd```
    * ```origin_country_code```: Immigrant original country
    * ```origin_resident_country_code```: Immigrant orginal residental country
    * ```visa_mode```: Immigrant visa mode (1 for Business, 2 for Pleasure, and 3 for Student)
    * ```count```: Immigrant number for one record
    * ```birth_year```: Immigrant birth year (who fill the record)
    * ```gender```: Immigrant gender (who fill the record)
    * ```airline```: Airline that immigrant used when immigrate into the US
    * ```addmission_number```: Record Admission Number
    * ```flight_number```: Flight number of the Airline
    * ```visatype```: Class of admission legally admitting the non-immigrant to temporarily stay in US
    
2. Dimension tables:
* dim_aircode
    * ```ident```: Airport id, stored as **Primary Key**
    * ```type```: Airport type
    * ```name```: Airport name
    * ```elevation_ft```: Airport elevation
    * ```iso_country```: Country that Airport located, ISO format 
    * ```iso_region```: Region (depend on country) that Airport located, ISO format, stored as **Foreign Key**, link to ```fact_immigration```
    * ```municipality```: City, Town (depend on country, region) that Airport located
    * ```coordinates```:  Airport coordinate
    
* dim_uscd
    * ```state```: US state, stored as **Composite Key**
    * ```city```: US city, stored as **Composite Key**
    * ```race```: people race, stored as **Composite Key**
    * ```state_code```: US state code, stored as **Foreign Key**, link to ```fact_immigration```
    * ```median_age```: Population median age
    * ```male_population```: Population of male
    * ```female_population```: Population of female
    * ```total_population```: Population for total gender
    * ```number_veterans```:  Number of veterans in the population
    * ```foreign_born```: Number of people not bornt in US
    * ```avg_household_size```: Average Household size
    * ```count```: Count of household
    * ```avg_temperature```: Average temperature for each city, based on World Temperature dataset
    * ```avg_temperature_uncertainty```: Average temperature uncertainty for each city, based on World Temperature dataset
    * ```latitude```: City latitude
    * ```longitude```: City longitude
    
* dim_arrivaldate
    * ```arrival_sas_date```: Arrival date, SAS format, stored as **Primary Key**
    * ```arrival_date```: Arrival date, Gregorian format
    * ```arrival_year```: Arrival year
    * ```arrival_month```: Arrival month
    * ```arrival_day```: Arrival day

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    * For this project, we use **Apache Spark** since it suitable for batch processing and also with spark we can deal with large amount of data.
    * Also, for the data model, we have designed the model as the **Star schema model**. The reasion behind this choice is that Star schema is the fundamental schema among the data mart schema and it is simplest. This schema is widely used to develop or build a data warehouse and dimensional data marts. It includes one or more fact tables indexing any number of dimensional tables. So that, not only the Data Engineer, but for Business Analyst or Data Scienctis, who is the user of the output of the ETL process, can catchup everything going on and easily comprehend the situation just only by looking at the data model.
    
* Propose how often the data should be updated and why.
    * For this data, as fact table is immigration data, it is considered to update the data monthly since it is not necessary to see a day-by-day arrival/departure immigrants. This one-month update step will help the server can get a enough amount of data for each dataset to process.
    * One more crucial factor is that when consider run an update any data, we must consider the update schedule of the source data, based on this article [here](https://www.trade.gov/i-94-arrivals-program), the source I94 Immigration data is update monthly, hence it is reccomended to update the data monthly
    * Still, depend on the requirement of end-user, the update step is considered in order to fullfil both technical requirement and end-user requirement.
    
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * We will consider to upload our pipeline to a better infrastructure, such as Amazon Web Service. The AWS infrastructure with S3 service will handle a vast amount of data in the case of our data get bigger. Redshift will create a cluster machine that can speed up the data processing.
     
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * To update the data automatically, we will consider 2 options: AWS service Event Bridge or Apache Airflow. 
         * Event Bridge keep our update service always on time by trigger an update function. An update function can be a Python script for updating the data that deployed to a Lambda function or a Elastic Container Service (ECS).
         * Apache Airflow is an service from Apache that automate an data pipeline through a Direct Acyclic Graph.
     
 * The database needed to be accessed by 100+ people.
     * With AWS, we can set an amount (can be over 100) user and its permission to any service by IAM service.