# Pyspark ETL Pipeline 
### Data Engineering Capstone Project

#### Project Summary
This project builds an ETL pipeline for Immigration Data and City Temperature Data to form a final fact table to help answer questions on the relationships between the two datasets.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [87]:
# Do all imports and installs here
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lower, col

### Step 1: Scope the Project and Gather Data

#### Scope 

This project consumes and aggregates I94 Immigration Data from the US National Tourism and Trade Office and Temperature Data from Kaggle to determine if there is a correlation between immigration patterns and temperature averages in cities. The data creates a fact table by joining on city. Spark is used in this process. 

#### Describe and Gather Data 
When exploring data there are several factors you have to figure out about your data before combining, cleaning, and filtering it. Below are what columns are important in each of the datasets imported.

#### Immigration Data: Us National Tourism and Trade Office (SAS7BDAT format)
I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

* cicid = city id
* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

#### Temperature data: Kaggle (csv format)
World Temperature Data: This dataset came from Kaggle. You can read more about it here.

* AverageTemperature = average temperature
* AverageTemperatureUncertainy = average temperature uncertainty
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude

#### Correct SAS Label Data
* i94port = numerical value
* city = city name

#### Scoping the project
This section answers the following initial questions about the data. 
1. How big is the data?
2. What columns are in the data?
3. What are the column formats

In [88]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


In [89]:
df_i94.shape

(3096313, 28)

In [90]:
df_i94.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [91]:
df_i94.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [205]:
fname2 = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname2)

In [206]:
df_temp.shape

(8599212, 7)

In [207]:
df_temp.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [208]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [211]:
df_temp.groupby('Country')['City'].value_counts()

Country      City        
Afghanistan  Baglan          2169
             Gardez          2169
             Jalalabad       2169
             Kabul           2169
             Qunduz          2169
             Herat           2115
             Gazni           2112
             Qandahar        2053
Albania      Durrës          3239
             Elbasan         3239
             Tirana          3239
Algeria      Algiers         3129
             Constantine     3129
             Médéa           3129
             Wahran          3129
             Warqla          3129
Angola       Luanda          1893
             Benguela        1878
             Huambo          1878
             Kuito           1878
             Lobito          1878
             Lubango         1878
Argentina    Concordia       2181
             Corrientes      2181
             Formosa         2181
             Lambaré         2181
             Posadas         2181
             Resistencia     2181
             Bahia Bla

#### Read Data Directly into Spark
1. I94 Data read into pyspark df
2. Temperature Data read into pyspark df
3. I94 Correct Label File into pyspark df

In [100]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [101]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

AnalysisException: 'path file:/home/workspace/sas_data already exists.;'

In [102]:
df_i94_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [103]:
df_i94_spark.describe('i94cit').show()

+-------+------------------+
|summary|            i94cit|
+-------+------------------+
|  count|           3096313|
|   mean| 304.9069344733559|
| stddev|210.02688853063327|
|    min|             101.0|
|    max|             999.0|
+-------+------------------+



In [104]:
df_i94_spark.select('i94cit').distinct().count()

243

In [105]:
df_temp_spark = (spark.read.format("csv").options(header="true")
    .load(fname2))

In [106]:
df_temp_spark.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
All valid entries for destination city codes are stored in `I94_SAS_Labels_Description_Valid.txt`. All cities from the I94 immigration data and temperature data is filtered through the valid cities list. Other cleaning steps are listed below. 

#### Cleaning Steps
Document steps necessary to clean the data. Data quality issues include, 

1. Remove Missing values (NaN)
2. Duplicate data
3. Invalid Data Values (SAS Label Descriptions)



In [238]:
# format txt document into clean, usable dictionary/list
valid_port = {}
list_of_dicts = []

with open('I94_SAS_Labels_Description_Valid.txt') as file:
    for line in file:
        line = line.strip()
        key, val = line.split('=')
        val_list = val.split(',')
        valid_port[key] = val_list
        for city in val_list:
            city = city.strip()
            valid_port_dict = {}
            valid_port_dict['i94port'] = key
            valid_port_dict['portcity'] = city
            list_of_dicts.append(valid_port_dict)



In [239]:
myJson = sc.parallelize(list_of_dicts)
port_df = sqlContext.read.json(myJson)

#### Clean and Filter I94 data

In [157]:
# I94 data (df_i94_spark) 
# df_i94_spark_test = df_i94_spark.limit(1000) # always start practice on smaller dataset

# drop any duplicates
df_i94_spark=df_i94_spark.dropDuplicates()

# lowercase all columns to standardize text format
df_i94_spark = df_i94_spark.toDF(*[c.lower() for c in df_i94_spark.columns])

df_i94_spark = df_i94_spark.filter(df_i94_spark.i94port.isin(list(valid_port.keys())))

#### Clean and Filter Immigration data

In [241]:
# City Demoographics Data (df_temp_spark)
# df_temp_spark_test = df_temp_spark.limit(1000) # always start practice on smaller dataset

# keep non-null values for averagetemperature and city
df_temp_spark = df_temp_spark.filter(df_temp_spark.AverageTemperature != 'NaN')
df_temp_spark = df_temp_spark.filter(df_temp_spark.City != 'null')

