# Immigration Study Case
### Data Engineering Capstone Project

#### Project Summary
This project is aimed to create databases for immigration in USA and local tempeatures in USA. Starting from gathering data from multiple sources then applying some data cleaning and wrangling steps to make data more suitable for our purposes. At The end all data are stored in parquet files partitioned by suitable columns, which can be later easily retrieved and further processed or integrated. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
from pyspark.sql.functions import udf, col, to_timestamp, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType, IntegerType
from pyspark.sql import functions as f
from pyspark.sql import types as t
import numpy as np
import pandas as pd

In [20]:
from pyspark.sql import functions as f
from pyspark.sql import types as t

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project we will study the immigration to USA and answer two question at the end:
1. The top 5 nationalities immigrated to USA
2. The best 5 cities to immigrants in USA

Tools: Apache Spark 

#### Describe and Gather Data 
* I94 Immigration Data: This data comes from the US National Tourism and Trade Office  (https://travel.trade.gov/research/reports/i94/historical/2016.html).
* World Temperature Data: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

In [3]:
# create spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
# read immig. data in spark
fpath1 = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_imm =spark.read.format('com.github.saurfang.sas.spark').load(fpath1)

In [5]:
# read temperature data in spark
fpath2 = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp =spark.read.format('csv').option("header", "true").load(fpath2)

In [6]:
df_imm.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
df_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [8]:
df_imm.count()

3096313

In [9]:
df_temp.count()

8599212

In [10]:
df_imm.show(n=3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

In [11]:
df_temp.show(n=3)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 3 rows



In [12]:
# open SAS file to extract data from it
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

In [13]:
def code_mapper(file, idx):
    """
    create dictionaries for all abbreviations in SAS _description file
    Arguments: 
        file: opened file
        idx: index name
    """
    f_content2 = f_content[f_content.index(idx):]

    f_content2 = f_content2[:f_content2.index(';')].split('\n')

    f_content2 = [i.replace("'", "") for i in f_content2]

    dic = [i.split('=') for i in f_content2[1:]]

    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)

    return dic

In [14]:
# create dictionaries
i94cit_res = code_mapper(f_content, "i94cntyl")

i94port = code_mapper(f_content, "i94prtl")

i94mode = code_mapper(f_content, "i94model")

i94addr = code_mapper(f_content, "i94addrl")

i94visa = {'1':'Business',

    '2': 'Pleasure',

    '3' : 'Student'}

In [15]:
# Create UDFs to look up the entities from the df in the dictionaries set

origin_code_udf= udf(lambda x: i94cit_res[str(x)],StringType())
port_name_code_udf= udf(lambda x: i94port[str(x)],StringType())
destin_code_udf= udf(lambda x: i94addr[str(x)],StringType())
transport_code_udf= udf(lambda x: i94mode[str(x)],StringType())
visa_udf= udf(lambda x: i94visa[str(x)],StringType())
upper_udf= udf(lambda x: x.upper(),StringType())


In [50]:
# function to return key for any value 
@udf()
def get_abbrev(val): 
    for key, value in i94addr.items(): 
        if str(val).upper() == value: 
            return key 

    return "key doesn't exist"


### Step 2: Explore and Assess the Data
#### Explore the Data 
we will filter data and remove duplicates for each dataset. In addidion missing data will be dropped
#### Cleaning Steps
* Dropping missing data and duplicates 
* Filteration of data based on the country and othe aspects of this study
* Casting new created or already defined columns with the right data types

In [19]:
# dropping null, empty and duplicates in the immigration dataframe
# Also casting columns with right data type
df_imm_valid= df_imm.dropna(how="any", subset= ["i94addr", "i94port","cicid","i94res"]).dropDuplicates(["cicid"])\
.withColumn("i94res",df_imm["i94res"].cast(IntegerType()))\
.withColumn("i94visa",df_imm["i94visa"].cast(IntegerType()))\
.withColumn("i94mode",df_imm["i94mode"].cast(IntegerType()))\
.withColumn("i94yr",df_imm["i94yr"].cast(IntegerType()))\
.withColumn("cicid",df_imm["cicid"].cast(IntegerType()))\
.withColumn("biryear",df_imm["biryear"].cast(IntegerType()))\
.withColumn("i94bir",df_imm["i94bir"].cast(IntegerType()))\
.withColumn("i94mon",df_imm["i94mon"].cast(IntegerType()))

In [21]:
# filteration of data based on the SAS data and extracting the date from arrdate and depdate columns
# Also, applying the defined UDFs to the dataframe
i94_immig= df_imm_valid.filter(col("i94res").isin(list(i94cit_res.keys())))\
.filter(col("i94addr").isin(list(i94addr.keys())))\
.filter(col("i94port").isin(list(i94port.keys())))\
.filter(col("i94mode").isin(list(i94mode.keys())))\
.filter(col("i94visa").isin(list(i94visa.keys())))\
.withColumn("origin_country",origin_code_udf(df_imm_valid["i94res"]))\
.withColumn("destination",destin_code_udf(df_imm_valid["i94addr"]))\
.withColumn("city_port_name",port_name_code_udf(df_imm_valid["i94port"]))\
.withColumn("transportation",transport_code_udf(df_imm_valid["i94mode"]))\
.withColumn("visa",visa_udf(df_imm_valid["i94visa"]))\
.withColumn("arrdate", f.date_format(df_imm_valid["arrdate"].cast(dataType=t.TimestampType()), "yyyy-MM-dd"))\
.withColumn("depdate", f.date_format(df_imm_valid["depdate"].cast(dataType=t.TimestampType()), "yyyy-MM-dd"))

In [25]:
# Create new dataset to view immigration data by state and origin country
I94_Data= i94_immig.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"), "origin_country",\
                          "i94port", "city_port_name", "destination", "transportation","depdate", "arrdate",\
                           col("count").alias("number_of_entry"), "visa", col("i94bir").alias("age"),\
                          "gender",col("biryear").alias("year_of_birth"))

In [26]:
# to speed up spark with defined df
I94_Data.cache()

DataFrame[cicid: int, year: int, month: int, origin_country: string, i94port: string, city_port_name: string, destination: string, transportation: string, depdate: string, arrdate: string, number_of_entry: double, visa: string, age: int, gender: string, year_of_birth: int]

In [27]:
I94_Data.show(5)

+-----+----+-----+--------------+-------+-----------------+-----------+--------------+----------+----------+---------------+--------+---+------+-------------+
|cicid|year|month|origin_country|i94port|   city_port_name|destination|transportation|   depdate|   arrdate|number_of_entry|    visa|age|gender|year_of_birth|
+-----+----+-----+--------------+-------+-----------------+-----------+--------------+----------+----------+---------------+--------+---+------+-------------+
|  299|2016|    4|       AUSTRIA|    NYC|     NEW YORK, NY|   NEW YORK|           Air|1970-01-01|1970-01-01|            1.0|Pleasure| 54|  null|         1962|
|  305|2016|    4|       AUSTRIA|    NYC|     NEW YORK, NY|   NEW YORK|           Air|1970-01-01|1970-01-01|            1.0|Pleasure| 63|  null|         1953|
|  496|2016|    4|       AUSTRIA|    CHI|      CHICAGO, IL|   ILLINOIS|           Air|1970-01-01|1970-01-01|            1.0|Business| 64|  null|         1952|
|  558|2016|    4|       AUSTRIA|    SFR|SAN F

In [28]:
#dropping null, empty and duplicates in the immigration dataframe
df_temp_valid= df_temp.dropna(how="any", subset= ["Country", "City","AverageTemperature"]).dropDuplicates()

In [29]:
# filteration of data based on United States data and appling udf function
df_temp= df_temp.filter(df_temp["Country"]=="United States")\
.filter(df_temp["AverageTemperature"]!="null")\
.withColumn("City",upper_udf(df_temp["City"]))

In [30]:
Temperatures=df_temp.filter(df_temp["City"].isin(list(i94addr.values())))\
.filter(col("City").isNotNull())\
.withColumn("year",year(df_temp["dt"]))\
.withColumn("month",month(df_temp["dt"]))\
.withColumn("avg_temp_fahrenheit",df_temp["AverageTemperature"]*9/5+32)\
.withColumn("i94port",get_abbrev(df_temp["City"]))

In [31]:
Temperatures=Temperatures.select("year","month",round(col("AverageTemperature"),1).alias("avg_temp_celcius"),\
                                      round(col("avg_temp_fahrenheit"),1).alias("avg_temp_fahrenheit"),
                                       "i94port","City","Country").dropDuplicates()

In [32]:
Temperatures.cache()

DataFrame[year: int, month: int, avg_temp_celcius: double, avg_temp_fahrenheit: double, i94port: string, City: string, Country: string]

### Step 3: Define the Data Model
#### Fact table: immigration with columns
* cicid 
* i94yr 
* i94mon
* i94res (origin_country)
* i94port
* arrdate 
* i94addr (destination)
* depdate
* i94mode (transportation)

#### Dimension tables: 
1. immigrant_table
* cicid
* i94bir
* count
* i94visa
* biryear
* gender

2. temperature
* year
* month
* avg_temp_celcius
* avg_temp_fahrenheit
* state_abbrev (i94port)
* City
* Country

In [None]:
I94_Data.show(5)

In [None]:
# Write immigration dimension table to parquet files partitioned by i94port
I94_Data.write.mode("append").partitionBy("i94port").parquet("output/immigration.parquet")

In [None]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data/part-00000-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet")

In [None]:
Temperatures.show(20)

In [None]:
Temperatures.write.mode("append").partitionBy("i94port").parquet("output/temperature.parquet")

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
We need a data model which enbales flexible queries to be run. I chose the relational model (SQL) to build a star schema to store our data. It is very common to use SQL since we might change our queries in the future. 
##### At the beginning we would like to know the most visited cities in USA and the top nationalities immigrating to USA.
* Query1: The top 5 nationalities immigrated to USA.
* Query2: The best 5 cities to immigrants in USA.

In [33]:
# Creating tables based on the previous cleaned dataframes  
I94_Data.createOrReplaceTempView("immigration")
Temperatures.createOrReplaceTempView("temperature")

In [34]:
# Create the fact table by joining the immigration and temperature views
immigration_table = spark.sql('''
SELECT immigration.cicid as cicid,
       immigration.year as year,
       immigration.month as month,
       immigration.origin_country as origin_country,
       immigration.i94port as i94port,
       immigration.destination as destination,
       immigration.arrdate as arrival_date,
       immigration.depdate as departure_date,
       immigration.transportation as transportation
FROM immigration
''')

In [36]:
# Write immigration_table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("output/immigration_table.parquet")


In [37]:
# create dimension table immigrant
immigrant_table = spark.sql('''
SELECT immigration.cicid as cicid,
       immigration.age as age,
       immigration.number_of_entry as number_of_entry,
       immigration.visa as reason,
       immigration.year_of_birth as year_of_birth,
       immigration.gender as gender
FROM immigration
''')

In [41]:
# Write immigrant_table to parquet files partitioned by reason
immigrant_table.write.mode("append").partitionBy("reason").parquet("output/immigrant_table.parquet")


In [39]:
# create dimension table temperature
temperature_table= spark.sql('''
SELECT temperature.year as year,
       temperature.month as month,
       temperature.avg_temp_celcius as avg_temp_celcius,
       temperature.avg_temp_fahrenheit as avg_temp_fahrenheit,
       temperature.i94port as i94port,
       temperature.City as city,
       temperature.Country as country
FROM temperature
''')

In [40]:
# Write temperature_table to parquet files partitioned by year
temperature_table.write.mode("append").partitionBy("year").parquet("output/temperature_table.parquet")

In [42]:
# Query1: The top 5 nationalities immigrated to USA
spark.sql("""
SELECT COUNT(*) as total_no_immig, origin_country
FROM immigration  
Group by origin_country
ORDER BY total_no_immig DESC """).show(5)

+--------------+--------------------+
|total_no_immig|      origin_country|
+--------------+--------------------+
|        350055|      UNITED KINGDOM|
|        234093|               JAPAN|
|        180334|              FRANCE|
|        165762|MEXICO Air Sea, a...|
|        163555|          CHINA, PRC|
+--------------+--------------------+
only showing top 5 rows



In [43]:
# Query2: The best 5 cities to immigrants in USA
spark.sql("""
SELECT COUNT(*) as top_cities, destination
FROM immigration  
Group by destination
ORDER BY top_cities DESC """).show(5)

+----------+-----------+
|top_cities|destination|
+----------+-----------+
|    621701|    FLORIDA|
|    553677|   NEW YORK|
|    470386| CALIFORNIA|
|    168764|     HAWAII|
|    134321|      TEXAS|
+----------+-----------+
only showing top 5 rows



#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [47]:
# Perform quality checks here
def quality_check(df, table):
    '''
    Input: Spark dataframe, description of Spark datafram
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(table))
    else:
        print("Data quality check passed for {} with {} records".format(table, result))
    return 

In [48]:
quality_check(Temperatures, "temperature")

Data quality check passed for temperature with 6238 records


In [49]:
quality_check(I94_Data, "immigration")

Data quality check passed for immigration with 2917199 records


#### 4.3 Data dictionary 
#### Fact table: immigration with columns
* cicid >> a unique number for the immigrants.
* i94yr >> 4 digit year
* i94mon >> Numeric month
* i94res >> is country from where one has travelled.
* i94port >> 3 character code of destination USA city.
* arrdate >> is date of arrrival.
* i94addr >> is where the immigrants resides in USA.
* depdate >> the Departure Date from the USA
* i94mode >> 1 digit travel code (transportation)

#### Dimension tables: 
1. immigrant_table
* cicid >> a unique number for the immigrants.
* i94bir >> Age of respondent in years.
* count >> no. of entry.
* i94visa >> is the type of visa which one owns.
* biryear >> 4 digit year of birth
* gender >> gender

2. temperature
* year >> 4 digit year
* month >> Numeric month
* avg_temp_celcius >> temperature in °C
* avg_temp_fahrenheit >>  temperature in °F
* i94port >> 3 character code of destination USA city
* City >> city name
* Country >> country name

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
###### In this project I have mainly used:
1. Apache Spark: a very powerful data-handling tool, which enables processing big data from various data sources (e.g. SAS, CSV, JSON, XML ...etc.) in a convenient and easy way. In addition it enables distribution of data on mulitple CPUs and well-suited for expanding data on AWS with EMR clusters.
2. Jupyter Notebook: It has been applied in this project for buliding data pipelines because it has the power to display the data in a quick and beautiful way with less effort. It can also be used to integrate many libraries to serve our target (e.g. SQL, Spark, Pandas, Numpy .... etc.) 

* Propose how often the data should be updated and why.
Data should be monthly updated since the data is defined mothly in the big dataframe

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
    * It would be a good solution to still using spark but move it on AWS by creating EMR cluster.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    * We could use Airflow for this purpose with defined DAGs to be updated daily.
 * The database needed to be accessed by 100+ people.
    * Move the project to AWS and create user credentials for each user and defininig appropriate roles.