# Project Capstone - Udacity

## US Immigration Data, Airport, US Demographics and Temprature ETL Pipeline


#### Project Summary

This project augments the US I94 immigration data with further data such as US airport data, US demographics and temperature data to have a wider basis for analysis on the immigration data.

The environment created for the project is as follows:
- Spark version 3.0 - latest version https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
- Hadoop version 3.2 pre-bundled with Spark
- Pyspark v3 compatible with Spark version
- Library for reading SAS data with https://github.com/saurfang/spark-sas7bdat
- Python Anaconda - v1.9.12
All the python dependencies were installed for running this project.

An ETL pipeline feeds into the 'dwh'(datawarehouse) database within indigeneous Spark Metastore. This database has a fact and dimensions table necessary for studying Immigration behavior. The warehouse can be used as a BI Analytics backend for 


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [371]:
import boto3
import re
import os
import pandas as pd
import io
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *

## Step 1: Scope the Project and Gather Data

### Scope 
This project augments the US I94 immigration data with further data such as US airport data, US demographics and temperature data to have a wider basis for analysis on the immigration data.
An ETL pipeline is built which directly feeds into the Spark warehouse 'dwh'. This warehouse can be used as a BI analytics backend for gaining deep insights on immigration behaviour.

#### I94 Immigration Data
I94 immigration data comes from theUS National Tourism and Trade Office website. It is provided in SAS7BDAT format which is a binary database storage format.

The temperature data is a Kaggle data set that includes temperatures in cities around the world. It can be found here: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

#### World Temperature Data
This is defined in the project resources. This dataset came from Kaggle.

#### Data for US Demographics
This is defined in the project resources. This data comes from OpenSoft.

#### Airport Data
This is a simple table of airport codes and corresponding cities.




In [360]:
# Data is gathered from the Udacity workspace to local workstation
%ls data/

