# US Immigration Data
### Data Engineering Capstone Project

#### Project Summary
This project utilizes the I94 immigration data, Global land Temperature data and US demographic data to build an ETL pipeline. In the project, we will load data from S3, process it and create the fact and dimension tables using Spark and load it back into S3.

The data will be loaded from I94 immigration dataset, Global land Temperature dataset and US demographic dataset. Then the data is cleaned from any missing values and created a fact and dimension table. With the created fact and dimension table, the following analysis are in scope:

1. Analysis on which port and travel mode the immigrants use to enter US
2. Distribution of Visa type and age of the immigrants
3. How the arrival date looks like for the immigrants, any sudden inflow of immigrants during any particular date


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import os
import pandas as pd
pd.options.display.max_columns = 100
import configparser
import datetime as dt


from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import avg
from pyspark.sql.functions import isnan, when, count, col,udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import *



In [2]:
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
def creat_spark_session():
    """
    Create and return spark session
    Args:
        None
    Returns:
        spark: Spark Session
    """
    spark = SparkSession.builder.getOrCreate()
    return spark

In [4]:
spark = creat_spark_session()

### Step 1: Scope the Project and Gather Data

#### Scope 
The project uses I94 immigration data, Global land Temperatur data and US demographic data to build an ETL pipeline to load cleaned data into S3 and to do analysis by creating fact and dimension table using spark.

Tools used:
1. Amazon S3- to store the dataset
2. Apache Spark- to read data from csv files
3. Schema used- Star Schema

#### Describe and Gather Data 
The primary datasets used in the project are I94 immigration data, Global Land Temperature data and US demographic data and secondary data from airport codes.

In [5]:
#### I94 immigration data: This data comes from US National Tourism and  Trade Office.

In [6]:
df_immigration =spark.read.load('./sas_data')

In [7]:
df_immigration.head()

Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')

