# Project Title
### Data Engineering Capstone Project

#### Project Summary
This capstone project contains a data pipeline implementation that transforms source data into the target data model for the US immigration data analysis.

The project uses AWS S3 data lake for storing low and high priority data and Spark cluster thar runs ETL pipeline logic transforming original data into analytics tables for the subsequent use by the analytics teams.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [22]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
THis project implements the following tasks:
1. Gather raw data from multiple sources
2. Cleaning and validating source data
3. Storing source data into data lake, both high-value and low-value
4. Transforming source data into the target data model containing set of fact and dimension tables for the future use of analytics team.

Target data model will let us analyze data on several aggregation levels: by time, by location (city, airport), by climate conditions etc.

As main storage option for our data late we chose AWS S3.
All computational tasks of ETL were implemented using Apache Spark.

#### Describe and Gather Data 
As input data we use four datasets:

**Main dataset**: I94 Immigration Data from the US National Tourism and Trade. It contains statistcs on immigration to the USA (point of entry and detailed info on immigrants).

Source: https://travel.trade.gov/research/reports/i94/historical/2016.html

**Supplementary datasets**:

_U.S. City Demographic Data_: This data comes from OpenSoft and contains various population statistics on every US city.

Source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

_Airport Code Table_: This dataset contains airport codes, geo references and corresponding cities.

Source: https://datahub.io/core/airport-codes#data

_World Temperature Data_: This dataset comes from Kaggle and contains various temperature statistics for a large number of world cities.

Source: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

**Additional tables**:
Following support tables were derived from source data dictionary for I94 Immigration Data (contained in file **I94_SAS_Labels_Descriptions.SAS**):

1. _countries.csv_: Dictionary for country from i94cit and i94res columns

2. _i94ports.csv_: Dictionary for link between **i94port** field and city of entrance.

3. _i94mode.csv_: Dctionary for text representation of border cross method in **i94mode** field 

4. _i94visa.csv_: Dictionary for text representation of visa type in **i94visa** field 

5. _us_states.csv_: Dictionary for full names of US States

Below are short fragments of information from every data source.


**I94 Immigration Data**

We load sample fragment of immigration data from file **immigration_data_sample.csv** and join it to dictionary tables.
Full i94 immigration dataset is stored in parquet format.

Let's look at the most interesting fields from immigration data set.

Description of all I94 fields can be found in **I94_SAS_Labels_Descriptions.SAS** dictionary file.

In [2]:
# Read in sample data file 'immigration_data_sample.csv'
i94_df = spark.read.option("header", True).csv('immigration_data_sample.csv')
# Read in country dictionary from 'countries.csv'
country_df = spark.read.option("header", True).option("delimiter", ";").csv('countries.csv')
# Read in port dictionary from 'i94ports.csv'
port_df = spark.read.option("header", True).option("delimiter", ";").csv('i94ports.csv')
# Read in port dictionary from 'i94mode.csv'
mode_df = spark.read.option("header", True).option("delimiter", ";").csv('i94mode.csv')
# Read in port dictionary from 'i94visa.csv'
visa_df = spark.read.option("header", True).option("delimiter", ";").csv('i94visa.csv')
# Read in port dictionary from 'us_states.csv'
states_df = spark.read.option("header", True).option("delimiter", ";").csv('us_states.csv')
# create a temporary views against which you can run SQL queries
i94_df.createOrReplaceTempView("i94_table")
country_df.createOrReplaceTempView("country_table")
port_df.createOrReplaceTempView("port_table")
mode_df.createOrReplaceTempView("mode_table")
visa_df.createOrReplaceTempView("visa_table")
states_df.createOrReplaceTempView("states_df")

i94_source_table = spark.sql('''
        SELECT CAST(cicid AS LONG) id, UPPER(c1.country_name) country_of_residence, UPPER(c2.country_name) country_of_citizenship,
                UPPER(p.port_location) city_of_entry, UPPER(p.state) state_of_entry_code, UPPER(s.state) state_of_entry_full, 
                m.border_cross_method border_cross_method, v.visa_type type_of_visa,
                i.visatype class_of_admission, i.gender gender, CAST(i.i94bir AS LONG) age,
                date_add(to_date('1960-01-01'), CAST(arrdate AS LONG)) arr_date, date_add(to_date('1960-01-01'), CAST(depdate AS LONG)) dep_date,
                YEAR(CAST(date_add(to_date('1960-01-01'), CAST(arrdate AS LONG)) AS DATE)) year,
                MONTH(CAST(date_add(to_date('1960-01-01'), CAST(arrdate AS LONG)) AS DATE)) month,
                DAY(CAST(date_add(to_date('1960-01-01'), CAST(arrdate AS LONG)) AS DATE)) day
        FROM i94_table i
        JOIN country_table c1
        ON CAST(i.i94res AS LONG) = c1.code
        JOIN country_table c2
        ON CAST(i.i94cit AS LONG) = c2.code
        JOIN port_table p
        ON i.i94port = p.code
        JOIN mode_table m
        ON CAST(i94mode AS LONG) = m.code
        JOIN visa_table v
        ON CAST(i94visa AS LONG) = v.code
        JOIN states_df s
        ON p.state = s.code
''')
i94_source_table.createOrReplaceTempView("i94_source_table")
# convert to pandas df and display first 5 rows
display(spark.sql('''
        SELECT *
        FROM i94_source_table
        LIMIT 5
''').toPandas())

