# ELT Pipeline for the i94 immigration and temperature data sets.
### Data Engineering Capstone Project

#### Project Summary
To Provide a database of tablea optimized for analytical queries, example of those queries include:-
    - Determine the top cities with most immegrations thier templeratures, the months with most immegrations.
    - How many females and males travel and what is the average temperature of places they go to.
    - The cities with the most emmigrations and the visa type for must people migrating to those cities.

#### Goals
 Prepare an ETL Pipeline that loads i94 immegration and temperature data sets then model the data and produces tables optimized for query,
 load the tables into a  parquet files and save localy.
 
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

---

## Step 1: Project Scope and Data gathering

### Scope
For this project we will have 2 tables and 1 facts table. The first "i9immegration" table will be an aggregration be oraginal city and destination city on the i94 immigration data, the second table "temperature" will the an aggregation by the city on the temperature  data. The facts table will be produces by joining the two data sets on the origin city and destination city to produce "immegrationandtemperature" facts table.

### Describe and Gather Data
For the 2 data sets provided we will present with needed fields which will be fields we will select from the data set.

From the i9migration data the following fields will be of intrenst
 - i94mon: Numeric month
 - i94cit: 3 numeric digit code from origin Country
 - i94port: 3 Character code for city in the US
 - i94visa: Visa code corresponding to three categories (1=Buisness, 2=Pleasure, 3=Student)
 - i94bir - Age of Respondent in Years
 - gender - Non-immigrant sex
 
For the temperature data the following fields will be of intrest
 - AverageTemperature: average temperature
 - AverageTemperatureUncertainty: average temperature uncertainty
 - City: city name
 - Country: country name
 - Latitude: latitude
 - Longitude: longitude

In [1]:
## Do imports 
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [2]:
## Set default options for pandas
pd.set_option('max_colwidth', 10)
pd.set_option('display.max_columns', None)

In [3]:
## Initialize spark instance 
spark = SparkSession \
        .builder \
        .getOrCreate()

#### Load i94migration data

In [4]:
migration_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

In [5]:
#pd_df =  pd.read_sas(migration_file, 'sas7bdat', encoding="ISO-8859-1")

In [6]:
pd_df = pd.read_csv('immigration_data_sample.csv')

In [10]:
# # define schema for immigration data
i94immigration_schema = StructType([
        StructField('', IntegerType()),
        StructField('cicid', DoubleType()),
        StructField('i94yr', DoubleType()),
        StructField('i94mon', DoubleType()),
        StructField('i94cit', DoubleType()),
        StructField('i94res', DoubleType()),
        StructField('i94port', StringType()),
        StructField('arrdate', DoubleType()),
        StructField('i94mode', DoubleType()),
        StructField('i94addr', StringType()),
        StructField('depdate', DoubleType()),
        StructField('i94bir', DoubleType()),
        StructField('i94visa', DoubleType()),
        StructField('count', DoubleType()),
        StructField('dtadfile', StringType()),
        StructField('visapost', StringType()),
        StructField('occup', StringType()),
        StructField('entdepa', StringType()),
        StructField('entdepd', StringType()),
        StructField('entdepu', StringType()),
        StructField('matflag', StringType()),
        StructField('biryear', DoubleType()),
        StructField('dtaddto', StringType()),
        StructField('gender', StringType()),
        StructField('insnum', StringType()),
        StructField('airline', StringType()),
        StructField('admnum', StringType()),
        StructField('fltno', StringType()),
        StructField('visatype', StringType())
    ])

In [11]:
# Convert the pandas df to a spark dataframe
imigrations_df = spark.createDataFrame(pd_df, schema=i94immigration_schema )

### Load GlobalLandTemperaturesByCity data

In [13]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'

In [14]:
df = spark.read.csv(fname)

In [18]:
temperature_schema = StructType([
        StructField('dt', TimestampType()),
        StructField('AverageTemperature', DoubleType()),
        StructField('AverageTemperatureUncertainty', DoubleType()),
        StructField('City', StringType()),
        StructField('Latitude', DoubleType()),
        StructField('Longitude', DoubleType())
])



In [19]:
temperature_data = spark.read.schema(temperature_schema).format('csv').options(header='true', inferSchema='false').load(fname)

