# IMMIGRATION ANALYSIS IN THE US!
### Data Engineering Capstone Project

#### Project Summary
The present project takes into account some US datasets regarding the immigration during a fixed amount of time
and performs some data pipelines in order to clean the raw dataset and create a data model that serves analytic
purposes, specifically the ones to answer the following questions:
* What is the state with most immigrants registered?
* How many airports have each of the top 10 cities with most immigrants?
* What is the state with greater immigrants/population rate?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
#Imports needed for the program
import os
import glob
import psycopg2
import pandas as pd
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, monotonically_increasing_id
from pyspark.sql.types import DoubleType, IntegerType

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of the project is to read the following datasets in order to satisfy the analytical questions stated above:
* immigration_data (SAS format written in PARQUET files)
* airport-codes_csv (CSV format)
* us-cities-demographics (CSV format)

The project gathers these datasets in order to create a star schema considering a main fact table (immigration data) and three dimension tables (immigrant info, airports information and cities demographics). These star schema will be represented using Spark SQL module, using Spark to read all input files (parquet and csv) and to create the needed dataframes.
The questions to answer are the following:

* What is the state with most immigrants registered?
* How many airports have each of the top 10 cities with most immigrants?
* What is the state with greater immigrants/population rate?

#### Describe and Gather Data

* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the repository. This data consists of records of immigrants on US.
* U.S. City Demographic Data: This data comes from OpenSoft. It consists of records of cities in US and all their demographic information such as population (both male and female), main race, number of veterans, among others.
* Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from datahub: https://datahub.io/core/airport-codes#data

In [2]:
def getSparkSession():
    """
    Returns a new Spark session object
    
    Parameters:
        None
        
    Returns:
        spark: Spark session
    """
    #Create Spark session
    return SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

In [3]:
def loadDFs(spark):
    """
    Loads and returns all Dataframes needed using the given spark object
    
    Parameters:
        df: Spark Dataframe
        
    Returns:
        immigration_df: Immigration Dataframe
        airports_df:    Airports Dataframe
        cities_df:      Cities Dataframe
    """
    #Reading immigration data (from parquet files)
    immigration_df = spark.read.parquet("sas_data")

    #Reading airport codes (from csv)
    airports_df = spark.read.format('csv').options(header='true').load('airport-codes_csv.csv')
    airports_df = airports_df.withColumn("elevation_ft", airports_df["elevation_ft"].cast(IntegerType()))

    #Reading us cities demographics (from csv)
    cities_df = spark.read.format('csv').options(header='true').load('us-cities-demographics.csv')
    cities_df = cities_df.toDF("data")
    cities_df = cities_df.select(split(col("data"),";").getItem(0).alias("city"),
        split(col("data"),";").getItem(1).alias("state"),
        split(col("data"),";").getItem(2).alias("median_age").cast(DoubleType()),
        split(col("data"),";").getItem(3).alias("male_pop").cast(IntegerType()),
        split(col("data"),";").getItem(4).alias("female_pop").cast(IntegerType()),
        split(col("data"),";").getItem(5).alias("total_pop").cast(IntegerType()),
        split(col("data"),";").getItem(6).alias("num_veterans").cast(IntegerType()),
        split(col("data"),";").getItem(7).alias("foreign_born").cast(IntegerType()),
        split(col("data"),";").getItem(8).alias("avg_household_size").cast(DoubleType()),
        split(col("data"),";").getItem(9).alias("state_code"),
        split(col("data"),";").getItem(10).alias("race"),
        split(col("data"),";").getItem(11).alias("count").cast(IntegerType())).drop("data")

    return immigration_df, airports_df, cities_df

In [9]:
#Schemas of DF read from files
def dispSchema(df, title):
    """
    Prints the schema of the given Spark dataframe
    
    Parameters:
        df: Spark Dataframe
        title: Table name
        
    Returns:
        None
    """
    print(title,"Table Schema:")
    df.printSchema()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Given the datasets provided, and looking at the sample CSV files, the main fields to consider in order to answer the analytical queries cannot have NULL values. These entire records will be removed in order to leave the datasets clean to perform the needed queries. Also, duplicate rows are not allowed for specific columns and they will be removed as described next.

