# Data Engineering Capstone Project

## US I94 Immigration Data Lake

### Project Summary
This project performs ETL operations on Udacity provided I94 immigration and demographics datasets using Pyspark. It generates a Star Schema in parquet file at the end following Data Lake's schema-on-read semantics.

This notebooks performs Exploratory Data Analysis on used datasets.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Check readme for installation and env setup
import configparser
import logging

import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from etl import (
    get_spark_session,
    get_immigration_data,
    get_us_demographics_data,
    get_i94_countries,
    get_i94_ports,
    get_i94_states,
    get_i94_travel_modes,
    get_i94_visas,
    clean_immigration_data,
    clean_us_demographics_data,
    clean_ports_data,
    clean_countries_data,
    clean_states_data,
    create_immigration_fact_table,
    create_city_demographics_dim_table,
    logger
)

logger.setLevel(logging.WARN) # To suppress etl logs in notebook

### Step 1: Scope the Project and Gather Data

#### Scope 
*Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc.*

I plan to create a data lake using Pyspark about immigrants destinations in US. To achieve this, I've used I94 immigrations dataset along with US demographics and ISO-3166 country codes datasets. Processed data lake could be used to analyse immigration trends at particular time periods and origin of the travelers. Output is generated in `Apache Parquet` columnar format for better performance on aggregation queries.

**Tools/Tech Used**: Python, Apache Spark (PySpark), Pandas

#### Describe and Gather Data 
*Describe the data sets you're using. Where did it come from? What type of information is included?*

Following datasets are used for this project:
- **I94 Immigration Data 2016:** This data comes from the US National Tourism and Trade Office.
    - Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    - Note this data is behind a paywall and provided by Udacity for this project.
    - Dataset consists of 12 files containing data for each month. Each file has around 3 million rows and 28 columns. A data dictionary explaining columns is also included at `data/I94_SAS_Labels_Descriptions.SAS`.
    - Sample CSV: `data/input/immigration_data_sample.csv`
    - NOTE: I've used sample sas dataset provided in `sas_data` dir in workspace by Udacity. This data contains ~3MM rows which satisfies the requirement of at least 1MM rows. It contains data for April 2016 only.
- **U.S. City Demographic Data:** This data comes from OpenSoft.
    - Source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

##### 1.1 Read config and load sample from datasets

In [2]:
# Because the immigration data has 28 columns
pd.set_option('display.max_columns', 28)


# Read config
config = configparser.ConfigParser()
config.read_file(open('capstone.cfg'))

I94_DATA_FILE_PATH = config['DATA']['I94_DATA_FILE_PATH']
print(I94_DATA_FILE_PATH)
# df = pd.read_sas(I94_DATA_FILE_PATH, format='sas7bdat', encoding="ISO-8859-1")

../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat


In [3]:
# Read local parquet dataset in `df`
I94_LOCAL_DATA_DIR = config['DATA']['I94_LOCAL_DATA_DIR']
print("I94_LOCAL_DATA_DIR: ", I94_LOCAL_DATA_DIR)

# To reduce memory usage locally
# df = pd.read_parquet(I94_LOCAL_DATA_DIR).sample(n=1000)
# df.describe()

I94_LOCAL_DATA_DIR:  data/input/sas_data/


In [4]:
# df.sample(n=10)

##### 1.2 Configure Spark session

In [5]:
# spark = SparkSession.builder \
#             .appName("Capstone Project - Immigration Dataset") \
#             .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
#             .enableHiveSupport() \
#             .getOrCreate()
spark = get_spark_session()
print("Spark session created")


Spark session created


##### 1.3 Show sample immigration data

Read April 2016 file in spark dataframe (same as Pandas df)

In [6]:
# Reading apr sas file in Spark df

# immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(I94_DATA_FILE_PATH)

immigration_df = get_immigration_data(spark)
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
print("Paritions: ", immigration_df.rdd.getNumPartitions())
df_pd = pd.DataFrame(immigration_df.head(10))
df_pd

Paritions:  4


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,57.0,2.0,1.0,20160430,ACK,,G,O,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,66.0,2.0,1.0,20160430,ACK,,G,O,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,41.0,2.0,1.0,20160430,ACK,,G,O,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,27.0,2.0,1.0,20160430,ACK,,G,O,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,26.0,2.0,1.0,20160430,ACK,,G,O,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


### Step 2: Explore and Assess the Data
#### 2.1 Explore the Data
*Identify data quality issues, like missing values, duplicate data, etc.*

In [52]:
# Read data sets
us_demographics_df = get_us_demographics_data(spark)
countries_df = get_i94_countries(spark)
ports_df = get_i94_ports(spark)
states_df = get_i94_states(spark)
travel_modes_df = get_i94_travel_modes(spark)
visa_categories_df = get_i94_visas(spark)

In [53]:
us_demographics_df.select("city").distinct().count()

567

#### 2.2 Cleaning Steps
Document steps necessary to clean the data

Explained in readme/etl.py