In [8]:
df_immigration.show(n=5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [9]:
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [10]:
#### World Temperature Data: This dataset comes from Kaggle. This data is all about the global average temperature in different countries and cities.

In [11]:
# read the temperature data
tempfile = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = spark.read.csv(tempfile, header=True, inferSchema=True)

In [12]:
df_temp.show(n=5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [13]:
#### US Demographic data: This data comes from OpenSoft. This contains information on population of all US cities with population greater or equal to 65000

In [14]:
# read in us cities demoraphics data
population_file = 'us-cities-demographics.csv'
# df_demographics = pd.read_csv(fname, sep=';')
df_population = spark.read.csv(population_file, inferSchema=True, header=True, sep=';')

In [15]:
df_population.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [16]:
#### Airport Code: This has information about the codes corresponding to the cities.

In [17]:
# read in airport codes data
airport_file = 'airport-codes_csv.csv'
# df_airport_codes = pd.read_csv(fname)
df_airportCodes = spark.read.csv(airport_file, inferSchema=True, header=True, sep=',')

In [18]:
df_airportCodes.show(n=5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [19]:
df_airportCodes.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
I94 Immigration data:
The columns in the dataset reveals the importance of choosing selective columns for analysis. So here we select, few columns of interest which aids in the further analysis. The selected columns are: i94port, arrdate, i94addr, depdate, i94bir,vidsatype,visapost,occup, gender,airline.


In [20]:
df_immigration.columns


['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [21]:
df_selected_immi = df_immigration[['cicid', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
                                          'i94mode', 'i94addr', 'depdate', 'i94bir', 'visatype',
                                          'count', 'visapost', 'occup', 'biryear','gender', 'airline']]

In [22]:
df_selected_immi.limit(5).toPandas()

Unnamed: 0,cicid,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,visatype,count,visapost,occup,biryear,gender,airline
0,5748517.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,B1,1.0,SYD,,1976.0,F,QF
1,5748518.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,B1,1.0,SYD,,1984.0,F,VA
2,5748519.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,B1,1.0,SYD,,1987.0,M,DL
3,5748520.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,B1,1.0,SYD,,1987.0,F,DL
4,5748521.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,B1,1.0,SYD,,1988.0,M,DL


In [23]:

# check the missing values
nrows = df_selected_immi.count()
df_missing = df_selected_immi.select([(count(when(isnan(c) | col(c).isNull(), c))/nrows).alias(c) for c in df_selected_immi.columns]).toPandas()

# display the missing value info
df_missing = pd.melt(df_missing, var_name='cols', value_name='values')
df_missing

Unnamed: 0,cols,values
0,cicid,0.0
1,i94mon,0.0
2,i94cit,0.0
3,i94res,0.0
4,i94port,0.0
5,arrdate,0.0
6,i94mode,7.7e-05
7,i94addr,0.049282
8,depdate,0.046009
9,i94bir,0.000259


In [24]:
# get columns to drop which have missing values over 50%
drop_cols = list(df_missing[df_missing['values']>0.5]['cols'])
drop_cols

['visapost', 'occup']

In [25]:
# drop the columns which have missing data > 50% since it won't be helpful for analysis
df_immi_cols_dropped = df_selected_immi.drop(*drop_cols)

In [26]:
df_immi_cols_dropped.columns

['cicid',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'visatype',
 'count',
 'biryear',
 'gender',
 'airline']

In [27]:
# drop duplicates if exists
df_immi_dropDuplicates = df_immi_cols_dropped.drop_duplicates()

# drop rows which have missing value in all columns
df_immi_dropDuplicates = df_immi_dropDuplicates.dropna(how='all')

In [28]:
def check_missing_values(df):
    """Check the missing values in dataframe
    
    Args:
        df: dataframe the data to check
    
    Returns:
        df_missing: dataframe showing the missing values 
    """
    
    nrows = df.count()
      # get missing value count for each column
    df_missing = df.select([(count(when(isnan(c) | col(c).isNull(), c))/nrows).alias(c) for c in df.columns]).toPandas()
    
      # format the missing value info
    df_missing = pd.melt(df_missing, var_name='cols', value_name='values')
    
    return df_missing

In [29]:
def get_cols_to_drop(df_missing, pct):
    """return the cols to drop based on missing value percentage
    
    Args:
        df_missing: dataframe showing the missing values 
        pct: the percentage to decide which col to drop
    
    Returns:
        drop_cols: columns names to drop
    """
    drop_cols = list(df_missing[df_missing['values']>pct]['cols'])
    
    return drop_cols

In [30]:
def  clean_immigration_data(input_data):
    """Clean immigration dataframe
    
    Args:
        input_data: spark dataframe with monthly immigration data
    
    Returns:
        df_immigration_clean: cleaned dataframe
    """
    
    df_immigration_selected = input_data[['cicid', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
                                          'i94mode', 'i94addr', 'depdate', 'i94bir', 'visatype',
                                          'count', 'visapost', 'occup', 'gender', 'airline']]
    # check missing values
    df_missing = check_missing_values(df_immigration_selected)
    
    # get columns to drop which have missing values over 50%
    drop_cols = get_cols_to_drop(df_missing, 0.5)
    
    # drop the columns which have missing data > 50% since it won't be helpful for analysis
    df_immigration_clean = input_data.drop(*drop_cols)
    
    # drop rows where all elements are missing
    df_immigration_clean = df_immigration_clean.dropna(how='all')
   
    # drop duplicates if exists
    df_immigration_clean = df_immigration_clean.drop_duplicates()

    return df_immigration_clean

In [31]:
def clean_temperature_data(input_data):
    """Clean temperatures dataset
    
    Args:
        input_data: dataframe immigration dataframe
    
    Returns:
        df_temperature: spark dataframe arrive date dimension table
    """
    
    # convert dt column type to string
    df_temperature = input_data.withColumn("dt",col("dt").cast(StringType())) 
    
    # convert Country column to upper case
    df_temperature = df_temperature.withColumn("Country",upper(col("Country"))) 

    # drop rows with missing average temperature
    df_temperature_clean = df_temperature.dropna(subset=['AverageTemperature','AverageTemperatureUncertainty'])
    
    # drop duplicate rows
    df_temperature_clean = df_temperature_clean.drop_duplicates(subset=['dt', 'City', 'Country'])
    
    return df_temperature_clean

In [32]:
def clean_demographics_data(input_data):
    """Clean the US demographics dataset
    
    Args:
        input_data: spark dataframe immigration dataframe
    
    Returns:
        df_demographics: spark dataframe arrive date dimension table
    """
    
    # check missing values
    df_missing = check_missing_values(input_data)
    
    # get columns to drop which have missing values over 50%
    drop_cols = get_cols_to_drop(df_missing, 0.5)
    
    if drop_cols:
        # drop the columns which have missing data > 50% since it won't be helpful for analysis
        df_demographics_dropped = input_data.drop(*drop_cols)

    # drop rows with missing values
    df_demographics_dropped = input_data.dropna()
    
    # drop duplicate columns
    df_demographics_clean = df_demographics_dropped.dropDuplicates(subset=['City', 'State', 'State Code', 'Race'])
    
    return df_demographics_clean

In [33]:
df_immi_dropDuplicates = clean_immigration_data(df_immigration)

In [34]:
df_immi_dropDuplicates.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,474.0,2016.0,4.0,103.0,103.0,NEW,20545.0,2.0,,20547.0,25.0,2.0,1.0,20160401,G,O,,M,1991.0,6292016,F,,VES,55410440000.0,91285,WT
1,1508.0,2016.0,4.0,104.0,104.0,NYC,20545.0,1.0,NY,20552.0,16.0,2.0,1.0,20160401,G,O,,M,2000.0,6292016,F,,LX,55416410000.0,16,WT
2,1669.0,2016.0,4.0,104.0,104.0,NYC,20545.0,1.0,FL,20561.0,57.0,2.0,1.0,20160401,G,O,,M,1959.0,6292016,M,,AA,55457750000.0,39,WT
3,2025.0,2016.0,4.0,104.0,104.0,NYC,20545.0,1.0,NY,20549.0,51.0,2.0,1.0,20160401,O,O,,M,1965.0,6292016,,,SN,55419980000.0,1401,WT
4,2048.0,2016.0,4.0,104.0,104.0,MIA,20545.0,1.0,FL,20554.0,3.0,2.0,1.0,20160401,O,O,,M,2013.0,6292016,,,UX,55456900000.0,97,WT


In [35]:
# convert dt column type to string
df_temp = df_temp.withColumn("dt",col("dt").cast(StringType()))

In [36]:
# check the missing values
df_missing = check_missing_values(df_temp)
df_missing

Unnamed: 0,cols,values
0,dt,0.0
1,AverageTemperature,0.042345
2,AverageTemperatureUncertainty,0.042345
3,City,0.0
4,Country,0.0
5,Latitude,0.0
6,Longitude,0.0


In [37]:
df_i94yr = df_immi_dropDuplicates[['i94yr']]

In [38]:
df_i94yr.show()

+------+
| i94yr|
+------+
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
|2016.0|
+------+
only showing top 20 rows



In [39]:
#### As we see, the i94 year in the immigration data is only 2016 and the max year in temperature data is 2013

In [40]:
df_temp_dropped = clean_temperature_data(df_temp)

In [41]:
df_temp_dropped.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01 00:00:00,7.76,1.973,Bilbao,SPAIN,42.59N,2.18W
1,1743-11-01 00:00:00,6.44,1.605,Göttingen,GERMANY,52.24N,10.51E
2,1744-04-01 00:00:00,14.251,2.169,Coimbra,PORTUGAL,40.99N,8.52W
3,1744-04-01 00:00:00,16.463,1.904,Palma,SPAIN,39.38N,2.08E
4,1744-04-01 00:00:00,8.807,2.252,Sterling Heights,UNITED STATES,42.59N,82.91W


In [42]:
df_missing = check_missing_values(df_population)
df_missing

Unnamed: 0,cols,values
0,City,0.0
1,State,0.0
2,Median Age,0.0
3,Male Population,0.001038
4,Female Population,0.001038
5,Total Population,0.0
6,Number of Veterans,0.004497
7,Foreign-born,0.004497
8,Average Household Size,0.005534
9,State Code,0.0


In [43]:
df_population_dropped = clean_demographics_data(df_population)

In [44]:
df_population_dropped.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057


In [45]:
df_us_airport = df_airportCodes[df_airportCodes['iso_country']=='US']

In [46]:
df_missing = check_missing_values(df_us_airport)
df_missing

Unnamed: 0,cols,values
0,ident,0.0
1,type,0.0
2,name,0.0
3,elevation_ft,0.010502
4,continent,0.0
5,iso_country,0.0
6,iso_region,0.0
7,municipality,0.004482
8,gps_code,0.07791
9,iata_code,0.91128


### Step 3: Define the Data Model

This project uses  Star Schema to convert the dataset. In line with Star schema, we will create fact and dimension tables for the analysis. The main advantage of using star schema is its simplicity for users to write and database to process queries written with simple inner join between the facts and a small number of dimensions. Also with this schema the data access is faster and also simple to derive business insights. Query performance is better because of small number of tables and clear join paths.


#### 3.1 Conceptual Data Model
Here, we will have one fact table and few dimension table. The following are the dimensional tables created as part of the project:
1. country_dim- has country name and its corresponding avg temperature
2. demographic_dim- has demographic data 
3. visa_dim- has visa categories information

#### 3.2 Mapping Out Data Pipelines
1. Load the datasets from the sources (csv)
2. Clean the datasets by removing missing values and duplicates
3. Create fact and dimension table from the cleaned data


In [47]:
### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
##### Build the data pipelines to create the data model.

In [48]:
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

In [49]:
# creating date dimension table
df_arrdate_dim = df_immi_dropDuplicates.select(['arrdate']).withColumn("arrdate", get_date(df_immi_dropDuplicates.arrdate)).distinct()

In [50]:
# adding other dimension of date
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_day', dayofmonth('arrdate'))
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_week', weekofyear('arrdate'))
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_month', month('arrdate'))
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_year', year('arrdate'))
df_arrdate_dim = df_arrdate_dim.withColumn('arrival_weekday', dayofweek('arrdate'))

In [51]:
# add an unique identifiable column
df_arrdate_dim = df_arrdate_dim.withColumn('id', monotonically_increasing_id())

In [52]:
# write to parquet file
partition_columns = ['arrival_year', 'arrival_month', 'arrival_week']
df_arrdate_dim.write.parquet("tables/" + "immigration_arrdate", partitionBy=partition_columns, mode="overwrite")

In [53]:
def create_arrdate_dimension(input_data, output_data):
    """Create the arrive date dimension table using arrdate in immigration data
    
    Args:
        input_data: dataframe immigration dataframe
    
    Returns:
        output_data: spark dataframe arrive date dimension table
    """
    
    # format the sas date in arrdate column
    get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # creating date dimension table 
    df_arrdate_dim = input_data.select(['arrdate']).withColumn("arrdate", get_date(input_data.arrdate)).distinct()
    
    # add other dimension of date
    df_arrdate_dim = df_arrdate_dim.withColumn('arrival_day', dayofmonth('arrdate'))
    df_arrdate_dim = df_arrdate_dim.withColumn('arrival_week', weekofyear('arrdate'))
    df_arrdate_dim = df_arrdate_dim.withColumn('arrival_month', month('arrdate'))
    df_arrdate_dim = df_arrdate_dim.withColumn('arrival_year', year('arrdate'))
    df_arrdate_dim = df_arrdate_dim.withColumn('arrival_weekday', dayofweek('arrdate'))

    # add an unique identifiable column
    df_arrdate_dim = df_arrdate_dim.withColumn('id', monotonically_increasing_id())
    
    # write to parquet file
    partition_columns = ['arrival_year', 'arrival_month', 'arrival_week']
    df_arrdate_dim.write.parquet(output_data + "immigration_arrdate", partitionBy=partition_columns, mode="overwrite")
    
    return df_arrdate_dim

In [54]:
def create_country_dimension(spark, input_data, df_temperature, output_data, country_cd):
    
    """Extract the country dimension using the immigration and land temperature dataest
    
    Args:
        spark: spark session object
        input_data: spark dataframe of immigration events
        temp_df: spark dataframe of global land temperatures data.
        output_data: path to write dimension dataframe
        country_cd: csv file which has country codes and country names mapping
    
    Returns:
        df_country: spark dataframe output dimension dataframe
    """
    
    # create temporary view for immigration data
    input_data.createOrReplaceTempView("immigration_view")

    # create temporary view for countries codes data
    country_cd.createOrReplaceTempView("country_codes_view")

    # aggregate the temperature data
    agg_temp = df_temperature.select(['Country', 'AverageTemperature']).groupby('Country').avg()
    agg_temp = agg_temp.withColumnRenamed('avg(AverageTemperature)', 'average_temperature')
    
    # create temporary view for average temperature data
    agg_temp.createOrReplaceTempView("average_temperature_view")
    
    # etract country dimension
    df_country_dim = spark.sql(
        """
        SELECT 
            i94res as country_code,
            Name as country_name
        FROM immigration_view
        LEFT JOIN country_codes_view
        ON immigration_view.i94res=country_codes_view.Code
        """
    ).distinct()
    
    # create country view
    df_country_dim.createOrReplaceTempView("country_view")

    df_country_dim = spark.sql(
        """
        SELECT 
            country_code,
            country_name,
            average_temperature
        FROM country_view
        LEFT JOIN average_temperature_view
        ON country_view.country_name=average_temperature_view.Country
        """
    ).distinct()
    
    # write the dimension to a parquet file
    df_country_dim.write.parquet(output_data + "country", mode="overwrite")

    return df_country_dim

In [55]:
def create_visa_dimension(input_data, output_data):
    """Create the visa dimension table using immigration data
    
    Args:
        input_data: spark dataframe of immigration events
        output_data: path to write dimension dataframe
    
    Returns:
        df_visa: spark dataframe of visa dimension table
    """
    
    # create visa dimension dataframe using visatype column
    df_visa = input_data.select(['visatype']).distinct()

    # add an nonduplicate id column
    df_visa_dim = df_visa.withColumn('visa_type_key', monotonically_increasing_id())

    # write dimension to parquet file
    df_visa_dim.write.parquet(output_data + "visa", mode="overwrite")

    return df_visa_dim

In [56]:
def create_demographics_dimension(input_data, output_data):
    """Create demographic dimension tables using US demographics dataset
    
    Args:
        input_data: spark dataframe of us demographics survey data
        output_data: path to write dimension dataframe to
    
    Returns:
        df_demographics: spark dataframe of demographics dimension
    """
    df_demographics_dim = input_data.withColumnRenamed('Median Age', 'median_age') \
        .withColumnRenamed('Male Population', 'male_population') \
        .withColumnRenamed('Female Population', 'female_population') \
        .withColumnRenamed('Total Population', 'total_population') \
        .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
        .withColumnRenamed('Foreign-born', 'foreign_born') \
        .withColumnRenamed('Average Household Size', 'average_household_size') \
        .withColumnRenamed('State Code', 'state_code')
    # add an un duplicate id column
    df_demographics_dim = df_demographics_dim.withColumn('id', monotonically_increasing_id())

    # write dimension to parquet file
    df_demographics_dim.write.parquet(output_data + "demographics", mode="overwrite")

    return df_demographics_dim

In [57]:
def create_immigration_fact(spark, input_data, output_data):
    """Create the immigration fact table with immigration and temperature dataset
    
    Args:
        spark: spark session
        input_data: spark dataframe of immigration events
        output_data: path to write dimension dataframe to
    
    Returns:
        df_fact: spark dataframe representing calendar dimension
    """
    # load visa dimension
    df_visa = spark.read.parquet(output_data + "visa")

    # create a view for visa type dimension
    df_visa.createOrReplaceTempView("visa_view")

    # convert arrival date in SAS format to datetime
    get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

    # rename columns to align with data model
    df_fact = input_data.withColumnRenamed('ccid', 'record_id') \
        .withColumnRenamed('i94res', 'country_residence_code') \
        .withColumnRenamed('i94addr', 'state_code')

    # create an immigration view
    df_fact.createOrReplaceTempView("immigration_view")

    # create visa_type key
    df_fact = spark.sql(
        """
        SELECT 
            immigration_view.*, 
            visa_view.visa_type_key
        FROM immigration_view
        LEFT JOIN visa_view ON visa_view.visatype=immigration_view.visatype
        """
    )

    # convert arrival date into datetime object
    df_fact = df_fact.withColumn("arrdate", get_date(df_fact.arrdate))

    # drop visatype key
    df_fact = df_fact.drop(df_fact.visatype)

    # write dimension to parquet file
    df_fact.write.parquet(output_data + "immigration_fact", mode="overwrite")

    return df_fact

In [58]:
# test 
output_data = 'tables/'
df_arrdate_dim = create_arrdate_dimension(df_immi_dropDuplicates, output_data)

In [59]:
df_arrdate_dim.limit(5).toPandas()

Unnamed: 0,arrdate,arrival_day,arrival_week,arrival_month,arrival_year,arrival_weekday,id
0,2016-04-22,22,16,4,2016,6,8589934592
1,2016-04-15,15,15,4,2016,6,25769803776
2,2016-04-18,18,16,4,2016,2,42949672960
3,2016-04-09,9,14,4,2016,7,68719476736
4,2016-04-11,11,15,4,2016,2,85899345920


In [60]:
# country codes mapping
country_cd = pd.read_csv('i94.csv')
country_cd.head()

Unnamed: 0,Code,Name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [61]:
# aggregate the temperature data
agg_temp = df_temp_dropped.select(['Country', 'AverageTemperature']).groupby('Country').avg()
agg_temp = agg_temp.withColumnRenamed('avg(AverageTemperature)', 'average_temperature').toPandas()
agg_temp.head()

Unnamed: 0,Country,average_temperature
0,SOUTH AFRICA,16.360849
1,ARMENIA,8.375597
2,BAHAMAS,24.786978
3,BURMA,26.01684
4,CAMBODIA,26.918136


In [62]:
df_country_dim = df_immi_dropDuplicates.select(['i94res']).distinct().withColumnRenamed('i94res', 'country_code')
df_country_dim.limit(5).toPandas()

Unnamed: 0,country_code
0,692.0
1,299.0
2,576.0
3,735.0
4,206.0


In [63]:
# add country name
# map the country name using country_cd 
@udf('string')
def map_country_name(code):
    country_name = country_cd[country_cd['Code']==code]['Name'].iloc[0]
        
    if country_name:
        return country_name
        
    return None


df_country_dim = df_country_dim.withColumn('country_name', map_country_name(df_country_dim.country_code))
df_country_dim.limit(5).toPandas()

Unnamed: 0,country_code,country_name
0,692.0,ECUADOR
1,299.0,MONGOLIA
2,576.0,EL SALVADOR
3,735.0,MONTENEGRO
4,206.0,HONG KONG


In [64]:
# add country average temperature
# map the country average temperature using country_cd 
@udf('string')
def map_average_temperature(country_name):
    average_temperature = agg_temp[agg_temp['Country']==country_name]['average_temperature']
    
    if not average_temperature.empty:
        return str(average_temperature.iloc[0])
    
    return None

df_country_dim = df_country_dim.withColumn('average_temperature', map_average_temperature(df_country_dim.country_name))
df_country_dim.limit(5).toPandas()

Unnamed: 0,country_code,country_name,average_temperature
0,692.0,ECUADOR,20.5391705374
1,299.0,MONGOLIA,-3.36548531952
2,576.0,EL SALVADOR,25.2628525509
3,735.0,MONTENEGRO,
4,206.0,HONG KONG,21.4236961538


In [65]:
output_data = 'tables/'
df_country_dim.write.parquet(output_data + "country", mode="overwrite")

In [66]:
country_cd = spark.read.csv('i94.csv', header=True, inferSchema=True)

In [67]:
# test pipleline function
output_data = 'tables/'
df_country_dim = create_country_dimension(spark, df_immi_dropDuplicates, df_temp_dropped, output_data, country_cd)

In [68]:
# quickly check the data
df_country_dim.limit(5).toPandas()

Unnamed: 0,country_code,country_name,average_temperature
0,151.0,ARMENIA,8.375597
1,512.0,BAHAMAS,24.786978
2,373.0,SOUTH AFRICA,16.360849
3,735.0,MONTENEGRO,
4,243.0,BURMA,26.01684


In [69]:
# create visa dimension dataframe using visatype column
df_visa_dim = df_immi_dropDuplicates.select(['visatype']).distinct()

# add an nonduplicate id column
df_visa_dim = df_visa_dim.withColumn('visa_type_key', monotonically_increasing_id())

In [70]:
output_data = 'tables/'
df_visa_dim.write.parquet(output_data + "visa", mode="overwrite")

In [71]:
# test pipeline function
output_data = 'tables/'
df_visa_dim = create_visa_dimension(df_immi_dropDuplicates, output_data)

In [72]:
# check the data
df_visa_dim.limit(5).toPandas()

Unnamed: 0,visatype,visa_type_key
0,F2,103079215104
1,GMB,352187318272
2,B2,369367187456
3,F1,498216206336
4,CPL,601295421440


In [73]:
# create demographics dimension table
df_demographics_dim = df_population_dropped.withColumnRenamed('Median Age', 'median_age') \
    .withColumnRenamed('Male Population', 'male_population') \
    .withColumnRenamed('Female Population', 'female_population') \
    .withColumnRenamed('Total Population', 'total_population') \
    .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
    .withColumnRenamed('Foreign-born', 'foreign_born') \
    .withColumnRenamed('Average Household Size', 'average_household_size') \
    .withColumnRenamed('State Code', 'state_code')

# add an un duplicate id column
df_demographics_dim = df_demographics_dim.withColumn('id', monotonically_increasing_id())

In [74]:
output_data = 'tables/'
df_demographics_dim.write.parquet(output_data + "demographics", mode="overwrite")

In [75]:
# test pipeline function
output_data = 'tables/'
df_demographics_dim = create_demographics_dimension(df_population_dropped, output_data)

In [76]:
# check the data
df_demographics_dim.limit(5).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,Race,Count,id
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723,0
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152,1
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154,2
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788,3
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057,4


In [77]:
# Immigration Fact Table

In [79]:
# Get visa data
df_visa = df_visa_dim.toPandas()
df_visa

Unnamed: 0,visatype,visa_type_key
0,F2,103079215104
1,GMB,352187318272
2,B2,369367187456
3,F1,498216206336
4,CPL,601295421440
5,I1,704374636544
6,WB,738734374912
7,M1,747324309504
8,B1,807453851648
9,WT,884763262976


In [80]:
# map the visa type code
@udf('string')
def map_visa_key(visa_type):
    keys = df_visa[df_visa['visatype']==visa_type]['visa_type_key']
    
    if not keys.empty:
        return str(keys.iloc[0])
    
    return None

# convert arrival date in SAS format to datetime
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

In [81]:
# rename columns to align with data model
df_fact = df_population_dropped.withColumnRenamed('ccid', 'record_id') \
    .withColumnRenamed('i94res', 'country_residence_code') \
    .withColumnRenamed('i94addr', 'state_code')

In [82]:
df_fact.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057


In [105]:
type(df_immi_dropDuplicates)
df_immi_dropDuplicates.select("visatype").show()

+--------+
|visatype|
+--------+
|      WT|
|      WT|
|      WT|
|      WT|
|      WT|
|      B2|
|      B2|
|      WB|
|      WT|
|      WT|
|      B2|
|      WT|
|      WT|
|      WT|
|      WT|
|      WT|
|      WT|
|      WT|
|      WT|
|      WT|
+--------+
only showing top 20 rows



In [101]:
# add visa_type key
df_fact = df_fact.withColumn("visa_type_key", map_visa_key(df_immi_dropDuplicates.visatype))
    


Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/pyspark/serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/opt/conda/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/opt/conda/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.6/pickle.py", line 736, in save_tuple
    save(element)
  File "/opt/conda/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/pyspark/cloudpickl

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1318.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)



In [100]:
# format arrival date into datetime object
df_fact = df_fact.withColumn("arrdate", get_date(df_arrdate_dim.arrdate))

AnalysisException: 'Resolved attribute(s) arrdate#1154 missing from City#204,State#205,Male Population#207,Median Age#206,Female Population#208,Average Household Size#212,Total Population#209,Foreign-born#211,State Code#213,Count#215,Number of Veterans#210,Race#214 in operator !Project [City#204, State#205, Median Age#206, Male Population#207, Female Population#208, Total Population#209, Number of Veterans#210, Foreign-born#211, Average Household Size#212, State Code#213, Race#214, Count#215, <lambda>(arrdate#1154) AS arrdate#1838].;;\n!Project [City#204, State#205, Median Age#206, Male Population#207, Female Population#208, Total Population#209, Number of Veterans#210, Foreign-born#211, Average Household Size#212, State Code#213, Race#214, Count#215, <lambda>(arrdate#1154) AS arrdate#1838]\n+- Deduplicate [City#204, State#205, State Code#213, Race#214]\n   +- Filter AtLeastNNulls(n, City#204,State#205,Median Age#206,Male Population#207,Female Population#208,Total Population#209,Number of Veterans#210,Foreign-born#211,Average Household Size#212,State Code#213,Race#214,Count#215)\n      +- Relation[City#204,State#205,Median Age#206,Male Population#207,Female Population#208,Total Population#209,Number of Veterans#210,Foreign-born#211,Average Household Size#212,State Code#213,Race#214,Count#215] csv\n'

In [106]:
output_data = 'tables/'
df_fact.write.parquet(output_data + "immigration_fact", mode="overwrite")

AnalysisException: 'Attribute name "Median Age" contains invalid character(s) among " ,;{}()\\n\\t=". Please use alias to rename it.;'

In [107]:
# test pipeline function
df_fact = create_immigration_fact(spark, df_population_dropped, output_data)

AnalysisException: "cannot resolve '`immigration_view.visatype`' given input columns: [immigration_view.Number of Veterans, visa_view.visatype, visa_view.visa_type_key, immigration_view.Male Population, immigration_view.Foreign-born, immigration_view.Female Population, immigration_view.City, immigration_view.State, immigration_view.Race, immigration_view.Average Household Size, immigration_view.Count, immigration_view.State Code, immigration_view.Median Age, immigration_view.Total Population]; line 6 pos 50;\n'Project [ArrayBuffer(immigration_view).*, 'visa_view.visa_type_key]\n+- 'Join LeftOuter, (visatype#1898 = 'immigration_view.visatype)\n   :- SubqueryAlias `immigration_view`\n   :  +- Deduplicate [City#204, State#205, State Code#213, Race#214]\n   :     +- Filter AtLeastNNulls(n, City#204,State#205,Median Age#206,Male Population#207,Female Population#208,Total Population#209,Number of Veterans#210,Foreign-born#211,Average Household Size#212,State Code#213,Race#214,Count#215)\n   :        +- Relation[City#204,State#205,Median Age#206,Male Population#207,Female Population#208,Total Population#209,Number of Veterans#210,Foreign-born#211,Average Household Size#212,State Code#213,Race#214,Count#215] csv\n   +- SubqueryAlias `visa_view`\n      +- Relation[visatype#1898,visa_type_key#1899L] parquet\n"

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Source/Count checks to ensure completeness
 * Check if the table is loaded without errors.

In [None]:
import importlib
importlib.reload(clean_data)
importlib.reload(utils)
importlib.reload(create_tables)
importlib.reload(quality_checks)

In [None]:
# Perform quality checks

tables = {
    'immigration_fact': df_fact,
    'visa_dim': df_visa_dim,
    'arrdate_dim': df_arrdate_dim,
    'demographics_dim': df_demographics_dim,
    'country_dim': df_country_dim
}

# check if the table is loaded without error
for table_name, df_table in tables.items():
    # check if the tables are loaded successfully
    quality_checks.loading_checks(df_table, table_name)

In [None]:
# check if the table counts is same as the source table
quality_checks.count_checks(df_immigration_dropped, df_fact)
quality_checks.count_checks(df_demographics_dropped, df_demographics_dim)

#### 4.3 Data dictionary 
The following is the summary of the Data Dictionary

In [None]:
**Fact Table:**
    This table is the primary table derived from I94 immigration dataset.

Column       | Description
---------------------------
record_id | Unique record ID
country_residence_code | 3 digit code for immigrant country of residence
visa_type_key | A numerical key that links to the visa_type dimension table
state_code | US state of arrival
i94yr | 4 digit year
i94mon | Numeric month
i94port | Port of admission
arrdate | Arrival Date in the USA
i94mode | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
i94addr | USA State of arrival
depdate | Departure Date from the USA
i94bir | Age of Respondent in Years
i94visa | Visa codes collapsed into three categories
count | Field used for summary statistics
dtadfile | Character Date Field - Date added to I-94 Files
visapost | Department of State where where Visa was issued
occup | Occupation that will be performed in U.S
entdepa | Arrival Flag - admitted or paroled into the U.S.
entdepd | Departure Flag - Departed, lost I-94 or is deceased
entdepu | Update Flag - Either apprehended, overstayed, adjusted to perm residence
matflag | Match flag - Match of arrival and departure records
biryear | 4 digit year of birth
dtaddto | Character Date Field - Date to which admitted to U.S. (allowed to stay until)
gender | Non-immigrant sex

In [1]:
*Country Dimension Table : *
    The country dimension table is used to check the average temperature temperature of the countries. 
    
    Column | Description
    --------------------
    Country_code | Unique country code
    country_name | Name of country
    average_temperature | Average temperature of country
    

SyntaxError: invalid syntax (<ipython-input-1-c97225a4377c>, line 1)

In [None]:
*Visa dimension table*
     This table is combined to analyse the visa in the I94 immigration dataset with the type of Visa.
   Column | Description
    --------------------
    visa_type_key | Unique id for each visa issued
    visa_type | Name of visa

In [None]:
*Arrive date dimension table*
    This table just holds the arrival date information for further analysis.

Feature | Description
---------------------
id | Unique id
arrdate | Arrival date into US
arrival_year | Arrival year into US
arrival_month | Arrival MonthS
arrival_day | Arrival Day
arrival_week | Arrival Week
arrival_weekday | Arrival WeekDay


In [None]:
*Demographics dimension table*
        This table helps to analyse the which city has how much male and female population, number of veterans etc
Column | Description
--------------------
id | Record id
state_code | US state code
City | City Name
State | US State where city is located
Median Age | Median age of the population
Male Population | Count of male population
Female Population | Count of female population
Total Population | Count of total population
Number of Veterans | Count of total Veterans
Foreign born | Count of residents of the city that were not born in the city
Average Household Size | Average city household size
Race | Respondent race
Count | Count of city's individual per race

#### Step 5: Complete Project Write Up
+This project utilizes the I94 immigration data, Global land Temperature data and US demographic data to build an ETL pipeline. In the project, we will load data from S3, process it and create the fact and dimension tables using Spark and load it back into S3.

The data will be loaded from I94 immigration dataset, Global land Temperature dataset and US demographic dataset. Then the data is cleaned from any missing values and created a fact and dimension table. With the created fact and dimension table, the following analysis are in scope:

1. Analysis on which port and travel mode the immigrants use to enter US
2. Distribution of Visa type and age of the immigrants
3. How the arrival date looks like for the immigrants, any sudden inflow of immigrants during any particular date
The rationale for the choice of tools and technologies for the project.

+Technology and tools
The project will store the data on Amazon S3 and use Apache Spark to read in source data from staging tables, extract necessary columns needed for analysis and populate the fact and dimension tables. Then will use Spark to write the data back to S3 if needed. For the data modeling part, the project will use the dimensional model which will make it easy for business users to work with the data and also improve analytical queries performance. So, in this case, we will use Star Schema which fits OLAP (online analytical processing) very well.

+Why S3 and Spark
When dealing with the large dataset in this project, with the combining of both batch and streaming capabilities, Spark can support the use case very well where the data need to be stored and analyzed in real-time. It will have more flexibility when more type and volume of data sources need to be added. Therefore, storing the data on S3 will eliminate need to invest in costly hardware and scale up with full flexibility when needed. And speaking of Parquet files, the columnar format that being used will be a good option to store big data set and for analytics purpures as well. And Spark can efficiently read data from S3 and process the data with full sets of data analytics and machine learning libraries. Especially when dealing with large dataset, Spark has more capacity to handle the performance and efficiency.

+How often the data should be updated and why.
The I94 immigration data used in this project is updated monthly, therefore it would be a good choice to update the data model designed in this project monthly as well.

+More scenarios:
 +The data was increased by 100x.
    Since Spark is designed for handling big data set, the increased data set won't be a big issue for Spark. But it might be an option to change some setting when setup the clusters like node numbers, computer power, etc.
 +The data populates a dashboard that must be updated on a daily basis by 7am every day.
    We can utilize the Apache Airflow to schedule the pipeline running so that we can get time on time everyday.
 +The database needed to be accessed by 100+ people.
    We can move the database to cloud like using Redshift so that we can support more access better.