# drop duplicates
df_temp_spark = df_temp_spark.dropDuplicates(['City', 'Country'])

# lowercase all columns to standardize text format
df_temp_spark = df_temp_spark.toDF(*[c.lower() for c in df_temp_spark.columns])

# use inner join to filter down to cities with ports
temp_final = df_temp_spark.join(port_df, df_temp_spark.city==port_df.portcity)

AttributeError: 'DataFrame' object has no attribute 'AverageTemperature'

In [242]:
temp_final.columns

['dt',
 'averagetemperature',
 'averagetemperatureuncertainty',
 'city',
 'country',
 'latitude',
 'longitude',
 'i94port',
 'portcity']

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The data model is to create a fact table (described below) from the combination of the Immigration Data and the Temperature linked by city after being cleaned and filtered above. Selecting only the columns that are important.

Fact Table: I94 Immigration Data and Temperature Data
* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
* AverageTemperature = average temp

#### 3.2 Mapping Out Data Pipelines
List of steps necessary to pipeline the data into data model
1. Import all data and convert into same dataformat (Step 1 above)
2. Clean and Filter data to for every month in folder (Step 2 above)
3. Create Immigrant and Temperature dimension tables by selecting specific important columns
4. Create above Fact Table by joining two dimension gtables on filtered i94port column

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [187]:
# Clean Immigration Data
# df_i94_spark

# Extract columns for immigration dimension table
immig_table = df_i94_spark.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# Write dimension table
immig_table.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")


In [243]:
# Clean Temperature Data
# df_temp_spark

# Extract columns  for temp dimension table
temp_table = temp_final.select(["averagetemperature", "city", "country", "latitude", "longitude", "i94port"])

# Write temperature dimension table
temp_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [264]:
from functools import reduce

immig_facts = immig_table.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "depdate", "i94visa"])
oldColumns = immig_facts.schema.names
newColumns = ["year"
              , "month"
              , "city_immig"
              , "i94port_immig"
              , "arrival_date"
              , "departure_date"
              , "reason"]

immig_dimen_table = reduce(lambda immig_facts, idx: immig_facts.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), immig_facts)

# Write Immigration dimension table
immig_dimen_table.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")


In [None]:
# FINAL TABLE
final_fact_table = immig_dimen_table.join(temp_table, immig_dimen_table.city_immig==temp_table.city)

# Write fact table to parquet files partitioned by i94port
final_fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [244]:
def quality_check(spark_df):
    data_num = spark_df.count()
    if data_num > 0:
        print("Quality check PASSED with record count:", data_num)
    else:
        print("Quality check FAILED with record count:", data_num)
        
quality_check(immig_table)
quality_check(temp_table)

Quality check PASSED with record count: 3


#### 4.3 Data dictionary 
Data dictionary for the data model providing a brief description of what the data is and where it came from. 

In [267]:
# use columns to verify and check each dimension and fact table
# immig_table.columns
# temp_table.columns
final_fact_table.columns

['year',
 'month',
 'city_immig',
 'i94port_immig',
 'arrival_date',
 'departure_date',
 'reason',
 'averagetemperature',
 'city',
 'country',
 'latitude',
 'longitude',
 'i94port']

#### First Dimension: Immigration Data: Us National Tourism and Trade Office 
`immig_table`: pyspark dataframe

* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination USA city
* arrdate = arrival date in the USA
* i94mode = 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration

#### Second Dimension: Temperature data: Kaggle 
`immig_table`: pyspark dataframe
* averagetemperature = average temperature
* city = city name
* country = country name
* latitude = latitude
* longitude = longitude
* i94port = 3 character code of destination USA city

#### Fact Table: Aggregated I94 Immigration Data and Temperature Data
`final_fact_table`: pyspark dataframe
* year = 4 digit year
* month = numeric month
* city_immig = 3 digit code of origin city
* i94port_immig = 3 character code of destination USA city
* arrival_date = arrival date in the USA
* departure_date = departure date from the USA
* reason = reason for immigration
* averagetemperature  = average temp
* city = city name
* country = country name
* latitude = latitude
* longitude = longitude
* i94port = 3 character code of destination USA city

#### Step 5: Complete Project Write Up
1. **Clearly state the rationale for the choice of tools and technologies for the project.**
* This project uses PySpark which is mainly used for processing structured and semi-structured datasets. This makes it a perfect use case for data aggregated from different formats. 

2. **Propose how often the data should be updated and why.**
* Data should be updated monthly based on the current processing in the sas data files by month. 

3. **Write a description of how you would approach the problem differently under the following scenarios:**
* The data was increased by 100x.
    * If the data is increased by 100x's we need to reconsider the tooling. We can still use PySpark but we could not run it locally, it would probably have to be in a cluster. 

* The data populates a dashboard that must be updated on a daily basis by 7am every day.
    * If the data is populating a dashboard on a daily basis we would need tooling that can handle time sensitive run times and not manual processing. If data is stored in S3 buckets, it can be accessed or executed with a number of tools such as Apache Airflow, SQS services, or Lambdas that can process the ETL. 

* The database needed to be accessed by 100+ people.
    * Currently files are stored as parquet files in between the processing step and at the final format. To have users access these files they would need to be stored in S3 buckets so users could use Athena or in a datalakes.  