---

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
- Read valid codes for the i94cit and i94port from valid_i94cit.txt and valid_i94port.txt files
* Revome all invalid country codes from the i94cit coloumn
- Remoce all invalid code from the i94port column
- Add coulumns for origin_country and destination_city
- Remove all NaN

In [20]:
temperature_data.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [21]:
## Read Valil country and code
file1 = open('valid_i94cit.txt', 'r') 
Lines = file1.readlines() 
# validcountry_list = []
validcountry_dic = {}
# Strips the newline character 
for line in Lines: 
    code, country = line.strip().split('=')
    code = int(code.strip())
    country = country.rsplit('(', 1)[0].replace("'","").strip()
    
#     validcountry_list.append({'code': code, 'country': country})
    validcountry_dic.update({code : country})

In [31]:
@udf()
def get_i94cit(code):
      '''
    Description: This function takes an i94cit (Country Code) and return the corresponding country name
    
    Parameters: 
        - Input: Country code
        - Output: Corresponding City
    '''
    try:
        return validcountry_dic[code]

    except KeyError:
        return None
        
#get_i94cit = udf(lambda code: validcountry_dic[code] )

# Add iport94 code based on city name
imigrations_df = imigrations_df.withColumn("origin_country", get_i94cit(imigrations_df.i94cit))

In [None]:
## Read Valid cities and code
file1 = open('valid_i94port.txt', 'r') 
Lines = file1.readlines() 
##validcity_list = []
validcity_dic = {}

# Strips the newline character 
for line in Lines: 
    code, city = line.strip().split('=')
    code = code.replace("'","").strip()
    city = city.split(',')[0].replace("'","").strip()

    #validcity_dic.append({'code': code, 'city': city})
    validcity_dic.update({code : country})

In [None]:
get_i94port= udf(lambda code: validcountry_dic[key] )

@udf()
def get_i94cit(code):
    '''
    Description: This function takes an i94port (City Code) and return the corresponding city name
    
    Parameters: 
        - Input: city code
        - Output: Corresponding City
    '''
    print(code)
    try:
        return validcountry_dic[code]
    except KeyError:
        return None

# Add iport94 code based on city name
imigrations_df = imigrations_df.withColumn("destination_city", get_i94cit(migrations_df.i94port))

In [None]:
temperature_df = temperature_data.selectExpr("AverageTemperature as average_temperature", "City as city", "Country as country", "Latitude as latitude", "Longitude as longitude").dropDuplicates()

In [None]:
temperature_df = temperature_df.filter(temperature_data.average_temperature != 'NaN')

---

## Step 3: Data Modeling 

### 3.1 Conceptual Data Model
Here we create are dimenstion and Facts tables as Defined in step 2

From the i9migration data set we create a  *i9immegration* dimmension table
 - i94mon: Numeric month
 - i94cit: 3 numeric digit code from origin Country
 - i94port: 3 Character code for city in the US
 - i94visa: Visa code corresponding to three categories (1=Buisness, 2=Pleasure, 3=Student)
 - i94bir - Age of Respondent in Years
 - gender - Non-immigrant sex

 
Fro, the temperature data the following fields will be extracted to make the *citytemperature* dimension table 
 - AverageTemperature: average temperature
 - City: city name
 - Country: country name
 - Latitude: latitude
 - Longitude: longitude
 
The Facts table will obtained from the i9immigration data set on join with the tables above to produce the table below
 - i94mon: Numeric month
 - i94cit: 3 numeric digit code from origin Country
 - i94port: 3 Character code for city in the US
 - i94visa: Visa code corresponding to three categories (1=Buisness, 2=Pleasure, 3=Student)
 - i94bir - Age of Respondent in Years
 - gender - Non-immigrant sex
 - country_of_origin: Contry represende by code
 - dest_city: Destination dity in the us 
 - dest_temperature: Temperature of destination city
 
### 3.2 Mapping Out Pipeline Steps
 Pipeline steps are listed below 
 - Import data from sources
 - Clean the data 
 - Select the required fields from both i94migration and temperature dataset to create dimension table.
 - Create a template view from data sets and dimension tables.
 - Create Facts table by joining the i9immigraion data and validcountry on i94cit join validcity on i94port join citytemperatur on city and country
 - Create a parquet file and persist all the tables created

In [None]:
imigrations_df = imigrations_df.select(["cicid","i94mon", "i94cit", "i94port","i94visa", "i94bir","gender", "origin_country", "destination_city" ]).dropDuplicates()