In [54]:
# Performing cleaning tasks here
cleaned_immigration_df = clean_immigration_data(immigration_df)
cleaned_us_demographics_df = clean_us_demographics_data(us_demographics_df)
cleaned_ports_df = clean_ports_data(ports_df)
cleaned_countries_df = clean_countries_data(countries_df)
cleaned_states_df = clean_states_data(states_df)

In [70]:
cleaned_us_demographics_df.createOrReplaceTempView("cleaned_us_demographics_df")
us_demographics_df.createOrReplaceTempView("us_demographics_df")

In [71]:
cleaned_us_demographics_df.printSchema()
cleaned_us_demographics_df.select("state_code").distinct().count()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- count: integer (nullable = true)



48

In [72]:
us_demographics_df.printSchema()
us_demographics_df.select("state_code").distinct().count()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- count: integer (nullable = true)



49

In [73]:
cleaned_ports_df.printSchema()
cleaned_ports_df.select("state_code").distinct().count()

root
 |-- port_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)



112

In [59]:
spark.sql("""
    SELECT COUNT(DISTINCT city)
    FROM cleaned_us_demographics_df
""").show()

+--------------------+
|count(DISTINCT city)|
+--------------------+
|                 559|
+--------------------+



In [66]:
spark.sql("""
    SELECT DISTINCT demo.city, demo.state
    FROM us_demographics_df demo
        LEFT OUTER JOIN cleaned_us_demographics_df cdf 
            ON lower(demo.city) = lower(cdf.city)
    WHERE cdf.city IS NULL
        
""").show()

+------------+-----------+
|        city|      state|
+------------+-----------+
|    San Juan|Puerto Rico|
|The Villages|    Florida|
|    Mayagüez|Puerto Rico|
|    Guaynabo|Puerto Rico|
|       Ponce|Puerto Rico|
|      Caguas|Puerto Rico|
|     Bayamón|Puerto Rico|
|    Carolina|Puerto Rico|
+------------+-----------+



In [81]:
# spark.sql("""
#     SELECT
#         COUNT(sud.city)
#     FROM cleaned_us_demographics_df sud
#     GROUP BY sud.city, sud.state_code
# """).count()

aggregated_df = spark.sql("""
    SELECT
        sud.city,
        sud.state_code,
        SUM(sud.male_population) AS male_population,
        SUM(sud.female_population) AS female_population,
        SUM(sud.total_population) AS total_population,
        SUM(sud.number_of_veterans) AS number_of_veterans,
        SUM(sud.foreign_born) AS num_foreign_born
    FROM staging_us_demographics sud
    GROUP BY sud.city, sud.state_code
""")
aggregated_df.createOrReplaceTempView('combined_demographics')
spark.sql("""
    SELECT
        COUNT(*)
    FROM combined_demographics
""").show()

+--------+
|count(1)|
+--------+
|     588|
+--------+



In [85]:
cleaned_ports_df.createOrReplaceTempView("cleaned_ports_df")
spark.sql("""
    SELECT
        sp.port_code AS port_code,
        cd.*
    FROM cleaned_ports_df sp
        JOIN combined_demographics cd 
            ON lower(cd.city) = lower(sp.city) AND cd.state_code = sp.state_code
""").count()

113

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
*Map out the conceptual data model and explain why you chose that model*

Added in readme.

#### 3.2 Mapping Out Data Pipelines
*List the steps necessary to pipeline the data into the chosen data model*

Added in readme.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [35]:
# Creating immigrations fact
immigration_fact_table = create_immigration_fact_table(spark, cleaned_immigration_df, cleaned_countries_df,
                                                           cleaned_states_df, cleaned_ports_df, visa_categories_df,
                                                           travel_modes_df)

In [36]:
# create port demographics dimension table
city_demographics_table = create_city_demographics_dim_table(spark, cleaned_us_demographics_df, cleaned_ports_df)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [40]:
spark.conf.set("spark.sql.shuffle.partitions", 50) # To reduce data shuffling locally

In [37]:
# Create tables
immigration_fact_table.createOrReplaceTempView("fact_immigrations")
city_demographics_table.createOrReplaceTempView("dim_city_demographics")
ports_df.createOrReplaceTempView("dim_ports")
countries_df.createOrReplaceTempView("dim_country")
states_df.createOrReplaceTempView("dim_us_state")
visa_categories_df.createOrReplaceTempView("dim_visa_category")
travel_modes_df.createOrReplaceTempView("dim_travel_mode")

In [38]:
spark.sql("""
    SELECT 
        COUNT(*) as count
    FROM fact_immigrations
""").show()

+--------+
|count(1)|
+--------+
| 2823272|
+--------+



In [39]:
# Note: its quite an expensive operation locally
immigration_fact_table.head()

Row(cicid=3386206.0, entry_year=2016.0, entry_month=4.0, origin_country_code='438', port_code='FMY', arrival_date='2016-04-18', travel_mode_code='1', us_state_code='AZ', departure_date='2016-04-24', age=54.0, visa_category_code='1', occupation=None, gender='M', birth_year='1962.0', entry_date='07162016', airline='QF', admission_number=56359932933.0, flight_number='00093', visa_type='WB')

