# Weather Temperatures Vs. I-94 Immigration!

## Data Engineering Capstone Project
* By Abdullah Alqithmi
### Project Summary
The objective of the project is to create pipelines that pull data from different resources and insert their data into a DB model that is designed for analytical purposes. To be specific, the designed model  
will help to study the relation between weather temperatures & I-94 immigration behaviors.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import re
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

## Step 1: Scope the Project and Gather Data

### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

### Describe and Gather Data 


### I-94 Immigration Data

**Form I-94**, the Arrival-Departure Record Card, is a form used by U.S. Customs and Border Protection (CBP) intended to keep track of the arrival and departure to/from the United States of people who are not United States citizens or lawful permanent residents 

*This data comes from the **US National Tourism and Trade Office**. IT will be used as the **main dataset** for this project*

In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


### World Temperature Data

The data is from a newer compilation put together by the **Berkeley Earth**, which is affiliated with Lawrence Berkeley National Laboratory. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives. It is nicely packaged and allows for slicing into interesting subsets (for example by country). They publish the source data and the code for the transformations they applied. They also use methods that allow weather observations from shorter time series to be included, meaning fewer observations need to be thrown away.

*This dataset came from **Kaggle**.*

In [4]:
# Read in the data here
temName = '../../data2/GlobalLandTemperaturesByCity.csv'
dt = pd.read_csv(temName)

In [5]:
dt.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### U.S. City Demographic Data

This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. The data came from the US Census Bureau's 2015 American Community Survey.

*This data comes from **OpenSoft.** *

In [6]:
# Read in the data here
usName = 'us-cities-demographics.csv'
du = pd.read_csv(usName, sep=";")

In [7]:
du.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Airport Code Data

This is a simple table of airport codes and corresponding cities. 

The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code.

*This data comes from **Datahub**.*

In [8]:
# Read in the data here
airName = 'airport-codes_csv.csv'
da = pd.read_csv(airName)

In [9]:
da.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [10]:
#check the dataset shape
print('I-94 Immigration Dataset Shape: ',df.shape)

print('World Temperature Dataset Shape: ',dt.shape)

print('U.S. City Demographic Dataset Shape: ',du.shape)

print('Airport Code Dataset Shape: ',da.shape)


World Temperature Dataset Shape:  (8599212, 7)
U.S. City Demographic Dataset Shape:  (2891, 12)
Airport Code Dataset Shape:  (55075, 12)


In [11]:
#check the dataset null values
print('I-94 Immigration Dataset null values: ')
((df.isnull() | df.isna()).sum() * 100 / df.index.size).round(2)

I-94 Immigration Dataset null values: 


cicid        0.00
i94yr        0.00
i94mon       0.00
i94cit       0.00
i94res       0.00
i94port      0.00
arrdate      0.00
i94mode      0.01
i94addr      4.92
depdate      4.60
i94bir       0.03
i94visa      0.00
count        0.00
dtadfile     0.00
visapost    60.76
occup       99.74
entdepa      0.01
entdepd      4.47
entdepu     99.99
matflag      4.47
biryear      0.03
dtaddto      0.02
gender      13.38
insnum      96.33
airline      2.70
admnum       0.00
fltno        0.63
visatype     0.00
dtype: float64

In [12]:
#check the dataset null values
print('World Temperature Dataset null values: ')
((dt.isnull() | dt.isna()).sum() * 100 / dt.index.size).round(2)

World Temperature Dataset null values: 


dt                               0.00
AverageTemperature               4.23
AverageTemperatureUncertainty    4.23
City                             0.00
Country                          0.00
Latitude                         0.00
Longitude                        0.00
dtype: float64

In [13]:
#check the dataset null values
print('U.S. City Demographic Dataset null values: ')
((du.isnull() | du.isna()).sum() * 100 / du.index.size).round(2)

U.S. City Demographic Dataset null values: 


City                      0.00
State                     0.00
Median Age                0.00
Male Population           0.10
Female Population         0.10
Total Population          0.00
Number of Veterans        0.45
Foreign-born              0.45
Average Household Size    0.55
State Code                0.00
Race                      0.00
Count                     0.00
dtype: float64

