# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, split,unix_timestamp, to_date
import os, time
from  pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import datetime



In [2]:
#Create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
-   The project will use the data and template provided by Udacity. The data will be mainly I94 immigration data, U.S. City Demographic Data and Airport Code data.The purpose of the project is to create a data model based on the provided data set, so we can quickly query, slicing and dicing the data using the proposed data model.After the ETL process the fact table will also be saved and partitioned in Parquet format for fast read. Main tool for this project will be Pyspark.



#### Describe and Gather Data
-   **I94 Immigration Data:**  This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.  [This](https://travel.trade.gov/research/reports/i94/historical/2016.html)  is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project. 
    

   
  
-   **U.S. City Demographic Data:**  This data comes from OpenSoft. You can read more about it  [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
    Columns details
    
-   **Airport Code Table:**  This is a simple table of airport codes and corresponding cities. It comes from  [here](https://datahub.io/core/airport-codes#data).
    Columns details


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


checking duplicate data (No duplicate data found in dataset)

In [None]:
# check duplicates for airpot data
fn = 'airport-codes_csv.csv'
df_airport_check = spark.read.format('csv').options(header='true', inferSchema='true').load(fn)

if df_airport_check.count() > df_airport_check.dropDuplicates(df_airport_check.columns).count():
    raise ValueError('Data has duplicates')

In [86]:
# check duplicates for demographic data
fn_demographics = 'us-cities-demographics.csv'
df_us_demographics_check= spark.read.csv(fn_demographics, header='true', sep=";")


if df_us_demographics_check.count() > df_us_demographics_check.dropDuplicates(df_us_demographics_check.columns).count():
    raise ValueError('Data has duplicates')

In [82]:
# check duplicates for immi data
df_spark_immi_check =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
if df_spark_immi_check.count() > df_spark_immi_check.dropDuplicates(df_spark_immi_check.columns).count():
    raise ValueError('Data has duplicates')

missing values check

In [85]:
#checking missing value in airport data

#https://stackoverflow.com/questions/44413132/count-the-number-of-missing-values-in-a-dataframe-spark
from pyspark.sql.functions import lit

rows = df_airport_check.count()
summary = df_airport_check.describe().filter(col("summary") == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in df_airport_check.columns)).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|  0.0| 0.0| 0.0|      7006.0|      0.0|        0.0|       0.0|      5676.0| 14045.0|  45886.0|   26389.0|        0.0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+



In [87]:
#checking missing value in demographic data
rows = df_us_demographics_check.count()
summary = df_us_demographics_check.describe().filter(col("summary") == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in df_us_demographics_check.columns)).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
| 0.0|  0.0|       0.0|            3.0|              3.0|             0.0|              13.0|        13.0|                  16.0|       0.0| 0.0|  0.0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [88]:
#checking missing value in immi data
# no missing value found  i94yr, i94mon, i94cit, i94res, i94port, aardate, i94visa, these columns can be used as forein key to join the dim tables

rows = df_spark_immi_check.count()
summary = df_spark_immi_check.describe().filter(col("summary") == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in df_spark_immi_check.columns)).show()

+-----+-----+------+------+------+-------+-------+-------+--------+--------+------+-------+-----+--------+---------+---------+-------+--------+---------+--------+-------+-------+--------+---------+-------+------+-------+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode| i94addr| depdate|i94bir|i94visa|count|dtadfile| visapost|    occup|entdepa| entdepd|  entdepu| matflag|biryear|dtaddto|  gender|   insnum|airline|admnum|  fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+--------+--------+------+-------+-----+--------+---------+---------+-------+--------+---------+--------+-------+-------+--------+---------+-------+------+-------+--------+
|  0.0|  0.0|   0.0|   0.0|   0.0|    0.0|    0.0|  239.0|152592.0|142457.0| 802.0|    0.0|  0.0|     1.0|1881250.0|3088187.0|  238.0|138429.0|3095921.0|138429.0|  802.0|  477.0|414269.0|2982605.0|83627.0|   0.0|19549.0|     0.0|
+-----+-----+------+------+------+-------+-------+-------+--------+--------+----

#### Cleaning Steps
Document steps necessary to clean the data

##### data extracted from I94_SAS_Labels_Descriptions.SAS saved in label_mapping for dimension tables