In [43]:
spark.sql("""
    SELECT count(*) FROM dim_ports
""").show()

+--------+
|count(1)|
+--------+
|     660|
+--------+



In [45]:
spark.sql("""
    SELECT count(*) FROM dim_city_demographics
""").show()

+--------+
|count(1)|
+--------+
|     113|
+--------+



In [44]:
spark.sql("""
    SELECT count(*) FROM dim_country
""").show()

+--------+
|count(1)|
+--------+
|     289|
+--------+



In [46]:
spark.sql("""
    SELECT count(*) FROM dim_us_state
""").show()

+--------+
|count(1)|
+--------+
|      55|
+--------+



In [47]:
spark.sql("""
    SELECT count(*) FROM dim_visa_category
""").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [48]:
spark.sql("""
    SELECT count(*) FROM dim_travel_mode
""").show()

+--------+
|count(1)|
+--------+
|       4|
+--------+



In [86]:
immigration_fact_table.select("visa_category_code").distinct().show()

+------------------+
|visa_category_code|
+------------------+
|                 3|
|                 2|
|                 1|
+------------------+



In [91]:
city_demographics_table.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- num_foreign_born: long (nullable = true)



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.
 
 Check readme.


#### The data model is appropriate for the identified purpose.
Queries:
- Which city was most visited in a specific month?
- From which country (or countries) travelers originate? Top countries of origin.
- Top countries from where students are coming?

In [None]:
# Which city was most visited in a specific month?
# This data is from April file, let's try for it
# numeric code for April is 4

spark.sql("""
    SELECT
        tvc.port_code,
        tvc.immigrant_visits,
        dcd.city,
        dcd.state_code,
        dcd.total_population
    FROM
        (SELECT 
            fi.port_code AS port_code, 
            COUNT(*) AS immigrant_visits
        FROM fact_immigrations fi 
        WHERE fi.entry_month = 4
        GROUP BY fi.port_code
        ORDER BY immigrant_visits DESC
        LIMIT 10
        ) AS tvc 
    JOIN dim_city_demographics dcd
        ON dcd.port_code = tvc.port_code
    ORDER BY tvc.immigrant_visits DESC
""").show()
# Showing top 10 visited cities here

In [94]:
#From which country (or countries) travelers originate? Top countries of origin.
spark.sql("""
    SELECT *
    FROM
        (SELECT 
            fi.origin_country_code AS origin_country_code, 
            COUNT(*) AS country_visitors
        FROM fact_immigrations fi 
        GROUP BY fi.origin_country_code
        ORDER BY country_visitors DESC
        LIMIT 10
        ) AS tcv
    JOIN dim_country dc
        ON tcv.origin_country_code = dc.country_code
    ORDER BY country_visitors DESC
""").show()

+-------------------+----------------+------------+--------------------+
|origin_country_code|country_visitors|country_code|        country_name|
+-------------------+----------------+------------+--------------------+
|                135|          339928|         135|      UNITED KINGDOM|
|                209|          230840|         209|               JAPAN|
|                111|          175445|         111|              FRANCE|
|                582|          163778|         582|MEXICO Air Sea, a...|
|                245|          159887|         245|          CHINA, PRC|
|                112|          146230|         112|             GERMANY|
|                689|          128960|         689|              BRAZIL|
|                276|          116534|         276|         SOUTH KOREA|
|                438|           99731|         438|           AUSTRALIA|
|                213|           87988|         213|               INDIA|
+-------------------+----------------+------------+

In [95]:
# Top countries from where students are coming?
# visa_category_code for student is 3 (SAS file)
spark.sql("""
    SELECT *
    FROM
        (SELECT 
            fi.origin_country_code AS origin_country_code, 
            COUNT(*) AS student_visitors
        FROM fact_immigrations fi 
        WHERE visa_category_code = 3
        GROUP BY fi.origin_country_code
        ORDER BY student_visitors DESC
        LIMIT 10
        ) AS tcv
    JOIN dim_country dc
        ON tcv.origin_country_code = dc.country_code
    ORDER BY student_visitors DESC
""").show()

+-------------------+----------------+------------+--------------------+
|origin_country_code|student_visitors|country_code|        country_name|
+-------------------+----------------+------------+--------------------+
|                245|            9760|         245|          CHINA, PRC|
|                213|            2683|         213|               INDIA|
|                276|            2498|         276|         SOUTH KOREA|
|                209|            2157|         209|               JAPAN|
|                582|            1817|         582|MEXICO Air Sea, a...|
|                689|            1677|         689|              BRAZIL|
|                261|            1425|         261|        SAUDI ARABIA|
|                268|            1060|         268|              TAIWAN|
|                696|             890|         696|           VENEZUELA|
|                691|             756|         691|            COLOMBIA|
+-------------------+----------------+------------+