Data-dict.jpeg                    [34mi94.csv[m[m/
[31mGlobalLandTemperaturesByCity.csv[m[m* [31mi94_apr16_sub.sas7bdat[m[m*
[31mairport-codes_csv.csv[m[m*            [34mimmigration[m[m/
airport.jpeg                      temperature.jpeg
demographics.jpeg                 [31mus-cities-demographics.csv[m[m*


In [182]:
# Spark library to read sas files
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:3.0.0-s_2.12").\
    enableHiveSupport().getOrCreate()

In [183]:
# Defining data source
files = [
    'GlobalLandTemperaturesByCity.csv',
    'i94_apr16_sub.sas7bdat',
    'airport-codes_csv.csv',
    'us-cities-demographics.csv'
]

airport_data = 'data/' + files[2]
demographics_data = 'data/' + files[3]
temperature_data = 'data/' + files[0]
immigration_data = 'data/' + files[1]

In [184]:
#df_immigration = pd.read_sas(immigration_data, 'sas7bdat', encoding="ISO-8859-1")
#df_temperature = pd.read_csv(temperature_data)
#df_demographics = pd.read_csv(demographics_data, sep)
#df_airports = pd.read_csv(airport_data)

In [185]:
spark

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Temperature Data
Converting into Spark Data Frames

Saving temperature data in pqt file format while fixing the header and making it consistent



In [364]:
df_temperature = spark.read.csv(temperature_data,header=True,inferSchema=True)
#fixing the header problem, this will make it consistent
for name in list(df_temperature.schema.names):
    df_temperature = df_temperature.withColumnRenamed(name, re.sub(r'[^a-zA-Z0-9]', '_', name).lower())
    
df_temperature.write.mode('overwrite').format('parquet').option('path', 'output/temperature').saveAsTable('temperature')

In [370]:
df_temperature.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| averagetemperature|averagetemperatureuncertainty| city|country|latitude|longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|        

#### Immigration data


##### Cleaning steps
Fixing data quality and other issues:

- date format for dtadfile and dtaddto
- data type for i94yr shouldn't be float
- data type for i94cit shouldn't be float
- data type for i94res shouldn't be float
- data type for i94mon shouldn't be float
- data type for arrdate shouldn't be float
- data type for i94mode shouldn't be float
- data type for depdate shouldn't be float
- data type for i94bir shouldn't be float
- data type for i94visa shouldn't be float
- data type for biryear shouldn't be float
- data type for count shouldn't be float
- data type for cicid shouldn't be float
- data type for admnum shouldn't be float

In [188]:
# Loading this as parrquet into the output folder
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(immigration_data)
for name in list(df_immigration.schema.names):
    df_immigration = df_immigration.withColumnRenamed(name, re.sub(r'[^a-zA-Z0-9]', '_', name).lower())
df_immigration.write.mode('overwrite').format('parquet').option('path', 'output/immigration').saveAsTable('immigration')

In [466]:
# fixing data quality issues
df_imm = (df_immigration
                  .withColumn('dtadfile', to_date(df_immigration.dtadfile, 'yyyyMMdd'))
                  .withColumn('dtaddto', to_date(df_immigration.dtaddto, 'MMddyyyy'))
                  .withColumn('i94yr', df_immigration.i94yr.cast(IntegerType()))
                  .withColumn('i94cit', df_immigration.i94cit.cast(IntegerType()))
                  .withColumn('i94res', df_immigration.i94res.cast(IntegerType()))
                  .withColumn('i94mon', df_immigration.i94mon.cast(IntegerType()))
                  .withColumn('arrdate', df_immigration.arrdate.cast(IntegerType()))
                  .withColumn('i94mode', df_immigration.i94mode.cast(IntegerType()))
                  .withColumn('depdate', df_immigration.depdate.cast(IntegerType()))
                  .withColumn('i94bir', df_immigration.i94bir.cast(IntegerType()))
                  .withColumn('i94visa', df_immigration.i94visa.cast(IntegerType()))
                  .withColumn('biryear', df_immigration.biryear.cast(IntegerType()))
                  .withColumn('count', df_immigration['count'].cast(IntegerType()))
                  .withColumn('cicid', df_immigration.cicid.cast(LongType()))
                  .withColumn('admnum', df_immigration.admnum.cast(LongType()))
                  )

df_imm.count()

#This will be used for staging the data for immigration
staging_immigration = df_imm.dropDuplicates(['cicid'])

In [467]:
staging_immigration.show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+----------+------+------+-------+-----------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|   dtaddto|gender|insnum|airline|     admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+----------+------+------+-------+-----------+-----+--------+
|   29| 2016|     4|   101|   101|    ATL|  20545|      1|     MA|  20561|    62|      2|    1|2016-04-01|     TIA| null|      G|      O|   null|      M|   1954|2016-09-30|     M|  null|     AZ|92503781430|00614|      B2|
|  474| 2016|     4|   103|   103|    NEW|  20545|      2|   null|  20547|    25|      2|    1|2016-04-01|    nu

#### Demographics data

##### Cleaning
Fixing headers

In [191]:
df_demographics = spark.read.csv(demographics_data, header=True, sep=';', inferSchema=True)
for name in list(df_demographics.schema.names):
    df_demographics = df_demographics.withColumnRenamed(name, re.sub(r'[^a-zA-Z0-9]', '_', name).lower())
df_demographics.write.mode('overwrite').format('parquet').option('path', 'output/demographics').saveAsTable('demographics')

In [385]:
df_demographics.select('city').show()
df_demographics.select('city').count()

+----------------+
|            city|
+----------------+
|   Silver Spring|
|          Quincy|
|          Hoover|
|Rancho Cucamonga|
|          Newark|
|          Peoria|
|        Avondale|
|     West Covina|
|        O'Fallon|
|      High Point|
|          Folsom|
|          Folsom|
|    Philadelphia|
|         Wichita|
|         Wichita|
|      Fort Myers|
|      Pittsburgh|
|          Laredo|
|        Berkeley|
|     Santa Clara|
+----------------+
only showing top 20 rows



2891

In [537]:
df_demographics.show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            city|         state|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|                race| count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

#### Airports data

##### Cleaning
- fixing headers
- fixing the iso_region while removing 'US-' as perfix

In [391]:
df_airport = spark.read.csv(airport_data, header=True, inferSchema=True)
for name in list(df_airport.schema.names):
    df_airport = df_airport.withColumnRenamed(name, re.sub(r'[^a-zA-Z0-9]', '_', name).lower())

df_airport = df_airport.withColumn('iso_region', regexp_replace('iso_region', r'US-', ''))
df_airport.write.mode('overwrite').format('parquet').option('path', 'output/airport').saveAsTable('airport')


In [407]:
df_airport.select('iso_region').show()


+----------+
|iso_region|
+----------+
|        PA|
|        KS|
|        AK|
|        AL|
|        AR|
|        OK|
|        AZ|
|        CA|
|        CA|
|        CA|
|        CO|
|        FL|
|        FL|
|        FL|
|        GA|
|        GA|
|        HI|
|        ID|
|        KS|
|        IN|
+----------+
only showing top 20 rows



In [538]:
df_airport.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|        PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|        KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|        AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

### Data Dictionary for Immigration

<img src="data/Data-dict.jpeg" width=600 height=600 />

### Data Dictionary for Temperature

<img src="data/temperature.jpeg" width=400 height=600 />

### Data Dictionary for Airports

<img src="data/airport.jpeg" width=500 height=600 />

### Data Dictionary for Demographics

<img src="data/demographics.jpeg" width=500 height=600 />

In [195]:
#df_immigration.createGlobalTempView("immigration")
#df_temperature.createGlobalTempView("temperature")
#df_airport.createGlobalTempView("airport")
#df_demographics.createGlobalTempView("demographics")

In [400]:
spark.sql('show tables').show()

+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
| default|     airport|      false|
| default|demographics|      false|
| default| immigration|      false|
| default| temperature|      false|
+--------+------------+-----------+



## Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

Immigration data has wealth of information on immigration practices in US hence it becomes our fact table.
Various dimensions table have been carved out which provides insights on influx of immigrants, visa information for immigrants etc.
Demographics and Airports table serve as other dimensions which can be joined with state_code and iata_code


#### 3.2 ETL  Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Run 'pythoon create_tables.py' to create tables.
2. SQL Join on city to airports data.
3. Finally insert the data using the main notebook.

### Fact table:
- Immigration (Joined with airports data on iata_code)

### Dimension table:
- Citywise influx (made from immigration table)

### Dimension table:
- Demographics table

### Dimension table
- Visa type (made from immigration table)

### Dimension table
- Airport (made using airport data)

In [497]:
#Creating a data warehouse

spark.sql("create database dwh").show()

spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
|      dwh|
+---------+



In [498]:
# Creating staging table for immigration
staging_immigration.write.mode('overwrite').format('parquet').option('path', 'dwh/s_immigration').saveAsTable('dwh.staging_immigration')


In [499]:
#Staging data for airport
# This is airport

# Will be used for loading data to dimension table
df_staging_airport = spark.sql("select * from airport where iata_code in (select distinct(i94port) from staging_immigration)")
staging_airport = df_staging_airport.dropDuplicates(['iata_code'])
staging_airport.write.mode('overwrite').format('parquet').option('path', 'dwh/s_airport').saveAsTable('dwh.staging_airport')




In [500]:
staging_airport.show()

+------+--------------+--------------------+------------+----------+-----------------+--------+---------+----------+--------------------+
| ident|          type|                name|elevation_ft|iso_region|     municipality|gps_code|iata_code|local_code|         coordinates|
+------+--------------+--------------------+------------+----------+-----------------+--------+---------+----------+--------------------+
|  KBGM|medium_airport|Greater Binghamto...|        1636|        NY|       Binghamton|    KBGM|      BGM|       BGM|-75.97979736, 42....|
|  KFMY|medium_airport|          Page Field|          17|        FL|       Fort Myers|    KFMY|      FMY|       FMY|-81.8632965087999...|
|  KDNS| small_airport|Denison Municipal...|        1274|        IA|          Denison|    KDNS|      DNS|       DNS|-95.38069916, 41....|
|  EFHK| large_airport|Helsinki Vantaa A...|         179|     FI-ES|         Helsinki|    EFHK|      HEL|      null|24.963300704956, ...|
|  KFOK| small_airport|Francis S G

In [501]:
#Staging data for citywise influx
staging_influx = spark.sql("SELECT i94cit, count(i94cit) count, i94addr, i94port FROM staging_immigration group by i94cit, i94port, i94addr order by count desc")

staging_influx.write.mode('overwrite').format('parquet').option('path', 'dwh/s_influx').saveAsTable('dwh.staging_influx')


In [502]:
#Staging data for visa info

staging_visa = spark.sql("SELECT i94port, count(i94port) count, i94visa, visapost, visatype FROM staging_immigration group by visatype, i94visa, i94port, visapost order by count desc")

staging_visa.write.mode('overwrite').format('parquet').option('path', 'dwh/s_visa').saveAsTable('dwh.staging_visa')


In [503]:
# Staging data for demographics
staging_demographics = df_demographics
staging_demographics.write.mode('overwrite').format('parquet').option('path', 'dwh/s_demographics').saveAsTable('dwh.staging_demographics')

In [504]:
staging_visa.dtypes


[('i94port', 'string'),
 ('count', 'bigint'),
 ('i94visa', 'int'),
 ('visapost', 'string'),
 ('visatype', 'string')]

In [505]:
staging_influx.dtypes


[('i94cit', 'int'),
 ('count', 'bigint'),
 ('i94addr', 'string'),
 ('i94port', 'string')]

In [506]:
staging_airport.dtypes


[('ident', 'string'),
 ('type', 'string'),
 ('name', 'string'),
 ('elevation_ft', 'int'),
 ('iso_region', 'string'),
 ('municipality', 'string'),
 ('gps_code', 'string'),
 ('iata_code', 'string'),
 ('local_code', 'string'),
 ('coordinates', 'string')]

In [507]:
staging_immigration.dtypes


[('cicid', 'bigint'),
 ('i94yr', 'int'),
 ('i94mon', 'int'),
 ('i94cit', 'int'),
 ('i94res', 'int'),
 ('i94port', 'string'),
 ('arrdate', 'int'),
 ('i94mode', 'int'),
 ('i94addr', 'string'),
 ('depdate', 'int'),
 ('i94bir', 'int'),
 ('i94visa', 'int'),
 ('count', 'int'),
 ('dtadfile', 'date'),
 ('visapost', 'string'),
 ('occup', 'string'),
 ('entdepa', 'string'),
 ('entdepd', 'string'),
 ('entdepu', 'string'),
 ('matflag', 'string'),
 ('biryear', 'int'),
 ('dtaddto', 'date'),
 ('gender', 'string'),
 ('insnum', 'string'),
 ('airline', 'string'),
 ('admnum', 'bigint'),
 ('fltno', 'string'),
 ('visatype', 'string')]

In [508]:
staging_demographics.dtypes

[('city', 'string'),
 ('state', 'string'),
 ('median_age', 'double'),
 ('male_population', 'int'),
 ('female_population', 'int'),
 ('total_population', 'int'),
 ('number_of_veterans', 'int'),
 ('foreign_born', 'int'),
 ('average_household_size', 'double'),
 ('state_code', 'string'),
 ('race', 'string'),
 ('count', 'int')]

In [542]:
spark.sql("use dwh").show()
spark.sql("show tables").show()

++
||
++
++

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|     dwh|     staging_airport|      false|
|     dwh|staging_demographics|      false|
|     dwh| staging_immigration|      false|
|     dwh|      staging_influx|      false|
|     dwh|        staging_visa|      false|
+--------+--------------------+-----------+



## 4 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [530]:
def dq_checks(df, staging_table):
    """Count the number of records for data completeness.
    :param df: spark dataframe to check counts on
    :param staging_table: corresponding name of table
    """
    total_count = df.count()

    if total_count == 0:
        print(f"Data quality check failed for {staging_table} with zero records!")
    else:
        print(f"Data quality check passed for {staging_table} with {total_count} records.")
    return 0

In [533]:
tables = {
    'staging_immigration': staging_immigration,
    'staging_visa': staging_visa,
    'staging_influx': staging_influx,
    'staging_demographics': staging_demographics,
    'staging_airport': staging_airport
}
for table_name, table_df in tables.items():
    dq_checks(table_df,table_name)

Data quality check passed for staging_immigration with 3096313 records.
Data quality check passed for staging_visa with 29372 records.
Data quality check passed for staging_influx with 89678 records.
Data quality check passed for staging_demographics with 2891 records.
Data quality check passed for staging_airport with 252 records.