#### Cleaning Steps
The cleaning steps are explained as follows:

In [5]:
def cleanDFs(immigration_df, airports_df, cities_df):
    """
    Cleans the given dataframes according to the rules defined
    
    Parameters:
        immigration_df: Immigration Dataframe
        airports_df:    Airports Dataframe
        cities_df:      Cities Dataframe
        
    Returns:
        immigration_df: Immigration Dataframe cleaned
        airports_df:    Airports Dataframe cleaned
        cities_df:      Cities Dataframe cleaned
    """
    #Immigration DF
    #This dataframe cannot include NULL values for i94addr, depdate or gender
    immigration_df = immigration_df.na.drop(subset=["i94addr","depdate","gender"])

    #Airport Codes DF
    #This dataframe cannot have duplicates for ident and cannot have NULL values for municipality
    airports_df = airports_df.na.drop(subset=["municipality"]).dropDuplicates(subset=["ident"])

    #US cities demographics DF
    #This dataframe cannot have NULL values for total population and both city-state should be unique for each row
    cities_df = cities_df.na.drop(subset=["total_pop"]).dropDuplicates(subset=["city", "state"])
    
    return immigration_df, airports_df, cities_df

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The database schema is presented as follows. It consists of 1 fact table with all immigration data needed to perform queries and 3 dimension tables that have all the information needed for the immigrant person, airports and cities demographics.
![](db_images/db_immigration.png)

#### 3.2 Mapping Out Data Pipelines
In order to perform the data pipeline to model the database needed is as follows:
* Create and load immigrants info table from immigration dataframe casting some double columns into integers for better performance
* Create and load airports table from airports dataframe, selecting only the US airports
* Create and load cities demographics table from cities dataframe, selecting the relevant cities' features
* Create and laod the main fact table, immigration table, from the immigration dataframe, casting some double columns into integers for better performance

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [11]:
#Data pipeline to create the fact table and the 3 dimension tables as needed
def buildDataModel():
    """
    Builds the data model using the complete pipeline defined
    
    Parameters:
        None
        
    Returns:
        immigrants_info_table:      Immigrants dimension table
        airports_table:             Airports dimension table
        cities_demographics_table:  Cities dimension table
        immigration_table:          Immigration fact table
    """
    
    #First, create a Spark Session
    spark = getSparkSession()
    
    #Second, load the DFs and print their schemas
    immigration_df, airports_df, cities_df = loadDFs(spark)
    
    #Third, clean the DFs
    immigration_df, airports_df, cities_df = cleanDFs(immigration_df, airports_df, cities_df)
    
    #Fourth, create tables following the star schema described
    #Immigrant info table (dim)
    immigrants_info_table = immigration_df.select(col("cicid").cast(IntegerType()).alias("imm_id"), 
                            col("biryear").cast(IntegerType()).alias("birth_year"), "gender", 
                            col("visatype").alias("visa_type"))

    #Airport codes table (dim) (filet to get only US countries)
    airports_table = airports_df.filter("iso_country == 'US'").select(col("ident").alias("id"), "type", "name", 
                            "elevation_ft", split(col("iso_region"), "-").getItem(1).alias("state"), 
                            col("municipality").alias("city"))

    #US cities demographics table (dim)
    cities_demographics_table = cities_df.select("city", col("state").alias("state_name"), "male_pop", "female_pop",
                            "total_pop", col("state_code").alias("state"), "race")

    #Immigration fact table
    immigration_table = immigration_df.select(col("cicid").cast(IntegerType()).alias("imm_id"), 
                            col("i94yr").cast(IntegerType()).alias("year"), col("i94cit").cast(IntegerType()).alias("country_code"),
                            col("i94port").alias("airport_code"), col("i94addr").alias("state"), 
                            col("depdate").cast(IntegerType()).alias("departure_date"))
    
    #Fifth, print schemas of tables created
    dispSchema(immigrants_info_table, "Immigrants")
    dispSchema(airports_table, "Airports")
    dispSchema(cities_demographics_table, "Cities Demographics")
    dispSchema(immigration_table, "Immigration (fact)")
    
    
    return immigrants_info_table, airports_table, cities_demographics_table, immigration_table