In [14]:
#check the dataset null values
print('Airport Code Dataset null values: ')
((da.isnull() | da.isna()).sum() * 100 / da.index.size).round(2)

Airport Code Dataset null values: 


ident            0.00
type             0.00
name             0.00
elevation_ft    12.72
continent       50.33
iso_country      0.45
iso_region       0.00
municipality    10.31
gps_code        25.50
iata_code       83.32
local_code      47.91
coordinates      0.00
dtype: float64

## Cleaning Steps
During the cleaning steps, we did the following
* Removing Null values
* Removing Duplicated values
* Adding/removing columns

In [2]:
# Creating Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### I-94 Immigration Data *Cleaning*

In [3]:
# Reading I-94 Immigration Dataset into Spark
df_imm =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
print('I-94 Immigration Dataset before Cleaning contains:',df_imm.count(),'rows' ,' || ',len(df_imm.columns), 'columns')

In [4]:
compil = re.compile(r'\'(.*)\'.*\'(.*)\'')
I94_valid_labels = {}
with open('I94_valid_labels.txt') as f:
     for data in f:
         match = compil.search(data)
         I94_valid_labels[match[1]]=[match[2]]

In [5]:
# Clean immigration data
def I94DatasetClenser(file):

    '''    
    Input: I-94 Immigration dataset file's path
    Output:  I-94 Immigration data as Spark Dataframe
    '''    
    df_i94 = spark.read.format('com.github.saurfang.sas.spark').load(file)
    
    # Filter out entries where i94port is invalid
    df_i94 = df_i94.filter(df_i94.i94port.isin(list(I94_valid_labels.keys())))

    return df_i94

In [6]:
# Read in the data here
I94_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
df_immCleaned = I94DatasetClenser(I94_file)

In [7]:
#drop duplicated value for id
df_immCleaned=df_immCleaned.dropDuplicates(['cicid'])

In [8]:
#Removing columns with high percentage of null values
columns_to_drop = ['entdepu','occup','insnum'] 
df_immCleaned = df_immCleaned.drop(*columns_to_drop)

In [22]:
print('I-94 Immigration Dataset After Cleaning contains:',df_immCleaned.count(),'rows' ,' || ',len(df_immCleaned.columns), 'columns')

I-94 Immigration Dataset After Cleaning contains: 3088544 rows  ||  25 columns


### World Temperature Dataset *Cleaning*

In [9]:
# Reading World Temperature Dataset into Spark
dt_temp=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
print('World Temperature Dataset before Cleaning contains:',dt_temp.count(),'rows' ,' || ',len(dt_temp.columns), 'columns')

In [10]:
#Removing duplicate values
dt_temp=dt_temp.dropDuplicates(['dt','Country','City'])

In [11]:
# Removing NaN values
dt_temp=dt_temp.filter(dt_temp.AverageTemperature != 'NaN')

In [26]:
# check dataset size after Cleaning
print('World Temperature Dataset after Cleaning contains:',dt_temp.count(),'rows' ,' || ',len(dt_temp.columns), 'columns')

World Temperature Dataset after Cleaning contains: 8190131 rows  ||  7 columns


### Adding Column

In [12]:
@udf()
def AddColumn(city):
    '''
    Input: The column name 
    Output: The equivalent I-94 Port 
    '''
    
    
    for i in I94_valid_labels:
        if city.lower() in I94_valid_labels[i][0].lower():
            return i

In [13]:
dt_temp = dt_temp.withColumn("i94port", AddColumn(dt_temp.City))
dt_temp.show()

+----------+------------------+-----------------------------+-------------------+--------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|               City|       Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+-------------------+--------------+--------+---------+-------+
|1743-11-01|            12.686|                        2.051|             Durrës|       Albania|  40.99N|   19.17E|   null|
|1743-11-01| 5.167000000000001|                        1.774|            Bergamo|         Italy|  45.81N|   10.38E|   null|
|1743-11-01|             3.765|                        1.893|          Stavanger|        Norway|  58.66N|    6.15E|   null|
|1743-11-01|             6.678|                         1.81|          Stockport|United Kingdom|  53.84N|    1.36W|   null|
|1743-11-01| 8.129999999999999|                        2.245|            Atlanta| United States|  34.56N|   83.68W|    ATL|
|1744-04