Unnamed: 0,id,country_of_residence,country_of_citizenship,city_of_entry,state_of_entry_code,state_of_entry_full,border_cross_method,type_of_visa,class_of_admission,gender,age,arr_date,dep_date,year,month,day
0,4084316,JAPAN,JAPAN,HONOLULU,HI,HAWAII,Air,Pleasure,WT,F,61,2016-04-22,2016-04-29,2016,4,22
1,4422636,MEXICO AIR SEA AND NOT REPORTED (I-94 NO LAND...,MEXICO AIR SEA AND NOT REPORTED (I-94 NO LAND...,MCALLEN,TX,TEXAS,Air,Pleasure,B2,M,26,2016-04-23,2016-04-24,2016,4,23
2,5291768,QATAR,QATAR,LOS ANGELES,CA,CALIFORNIA,Air,Pleasure,B2,M,25,2016-04-28,2016-05-07,2016,4,28
3,985523,FRANCE,FRANCE,CHAMPLAIN,NY,NEW YORK,Land,Pleasure,WT,F,19,2016-04-06,2016-04-09,2016,4,6
4,1481650,GUATEMALA,GUATEMALA,ATLANTA,GA,GEORGIA,Air,Pleasure,B2,M,51,2016-04-08,2016-06-01,2016,4,8


Let's test access to full dataset in parquet format.

In [3]:
i94_full_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
i94_full_df.count()

3096313

In [4]:
i94_full_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [5]:
# read full SAS data and create a preprocessed source table for immigration data
i94_full_df.createOrReplaceTempView("i94_table")
i94_full_source_table = spark.sql('''
        SELECT CAST(cicid AS LONG) id, UPPER(c1.country_name) country_of_residence, UPPER(c2.country_name) country_of_citizenship,
                UPPER(p.port_location) city_of_entry, UPPER(p.state) state_of_entry_code, UPPER(s.state) state_of_entry_full, 
                m.border_cross_method border_cross_method, v.visa_type type_of_visa,
                i.visatype class_of_admission, i.gender gender, CAST(i.i94bir AS LONG) age,
                date_add(to_date('1960-01-01'), CAST(arrdate AS LONG)) arr_date, date_add(to_date('1960-01-01'), CAST(depdate AS LONG)) dep_date,
                YEAR(CAST(date_add(to_date('1960-01-01'), CAST(arrdate AS LONG)) AS DATE)) year,
                MONTH(CAST(date_add(to_date('1960-01-01'), CAST(arrdate AS LONG)) AS DATE)) month,
                DAY(CAST(date_add(to_date('1960-01-01'), CAST(arrdate AS LONG)) AS DATE)) day
        FROM i94_table i
        JOIN country_table c1
        ON CAST(i.i94res AS LONG) = c1.code
        JOIN country_table c2
        ON CAST(i.i94cit AS LONG) = c2.code
        JOIN port_table p
        ON i.i94port = p.code
        JOIN mode_table m
        ON CAST(i94mode AS LONG) = m.code
        JOIN visa_table v
        ON CAST(i94visa AS LONG) = v.code
        JOIN states_df s
        ON p.state = s.code
''')
i94_full_source_table.createOrReplaceTempView("i94_source_table")

# convert to pandas df and display first 5 rows
display(spark.sql('''
        SELECT *
        FROM i94_source_table
        LIMIT 5
''').toPandas())


Unnamed: 0,id,country_of_residence,country_of_citizenship,city_of_entry,state_of_entry_code,state_of_entry_full,border_cross_method,type_of_visa,class_of_admission,gender,age,arr_date,dep_date,year,month,day
0,16,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Pleasure,B2,,28,2016-04-01,2016-04-23,2016,4,1
1,17,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Pleasure,B2,,4,2016-04-01,2016-04-23,2016,4,1
2,18,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Business,B1,,57,2016-04-01,2016-04-11,2016,4,1
3,19,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Pleasure,B2,,63,2016-04-01,2016-04-14,2016,4,1
4,20,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Pleasure,B2,,57,2016-04-01,2016-04-14,2016,4,1


In [6]:
i94_full_source_table.count()

2515336

**U.S. City Demographic Data**

Let's load first five rows from source csv file and make schema suitable for subsequent processing and joining with other sources.

In [7]:
# Read in data file 'us-cities-demographics.csv'
cities_pop_df = spark.read.option("header", True).option("delimiter", ";").csv('us-cities-demographics.csv')
cities_pop_df = cities_pop_df.toDF('city', 'state', 'median_age', 'male_population', 'female_population', 'total_population', 'number_of_veterans',
                                  'foreign_born', 'avg_household_size', 'state_code', 'race', 'count')
# create a temporary views against which you can run SQL queries
cities_pop_df.createOrReplaceTempView("cities_pop_table")
# read data and create a preprocessed source table for demographic data
cities_pop_source_table = spark.sql('''
        SELECT UPPER(city) city, UPPER(state) state, CAST(median_age AS DOUBLE) median_age, CAST(male_population AS LONG) male_population,
                CAST(female_population AS LONG) female_population, CAST(total_population AS LONG) total_population, CAST(number_of_veterans AS LONG) number_of_veterans,
                CAST(foreign_born AS LONG) foreign_born, CAST(avg_household_size AS DOUBLE) avg_household_size, state_code, race, CAST(count AS LONG) count
        FROM cities_pop_table c
''')
cities_pop_source_table.createOrReplaceTempView("cities_pop_source_table")
# convert to pandas df and display first 5 rows
display(spark.sql('''
        SELECT *
        FROM cities_pop_source_table
        LIMIT 5
''').toPandas())

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,avg_household_size,state_code,race,count
0,SILVER SPRING,MARYLAND,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,QUINCY,MASSACHUSETTS,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,HOOVER,ALABAMA,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,NEWARK,NEW JERSEY,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


We can use **city** and **state** to link main immigration data with demographic statistics for points of entry.

**Airport Code Table**

Let's load first five rows from source csv file and make schema suitable for subsequent processing and joining with other sources.

For the purposes of the data model we'll limit records only to USA and working airports.

In [8]:
# Read in data file 'airport codes.csv'
airports_df = spark.read.option("header", True).csv('airport-codes_csv.csv')
# create a temporary views against which you can run SQL queries
airports_df.createOrReplaceTempView("airports_table")
# read data and create a preprocessed source table for airport code data
airports_source_table = spark.sql('''
        SELECT type, name, CAST(elevation_ft AS LONG) elevation_ft, SPLIT(iso_region, '-')[1] iso_region,
                UPPER(municipality) municipality, gps_code, CAST(SPLIT(coordinates,',')[0] AS DOUBLE) lon, CAST(SPLIT(coordinates,',')[1] AS DOUBLE) lat
        FROM airports_table a
        WHERE iso_country = 'US' AND type != 'closed'
''')
airports_source_table.createOrReplaceTempView("airports_source_table")
# convert to pandas df and display first 5 rows
display(spark.sql('''
        SELECT *
        FROM airports_source_table
        LIMIT 5
''').toPandas())

Unnamed: 0,type,name,elevation_ft,iso_region,municipality,gps_code,lon,lat
0,heliport,Total Rf Heliport,11,PA,BENSALEM,00A,-74.933601,40.070801
1,small_airport,Aero B Ranch Airport,3435,KS,LEOTI,00AA,-101.473911,38.704022
2,small_airport,Lowell Field,450,AK,ANCHOR POINT,00AK,-151.695999,59.9492
3,small_airport,Epps Airpark,820,AL,HARVEST,00AL,-86.770302,34.864799
4,small_airport,Fulton Airport,1100,OK,ALEX,00AS,-97.818019,34.942803


We can use **iso_region** and **municipality** to link this data set with points of entry and main immigration dataset.

**World Temperature Data**

Let's load first five rows from source csv file and make schema suitable for subsequent processing and joining with other sources.

For the purposes of the data model we'll limit records only to USA and records since 1960.

In [9]:
# Read in data file 'GlobalLandTemperaturesByCity.csv'
weather_df = spark.read.option("header", True).csv('../../data2/GlobalLandTemperaturesByCity.csv')
# create a temporary views against which you can run SQL queries
weather_df.createOrReplaceTempView("weather_table")
# read data and create a preprocessed source table for world temperature data
weather_source_table = spark.sql('''
        SELECT CAST(dt AS DATE) dt, YEAR(CAST(dt AS DATE)) year, MONTH(CAST(dt AS DATE)) month, DAY(CAST(dt AS DATE)) day, CAST(AverageTemperature AS DOUBLE) avg_temperature,
        UPPER(city) city, UPPER(country) country, CAST(SUBSTRING(longitude,1,LENGTH(longitude)-1) AS DOUBLE)*-1 lon, CAST(SUBSTRING(latitude,1,LENGTH(latitude)-1) AS DOUBLE) lat
        FROM weather_table w
        WHERE country = 'United States' AND YEAR(CAST(dt AS DATE)) >= 1960
''')
weather_source_table.createOrReplaceTempView("weather_source_table")
# convert to pandas df and display first 5 rows
display(spark.sql('''
        SELECT *
        FROM weather_source_table
        LIMIT 5
''').toPandas())


Unnamed: 0,dt,year,month,day,avg_temperature,city,country,lon,lat
0,1960-01-01,1960,1,1,5.243,ABILENE,UNITED STATES,-100.53,32.95
1,1960-02-01,1960,2,1,4.995,ABILENE,UNITED STATES,-100.53,32.95
2,1960-03-01,1960,3,1,8.575,ABILENE,UNITED STATES,-100.53,32.95
3,1960-04-01,1960,4,1,18.452,ABILENE,UNITED STATES,-100.53,32.95
4,1960-05-01,1960,5,1,21.709,ABILENE,UNITED STATES,-100.53,32.95


We can try to link entry events from main dataset with these average monthly temperatures using **year**, **month** and **city**.

**Save source tables in the data lake**

Let's save the preprocessed source tables in our data lake.

In [None]:
# State your output s3 bucket for source data and processed fact and dimension tables 
output_data = ""

#write to parquet
i94_full_source_table.write.partitionBy('year', 'month').mode('overwrite').\
                    parquet(output_data + 'source/i94/i94_records.parquet')

cities_pop_source_table.write.partitionBy('state_code').mode('overwrite').\
                    parquet(output_data + 'source/demographic/demographic.parquet')

weather_source_table.write.partitionBy('year', 'month').mode('overwrite').\
                    parquet(output_data + 'source/weather/weather.parquet')

airports_source_table.write.partitionBy('iso_region').mode('overwrite').\
                    parquet(output_data + 'source/airports/airports.parquet')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Let's explore our data in more detail.

In this step we'll prepare our data for trnasforming in the target data model and complete the following steps:
1. Identify and drop missing values
2. Identify primary and foreign keys for fact and dimension tables and eliminate duplicate data for them by grouping, averaging, etc.

In the previous step we already reduced the size of datasets dropping out irrelevant records.

#### Cleaning Steps

**I94 Immigration Data**

Let's check full dataset for null values.

In [10]:
i94_full_source_table.toPandas().isnull().sum()

id                             0
country_of_residence           0
country_of_citizenship         0
city_of_entry                  0
state_of_entry_code            0
state_of_entry_full            0
border_cross_method            0
type_of_visa                   0
class_of_admission             0
gender                    339908
age                          174
arr_date                       0
dep_date                  109375
year                           0
month                          0
day                            0
dtype: int64

We can see that most important fields for linking with other datasets - **arr_date**, **state_of_entry_code**, **city_of_entry** - don't have missing values.

Missing values in **gender**, **age** and **dep_date** we'll leave as is.

Other missing values in **country_name**, **city_of_entry**, **border_cross_method**, **type_of_visa**, etc. we've alredy dropped in the previous stage.

Now let's check wether we can use **id** field as primary key for our future fact table.

In [11]:
display(spark.sql('''
        SELECT COUNT(c.id) total_ids
        FROM i94_source_table c
''').toPandas())

Unnamed: 0,total_ids
0,2515336


In [12]:
display(spark.sql('''
        SELECT COUNT(DISTINCT c.id) unique_ids
        FROM i94_source_table c
''').toPandas())

Unnamed: 0,unique_ids
0,2515336


As we can see **id** field doesn't contain duplicate values.

Finally let's check the consistency of **arr_date** and **dep_date** fields.

In [13]:
display(spark.sql('''
        SELECT COUNT(*)
        FROM i94_source_table c
        WHERE dep_date < arr_date
''').toPandas())

Unnamed: 0,count(1)
0,180


In [14]:
display(spark.sql('''
        SELECT arr_date, dep_date
        FROM i94_source_table c
        WHERE dep_date < arr_date
        LIMIT 5
''').toPandas())

Unnamed: 0,arr_date,dep_date
0,2016-04-01,2016-03-31
1,2016-04-02,2016-03-19
2,2016-04-02,2016-01-26
3,2016-04-02,2016-04-01
4,2016-04-02,2016-01-31


We'll drop these records and form final preprocessed dataset.

In [15]:
i94_full_source_table = spark.sql('''
        SELECT *
        FROM i94_source_table i
        WHERE dep_date IS NULL OR dep_date >= arr_date
''')
i94_full_source_table.createOrReplaceTempView("i94_source_table")

display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM i94_source_table a
''').toPandas())

Unnamed: 0,total_num_of_records
0,2515156


In [16]:
display(spark.sql('''
        SELECT *
        FROM i94_source_table c
        LIMIT 5
''').toPandas())

Unnamed: 0,id,country_of_residence,country_of_citizenship,city_of_entry,state_of_entry_code,state_of_entry_full,border_cross_method,type_of_visa,class_of_admission,gender,age,arr_date,dep_date,year,month,day
0,16,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Pleasure,B2,,28,2016-04-01,2016-04-23,2016,4,1
1,17,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Pleasure,B2,,4,2016-04-01,2016-04-23,2016,4,1
2,18,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Business,B1,,57,2016-04-01,2016-04-11,2016,4,1
3,19,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Pleasure,B2,,63,2016-04-01,2016-04-14,2016,4,1
4,20,ALBANIA,ALBANIA,NEW YORK,NY,NEW YORK,Air,Pleasure,B2,,57,2016-04-01,2016-04-14,2016,4,1


**U.S. City Demographic Data**

In [17]:
display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM cities_pop_source_table c
''').toPandas())