In [89]:
#get us_state code dimension and transformed to spark dataframe
fn = '/home/workspace/label_mapping/i94addrl.txt'
df_us_state = pd.read_csv(fn , sep="=", header=None, engine='python',  names = ["state_code", "state"], skipinitialspace = False)
df_us_state['state_code'] = df_us_state['state_code'].str.replace('[^a-zA-Z]', '')
df_us_state['state'] = df_us_state['state'].str.replace('[^a-zA-Z]', '')
df_us_state_cleaned = df_us_state
df_us_state_spark = spark.createDataFrame(df_us_state_cleaned)

In [90]:
# get country code and transformed
fn ='/home/workspace/label_mapping/i94cntyl.txt'
df_country_code = pd.read_csv(fn , sep="=", header=None, engine='python',  names = ["country_code", "country"], skipinitialspace = False)
df_country_code['country'] = df_country_code['country'].str.strip("\' \n\t")
# replace start with INVALID, Collapsed, No Country Code with other
pattern = '|'.join(['INVALID.*', 'Collapsed.*', 'No Country Code.*'])
df_country_code['country']= df_country_code['country'].str.replace(pattern, 'others', regex = True)
df_country_code_cleaned = df_country_code
df_country_code_spark = spark.createDataFrame(df_country_code_cleaned)

In [96]:
# get arrival mode dimension and transformed
fn = '/home/workspace/label_mapping/i94model.txt'
df_mode = pd.read_csv(fn , sep="=", header=None, engine='python',  names = ["arrival_code", "arrival_mode"], skipinitialspace = False)
df_mode["arrival_mode"]= df_mode["arrival_mode"].str.strip("\' \n\t")
df_mode_cleaned = df_mode
df_mode_spark = spark.createDataFrame(df_mode_cleaned)

In [98]:
#get port info, after processing the data contains non US ports info, but we can use the 2 digit state code for US state
fn = '/home/workspace/label_mapping/i94prtl.txt'
df_port = pd.read_csv(fn , sep="=", header=None, engine='python',  names = ["port_code", "port_name"], skipinitialspace = False)
df_port["port_code"]= df_port["port_code"].str.strip("\' \n\t")
df_port["port_name"]= df_port["port_name"].str.strip("\' \n\t")
#only with records with comma sperator in port_name column
df_port_valid=df_port[df_port["port_name"].str.contains(",",regex=True)==True]
#split port_name as port_name to get port_name and state name
tmp= df_port_valid["port_name"].str.rsplit(",", n=1, expand=True)
#update the state code
df_port_valid['state']=tmp[1].str.strip()
#update the port name
df_port_valid["port_name"]=tmp[0].str.strip()
df_port_cleaned = df_port_valid
df_port_spark = spark.createDataFrame(df_port_cleaned)

In [99]:
# df_visa get visa dimension and transform data
fn = '/home/workspace/label_mapping/I94VISA.txt' 
df_visa = pd.read_csv(fn , sep="=", header=None, engine='python',  names = ["visa_code", "visa_type"], skipinitialspace = False)
df_visa_cleaned = df_visa
df_visa_spark =  spark.createDataFrame(df_visa_cleaned)

