# Project Title
### Data Engineering Capstone Project

#### Project Summary

* Data is money and in a day with thousand and thousand of data to process, there is some tool that will suit perfectly with a large amount of data such as [spark](https://spark.apache.org/) and [hadoop](https://hadoop.apache.org/) in this project i will be using Spark to transform [I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office) and [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) to [star schema](https://searchdatamanagement.techtarget.com/definition/star-schema) and perform the nasscary step To be ready to query multiple analytics questions.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from functools import reduce
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DateType, StringType

### Step 1: Scope the Project and Gather Data

#### Scope 
* Dataset scope will be in two different datasets with more than one million records, and then create fact and dimension table to be able to analyze the data and find insigit by using spark to help process a large amount of data.
#### Describe and Gather Data 
* `I94 Immigration Data`: This data comes from the US National Tourism and Trade Office This is where the data comes from [here](https://travel.trade.gov/research/reports/i94/historical/2016.html).
* `World Temperature Data`: This dataset came from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) contain monthly average temperature for different cities

In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [5]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df2 = pd.read_csv(fname)

In [6]:
df2.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()



In [3]:

imm_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
imm_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### Step 2: Explore and Assess the Data
#### Explore the Data 
* Exploring the data set using pyspark to find any duplicated or null recored to clean them
#### Cleaning Steps

* Remove any duplicated data.
* Change the column name to clearer names.
* Transform SAS date to datetime.


#### Immigration Data

In [5]:
# Performing cleaning tasks here
fact_imm = imm_df.select(['cicid','i94yr','arrdate','depdate','i94mode','fltno']).dropDuplicates()

# fact_imm.show(5)

In [6]:
fact_imm.count()

3096313

In [7]:
fact_imm.schema.names

['cicid', 'i94yr', 'arrdate', 'depdate', 'i94mode', 'fltno']

In [6]:
# overwrite column name 
old_column = fact_imm.schema.names
new_columns = ['id', 'year', 'arrival_date', 'departure_date', 'mode','fltno']

fact_imm = reduce(lambda fact_imm, idx: fact_imm.withColumnRenamed(old_column[idx], new_columns[idx]), range(len(old_column)), fact_imm)


fact_imm.printSchema()

root
 |-- id: double (nullable = true)
 |-- year: double (nullable = true)
 |-- arrival_date: double (nullable = true)
 |-- departure_date: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- fltno: string (nullable = true)



In [7]:
# Transform SAS date to datetime.
def convert_SAS_to_date(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
    
convert_SAS_to_date_udf = udf(convert_SAS_to_date, DateType())
fact_imm = fact_imm.withColumn('arrival_date', convert_SAS_to_date_udf(col('arrival_date')))
fact_imm = fact_imm.withColumn('departure_date', convert_SAS_to_date_udf(col('departure_date')))

In [10]:
fact_imm.show(5)

+-----+------+------------+--------------+----+-----+
|   id|  year|arrival_date|departure_date|mode|fltno|
+-----+------+------------+--------------+----+-----+
|314.0|2016.0|  2016-04-01|    2016-04-11| 1.0|00275|
|425.0|2016.0|  2016-04-01|    2016-04-02| 1.0|00097|
|496.0|2016.0|  2016-04-01|    2016-04-04| 1.0|00065|
|703.0|2016.0|  2016-04-01|    2016-06-27| 1.0|00246|
|881.0|2016.0|  2016-04-01|    2016-04-09| 1.0|02067|
+-----+------+------------+--------------+----+-----+
only showing top 5 rows



In [8]:
dim_immigrant = imm_df.select(['cicid','i94cit','i94res','gender','biryear']).dropDuplicates()

dim_immigrant.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- biryear: double (nullable = true)



In [10]:
dim_immigrant.schema.names

['cicid', 'i94cit', 'i94res', 'gender', 'biryear']

In [9]:
# change column name
old_column = dim_immigrant.schema.names
new_columns = ['id', 'citizen', 'residence', 'gender', 'year_of_birth']

dim_immigrant = reduce(lambda dim_immigrant, idx: dim_immigrant.withColumnRenamed(old_column[idx], new_columns[idx]), range(len(old_column)), dim_immigrant)


dim_immigrant.printSchema()

root
 |-- id: double (nullable = true)
 |-- citizen: double (nullable = true)
 |-- residence: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- year_of_birth: double (nullable = true)



In [14]:
dim_immigrant.show(5)

+-----+-------+---------+------+-------------+
|   id|citizen|residence|gender|year_of_birth|
+-----+-------+---------+------+-------------+
| 16.0|  101.0|    101.0|  null|       1988.0|
| 84.0|  103.0|    103.0|     M|       1994.0|
|536.0|  103.0|    103.0|     M|       1956.0|
|670.0|  103.0|    124.0|     M|       1979.0|
|681.0|  103.0|    112.0|     F|       1955.0|
+-----+-------+---------+------+-------------+
only showing top 5 rows



In [10]:
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()

In [11]:
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

In [12]:
i94cit_res = code_mapper(f_content, "i94cntyl")
# i94cit_res

In [13]:
country_code = pd.DataFrame(list(i94cit_res.items()), columns=['code', 'country'])
dim_country= spark.createDataFrame(country_code)

In [15]:
dim_country.show(5)

+----+--------------------+
|code|             country|
+----+--------------------+
| 582|MEXICO Air Sea, a...|
| 236|         AFGHANISTAN|
| 101|             ALBANIA|
| 316|             ALGERIA|
| 102|             ANDORRA|
+----+--------------------+
only showing top 5 rows



In [14]:
dim_airline = imm_df.select(['fltno','airline']).dropDuplicates()

In [15]:
dim_airline.show(5)

+-----+-------+
|fltno|airline|
+-----+-------+
|00446|     LH|
|00623|     FI|
|00229|     UP|
|00008|     AV|
|04400|     YX|
+-----+-------+
only showing top 5 rows



In [20]:
dim_airline.count()

11942

#### Temperture

In [15]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
dim_temp = spark.read.option("header", True).csv(fname)


dim_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [16]:
# remove record that contail null value in both AverageTemperature and AverageTemperatureUncertainty

dim_temp = dim_temp.filter(dim_temp.AverageTemperature.isNotNull() & dim_temp.AverageTemperatureUncertainty.isNotNull())

dim_temp = dim_temp.select(['dt','AverageTemperature','AverageTemperatureUncertainty','City','Country','Latitude', 'Longitude']).dropDuplicates()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
* Star schema are work optimally for data warehouses, BI use and OLAP in that help our goal. 
#### 3.2 Mapping Out Data Pipelines
* 1. Extract the data from the provided sources above.
* 2. Clean and preprocess the data.
    * Remove any duplicated data.
    * Change the column name to clearer names.
    * Transform SAS date to datetime.
    * remove null in temperture if both AverageTemperature and AverageTemperatureUncertainty Null.
* 3. Transform imm_df to fact table and dimensional.
* 4. Transform temp_df to dimensional table.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model


In [24]:
# Write code here
fact_imm.show(5)

+-----+------+------------+--------------+----+-----+
|   id|  year|arrival_date|departure_date|mode|fltno|
+-----+------+------------+--------------+----+-----+
|314.0|2016.0|  2016-04-01|    2016-04-11| 1.0|00275|
|425.0|2016.0|  2016-04-01|    2016-04-02| 1.0|00097|
|496.0|2016.0|  2016-04-01|    2016-04-04| 1.0|00065|
|703.0|2016.0|  2016-04-01|    2016-06-27| 1.0|00246|
|881.0|2016.0|  2016-04-01|    2016-04-09| 1.0|02067|
+-----+------+------------+--------------+----+-----+
only showing top 5 rows



In [19]:
dim_immigrant.count()

3096313

In [47]:
dim_immigrant.show()

+------+-------+---------+------+-------------+
|    id|citizen|residence|gender|year_of_birth|
+------+-------+---------+------+-------------+
|  16.0|  101.0|    101.0|  null|       1988.0|
|  84.0|  103.0|    103.0|     M|       1994.0|
| 536.0|  103.0|    103.0|     M|       1956.0|
| 670.0|  103.0|    124.0|     M|       1979.0|
| 681.0|  103.0|    112.0|     F|       1955.0|
|1139.0|  104.0|    104.0|     M|       2012.0|
|1633.0|  104.0|    104.0|     M|       1958.0|
|2096.0|  105.0|    105.0|     F|       1968.0|
|2344.0|  107.0|    107.0|     F|       2010.0|
|2531.0|  107.0|    107.0|     M|       1983.0|
|2615.0|  107.0|    107.0|     M|       1990.0|
|2913.0|  108.0|    108.0|  null|       1966.0|
|2930.0|  108.0|    108.0|     M|       1969.0|
|3378.0|  108.0|    108.0|     M|       1967.0|
|3425.0|  108.0|    108.0|  null|       1985.0|
|3749.0|  108.0|    438.0|     M|       1963.0|
|4042.0|  110.0|    110.0|     F|       1991.0|
|4109.0|  110.0|    110.0|     F|       

In [48]:
dim_country.show()

+----+--------------------+
|code|             country|
+----+--------------------+
| 582|MEXICO Air Sea, a...|
| 236|         AFGHANISTAN|
| 101|             ALBANIA|
| 316|             ALGERIA|
| 102|             ANDORRA|
| 324|              ANGOLA|
| 529|            ANGUILLA|
| 518|     ANTIGUA-BARBUDA|
| 687|           ARGENTINA|
| 151|             ARMENIA|
| 532|               ARUBA|
| 438|           AUSTRALIA|
| 103|             AUSTRIA|
| 152|          AZERBAIJAN|
| 512|             BAHAMAS|
| 298|             BAHRAIN|
| 274|          BANGLADESH|
| 513|            BARBADOS|
| 104|             BELGIUM|
| 581|              BELIZE|
+----+--------------------+
only showing top 20 rows



In [17]:
fact_imm.createOrReplaceTempView("fact_imm")
dim_immigrant.createOrReplaceTempView("dim_immigrant")
dim_country.createOrReplaceTempView("dim_country")
dim_airline.createOrReplaceTempView("dim_airline")
dim_temp.createOrReplaceTempView("dim_temp")

In [21]:
spark.sql("""
SELECT  fa.id, 
        fa.arrival_date, 
        fa.departure_date, 
        di.gender, 
        dc.country as citizen_country, 
        dc2.country as residence_country, 
        da.airline
FROM 
     fact_imm      fa,
     dim_immigrant di,
     dim_country   dc,
     dim_country   dc2,
     dim_airline   da 
     WHERE 
            fa.id == di.id
            and Int(di.citizen) == dc.code
            and Int(di.residence) == dc2.code 
            and fa.fltno = da.fltno

""").limit(10).toPandas()

Unnamed: 0,id,arrival_date,departure_date,gender,citizen_country,residence_country,airline
0,1434764.0,2016-04-08,2016-05-19,M,CAYMAN ISLANDS,CAYMAN ISLANDS,YNT
1,1434763.0,2016-04-08,2016-04-23,M,CAYMAN ISLANDS,CAYMAN ISLANDS,YNT
2,669198.0,2016-04-04,2016-04-15,M,UNITED KINGDOM,UNITED KINGDOM,YNT
3,873485.0,2016-04-05,2016-04-17,M,ISRAEL,ISRAEL,
4,873485.0,2016-04-05,2016-04-17,M,ISRAEL,ISRAEL,LH
5,873485.0,2016-04-05,2016-04-17,M,ISRAEL,ISRAEL,CH
6,3372894.0,2016-04-18,2016-05-01,M,ISRAEL,ISRAEL,
7,3372894.0,2016-04-18,2016-05-01,M,ISRAEL,ISRAEL,LH
8,3372894.0,2016-04-18,2016-05-01,M,ISRAEL,ISRAEL,CH
9,3540945.0,2016-04-19,2016-05-04,F,ISRAEL,ISRAEL,


In [36]:
# we notice here that the most imiigration are citizen of UNITED KINGDOM 
spark.sql("""
SELECT  di.citizen, dc.country, count(di.citizen) as number_of_citizen
                            FROM dim_immigrant di,
                                 dim_country   dc
                            where dc.code = Int(di.citizen)
                            group by di.citizen ,dc.country
                            order by number_of_citizen desc

""").limit(1).toPandas()

Unnamed: 0,citizen,country,number_of_citizen
0,135.0,UNITED KINGDOM,360157


In [49]:
dim_airline.show()

+-----+-------+
|fltno|airline|
+-----+-------+
|00446|     LH|
|00623|     FI|
|00229|     UP|
|00008|     AV|
|04400|     YX|
|00502|     AA|
|  151|     EY|
|01117|     AA|
|00097|     3M|
|01520|     AA|
|  632|     AM|
|  450|     CM|
|6604C|     Y4|
| 5574|     UA|
|01193|     UA|
|01669|     UA|
|   46|    TOM|
|00151|     AA|
|  790|     AM|
|TGMYS|    *GA|
+-----+-------+
only showing top 20 rows



In [50]:
dim_temp.show()

+----------+--------------------+-----------------------------+-----+-------+--------+---------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+--------------------+-----------------------------+-----+-------+--------+---------+
|1767-01-01|  -6.297999999999999|                        7.282|Århus|Denmark|  57.05N|   10.33E|
|1780-10-01|   8.931000000000001|           2.7680000000000002|Århus|Denmark|  57.05N|   10.33E|
|1781-05-01|                11.1|                        3.615|Århus|Denmark|  57.05N|   10.33E|
|1784-05-01|              11.536|                         2.89|Århus|Denmark|  57.05N|   10.33E|
|1785-04-01|               4.388|                        4.287|Århus|Denmark|  57.05N|   10.33E|
|1787-04-01|               4.598|           3.4960000000000004|Århus|Denmark|  57.05N|   10.33E|
|1822-06-01|              15.686|           1.9469999999999998|Århus|Denmark|  57.05N|   10.33E|
|1839-02-01|-0.00900000000000.

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [47]:
def check_dublicate(df):
    if df.count() == df.dropDuplicates(df.schema.names).count():
        return True
    else:
        return False
    
def check_record(df):
    if df.count() != 0:
        return True
    else:
        return False

In [48]:
if check_dublicate(fact_imm) and check_dublicate(dim_immigrant) and check_dublicate(dim_country) and check_dublicate(dim_airline) and check_dublicate(dim_temp):
    print("Fact and dim does not contain any dublicated records")
else:
    print("ther is dublicated record")

Fact and dim does not contain any dublicated records


In [49]:
if check_record(fact_imm) and check_record(dim_immigrant) and check_record(dim_country) and check_record(dim_airline) and check_record(dim_temp):
    print("pass")
else:
    print("No Record found")

pass


#### 4.3 Data dictionary 
## Immigration data

#### Fact Immigration (fact_imm)
     |-- id: id
     |-- year: year of the arrival
     |-- arrival_date: arrival date
     |-- departure_date: departure date
     |-- mode: mean of travil by (air, sea, land, or not reported)
     |-- fltno: flight number
     
#### Dim Immigrant (dim_immigrant)
     Contain immgrant personal information
     |-- id: id
     |-- citizen: the country of citizenship. in code
     |-- residence: the country of residence. in code
     |-- gender: person gender
     |-- year_of_birth: year of birth
     
#### Dim Country (dim_country)
     Contain code of the country
     |-- code: code represent spesifc country
     |-- country: country name
     
#### Dim Airline (dim_airline)
     |-- fltno: flight number 
     |-- airline: airline name
     
#### Dim Temperture (dim_temp)
     |-- dt: date 
     |-- AverageTemperature: Average Temperature for month
     |-- AverageTemperatureUncertainty: Average Temperature Uncertainty
     |-- City: city name 
     |-- Country: Country name
     |-- Latitude: Latitude
     |-- Longitude: Longitude

<img src="./erdigram1.png" />

### Step 5: Complete Project Write Up
##### Clearly state the rationale for the choice of tools and technologies for the project.
  * Spark has proven to work great with a large amounts of data, also with the simplicity of use.
    
##### Propose how often the data should be updated and why.
  * Could update monthly, Since the Immigration data get updated monthly.

### Write a description of how you would approach the problem differently under the following scenarios:

##### The data was increased by 100x.
  * With increased of data we can relay on Amazon services such as [EMR](https://aws.amazon.com/emr/).
  
##### The data populates a dashboard that must be updated on a daily basis by 7am every day.
  * Using scheduling tool such as airflow will help autmated and scheduling the data pipeline.
  
##### The database needed to be accessed by 100+ people.
  * Consider Amazon service to handle handred or more request simultaneously such as [redshift](https://aws.amazon.com/redshift/) , [Amazon Aurora](https://aws.amazon.com/rds/aurora/?aurora-whats-new.sort-by=item.additionalFields.postDateTime&aurora-whats-new.sort-order=desc) 