Unnamed: 0,total_num_of_records
0,2891


Let's check for null values.

In [18]:
cities_pop_source_table.toPandas().isnull().sum()

city                   0
state                  0
median_age             0
male_population        3
female_population      3
total_population       0
number_of_veterans    13
foreign_born          13
avg_household_size    16
state_code             0
race                   0
count                  0
dtype: int64

Since selected key fields have no missing values we'll leave other fields as is.

Missing values are present in various statistics fields across the table. This problem will be solved during building analytics reports based on fact and dimension tables. 

Now we check if the combination of city and state doesn't contain duplicate rows.

In [19]:
display(spark.sql('''
        SELECT city, state, COUNT(*) cnt
        FROM cities_pop_source_table c
        GROUP BY city, state
        HAVING COUNT(*) > 1
        LIMIT 5
''').toPandas())

Unnamed: 0,city,state,cnt
0,BOLINGBROOK,ILLINOIS,5
1,WESTMINSTER,CALIFORNIA,5
2,LITTLE ROCK,ARKANSAS,5
3,DECATUR,ILLINOIS,5
4,THE WOODLANDS,TEXAS,5


In [20]:
display(spark.sql('''
        SELECT *
        FROM cities_pop_source_table c
        WHERE city = 'BOLINGBROOK' AND state = 'ILLINOIS'
''').toPandas())

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,avg_household_size,state_code,race,count
0,BOLINGBROOK,ILLINOIS,33.7,36295,35801,72096,2951,15212,3.42,IL,American Indian and Alaska Native,323
1,BOLINGBROOK,ILLINOIS,33.7,36295,35801,72096,2951,15212,3.42,IL,Black or African-American,12671
2,BOLINGBROOK,ILLINOIS,33.7,36295,35801,72096,2951,15212,3.42,IL,Asian,9788
3,BOLINGBROOK,ILLINOIS,33.7,36295,35801,72096,2951,15212,3.42,IL,White,40458
4,BOLINGBROOK,ILLINOIS,33.7,36295,35801,72096,2951,15212,3.42,IL,Hispanic or Latino,16904