In [14]:
# Remove Null values from iport94
dt_temp = dt_temp.filter(dt_temp.i94port != 'null')

## Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The chosen model is consists of a fact table with 2 dimension tables. For the 2 dimension tables, we will its data from both I-94 immigration and temperature datasets, and it will be partitioned by the city column. To create the fact table, we used a join based on the city.
___
This model or schema will be used by users to analyze the data and answer questions regarding the relation between the weather temperatures and the destination (cities) that been chosen by the I-94 applicants  

#### 3.2 Mapping Out Data Pipelines


<img src="Model.jpg" alt="ER" width="500" height="500">

## Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [16]:
# Creating a view to be used for the fact table
df_immCleaned.createOrReplaceTempView("DiEventsView")
dt_temp.createOrReplaceTempView("DiCityTemperatureView")


# Creating the I94_Immigration table *fact table*
I94_Immigration = spark.sql(
    '''
SELECT
       DiEventsView.i94port as CityCode,
       DiEventsView.arrdate as ArrivalDate,
       DiEventsView.depdate as DepartureDate,
       DiEventsView.i94visa as VisaCode,
       DiEventsView.i94yr as year,
       DiEventsView.i94mon as month,
       DiEventsView.i94cit as city,
       DiCityTemperatureView.AverageTemperature as Temperature,
       DiCityTemperatureView.Latitude,
       DiCityTemperatureView.Longitude
FROM
DiEventsView
JOIN DiCityTemperatureView ON (DiEventsView.i94port = DiCityTemperatureView.i94port)
    '''
)

# insert into I94_Immigration table
I94_Immigration.write.mode("append").partitionBy("CityCode").parquet("/results/I94_Immigration.parquet")

In [15]:
#Creating the DiEvents table *dimension table* 
DiEvents = df_immCleaned.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"])

# insert into DiEvents table
DiEvents.write.mode("append").partitionBy("i94port").parquet("/results/DiEvents.parquet")

In [46]:
#Creating the temp_table table *dimension table* 
DiCityTemperature = dt_temp.select(["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"])

# insert into temp_table table
DiCityTemperature.write.mode("append").partitionBy("i94port").parquet("/results/DiCityTemperature.parquet")

## 4.2 Data Quality Checks

 * Source/Count checks to ensure completeness
 
Run Quality Checks


In [17]:
#Quality Check 1
#check the records number
def dataquality_checks(df, table_name):
   '''
    Input:  Spark Dataframe
    Output: The results of the quality check
    '''
    count = df.count()

    if count == 0:
        print(f"Data quality failed. {table_name} has zero records!")
    else:
        print(f"Data quality passed. {table_name} has {count:,} records.")
    return 0

In [None]:
#Quality Check 1
#check the records number by call the predefined function  
table_dfs = {
    'I94_Immigration': I94_Immigration,
    'DiEvents': DiEvents
    'DiCityTemperature': DiCityTemperature,
}

for table_name, table_df in table_dfs.items():
    dataquality_checks(table_df, table_name)

In [18]:
#Quality Check 2
#check the number of distinct records 
def dataquality_checks2(df, table_name):
   '''
    Input:  Spark Dataframe
    Output: The results of the quality check
    '''
    count = df.select("*").distinct().count()

    if count == 1:
        print(f"Data quality failed. {table_name} has Insufficient number of distinct records!")
    else:
        print(f"Data quality passed. {table_name} has {count:,} distinct records.")
    return 0

In [21]:
#Quality Check 2
#check the number of distinct records 
table_dfs = {
    'I94_Immigration': I94_Immigration,
    'DiEvents': DiEvents
    'DiCityTemperature': DiCityTemperature,
}

for table_name, table_df in table_dfs.items():
    dataquality_checks2(table_df, table_name)