In [100]:
# Get immi info prepare for fact table and transform data
# reading april 16 IMMI data in SAS format
start_time = time.time()
df_spark_immi =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# change arrdate to date format
format_date = udf(lambda x: (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat() if x else None)
df_spark_immi = df_spark_immi.withColumn("arrdate", format_date(df_spark_immi.arrdate))
df_spark_immi=df_spark_immi.withColumn('arrdate_in_dateFormat',to_date(unix_timestamp(col('arrdate'), 'yyyy-MM-dd').cast("timestamp")))
print("it took", time.time() - start_time, "to run")

it took 0.08245205879211426 to run


In [102]:
# Read us-cities-demographics and transforming
demographicsSchema = StructType([
                            StructField("city",StringType()),
                            StructField("state",StringType()),
                            StructField("median_age",DoubleType()),
                            StructField("male_population",StringType()),
                            StructField("female_population",StringType()),
                            StructField("total_population",IntegerType()),
                            StructField("number_of_veterans",IntegerType()),
                            StructField("number_of_foreign_born",IntegerType()),
                            StructField("average_household_size",DoubleType()),
                            StructField("state_code",StringType()),
                            StructField("race",StringType()),
                            StructField("count",IntegerType()) 
                            ])

fn = 'us-cities-demographics.csv'
df_us_demographics_spark= spark.read.csv(fn, header='true', sep=";", schema=demographicsSchema)



In [103]:
# read airport-codes file and transform
fn = 'airport-codes_csv.csv'
df_airport = spark.read.format('csv').options(header='true', inferSchema='true').load(fn)
# change keep us airport info only, transform corodiates to latitutued and longittued iso

df_airport_spark = df_airport.filter(df_airport['iso_country'] == 'US')\
          .withColumn('state', split(df_airport['iso_region'], "-").getItem(1))\
          .withColumn('latitude', split(df_airport['coordinates'], " ").getItem(0).cast(DoubleType()))\
          .withColumn('longitude', split(df_airport['coordinates'], " ").getItem(1).cast(DoubleType()))\
          .drop('iso_region')\
          .drop('coordinates')


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

* Fact table will be based on I94 Immigration Data because it contains the travellor records
* Dim table will be based on I94 Immigration data i94yr, i94mon, i94cit, i94res, i94port, i94visa,aardate(this will be used to partition the data when saved in parquet format)
* Dim table will also be based on us-cities-demographics and airport-codes




Proposed data model will be shown as below

<a href="https://ibb.co/twGtZnH"><img src="https://i.ibb.co/4wqGsCF/datamodel.jpg" alt="datamodel" border="0"></a><br /><a target='_blank' href='https://imgbb.com/'>image uploader</a><br />



#### 3.2 Mapping Out Data Pipelines


* Prepare txt data based on I94_SAS_Labels_Descriptions.SAS saved in label_mapping folder, create dim tables based on them
* Create Dim_us_demographcis table based on us-cities-demographics.csv 
* Create Dim_airport table based on airport-codes_csv.csv
* Create Fact_immi table based on ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat data and dim_mode,dim_us_state and dim_mod




### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [104]:
# using Pyspark sql to create data model
df_us_state_spark.createOrReplaceTempView('df_us_state_spark')
df_country_code_spark.createOrReplaceTempView('df_country_code_spark')
df_mode_spark.createOrReplaceTempView('df_mode_spark')
df_spark_immi.createOrReplaceTempView('df_spark_immi')
df_port_spark.createOrReplaceTempView('df_port_spark')
df_us_demographics_spark.createOrReplaceTempView('df_us_demographics_spark')
df_airport_spark.createOrReplaceTempView('df_airport_spark')
df_visa_spark.createOrReplaceTempView('df_visa_spark')




In [49]:
### create temp table 
df_spark_immi_temp = spark.sql("""
                                        select 
                                                i.cicid,
                                                i.arrdate_in_dateFormat as arrival_date,
                                                i.i94yr as year,
                                                i.i94mon as month,
                                                i.i94cit as birth_country,
                                                i.i94res as residence_country,
                                                i.i94bir as repondent_age,
                                                i.biryear as birth_year,
                                                i.i94addr as state_code,
                                                i.i94mode as arrival_code,
                                                i.i94port as port,
                                                i.i94visa as visa_code,
                                                i.visatype as visa_type
                                            from df_spark_immi i """)


In [113]:
#create dim tables
Dim_us_state = spark.sql("""SELECT * FROM df_us_state_spark""")
Dim_country_code = spark.sql("""SELECT * FROM df_country_code_spark """)
Dim_mode = spark.sql("""SELECT * FROM  df_mode_spark""")
Dim_port = spark.sql("""SELECT * FROM df_port_spark""")
Dim_us_demographics =("""SELECT * FROM df_us_demographics_spark """)
Dim_airport = ("""SELECT * FROM df_airport_spark""")
Dim_visa_code =("""SELECT * FROM df_visa_spark""")

In [132]:
# create fact table
df_spark_immi_temp.createOrReplaceTempView('df_spark_immi_temp')
dim_us_state.createOrReplaceTempView('dim_us_state')
dim_mode.createOrReplaceTempView('dim_mode')

fact_immi = spark.sql("""SELECT                   t.cicid,
                                                t.arrival_date,
                                                t.year,
                                                t.month,
                                                t.birth_country,
                                                t.residence_country,
                                                t.repondent_age,
                                                t.birth_year,
                                                t.state_code,
                                                t.arrival_code,
                                                t.port,
                                                t.visa_code,
                                                t.visa_type,
                                                coalesce(m.arrival_mode, 'Not reported') as arrival_mode,
                                                coalesce(s.state_code, '99') as state
                                               
                                                
                                              

FROM df_spark_immi_temp t
left join dim_us_state S on t.state_code=s.state_code
left join dim_mode m on t.arrival_code = m.arrival_code

""")

Save result with partition as parquet format

In [133]:
#write result in  parquet formt to sas_data folder
fact_immi.write.mode("overwrite").parquet("sas_data")

In [163]:
# write data to parquet and partition by year and and month and state
fact_immi.write.mode("overwrite").partitionBy("year", "month", "state").parquet("partitioned_result")


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [147]:
# check the immi data records are not missing after fact table created
df_spark_immi_then =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark_immi_now = spark.read.parquet("sas_data")
assert(df_spark_immi_then.count() == df_spark_immi_now.count())

In [127]:
# Data quality check to validate the fact and dim tables join working, country name successfully loaded
fact_immi.createOrReplaceTempView('fact_immi')
Dim_country_code.createOrReplaceTempView('Dim_country_code')
result_country =spark.sql("""
select i.*, c.country from fact_immi i left join Dim_country_code c
on i.birth_country = c.country_code""")
result_country.show(3)

+---------+------------+------+-----+-------------+-----------------+-------------+----------+----------+------------+----+---------+---------+------------+-----+---------+--------+
|    cicid|arrival_date|  year|month|birth_country|residence_country|repondent_age|birth_year|state_code|arrival_code|port|visa_code|visa_type|arrival_mode|state|visa_code| country|
+---------+------------+------+-----+-------------+-----------------+-------------+----------+----------+------------+----+---------+---------+------------+-----+---------+--------+
| 881270.0|  2016-04-05|2016.0|  4.0|        299.0|            299.0|         34.0|    1982.0|        AZ|         1.0| SFR|      1.0|       B1|         Air|   AZ|        1|MONGOLIA|
|1048471.0|  2016-04-06|2016.0|  4.0|        299.0|            299.0|         51.0|    1965.0|        AZ|         1.0| SFR|      1.0|       B1|         Air|   AZ|        1|MONGOLIA|
|1048473.0|  2016-04-06|2016.0|  4.0|        299.0|            299.0|         45.0|    197

In [161]:
#Data quality check for fact table to ensure arrival_date is correct
assert(spark.sql("select arrival_date from fact_immi where dayofweek(arrival_date)<1 or dayofweek(arrival_date)>7").count() == 0)


#### 4.3 Data dictionary 
Data dictionary for data model has been saved as datadiconary.txt

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

  <em>Pyspark is chosen because the for one month of data it is about 6GB. Apache Spark is a popular open source framework that ensures data processing with lightning speed and supports various   languages like Python </em>


* Propose how often the data should be updated and why.

  <em>Data should be updated at least daily if not hourly, because immi visitor data are part of the critical inforamtion for country's border secutiry management</em>


* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
 <em>When data was increased by 100x big data solution is required cloud solution, GCP cloud storage, AWS S3 bucket could be the good candiates. </em>
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 <em>This can be achieved through combintion of Apache airflow and AWS quicksight or GCP's data studio or third party visualisation software </em>
 
 * The database needed to be accessed by 100+ people.
 <em>AWS redshift or GCP big Query can provide 100+ people access 

In [162]:
%ls -lh ../../data/18-83510-I94-Data-2016/

total 6.0G
-rw-r--r-- 1 root root 451M May 31  2018 i94_apr16_sub.sas7bdat
-rw-r--r-- 1 root root 597M May 31  2018 i94_aug16_sub.sas7bdat
-rw-r--r-- 1 root root 500M May 31  2018 i94_dec16_sub.sas7bdat
-rw-r--r-- 1 root root 374M May 31  2018 i94_feb16_sub.sas7bdat
-rw-r--r-- 1 root root 415M May 31  2018 i94_jan16_sub.sas7bdat
-rw-r--r-- 1 root root 620M May 31  2018 i94_jul16_sub.sas7bdat
-rw-r--r-- 1 root root 684M May 31  2018 i94_jun16_sub.sas7bdat
-rw-r--r-- 1 root root 459M May 31  2018 i94_mar16_sub.sas7bdat
-rw-r--r-- 1 root root 501M May 31  2018 i94_may16_sub.sas7bdat
-rw-r--r-- 1 root root 424M May 31  2018 i94_nov16_sub.sas7bdat
-rw-r--r-- 1 root root 531M May 31  2018 i94_oct16_sub.sas7bdat
-rw-r--r-- 1 root root 543M May 31  2018 i94_sep16_sub.sas7bdat