In [21]:
display(spark.sql('''
        SELECT DISTINCT race
        FROM cities_pop_source_table a
''').toPandas())

Unnamed: 0,race
0,Black or African-American
1,Hispanic or Latino
2,White
3,Asian
4,American Indian and Alaska Native


As we can see, we have distinct records for separate races. Statistics other than count field is the same for every race in given city.

Let's transpose race into columns and drop duplicate rows for cities.

In [22]:
cities_pop_source_table = spark.sql('''
        SELECT DISTINCT c.city city, c.state state, c.state_code state_code, c.median_age median_age, 
                        c.male_population male_population, c.female_population female_population,
                        c.total_population total_population, c.number_of_veterans number_of_veterans,
                        c.foreign_born foreign_born, c.avg_household_size avg_household_size,
                        c1.american_indian_and_alaska_native american_indian_and_alaska_native,
                        c2.asian asian, c3.white white, c4.hispanic_or_latino hispanic_or_latino,
                        c5.black_or_african_american black_or_african_american
        FROM cities_pop_source_table c
        LEFT JOIN (
            SELECT city, state, count as american_indian_and_alaska_native
            FROM cities_pop_source_table
            WHERE race = "American Indian and Alaska Native"
        ) c1
        ON c.city = c1.city AND c.state = c1.state
        LEFT JOIN (
            SELECT city, state, count as asian
            FROM cities_pop_source_table
            WHERE race = "Asian"
        ) c2
        ON c.city = c2.city AND c.state = c2.state
        LEFT JOIN (
            SELECT city, state, count as white
            FROM cities_pop_source_table
            WHERE race = "White"
        ) c3
        ON c.city = c3.city AND c.state = c3.state
        LEFT JOIN (
            SELECT city, state, count as hispanic_or_latino
            FROM cities_pop_source_table
            WHERE race = "Hispanic or Latino"
        ) c4
        ON c.city = c4.city AND c.state = c4.state
        LEFT JOIN (
            SELECT city, state, count as black_or_african_american
            FROM cities_pop_source_table
            WHERE race = "Black or African-American"
        ) c5
        ON c.city = c5.city AND c.state = c5.state
''')
cities_pop_source_table.createOrReplaceTempView("cities_pop_source_table")

