# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,regexp_replace,split

### Step 1: Scope the Project and Gather Data

#### Scope 

In this project, we'll use Spark and data lakes to build an ETL pipeline. To complete the project, we will need to load immigration data and temperature data, process the data into analytics tables using Spark, and save tables. 

#### Describe and Gather Data 

* I94 immigration Data:  This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. 

* Temperature Data: This dataset came from Kaggle. You can read more about it https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data 


In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_imm =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_temp =spark.read.option("header",True).csv('../../data2/GlobalLandTemperaturesByCity.csv')

In [3]:
df_imm.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
df_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [5]:
df_temp=df_temp.filter(col('AverageTemperature') != "None")
df_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1744-04-01,5.787999999999999,3.624,Århus,Denmark,57.05N,10.33E
2,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
3,1744-06-01,14.050999999999998,1.347,Århus,Denmark,57.05N,10.33E
4,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E


In [6]:
df_i94port = spark.read.option("header",False).option("delimiter", "\t").csv("i94port.txt")
df_i94port=df_i94port.withColumn('_c0', regexp_replace('_c0', "'", "")).withColumn('_c0', regexp_replace('_c0', " ", ""))
df_i94port=df_i94port.withColumn('_c2', regexp_replace('_c2', "'", ""))
i94port = list(df_i94port.select('_c0').toPandas()['_c0'])
df_i94port.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2
0,ALC,=,"ALCAN, AK"
1,ANC,=,"ANCHORAGE, AK"
2,BAR,=,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,=,"DALTONS CACHE, AK"
4,PIZ,=,"DEW STATION PT LAY DEW, AK"


In [7]:
df_i94port =df_i94port.withColumnRenamed("_c0", "port_code")
df_i94port=df_i94port.withColumn("port_city", split('_c2', "\\,").getItem(0)).withColumn("state", split('_c2', "\\,").getItem(1))
df_i94port=df_i94port.drop('_c1').drop('_c2')
df_i94port.limit(5).toPandas()

Unnamed: 0,port_code,port_city,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


In [8]:
df_temp=df_temp.join(df_i94port,df_temp.City==df_i94port.port_city,"inner")
df_temp=df_temp.drop('port_city')
df_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,port_code,state
0,1843-01-01,18.874,2.017,Abu Dhabi,United Arab Emirates,24.92N,54.98E,MAA,
1,1843-02-01,19.993,1.79,Abu Dhabi,United Arab Emirates,24.92N,54.98E,MAA,
2,1843-03-01,21.656,1.912,Abu Dhabi,United Arab Emirates,24.92N,54.98E,MAA,
3,1843-04-01,25.205,1.853,Abu Dhabi,United Arab Emirates,24.92N,54.98E,MAA,
4,1843-05-01,28.203000000000003,1.551,Abu Dhabi,United Arab Emirates,24.92N,54.98E,MAA,


In [9]:
df_imm=df_imm.filter(df_imm.i94port.isin(i94port))
df_imm.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296,F1
1,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93,B2
2,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199,B2
3,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199,B2
4,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602,B1


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

In this project, we use star schema data model, which contains one fact tables and two dimension tables. The fact table is i94_fact, which contains I94 related information for each record. This table is the main table that can be used for immigration analysis and can be connected to other tables for a futher insight.Temperature table is dimension table. It provides the average temperature by port code and country, which can be used to analyze the relatinoship between the temperature and I94 records.Port_code table is dimension table. It can be used to provide port information for I94 records. 

I94 fact table. 
* cicid
* i94yr
* i94mon
* i94cit
* i94res
* i94port
* i94addr
* arrdate
* depdate 
* visatype

Temperature dimension table 

* dt
* AverageTemperature
* City
* Country
* port_code

Port_code dimension table 

* port_code
* port_city
* state
#### 3.2 Mapping Out Data Pipelines

    * Data Loading 
    * Clean up Data 
    * Create Table and insert data into table
    * Data Validation
    * Save table 


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [10]:
# Write code here
i94_table=df_imm.select('cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port','i94addr', 'arrdate', 'depdate', 'visatype').dropDuplicates()

