# Immigration Data Warehouse
### Data Engineering Project

#### Project Summary
This project aims to be able to answers questions around US immigration such as what are the most popular cities for immigration,what is count of immigrants by visa type and city in descending order of count,what is the average temperature of the city where immigrant count is low or high on a perticulat year, What is the percentage of foreign born in a city where the immigrant is moving to.. We extract data from three different data sources, the I94 immigration dataset of 2016, city temperature data from Kaggle and US city demographic data from OpenSoft. We have designed 3 dimension tables: dimTemperature, dimImmigration , dimDemographics and one fact table: factImmigration. 



In [1]:
import pandas as pd
import os
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg,monotonically_increasing_id
from pyspark.sql.types import StringType, IntegerType

In [2]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [3]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The goal of this project is ingest data from three different data sources and create fact and dimension table to be able to do analysis on US immigration using factors of city, average temperature, city demographics and seasonality.We use Spark for ETL jobs and store the results in parquet for downstream analysis by a Data Analyst or Data Scientist.

#### Describe and Gather Data 

**I94 Immigration Data**: comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA and comes from the US National Tourism and Trade Office. The dataset contains data from 2016.<br>
**World Temperature Data**: comes from Kaggle and contains average weather temperatures by city. <br>
**U.S.City Demographic Data**: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population. 


In [4]:
def readDataFromSource():
    """ This function reads data from Source and returns it as a dataframe """
    # Read temperature data
    temparatureData = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
    # Read immigeration data
    immigrationData = spark.read.load('./sas_data')
    # Read demographics data
    demographyData = spark.read.format("csv").option("delimiter", ";").option("header", "true").load( "us-cities-demographics.csv")
    return temparatureData,immigrationData,demographyData


### Step 2: Explore and Assess the Data


In [5]:
@udf(StringType())
def convert_datetime(x):
    """ This user-defined function converts udf to convert SAS date to PySpark date """
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None


In [6]:
def immigrationStaging(immigrationData):
    """ This function removes null from essential columns and creates columns with alias name for immigration Data . The output is a Staging Layer for immigrationData"""
    immigrationDatadf = immigrationData.filter(immigrationData.i94addr.isNotNull()).filter(immigrationData.cicid.isNotNull()).withColumn("Year",col("i94yr").cast("integer")).withColumn("Month",col("i94mon").cast("integer"))
    cleaned_immigrationData = immigrationDatadf.withColumn("arrdate", convert_datetime(immigrationDatadf.arrdate))
    staging_immigrationData = cleaned_immigrationData.select(col("cicid").alias("id"), 
                                           col("arrdate").alias("date"),
                                           col("i94port").alias("city_code"),
                                           col("i94addr").alias("state_code"),
                                           col("i94bir").alias("age"),
                                           col("Year").alias("year_of_arrival"),
                                           col("Month").alias("month_of_arrival"),
                                           col("gender").alias("gender"),
                                           col("visatype").alias("visa_type"),
                                           "count").drop_duplicates()

    
    print("immigrationStaging successful")
    return staging_immigrationData


In [7]:
def temperatureStaging(temparatureData): 
    """ This function removes null from essential columns and casts datatype for aggregation. The output is a cleansed form of temparatureData"""
    temparatureData_Cleansed = temparatureData.filter(temparatureData.Country=='United States').filter(temparatureData.AverageTemperatureUncertainty.isNotNull()).filter(temparatureData.AverageTemperature.isNotNull())
    temparatureData_Aggrigated=temparatureData_Cleansed.withColumn("AvgTemp",col("AverageTemperature").cast("float")).withColumn("AvgDifferenceinTemp",col("AverageTemperatureUncertainty").cast("float"))
    stageTemperature=temparatureData_Aggrigated.groupBy("Country","City").agg({'AvgTemp':'avg', 'AvgDifferenceinTemp':'avg'})
    staging_TemperatureData=stageTemperature.select("Country","City",col("avg(AvgTemp)").alias("AvgTemp"),col("avg(AvgDifferenceinTemp)").alias("AvgDifferenceinTemp"))
    staging_TemperatureData.createOrReplaceTempView("dimTemperature")
    print("temperatureStaging successful")
    
    return staging_TemperatureData