In [None]:
# Write immigration data to parquet file
df_spark.write.mode("append").partitionBy("i94port").parquet("/parquets/i94immigration.parquet")

In [None]:
temperature_data = temperature_data.selectExpr("AverageTemperature as average_temperature", "City as city", "Country as country", "Latitude as latitude", "Longitude as longitude").dropDuplicates()

In [None]:
# Write temperature data to parquet file
df_spark.write.mode("append").partitionBy("i94port").parquet("/parquets/i94immigration.parquet")

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Building the pipelines to create the data models as defined above

In [None]:
# Create template view for immigration data
imigrations_df .createOrReplaceTempView('i9immegration')

In [None]:
# Create template view for temperature data 
temperature_data.createOrReplaceTempView('temperature')

In [None]:
# create facts table from the dimensions table
immigration_and_temperature = spark.sql('''
            SELECT DISTINCT
            im.i94mon,
            im.i94cit,
            im.i94port,
            im.i94visa,
            im.origin_country,
            im.destination_city AS dest_city,
            im.i94visa,
            im.gender,
            t.average_temperature AS dest_temperature
            FROM i9immegration as im
            JOIN temperature AS t ON im.destination_city=t.city
        ''')

In [None]:
immigration_and_temperature.createOrReplaceTempView('immigration_temperature')

#### 4.2 Data Quality Checks
The data quality check will be ran to endure that all tables have entries.

In [None]:
def data_quality_check(table):
    query = "SELECT * FROM {} LIMIT 5".format(table)
    res = spark.sql(query)
    if(len(res) < 1):
         raise ValueError(f"Data quality check failed. {table} returned no results")

In [None]:
## Run check for tables
data_quality_check('i9immegration')
data_quality_check('temperature')
data_quality_check('immigration_temperature')

#### 4.3 Data dictionary 
Create a data dictionary for your data model, each field listed with a brief description of what the data is and where it comes from. 

##### i94immigration(Dimension Table)
Data here comes the i94immigration data set, the field used from this dataset include:-
 - i94mon: Numeric month
 - i94cit: 3 numeric digit code from origin Country
 - i94port: 3 Character code for city in the US
 - i94visa: Visa code corresponding to three categories (1=Buisness, 2=Pleasure, 3=Student)
 - i94bir: Age of Respondent in Years
 - gender: Non-immigrant sex
 - origin_country: Origin city gotten using the i94cit field property
 - destination_city: Destination city gotten using the i94port field property

##### temperaturen(Dimension Table)
Data here comes the GlobalLandTemperaturesByCity data set, the field used from this dataset include:-
 - average_temperature: average temperature
 - city: city name
 - country: country name
 - latitude: latitude
 - longitude: longitude
 
##### temperaturen(Dimension Table)
Data here comes the  i94immigration and GlobalLandTemperaturesByCity data sets joined by the city fiels, the field uses include:-
 - i94mon: Numeric month
 - i94cit: 3 numeric digit code from origin Country
 - i94port: 3 Character code for city in the US
 - i94visa: Visa code corresponding to three categories (1=Buisness, 2=Pleasure, 3=Student)
 - i94bir - Age of Respondent in Years
 - gender - Non-immigrant sex
 - country_of_origin: Contry represende by code
 - dest_city: Destination dity in the us 
 - dest_temperature: Temperature of destination city


#### Step 5: Complete Project Write Up
##### Clearly state the rationale for the choice of tools and technologies for the project.
The choice of spark for etl in this process of this project is because its fast can perform advanced analytics and it's dynamic in nature to the number of file formats it can read.

##### Propose how often the data should be updated and why.
Given the analytical goals of this project related to producing analysis for movement per month it should be updated monthly.

##### Below description's how the project will be approached based on the different scenarios:

1. The data was increased by 100x.

Given that data increases to 100X we will prefarable move spak to run on the cloud on a service optimized for that pupose like  AWS MRC which is optimized to for large data set by first dividing up the large dataset and distributing the data accross a cluster or clusters.

2. The data populates a dashboard that must be updated on a daily basis by 7am every day.

For this reason we will use a tool like Airflow to that automated ETL pipelines, that way we shedule the pipline overnight or the evening of the preveous day.

3. The database needed to be accessed by 100+ people.

For this reason we could host the parquet files on an s3 bucket and give the read access externally.

---