In [11]:
temperature_table=df_temp.select('dt','AverageTemperature','Country','port_code').dropDuplicates()

In [12]:
port_code_table = df_i94port

In [13]:
i94_table.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,i94addr,arrdate,depdate,visatype
0,174.0,2016.0,4.0,103.0,103.0,WAS,FL,20545.0,,WT
1,543.0,2016.0,4.0,103.0,103.0,SEA,AK,20545.0,20556.0,WB
2,567.0,2016.0,4.0,103.0,103.0,SFR,CA,20545.0,20558.0,WT
3,590.0,2016.0,4.0,103.0,103.0,TAM,FL,20545.0,20574.0,WT
4,790.0,2016.0,4.0,104.0,104.0,ATL,FL,20545.0,20559.0,WT


In [14]:
temperature_table.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,Country,port_code
0,1848-07-01,33.42,United Arab Emirates,MAA
1,1866-05-01,28.935,United Arab Emirates,MAA
2,1912-11-01,23.519,United Arab Emirates,MAA
3,1920-08-01,33.749,United Arab Emirates,MAA
4,1956-06-01,31.724,United Arab Emirates,MAA


In [15]:
port_code_table.limit(5).toPandas()

Unnamed: 0,port_code,port_city,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


#### 4.2 Data Quality Checks
 
Run Quality Checks

In [16]:
# Perform quality checks here
table = [i94_table,temperature_table,port_code_table]
for i in table: 
    cnt=i.count()
    if cnt < 1:
        raise ValueError(f'Table {i} is empty.')
    print (f"{i} is passed ")

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, i94addr: string, arrdate: double, depdate: double, visatype: string] is passed 
DataFrame[dt: string, AverageTemperature: string, Country: string, port_code: string] is passed 
DataFrame[port_code: string, port_city: string, state: string] is passed 


In [17]:
#Save table 
i94_table.write.parquet(os.path.join('./', 'i94.parquet'), 'overwrite')
temperature_table.write.parquet(os.path.join('./', 'temperature.parquet'), 'overwrite')
port_code_table.write.parquet(os.path.join('./', 'port_code.parquet'), 'overwrite')

#### 4.3 Data dictionary 

##### i94_fact  

* cicid: double 
* i94yr: double 
* i94mon: double
* i94cit: double
* i94res: double
* i94port: string 
* i94addr: string 
* arrdate: double 
* depdate: double 
* visatype: string

##### temperature_dim 

* dt: string
* AverageTemperature: string
* Country: string 
* port_code: string

##### port_code_dim
* port_code: string
* port_city: string
* state: string

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    * Spark is used in this porject to load data and clean data from different data format such as SAS, CSV, etc . It is very easy to process the data and very efficient. However, we can run this code on AWS with EMR and Apache Airflow. AWS can process data very efficently, and Airflow can be set up for schedeling the process.  
* Propose how often the data should be updated and why.
    * The data should be updated monthly beacuse I94 immaigration data is updated monthly.
* Write a description of how you would approach the problem differently under the following scenarios:
 * When the data was increased by 100x, do you store the data in the same way? If your project is heavy on reading over writing, how do you store the data in a way to meet this requirement? What if the requirement is heavy on writing instead?
     * I would use Spark with EMR to process the data by distributed way. For the EMR, I can use one master node and three core nodes, which  will be faster than one node. The data will be stored in Postgres database. If the data is big, we can use Cassandra by NoSQL database to write data. 
 * How do you run this pipeline on a daily basis by 7 am every day. What if the dag fails, how do you update the dashboard? Will the dashboard still work? Or will you populate the dashboard by using the last day?
     * I will use Airflow to trigger DAG update as set up. If the DAG does not work, the email will reminder me to check the progrcess and fix the problem. 
 * How do you make your database could be accessed by 100+ people? Can you come up with a more cost-effective approach? Does your project need to support 100+ connections at the same time?
     * I will use AWS Redshift if many people need to access the database. The Redshfit can be set scaling as auto, which can allow many peope to access the database. For the cost-effective, I can choose the cheapest plan. In my project, it did not support many connections at the same time. 