In [8]:
def demographyStaging(demographyData):
    """ This function aggrigates columns in terms of percentage and stores them in a new column. The output is a Staging Layer for demographyData"""
    stage_demographyData = demographyData.withColumn("median_age", demographyData['Median Age']) \
        .withColumn("pcnt_male_pop", (demographyData['Male Population'] / demographyData['Total Population']) * 100) \
        .withColumn("pcnt_female_pop", (demographyData['Female Population'] / demographyData['Total Population']) * 100) \
        .withColumn("pcnt_foreign_born", (demographyData['Foreign-born'] / demographyData['Total Population']) * 100).withColumn("state_code", (demographyData['State Code'])).withColumn("total_pop", (demographyData['Total Population'])) 

    print("demographyStaging successful")
    return stage_demographyData


In [9]:
def immigrationDimension(staging_Temperature):
    """ This function loads staging_immigrationData into dimImmigration dimension table"""
    staging_immigrationData.createOrReplaceTempView("dimImmigration")
    dimImmigration = spark.sql('''SELECT id,state_code,city_code,visa_type,year_of_arrival,month_of_arrival FROM dimImmigration''')
    print("immigrationDimension successful")
    return dimImmigration

In [10]:
def temperatureDimension(staging_TemperatureData):
    """ This function loads temparatureData_Cleansed , Performs Aggrigation and loads it into dimTemperature dimension table"""
    staging_TemperatureData.createOrReplaceTempView("dimTemperature")
    dimTemperature = spark.sql('''SELECT * FROM dimTemperature''')
    print("temperatureDimension successful")
    return dimTemperature

In [11]:

def demographyDimension(stage_demographyData):
    """ This function loads stage_demographyData into dimDemography dimension table"""
    stage_demographyData.createOrReplaceTempView("dimDemography")
    dimDemography = spark.sql('''SELECT state_code,state,city,median_age,pcnt_male_pop,pcnt_female_pop,pcnt_foreign_born,total_pop FROM dimDemography''')
    print("demographyDimension successful")
    return dimDemography


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

**DIMENSION TABLES**

1.**dimImmigration** : It contains immigration events

* id               - Unique Identifier
* state_code       - State Code
* city_code        - City Code 
* visa_type        - Type of visa issues
* year_of_arrival  - Year of Arrival to US
* month_of_arrival - Month of Arrival to US

2.**dimTemperature** :  It contains average temparature city in US

* Country             - Country Name
* City                - City Name
* AvgTemp             - Average Temperature in City
* AvgDifferenceinTemp - Average Variation In Temperature

3.**dimDemography** : It has information on Demographic Statistics

* state_code         - State Code
* city               - City Name
* median_age         - Median age of people in city
* pcnt_male_pop      - Percentage of Male Population
* pcnt_female_pop    - Percentage of Female Population
* pcnt_foreign_born  - Percentage of People who are born outside US 
* total_pop          - Total Polulation

4.**factImmigration** : The Fact table gives the count of entry into US soil.

* factImmigration_pk  - Unique Identifier
* id                  - Unique Identifier of dimImmigration
* city                - City Name
* state_code          - State Code




#### 3.2 Mapping Out Data Pipelines
1. Three Staging tables are created from raw data viz staging_immigrationData,staging_TemperatureData and stage_demographyData

2. dimTemperature is created by reading data from csv file and aggregated data after cleaning from staging_TemperatureData 

3. dimImmigration is created from list of sas files and cleaned from staging_immigrationData

4. dimDemography is created by reading data from csv file after cleaning tables from stage_demographyData

5. factImmigration is created by joining staging_immigrationData,dimDemography and dimTemperature tables

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [12]:
# factImmigration is created by joining staging_immigrationData,dimDemography and dimTemperature tables