#Call to build data model
immigrants_info_table, airports_table, cities_demographics_table, immigration_table = buildDataModel()

Immigrants Table Schema:
root
 |-- imm_id: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: string (nullable = true)

Airports Table Schema:
root
 |-- id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)

Cities Demographics Table Schema:
root
 |-- city: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- male_pop: integer (nullable = true)
 |-- female_pop: integer (nullable = true)
 |-- total_pop: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- race: string (nullable = true)

Immigration (fact) Table Schema:
root
 |-- imm_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- state: string (nullable = true

#### 4.2 Data Quality Checks
In order to ensure the integrity of the tables, the following tests are run:
 * Immigration fact table has no duplicate entries for same person
 * Immigrants table has no duplicate entries for same person
 * Cities demographics table have unique (city,state) pairs

In [12]:
#Check that imm_id is unique in both immigrations fact table and immigrants info table
if immigrants_info_table.groupBy("imm_id").count().filter("count > 1").count() > 0:
    raise ValueError("Immigrants table violates restriction of unique IM ID")
if immigration_table.groupBy("imm_id").count().filter("count > 1").count() > 0:
    raise ValueError("Main Immigration table violates restriction of unique IM ID")

#Check that cities table have unique cities-state composite entries
if cities_demographics_table.groupBy("city","state").count().filter("count > 1").count() > 0:
    raise ValueError("Cities table violates restriction of unique CITY entries")

print("All tests passed!")

All tests passed!


#### 4.3 Data dictionary 
In order to get the fields description of the database, refer to the `data_dictionary.txt`file.

#### Step 5: Complete Project Write Up
The project was developed using mainly Spark to read and load all files into dataframes (used as staging elements to start). Spark was considered due to its ability to handle big data (when needed) and to load data into dataframes from PARQUET files. Also, Spark can be integrated with any cloud service where files could be located such as Amazon's S3 or any HDFS path.

Regarding the data model, the star schema was used because it provides the perfect adjustment between all datasets available and the main (fact) table for immigration records. These records include specific values that can be easily integrated (aggregated) with the other datasets to perform the analytical queries needed, enabling faster results for the queries considered. Also, the OLAP cubes can be constructed efficiently to serve the purposes described here and further analytical purposes as well.

In order for the analysis to remain valid through time, the datasets should be updated once a year to make sure all conclusions are up to date for new immigration data. Given the data model currently in use, the data pipeline can produce several analytical purposes in order to answer questions such as:
* What is the state with most immigrants registered?
* How many airports have each of the top 10 cities with most immigrants?
* What is the state with greater immigrants/population rate?

##### Different Possible Scenarios
 * The data was increased by 100x: Spark should be adjusted to run within a cloud cluster using the 100x files as inputs from services such as S3 or HDFS. This cluster allows the distributed-fashion Spark uses to store data and perform queries on it.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day: Data pipelines can be automated using Airflow DAG's scheduled once a day starting at 7am. This DAG should include:
 1. Load from datasets available
 2. Perform data cleaning on dashboard, deleting current data results
 3. Transform data into staging tables
 4. Load data models defined to create database schema
 5. Perform data quality checks to ensure database integrity
 
 * The database needed to be accessed by 100+ people: On one side, to cover an increasing demand, cloud resources should be used to store data models and make them available for lots of queries. On the other hand, more data integrity checks should be made in order to avoid data loss and data rupture of integrity. Copies of the databases can be allocated beforehand to store different states of the main database