display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM cities_pop_source_table a
''').toPandas())

Unnamed: 0,total_num_of_records
0,596


Now we can join demographic data to other statistics based on the city and state.

In [23]:
display(spark.sql('''
        SELECT *
        FROM cities_pop_source_table a
        LIMIT 5
''').toPandas())

Unnamed: 0,city,state,state_code,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,avg_household_size,american_indian_and_alaska_native,asian,white,hispanic_or_latino,black_or_african_american
0,POMPANO BEACH,FLORIDA,FL,42.5,56569,51202,107771,5145,32733,2.55,2138,1089,67360,23225,38653
1,HAWTHORNE,CALIFORNIA,CA,33.7,43682,44762,88444,2123,32107,2.97,906,7601,23665,50115,21566
2,MELBOURNE,FLORIDA,FL,43.4,39180,40956,80136,8363,9685,2.37,251,5186,65397,6060,11721
3,OREM,UTAH,UT,26.1,48695,45762,94457,2828,12808,3.26,976,3862,85512,18342,1837
4,CHICAGO,ILLINOIS,IL,34.2,1320015,1400541,2720556,72042,573463,2.53,23323,195084,1374535,785725,873316


**Airport Code Table**

In [24]:
display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM airports_source_table a
''').toPandas())

Unnamed: 0,total_num_of_records
0,21431


In [25]:
display(spark.sql('''
        SELECT COUNT(type), COUNT(name), COUNT(iso_region), COUNT(municipality)
        FROM airports_source_table a
        WHERE type is NULL OR name is NULL OR iso_region is NULL OR municipality is NULL
''').toPandas())

Unnamed: 0,count(type),count(name),count(iso_region),count(municipality)
0,67,67,67,0


In [26]:
display(spark.sql('''
        SELECT *
        FROM airports_source_table a
        WHERE municipality is NULL
        LIMIT 5
''').toPandas())

Unnamed: 0,type,name,elevation_ft,iso_region,municipality,gps_code,lon,lat
0,heliport,Watertown / Brownlee Heliport,1720.0,SD,,15SD,-97.10809,44.883265
1,heliport,Nordman / Phillabaum Heliport,2440.0,ID,,21ID,-116.871175,48.631483
2,seaplane_base,Peru / Destiny Cove SPB,580.0,ME,,3ME7,-70.396957,44.460597
3,small_airport,Zadow Airstrip,,TX,,6XA4,-95.954354,29.991739
4,small_airport,Gun Barrel City Airpark,385.0,TX,,74XA,-96.145665,32.35515


Source dataset contains 67 missing values in **municipality** field. Let's drop these records.

In [27]:
airports_source_table = spark.sql('''
        SELECT *
        FROM airports_source_table a
        WHERE municipality IS NOT NULL
''')
airports_source_table.createOrReplaceTempView("airports_source_table")

display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM airports_source_table a
''').toPandas())

Unnamed: 0,total_num_of_records
0,21364


Now we check if the combination of municipality and iso_region doesn't contain duplicate rows.

In [28]:
display(spark.sql('''
        SELECT iso_region, municipality, COUNT(*) cnt
        FROM airports_source_table a
        GROUP BY iso_region, municipality
        HAVING COUNT(*) > 1
        LIMIT 5
''').toPandas())

Unnamed: 0,iso_region,municipality,cnt
0,TX,PEARLAND,9
1,NM,RESERVE,2
2,SD,CUSTER,5
3,VA,BEDFORD,6
4,MT,HELENA,9


As we can see, cities often have more than one airport. So we can't use iso_region and municipality as the key for joining to other tables for now.

We'll transform this dataset to the new schema later (afer cleaning world temperature data).

**World Temperature Data**

In [29]:
display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM weather_source_table w
''').toPandas())

Unnamed: 0,total_num_of_records
0,165765


Let's test dataset for null values.

In [30]:
display(spark.sql('''
        SELECT COUNT(dt), COUNT(avg_temperature), COUNT(city)
        FROM weather_source_table w
        WHERE dt is NULL OR avg_temperature is NULL OR city is NULL
''').toPandas())

Unnamed: 0,count(dt),count(avg_temperature),count(city)
0,1,0,1


In [31]:
display(spark.sql('''
        SELECT *
        FROM weather_source_table w
        WHERE avg_temperature is NULL
''').toPandas())

Unnamed: 0,dt,year,month,day,avg_temperature,city,country,lon,lat
0,2013-09-01,2013,9,1,,ANCHORAGE,UNITED STATES,-151.13,61.88


Source dataset contains one missing value in **avg_temperature** field. Let's drop this record.

In [32]:
weather_source_table = spark.sql('''
        SELECT *
        FROM weather_source_table w
        WHERE avg_temperature IS NOT NULL
''')
weather_source_table.createOrReplaceTempView("weather_source_table")

display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM weather_source_table w
''').toPandas())

Unnamed: 0,total_num_of_records
0,165764


Now we'll check if the combination of city and date doesn't contain duplicate rows.

In [33]:
display(spark.sql('''
        SELECT dt, city, COUNT(*) cnt
        FROM weather_source_table w
        GROUP BY dt, city
        HAVING COUNT(*) > 1
        LIMIT 5
''').toPandas())

Unnamed: 0,dt,city,cnt
0,1961-08-01,ARLINGTON,2
1,2010-06-01,ARLINGTON,2
2,1981-04-01,AURORA,2
3,1991-03-01,AURORA,2
4,1966-10-01,COLUMBUS,2


In [34]:
display(spark.sql('''
        SELECT dt, city, lat, lon
        FROM weather_source_table w
        WHERE city = 'ARLINGTON' AND dt = '1961-08-01'
''').toPandas())

Unnamed: 0,dt,city,lat,lon
0,1961-08-01,ARLINGTON,32.95,-96.7
1,1961-08-01,ARLINGTON,39.38,-76.99


As we can see from lat-lon reference there are two different cities with the same name in different states.

Let's repeat check using latitude and longitude

In [35]:
display(spark.sql('''
        SELECT dt, city, lat, lon, COUNT(*) cnt
        FROM weather_source_table w
        GROUP BY dt, city, lat, lon
        HAVING COUNT(*) > 1
        LIMIT 5