def immigrationFact(staging_immigrationData,stage_demographyData,dimTemperature):
    """In This function factImmigration is created by joining staging_immigrationData,dimDemography and dimTemperature tables"""
    staging_immigrationData.createOrReplaceTempView("stgImmigration")
    stage_demographyData.createOrReplaceTempView("dimDemography")
    dimTemperature.createOrReplaceTempView("dimTemperature")
    factImmigration = spark.sql('''
    SELECT stgImmigration.id,
           dimTemperature.City,
           dimDemography.state_code,
           stgImmigration.count
    FROM dimTemperature  
    JOIN dimDemography  ON (dimTemperature.city = dimDemography.city)
    JOIN stgImmigration ON ( dimDemography.state_code=stgImmigration.state_code)

    ''')
    # Adding Sequencial Primary Key to Fact Table
    factImmigration=factImmigration.withColumn("factImmigration_pk", monotonically_increasing_id()) 
    print("factImmigration successful")
    return factImmigration

In [13]:
def writeTableAsParquet(dimTemperature,dimDemography,dimImmigration,factImmigration):
    """ This function is used to write all the fact and dimension tables into Parquet file"""
    spark.conf.set("spark.sql.parquet.compression.codec", "gzip")
    # Write Dimention Tables as Parquet File
    dimTemperature.write.mode("append").partitionBy("City").parquet("/target/dimTemperature.parquet")
    # Write Dimention Tables as Parquet File
    dimDemography.write.mode("append").partitionBy("city").parquet("/target/dimDemography.parquet")
    # Write Dimention Tables as Parquet File
    dimImmigration.write.mode("append").partitionBy("city_code").parquet("/target/dimImmigration.parquet")
    # Write Dimention Tables as Parquet File
    factImmigration.write.mode("append").partitionBy("City").parquet("/target/factImmigration.parquet")
    print("writeTableAsParquet successful")
    print("Data Written Successfully")
    

## PIPELINE 
#### P.1 Data Reading and Cleaning

In [14]:
temparatureData,immigrationData,demographyData = readDataFromSource()
staging_immigrationData = immigrationStaging(immigrationData)
staging_TemperatureData = temperatureStaging(temparatureData)
stage_demographyData=demographyStaging(demographyData)

immigrationStaging successful
temperatureStaging successful
demographyStaging successful


#### P.2 Fact and Dimension Table Creation

In [15]:
dimImmigration = immigrationDimension(staging_immigrationData)
dimTemperature = temperatureDimension(staging_TemperatureData)
dimDemography = demographyDimension(stage_demographyData)
factImmigration = immigrationFact(staging_immigrationData,stage_demographyData,dimTemperature)
writeTableAsParquet(dimTemperature,dimDemography,dimImmigration,factImmigration)

immigrationDimension successful
temperatureDimension successful
demographyDimension successful
factImmigration successful
writeTableAsParquet successful
Data Written Successfully


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [16]:
def dataQualityCountCheck():
    """ This function is used to validate the count of source and target columns"""
    count=0
    if staging_immigrationData.count() == dimImmigration.count():
        count +=1
    if stage_demographyData.count() == dimDemography.count():
        count +=1
    if staging_TemperatureData.count() == dimTemperature.count():
        count +=1
    if count ==3:
        print("Data Quality Check Successful ! Data at Source and Target Equal")
    else:
        print("Data Quality Check Failed ! Data at Source and Target Are Not Equal")

In [17]:
def DataQualityNullCheck():
    """ This function is used to validate if null is present in primary key"""
    count=0
    if dimImmigration.where(col("id").isNull()).count() == 0:
        count +=1
    if dimTemperature.where(col("City").isNull()).count() == 0:
        count +=1
    if dimDemography.where(col("state_code").isNull()).count() == 0:
        count +=1
    if count == 3:
        print("Data Quality Check Successful ! No Null present in Primary Key Columns")
    else:
        print("Data Quality Check Failed ! Null present in Primary Key Columns")

In [18]:
dataQualityCountCheck()

Data Quality Check Successful ! Data at Source and Target Equal


In [19]:
DataQualityNullCheck()

Data Quality Check Successful ! No Null present in Primary Key Columns


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. 

**DIMENSION TABLES**

1.**dimImmigration** : It contains immigration events

* id               - Unique Identifier
* state_code       - State Code
* city_code        - City Code 
* visa_type        - Type of visa issues
* year_of_arrival  - Year of Arrival to US
* month_of_arrival - Month of Arrival to US