## 4.3 Data dictionary 


### I-94 Immigration Dataset *Dictionary*

| Column Name | Description |
| :--- | :--- |
| CICID* | ID that uniquely identify one record in the dataset |
| I94YR | 4 digit year |
| I94MON | Numeric month |
| I94CIT | 3 digit code of origin city |
| I94RES | 3 digit code of residence country |
| I94PORT | Code of destination USA city |
| ARRDATE | Arrival date|
| I94MODE | Travel type code |
| I94ADDR | Destination State |
| DEPDATE | Departure date |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories |
| COUNT | Used for summary statistics |
| DTADFILE | Character Date Field |
| VISAPOST | The department that the Visa was issued from |
| OCCUP | Occupation that will be performed in U.S. |
| ENTDEPA | Arrival Flag - admitted or paroled into the U.S. |
| ENTDEPD | Departure Flag - Departed, lost I-94 or is deceased |
| ENTDEPU | Update Flag - Either apprehended, overstayed, adjusted to perm residence |
| MATFLAG | Match flag - Match of arrival and departure records |
| BIRYEAR | 4 digit year of birth |
| DTADDTO | Character Date Field - Date to which admitted to U.S. (allowed to stay until) |
| GENDER | Gender |
| INSNUM | INS number |
| AIRLINE | Airline used to arrive in U.S. |
| ADMNUM | Admission number |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

### World Temperature Dataset *Dictionary*

| Column Name | Description |
| :--- | :--- |
| dt | Date |
| AverageTemperature | The Average temperature of the city |
| City | The City Name |
| Country | The Country Name |
| Latitude | Latitude |
| Longitude | Longitude |

### U.S. City Demographic Dataset *Dictionary*

| Column Name | Description |
| :--- | :--- |
| City | The city name |
| State | The state name |
| Median Age | The median of the age |
| Male Population | Male population |
| Female Population | Female population |
| Total Population | Total population |
| Number of Veterans | Number of veterans in each city |
| Foreign-born | Number of Foreign the in the city |
| Average Household Size | Average size of the houses in the city |
| State Code | Code of the state |
| Race | Race type |
| Count | Count of race |

### Airport Code Dataset *Dictionary*

| Column Name | Description |
| :--- | :--- |
| ident | Identifier |
| type | Type of the airport |
| name | Airport Name |
| elevation_ft | The altitude of the airport |
| continent | Continent |
| iso_country | ISO code of the airport country |
| iso_region | ISO code for the airport region |
| municipality | The airport city |
| gps_code | GPS code of the airport |
| iata_code | IATA code of the airport |
| local_code | The code of the airport |
| coordinates | The coordinates of the airport |

## Step 5: Complete Project Write Up


#### Clearly state the rationale for the choice of tools and technologies for the project.

We used Apache spark for several reasons:
<ol>
    <li>Lighting-fast processing speed</li>
    <li>Ease of use</li>
    <li>It offers support for sophisticated analytics</li>
    <li>Support a variety of file formats</li>
    <li>It is flexible</li>
    <li>Active and expanding community</li>
</ol>  


#### Propose how often the data should be updated and why.

We propose to update the data once a month, so each month, we would load the data as a single batch. That is because of one of our main sources of data (I-94 data) is updated monthly.

### The Appropriate Approach to The Following Scenarios:



<ol>
  <li>The data was increased by 100x.</li>
</ol>    
    We can add more nodes to the cluster to increase its capabilities, and instead of loading the datasets as a single batch, we can load the data incrementally.
<hr />

<ol start ='2'>

  <li>The data populates a dashboard that must be updated on a daily basis by 7am every day.</li>

</ol>

We can use Apache Airflow to create pipelines with a schedule, or we can use Informatica Power Center, which is another famous ETL tool
<hr />

<ol start ='3'>

  <li>The database needed to be accessed by 100+ people.</li>
</ol>

Adding more nodes to the cluster still can handle this scenario also. On the other hand, we can use Amazon Redshift instead of our current DB as another option.

<hr />