''').toPandas())

Unnamed: 0,dt,city,lat,lon,cnt


Now we can see that there is no duplicate records.

But we must find the way to link city name with state name if we want to link this dataset to other sources.

Let's try find the distance between every city and every airport using geo-ref. If the distance less than some threshold we'll link records in both datasets.

In [36]:
tmp_loc_table = spark.sql('''
        SELECT DISTINCT w.city city, w.lat lat, w.lon lon
        FROM weather_source_table w
''')
tmp_loc_table.createOrReplaceTempView("tmp_loc_table")

Take a closer look at the city of Philadelphia.

In [37]:
display(spark.sql('''
        SELECT w1.city, w1.lon, w1.lat, a.municipality, a.iso_region, a.name, a.lon airport_lon, a.lat airport_lat,
        SQRT((w1.lat-a.lat)*(w1.lat-a.lat)+(w1.lon-a.lon)*(w1.lon-a.lon)) dist_to_airport
        FROM tmp_loc_table w1
        JOIN airports_source_table a
        ON w1.city = a.municipality
        WHERE w1.city = 'PHILADELPHIA'
        ORDER BY dist_to_airport
        LIMIT 15
''').toPandas())

Unnamed: 0,city,lon,lat,municipality,iso_region,name,airport_lon,airport_lat,dist_to_airport
0,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,Sports Complex N Lot Heliport,-75.161344,39.901544,0.578949
1,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,Peco Oregon Shop Heliport,-75.1399,39.912601,0.580102
2,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,S & C Distribution Center Heliport,-75.228798,39.879299,0.592395
3,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,Philadelphia International Airport,-75.241096,39.871899,0.59295
4,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,Penn's Landing Heliport,-75.141296,39.937302,0.603393
5,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,Federal Reserve Bank Heliport,-75.150497,39.936798,0.606517
6,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,Atlantic Refining & Marketing Corp Heliport,-75.196602,39.9198,0.611166
7,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,Thomas Jefferson University Hospital Heliport,-75.158501,39.949001,0.620899
8,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,S & C 8th & Market Helistop,-75.149597,39.9534,0.621445
9,PHILADELPHIA,-74.91,39.38,PHILADELPHIA,PA,Hahnemann Heliport,-75.162697,39.9571,0.63


Example above shows that we can use this approach to determine the state code for given city name.

Cities that doesn't have matching records in airports dataset can be dropped as irrelevant to immigration flows analysis.

In [38]:
DISTANCE_THRESHOLD = 1
tmp_loc_table = spark.sql(f'''
        SELECT DISTINCT w1.city city, a.iso_region state, w1.lat lat, w1.lon lon
        FROM tmp_loc_table w1
        JOIN airports_source_table a
        ON SQRT((w1.lat-a.lat)*(w1.lat-a.lat)+(w1.lon-a.lon)*(w1.lon-a.lon)) < {DISTANCE_THRESHOLD} AND w1.city = a.municipality
''')
tmp_loc_table.createOrReplaceTempView("tmp_loc_table")

display(spark.sql('''
        SELECT *
        FROM tmp_loc_table w1
        WHERE w1.city = 'PHILADELPHIA'
        LIMIT 5
''').toPandas())

Unnamed: 0,city,state,lat,lon
0,PHILADELPHIA,PA,39.38,-74.91


Now we can link state code to the main temperature dataset.

In [39]:
weather_source_table = spark.sql('''
        SELECT w.dt dt, w.year year, w.month month, w.day day, w.city, w1.state,
        w.lat, w.lon, AVG(w.avg_temperature) avg_temperature
        FROM weather_source_table w
        JOIN tmp_loc_table w1
        ON w.city = w1.city AND w.lat = w1.lat AND w.lon = w1.lon
        GROUP BY w.dt, w.year, w.month, w.day, w.city, w1.state, w.lat, w.lon    
''')
weather_source_table.createOrReplaceTempView("weather_source_table")

display(spark.sql('''
        SELECT *
        FROM weather_source_table w
        LIMIT 5
''').toPandas())

Unnamed: 0,dt,year,month,day,city,state,lat,lon,avg_temperature
0,1960-01-01,1960,1,1,MOBILE,AL,31.35,-88.59,8.497
1,1960-02-01,1960,2,1,MOBILE,AL,31.35,-88.59,8.325
2,1960-03-01,1960,3,1,MOBILE,AL,31.35,-88.59,10.549
3,1960-04-01,1960,4,1,MOBILE,AL,31.35,-88.59,19.022
4,1960-05-01,1960,5,1,MOBILE,AL,31.35,-88.59,20.958


In [40]:
display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM weather_source_table w
''').toPandas())

Unnamed: 0,total_num_of_records
0,139320


Now let's transform airport dataset into the new schema - number of airports per city.

In [41]:
airports_source_table = spark.sql('''
        SELECT iso_region, municipality, COUNT(*) number_of_airports
        FROM airports_source_table a
        GROUP BY iso_region, municipality
''')
airports_source_table.createOrReplaceTempView("airports_source_table")

display(spark.sql('''
        SELECT COUNT(*) total_num_of_records
        FROM airports_source_table a
''').toPandas())

Unnamed: 0,total_num_of_records
0,11510


Now we can join the total number of airports at the point of entry to other statistics.

In [42]:
display(spark.sql('''
        SELECT *
        FROM airports_source_table a
        LIMIT 5
''').toPandas())

Unnamed: 0,iso_region,municipality,number_of_airports
0,TX,PEARLAND,9
1,NM,RESERVE,2
2,SD,CUSTER,5
3,VA,BEDFORD,6
4,MI,FRASER,1


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
For this project, we use a star schema optimized for queries on immigration analysis.
According to the selected schema we'll build one fact table which contains measures for the immigration data and several dimension tables which contain attributes describing our target entities.

For our fact table - **fact_i94_history** - we'll take i94 immigration data records and combine them with average temperatures in the city of entry for the month of the arrival event.