2.**dimTemperature** :  It contains average temparature city in US

* Country             - Country Name
* City                - City Name
* AvgTemp             - Average Temperature in City
* AvgDifferenceinTemp - Average Variation In Temperature

3.**dimDemography** : It has information on Demographic Statistics

* state_code         - State Code
* city               - City Name
* median_age         - Median age of people in city
* pcnt_male_pop      - Percentage of Male Population
* pcnt_female_pop    - Percentage of Female Population
* pcnt_foreign_born  - Percentage of People who are born outside US 
* total_pop          - Total Polulation

4.**factImmigration** : The Fact table gives the count of entry into US soil.

* factImmigration_pk  - Unique Identifier
* id                  - Unique Identifier of dimImmigration
* city                - City Name
* state_code          - State Code


#### Step 5: Complete Project Write Up
* **Clearly state the rationale for the choice of tools and technologies for the project.**
PySpark is chosen for this project as it is known for processing large amount of data fast (with in-memory compute), scale easily with additional worker nodes, with ability to digest different data formats (e.g. SAS, Parquet, CSV), and integrate nicely with cloud storage like S3 and warehouse like Redshift.Python provides an additional edge to spark native scala with its enormous library size.The Fact and Dimention tables are created with star schema .The star schema’s goal is to speed up read queries and analysis for massive amounts of data contained in diverse databases with different source schemas. The star schema achieves this goal through the “denormalization” of the data within the network of dimension tables.
* **Propose how often the data should be updated and why.**
It Depends on the Data Availability at source and Reporting Cycle. If the data at source is available bi-monthly we can perform a complete refresh once a fortnight.
* **Write a description of how you would approach the problem differently under the following scenarios:**
 * **The data was increased by 100x.**
We can consider spinning up larger instances of EC2s hosting Spark and/or additional Spark work nodes. With added capacity arising from either vertical scaling or horizontal scaling, we should be able to accelerate processing time.
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.**
We can consider using Airflow/Oozie to schedule and automate the data pipeline jobs.
 * **The database needed to be accessed by 100+ people.**
We can use AWS Redshift With Concurrency Scaling feature, which can support virtually unlimited concurrent users and concurrent queries, with consistently fast query performance. When concurrency scaling is enabled, Amazon Redshift automatically adds additional cluster capacity when you need it to process an increase in concurrent read queries. Write operations continue as normal on your main cluster. Users always see the most current data, whether the queries run on the main cluster or on a concurrency scaling cluster.

#### Sample Query - what is count of immigrants by visa type and city in descending order of count.

In [20]:
factImmigration.createOrReplaceTempView("factImmigration")
dimImmigration.createOrReplaceTempView("dimImmigration")

Query = spark.sql('''
    SELECT factImmigration.city,dimImmigration.visa_type,count(factImmigration.id) 
    FROM dimImmigration JOIN  factImmigration ON (dimImmigration.id == factImmigration.id) GROUP BY dimImmigration.visa_type,factImmigration.city ORDER BY count(factImmigration.id) DESC ''')

Query.show()

+----------------+---------+---------+
|            city|visa_type|count(id)|
+----------------+---------+---------+
|    Jacksonville|       B2|  1633525|
|       Rochester|       WT|  1597890|
|Port Saint Lucie|       B2|  1595460|
|      Clearwater|       B2|  1595460|
|         Orlando|       B2|  1595460|
|      Cape Coral|       B2|  1595460|
|     Tallahassee|       B2|  1595460|
|   Coral Springs|       B2|  1595460|
| Fort Lauderdale|       B2|  1595460|
|     Gainesville|       B2|  1595460|
|  Pembroke Pines|       B2|  1595460|
|           Miami|       B2|  1595460|
|           Tampa|       B2|  1595460|
|Saint Petersburg|       B2|  1595460|
|       Hollywood|       B2|  1595460|
|         Yonkers|       WT|  1585910|
|        Syracuse|       WT|  1585910|
|        New York|       WT|  1585910|
|         Buffalo|       WT|  1585910|
|    Jacksonville|       WT|  1278305|
+----------------+---------+---------+
only showing top 20 rows