We will use two dimensions of aggregation for our fact data:
1. **dim_time** - timestamps of records in our **fact_i94_history** table broken down into specific units (day, week, month, year, etc.)
2. **dim_cities** - List of the U.S. cities with the most important statistical attributes on them (demography, number of airports, etc.)


Using this data model we'll be able to build some useful analytical queries:
- Finding relationships between the flow of immigrants and the demographic composition of the population
- Finding seasonal changes in immigrant flows depending on time of the year and average temperatures
- etc.

#### 3.2 Mapping Out Data Pipelines
Let's summarize the steps of pipelining our source data into the target data model which were discussed earlier in sections 1 and 2.

1. **Data extraction from external sources**
    - Load all required datasets in various formats from external data sources
    - Narrow datasets and create schema for source tables
    - Save raw source data tables to the S3 data lake (in parquet format)

2. **Data cleaning, aligning and source quality checks**
    - Load source data tables from S3 data lake
    - Drop rows with missing values critical for the consistency
    - Further process source data (transform table shapes, data fields etc.)
    - Quality checks: eliminating duplicate rows and rows with incorrect combination of field values
    - Save preprocessed source tables to the S3 data lake (in parquet format)
    
3. **Forming the target data model and final quality checks**
    - Creating fact table (fact_i94_history) and dimension tables (dim_time and dim_cities) using prepared source tables
    - Save final fact and dimension tables to parquet files in the S3 data lake (in parquet format)


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
All the logic of the data pipeline for creation of the data model is provided in the **etl.py** file.

In [23]:
import etl

input_data = {
        "i_94_immig": "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat",
        "demographic": "",
        "airports": "",
        "temperature": "../../data2/GlobalLandTemperaturesByCity.csv",
        "dict_tables": ""
}

output_data = ""


**Step 1: Data extraction from external sources** is implemented in function **load_source_data(spark, input_data, output_data)**.

In this step we perform the following tasks:
- Load all required datasets in various formats from external data sources
- Narrow datasets and create schema for source tables
- Save raw source data tables to the S3 data lake (in parquet format)

In [25]:
etl.load_source_data(spark, input_data, output_data)

Step 1 - Data extraction from external sources - started.

Reading dictionary tables...
289 records were successfully loaded from countries.csv
660 records were successfully loaded from i94ports.csv
4 records were successfully loaded from i94mode.csv
3 records were successfully loaded from i94visa.csv
55 records were successfully loaded from us_states.csv
Reading i94 immigration dataset...
3,096,313 records were successfully loaded from i94 immigration dataset.
Reading U.S. cities demographics dataset...
2,891 records were successfully loaded from U.S. cities demographics dataset.
Reading airport codes dataset...
55,075 records were successfully loaded from airport codes dataset.
Reading global temperatures by cities dataset...
8,599,212 records were successfully loaded from global temperatures by cities dataset.

Narrowing datasets and creating correct schema for each source table...
2,515,336 records were successfully processed into i94_source_table table.
2,891 records were successf

For **i94 immigration** dataset we perform the following tasks:
1. Set correct data types for the fields
2. Translate _i94res_ and _i94cit_ codes into country names
3. Translate _i94port_ code into city name, state names and state abbreviation name
4. Convert _country_of_residence_, _country_of_citizenship_, _city_of_entry_, _state_of_entry_code_ and _state_of_entry_full_ into capital letters
5. Translate _i94mode_ code into border cross method text name
6. Translate _i94visa_ code into type of visa text name
7. Convert dates from SAS format (with base - 1960-01-01)
8. Add _year_, _month_ and _day_ fields for parquet partitions

For **U.S. cities demographics** dataset we perform the following tasks:
1. Set correct data types for the fields 
2. Convert _city_, _state_ and _state_code_ into capital letters

For **airport codes** dataset we perform the following tasks:
1. Set correct data types for the fields
2. Extract state abbreviation from _iso_region_ field
3. Split coordinates field into _lat_ and _lon_ fields
4. Convert _municipality_ and _iso_region_ into capital letters
5. Drop closed airports and those that are not in the US

For **global temperatures** dataset we perform the following tasks:
1. Set correct data types for the fields
2. Convert _country_ and _city_ into capital letters 
3. Drop cities that are not in the US
4. Drop records before 1960
5. Convert _longitude_ and _latitude_ from string representation (like 39.38N - 89.48W) into real numbers. For simplicity of transformation we assume that all cities in dataset will have north latitude (> 0) and west longitude (< 0) since we've dropped all cities from outside of the U.S.
6. Add _year_, _month_ and _day_ fields for parquet partitions

**Step 2: Data cleaning, aligning and source quality checks** is implemented in function **process_source_data(spark, input_data, output_data)**.

In this step we perform the following tasks:
- Load source data tables from S3 data lake
- Drop rows with missing values critical for the consistency
- Further process source data (transform table shapes, data fields etc.)
- Quality checks: eliminating duplicate rows and rows with incorrect combination of field values
- Save preprocessed source tables to the S3 data lake (in parquet format)

In [26]:
etl.process_source_data(spark, input_data, output_data)

Step 2 - Data cleaning, aligning and source quality checks - started.

Loading source tables from the data lake...
2,515,336 records were successfully loaded from the data lake into i94_source_table table.
2,891 records were successfully loaded from the data lake into cities_pop_source_table table.
104,085 records were successfully loaded from the data lake into weather_source_table table.
21,431 records were successfully loaded from the data lake into airports_source_table table.

Cleaning and transforming source tables...
i94_source_table was successfully preprocessed.
180 records were dropped from the table.
Total number of rows in preprocessed i94_source_table - 2,515,156.
cities_pop_source_table was successfully preprocessed.
2,295 records were dropped from the table.
Total number of rows in preprocessed cities_pop_source_table - 596.
airports_source_table was successfully preprocessed.
67 records were dropped from the table.
Total number of rows in preprocessed airports_source_ta

For **i94_source_table** we perform the following tasks:
1. Drop rows with duplicate _id_ field
2. Drop rows with missing values in _id, state_of_entry_code, city_of_entry, arr_date_
3. Drop rows with _dep_date < arr_date_

For **cities_pop_source_table** we perform the following tasks:
1. Drop rows with missing values in _city_ and _state_code_
2. Build pivot columns for different races in given city + state
3. Drop rows with duplicate combination of _city_ and _state_code_ fields

For **airports_source_table** we perform the following tasks:
1. Drop rows with missing values in _iso_region_, _municipality_, _lat_ and _lon_
2. Drop rows with duplicate rows

We don't need to check for duplicate combination of _iso_region_, _municipality_ and _name_ fields as we'll build pivot table for quantity of the airports for given city + state combination.

For **weather_source_table** we perform the following tasks:
1. Drop rows with missing values in _dt_, _avg_temperature_, _city_, _lat_ and _lon_
2. Drop duplicate rows
3. Join this table with **airports_source_table** using distance estimate from city coordinates (lat-lon) to airport coordinates (lat-lon) and evaluate state code for every city

We drop from the resulting table all cities without state code as we cannot use them unambiguously in the future analysis.

**Step 3: Forming the target data model and final quality checks** is implemented in function **build_data_model(spark, input_data, output_data)**.

In this step we perform the following tasks:
- Load preprocessed source data tables from S3 data lake
- Create fact table (**fact_i94_history**) and dimension tables (**dim_time** and **dim_cities**)
- Perform quality checks on the data model
- Save final fact and dimension tables to the S3 data lake (in parquet format)

In [27]:
etl.build_data_model(spark, input_data, output_data)

Step 3 - Forming the target data model and final quality checks - started.

Loading preprocessed source tables from the data lake...
2,515,156 records were successfully loaded from the data lake into i94_clean_source_table table.
596 records were successfully loaded from the data lake into cities_pop_clean_source_table table.
87,480 records were successfully loaded from the data lake into weather_clean_source_table table.
21,364 records were successfully loaded from the data lake into airports_clean_source_table table.

Building dimension tables...
Dimension table dim_time with 578 records was successfully created .
Dimension table dim_cities with 596 records was successfully created .

Building fact table...
Fact table fact_i94_history with 2,515,156 records was successfully created .

Step 4 - Final quality checks for the data model - started.

Step 4 - Final quality checks for the data model - successfully completed.

Quality checks on the data model have successfully passed.

Writi

For **dim_time** dimension table we perform the following tasks:
1. Extract unique date values from _arr_date_ and _dep_date_ fields of **i94_clean_source_table** and from _dt_ field of **weather_clean_source_table** and make a union of them
2. Add separate fields for _year_, _month_, _day_, _weekday_ and _week_ parts of date

For **dim_cities** dimension table we perform the following tasks:
1. Extract all attributes from **cities_pop_clean_source_table**
2. Transform **airports_clean_source_table** into pivot table with number of airports per city + state combination
3. Add _number_of_airports_ field in resulting **dim_cities** table by joining _municipality_ & _iso_region_ composite key with _city_ & _state_code_ omposite key

For **fact_i94_history** dimension table we perform the following tasks:
1. Extract following attributes from **i94_clean_source_table**: _id, country_of_residence, country_of_citizenship, city_of_entry, state_of_entry_code, border_cross_method, type_of_visa, class_of_admission, gender, age, arr_date, dep_date_
2. Add fields for _year_, _month_ and _day_ parts of the _arrival_date_
3. Add _avg_temperature_ field in resulting **fact_i94_history table** by joining _city_of_entry_ & _state_of_entry_code_ & _YEAR(arr_date)_ & _MONTH(arr_date)_ composite key with _city_ & _state_ & _YEAR(dt)_ & _MONTH(dt)_ omposite key (Test datasets for i94 and temperature don't contain overlaping periods of time, so _avg_temperature_ field will be null)

After forming target tables we perform final quality checks (see below).

#### 4.2 Data Quality Checks
We perform data quality checks through all three steps of data transformation into the target data model.

During **Step 3** we performed the following final checks in function **check_model_quality(spark, model_data)** to ensure quality of our fact and dimension tables:
- Check if **fact** and **dimension** tables contain greater than zero records
- Check if **fact** table doesn't contains dates missing in **dim_time** table

We ran this function in Step 3 above.

Integrity constraints on the relational database (e.g., unique key, data type, etc.) were implemented in the previous steps of pipeline.

#### 4.3 Data dictionary 
Data dictionary for the target data model is provided in the file **dictionary.pdf**.

Schemas for all tables used in our pipeline is provided in the file **dictionary_Schema.txt**.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In this project we are dealing with a huge amount of data. As we see from one month's immigration data sample dataset for April 2016 there is over 3 million rows of events for only 30 days. This is apparently a Big Data scenario.
Therefore, we must use tools with scalability in mind. So we decided to use scalable processing capabilities of Apache Spark and scalable storage capabilities of AWS S3.

For this project, we use a star schema optimized for queries on immigration analysis.
According to the selected schema we'll build one fact table which contains measures for the immigration data and several dimension tables which contain attributes describing our target entities.

Using this data model we'll be able to build some useful analytical queries:
- Finding relationships between the flow of immigrants and the demographic composition of the population
- Finding seasonal changes in immigrant flows depending on time of the year and average temperatures
- etc.

Monthly updating of our datd would be adequate for the needs of the analytics teams.
Anyway we can increase the frequency of update up to a week or day, for example, without having to make architectural changes in the pipeline.

In the scenario when data volumes will rapidly grow (100x, for example) we should consider two options:
- to scale underlying Spark cluster hardware.
- to implement Steps 1 and 2 of our pipeline in Apache Airflow using data partitioning. We could use logical partitioning to parallelize data processing where different soruce tables will be processed separately from each other. Also we could use data partiotioning where each run of our DAG will process only smaller data segment based on year/month partitioning of our source data.

If we need to update target fact and dimension tables on a regular schedule (e.g., on a daily basis by 7am every day) we can implement Step 3 of our pipeline as an independent DAG in Airflow and schedule this DAG to run on a daily basis. 

Since we're using AWS S3 as main storage option for our data lake we can either organize access to the data by queries in-place or transfer them to multiuser databases such as redshift, etc. 
Therefore, if the database needs to be accessed by 100+ people that shouldn’t be an issue.
