# Capstone Project - Udacity Data Engineer Nanodegree

#### Project Summary

The main goal of this project is building up a [data warehouse](https://en.wikipedia.org/wiki/Data_warehouse) as a [single-source-of-truth](https://en.wikipedia.org/wiki/Single_source_of_truth) database by integrating data from different data sources for data analytics purpose and future backend usage.


The project follows the follow steps:

-   [Step 1: Scope the Project and Gather Data](#Step-1:-Scope-the-Project-and-Gather-Data)
    -   [Describe and Gather Data](#Describe-and-Gather-Data)

---

-   [Step 2: Explore and Assess the Data](Step-2:-Explore-and-Assess-the-Data)
    1.  [Explore immigration data set](#Explore-immigration-data-set)
    1.  [Clean immigration data set](#Clean-immigration-data-set)
    1.  [Explore Temperature data set](#Explore-temperature-data-set)
    1.  [Clean Temperature data set](Clean-temperature-data-set)
    1.  [Explore Demography data set](#Explore-demography-data-set)
    1.  [Clean Demography data set](#Explore-demography-data-set)

---

-   [Step 3: Define the Data Model](#Step-3:-Define-the-Data-Model)

---

-   [Step 4: Run ETL to Model the Data](#Step-4:-Run-Pipelines-to-Model-the-Data)

---

-   [Step 5: Complete Project Write Up](#Step-5:-Complete-Project-Write-Up)


In [1]:
import pyspark
from pyspark.sql import SparkSession

import wrangle
import importlib


import helper
from helper import *


In [2]:
import warnings

warnings.filterwarnings(action="ignore")


### Step 1: Scope the Project and Gather Data


#### Scope

In this project we will integrate I94 immigration data, world temperature data and US demographic data sets to setup a data warehouse with fact and dimension tables (star schema).

-   Data Sets

    1. [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)
    2. [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
    3. [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

-   Tools
    -   Python for data processing
        -   PySpark - data processing on large data set
    -   AWS S3: data storage


#### Describe and Gather Data

| Data Set                                                                                                     | Format | Brief description                                                                                                                                                                                                                               |
| ------------------------------------------------------------------------------------------------------------ | ------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)                   | SAS    | Data contains international visitor arrival statistics by world regions and select countries, type of visa, transportation method, states visited (first intended address only), age groups, and the top ports of entry (for select countries). |
| [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) | CSV    | This dataset is from Kaggle and contains monthly average temperature data at different countries in the world wide since 1750.                                                                                                                  |
| [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) | CSV    | This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.                                                                                            |


### Step 2: Explore and Assess the Data


#### Explore the data

1. Use pandas for exploratory data analysis to get an overview on these data sets
2. Split data sets to dimensional tables and change column names for better understanding
3. Utilize PySpark on one of the SAS data sets to test ETL data pipeline logic


Create spark sessoin.


In [3]:
# spark = (
#     SparkSession.builder.config(
#         "spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11"
#     )
#     .enableHiveSupport()
#     .getOrCreate()
# )
spark = (
    SparkSession.builder.config(
        "spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0"
    )
    .enableHiveSupport()
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")


##### Explore immigration data set


In [4]:
# Read in the data here
data_path = "sample_data/immigration_data_sample.csv"
df_immigration = spark.read.csv(data_path, header=True, inferSchema=True)


In [5]:
df_immigration.limit(5).toPandas()


Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [6]:
df_immigration.columns


['_c0',
 'cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

| column   | Description                                                                        |
| -------- | ---------------------------------------------------------------------------------- |
| cicid    | Unique record ID                                                                   |
| i94yr    | 4 digit year                                                                       |
| i94mon   | Numeric month                                                                      |
| i94cit   | 3 digit code for immigrant country of birth                                        |
| i94res   | 3 digit code for immigrant country of residence                                    |
| i94port  | Port of admission                                                                  |
| arrdate  | Arrival Date in the USA                                                            |
| i94mode  | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)              |
| i94addr  | USA State of arrival                                                               |
| depdate  | Departure Date from the USA                                                        |
| i94bir   | Age of Respondent in Years                                                         |
| i94visa  | Visa codes collapsed into three categories                                         |
| count    | Field used for summary statistics                                                  |
| dtadfile | Character Date Field - Date added to I-94 Files                                    |
| visapost | Department of State where where Visa was issued                                    |
| occup    | Occupation that will be performed in U.S                                           |
| entdepa  | Arrival Flag - admitted or paroled into the U.S.                                   |
| entdepd  | Departure Flag - Departed, lost I-94 or is deceased                                |
| entdepu  | Update Flag - Either apprehended, overstayed, adjusted to perm residence           |
| matflag  | Match flag - Match of arrival and departure records                                |
| biryear  | 4 digit year of birth                                                              |
| dtaddto  | Character Date Field - Date to which admitted to U.S. (allowed to stay until)      |
| gender   | Non-immigrant sex                                                                  |
| insnum   | INS number                                                                         |
| airline  | Airline used to arrive in U.S.                                                     |
| admnum   | Admission Number                                                                   |
| fltno    | Flight number of Airline used to arrive in U.S.                                    |
| visatype | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |


In [7]:
print_number_of_records(df_immigration.count())


number of records:	1,000


In [8]:
missing_values_as_df(df_immigration)


Unnamed: 0,column_name,missing_count,missing_percentage
5,entdepu,1000,100.0
3,occup,996,99.6
8,insnum,965,96.5
2,visapost,618,61.8
7,gender,141,14.1
0,i94addr,59,5.9
1,depdate,49,4.9
4,entdepd,46,4.6
6,matflag,46,4.6
9,airline,33,3.3


##### Clean immigration data set


We noticed that there are some columns with over 90% missing values, so we don't need them.

> Drop all columns with more than 90 percent missing values.

> Drop all duplicates and empty rows.

> Transform arrive_date, departure_date in immigration data from SAS time format to panda datetime format.


In [9]:
df_immigration = wrangle.drop_immigration_empty_columns(df_immigration)


In [10]:
missing_values_as_df(df_immigration)


Unnamed: 0,column_name,missing_count,missing_percentage
2,visapost,618,61.8
5,gender,141,14.1
0,i94addr,59,5.9
1,depdate,49,4.9
3,entdepd,46,4.6
4,matflag,46,4.6
6,airline,33,3.3
7,fltno,8,0.8


In [11]:
fact_immigration = df_immigration[
    [
        "cicid",
        "i94yr",
        "i94mon",
        "i94port",
        "i94addr",
        "arrdate",
        "depdate",
        "i94mode",
        "i94visa",
    ]
]


In [12]:
fact_immigration = wrangle.fact_immigration(fact_immigration)
fact_immigration.limit(5).toPandas()


Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country,immigration_id
0,4084316,2016,4,HHW,HI,1970-01-01 05:42:46,1970-01-01 05:42:53,1,2,United States,0
1,4422636,2016,4,MCA,TX,1970-01-01 05:42:47,1970-01-01 05:42:48,1,2,United States,1
2,1195600,2016,4,OGG,FL,1970-01-01 05:42:31,1970-01-01 05:42:51,1,2,United States,2
3,5291768,2016,4,LOS,CA,1970-01-01 05:42:52,1970-01-01 05:43:01,1,2,United States,3
4,985523,2016,4,CHM,NY,1970-01-01 05:42:30,1970-01-01 05:42:33,3,2,United States,4


In [13]:
fact_immigration.columns


['cic_id',
 'year',
 'month',
 'city_code',
 'state_code',
 'arrive_date',
 'departure_date',
 'mode',
 'visa',
 'country',
 'immigration_id']

In [14]:
fact_immigration.createOrReplaceTempView("fact_immigration")
fact_immigration.printSchema()

root
 |-- cic_id: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- arrive_date: timestamp (nullable = true)
 |-- departure_date: timestamp (nullable = true)
 |-- mode: integer (nullable = true)
 |-- visa: integer (nullable = true)
 |-- country: string (nullable = false)
 |-- immigration_id: integer (nullable = false)



In [15]:
spark.sql("select * from fact_immigration limit 10").show()

+-------+----+-----+---------+----------+-------------------+-------------------+----+----+-------------+--------------+
| cic_id|year|month|city_code|state_code|        arrive_date|     departure_date|mode|visa|      country|immigration_id|
+-------+----+-----+---------+----------+-------------------+-------------------+----+----+-------------+--------------+
|4084316|2016|    4|      HHW|        HI|1970-01-01 05:42:46|1970-01-01 05:42:53|   1|   2|United States|             0|
|4422636|2016|    4|      MCA|        TX|1970-01-01 05:42:47|1970-01-01 05:42:48|   1|   2|United States|             1|
|1195600|2016|    4|      OGG|        FL|1970-01-01 05:42:31|1970-01-01 05:42:51|   1|   2|United States|             2|
|5291768|2016|    4|      LOS|        CA|1970-01-01 05:42:52|1970-01-01 05:43:01|   1|   2|United States|             3|
| 985523|2016|    4|      CHM|        NY|1970-01-01 05:42:30|1970-01-01 05:42:33|   3|   2|United States|             4|
|1481650|2016|    4|      ATL|  

In [16]:
dim_immigration_personal = df_immigration[
    ["cicid", "i94cit", "i94res", "biryear", "gender"]
]


In [17]:
importlib.reload(wrangle)
dim_immigration_personal = wrangle.dim_immigration_personal(dim_immigration_personal)
dim_immigration_personal.limit(5).toPandas()


Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,immi_personal_id
0,4084316,209,209,1955,F,0
1,4422636,582,582,1990,M,1
2,1195600,148,112,1940,M,2
3,5291768,297,297,1991,M,3
4,985523,111,111,1997,F,4


In [18]:
dim_immigration_personal.createOrReplaceTempView("dim_immigration_personal")
dim_immigration_personal.printSchema()

root
 |-- cic_id: long (nullable = true)
 |-- citizen_country: integer (nullable = true)
 |-- residence_country: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- immi_personal_id: integer (nullable = false)



In [19]:
spark.sql("select * from dim_immigration_personal limit 10").show()

+-------+---------------+-----------------+----------+------+----------------+
| cic_id|citizen_country|residence_country|birth_year|gender|immi_personal_id|
+-------+---------------+-----------------+----------+------+----------------+
|4084316|            209|              209|      1955|     F|               0|
|4422636|            582|              582|      1990|     M|               1|
|1195600|            148|              112|      1940|     M|               2|
|5291768|            297|              297|      1991|     M|               3|
| 985523|            111|              111|      1997|     F|               4|
|1481650|            577|              577|      1965|     M|               5|
|2197173|            245|              245|      1968|     F|               6|
| 232708|            113|              135|      1983|     F|               7|
|5227851|            131|              131|      1977|  null|               8|
|  13213|            116|              116|      198

In [20]:
dim_immigration_airline = df_immigration[
    ["cicid", "airline", "admnum", "fltno", "visatype"]
]


In [21]:
dim_immigration_airline = wrangle.dim_immigration_airline(dim_immigration_airline)
dim_immigration_airline.limit(5).toPandas()


Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type,immi_airline_id
0,4084316,JL,56582674633,00782,WT,0
1,4422636,*GA,94361995930,XBLNG,B2,1
2,1195600,LH,55780468433,00464,WT,2
3,5291768,QR,94789696030,00739,B2,3
4,985523,,42322572633,LAND,WT,4


In [22]:
dim_immigration_airline.createOrReplaceTempView("dim_immigration_airline")
dim_immigration_airline.printSchema()

root
 |-- cic_id: long (nullable = true)
 |-- airline: string (nullable = true)
 |-- admin_num: long (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- immi_airline_id: integer (nullable = false)



In [23]:
spark.sql("select * from dim_immigration_airline limit 10").show()

+-------+-------+-----------+-------------+---------+---------------+
| cic_id|airline|  admin_num|flight_number|visa_type|immi_airline_id|
+-------+-------+-----------+-------------+---------+---------------+
|4084316|     JL|56582674633|        00782|       WT|              0|
|4422636|    *GA|94361995930|        XBLNG|       B2|              1|
|1195600|     LH|55780468433|        00464|       WT|              2|
|5291768|     QR|94789696030|        00739|       B2|              3|
| 985523|   null|42322572633|         LAND|       WT|              4|
|1481650|     DL|  736852585|          910|       B2|              5|
|2197173|     CX|  786312185|          870|       B2|              6|
| 232708|     BA|55474485033|        00117|       WT|              7|
|5227851|     LX|59413424733|        00008|       WT|              8|
|  13213|     AA|55449792933|        00109|       WT|              9|
+-------+-------+-----------+-------------+---------+---------------+



##### Explore temperature data set


In [24]:
# data_path = "../../data2/GlobalLandTemperaturesByCity.csv"
# df_temperature = spark.read.csv(data_path, header=True, inferSchema=True)


create a sample data for better perfomence

In [25]:
# df_temperature = df_temperature[df_temperature["Country"] == "United States"]
# df_temperature.limit(5000).toPandas().to_csv("sample_data/GlobalLandTemperaturesByCity_sample.csv", encoding='utf-8',index=False)

In [26]:
data_path = "sample_data/GlobalLandTemperaturesByCity_sample.csv"
df_temperature = spark.read.csv(data_path, header=True, inferSchema=True)


In [27]:
df_temperature.columns


['dt',
 'AverageTemperature',
 'AverageTemperatureUncertainty',
 'City',
 'Country',
 'Latitude',
 'Longitude']

In [28]:
df_temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
1,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
2,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
3,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
4,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [29]:
missing_values_as_df(df_temperature)

Unnamed: 0,column_name,missing_count,missing_percentage
0,AverageTemperature,121,2.42
1,AverageTemperatureUncertainty,121,2.42


| Column                        | Description                                |
| ----------------------------- | ------------------------------------------ |
| dt                            | Date                                       |
| AverageTemperature            | Global average land temperature in celsius |
| AverageTemperatureUncertainty | 95% confidence interval around the average |
| City                          | Name of City                               |
| Country                       | Name of Country                            |
| Latitude                      | City Latitude                              |
| Longitude                     | City Longitude                             |


##### Clean temperature data set

> Drop all rows with missing average temperature

> Drop duplicate columns


In [30]:
df_temperature = wrangle.drop_temperature_nulls_and_duplicates_rows(df_temperature)


In [31]:
df_temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1848-12-01,4.274,4.167,Abilene,United States,32.95N,100.53W
1,1870-07-01,28.283,1.63,Abilene,United States,32.95N,100.53W
2,1873-12-01,6.623,0.843,Abilene,United States,32.95N,100.53W
3,1879-12-01,7.3,1.702,Abilene,United States,32.95N,100.53W
4,1893-07-01,29.273,0.82,Abilene,United States,32.95N,100.53W


In [32]:
# df_temperature_usa = df_temperature[df_temperature["Country"] == "United States"]
dim_temperature = df_temperature[
    ["dt", "AverageTemperature", "AverageTemperatureUncertainty", "City", "Country"]
]


In [33]:
dim_temperature.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)



In [34]:
dim_temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country
0,1848-12-01,4.274,4.167,Abilene,United States
1,1870-07-01,28.283,1.63,Abilene,United States
2,1873-12-01,6.623,0.843,Abilene,United States
3,1879-12-01,7.3,1.702,Abilene,United States
4,1893-07-01,29.273,0.82,Abilene,United States


In [35]:
dim_temperature = wrangle.dim_temperature(dim_temperature)
dim_temperature.limit(5).toPandas()


Unnamed: 0,dt,avg_temp,avg_temp_uncertainty,city,country,year,month
0,1848-12-01,4.274,4.167,ABILENE,UNITED STATES,1848,12
1,1870-07-01,28.283001,1.63,ABILENE,UNITED STATES,1870,7
2,1873-12-01,6.623,0.843,ABILENE,UNITED STATES,1873,12
3,1879-12-01,7.3,1.702,ABILENE,UNITED STATES,1879,12
4,1893-07-01,29.273001,0.82,ABILENE,UNITED STATES,1893,7


In [36]:
dim_temperature.createOrReplaceTempView("dim_temperature")
dim_temperature.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- avg_temp: float (nullable = true)
 |-- avg_temp_uncertainty: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [37]:
spark.sql("select * from dim_temperature limit 10").show()

+-------------------+--------+--------------------+-------+-------------+----+-----+
|                 dt|avg_temp|avg_temp_uncertainty|   city|      country|year|month|
+-------------------+--------+--------------------+-------+-------------+----+-----+
|1848-12-01 00:00:00|   4.274|               4.167|ABILENE|UNITED STATES|1848|   12|
|1870-07-01 00:00:00|  28.283|                1.63|ABILENE|UNITED STATES|1870|    7|
|1873-12-01 00:00:00|   6.623|               0.843|ABILENE|UNITED STATES|1873|   12|
|1879-12-01 00:00:00|     7.3|               1.702|ABILENE|UNITED STATES|1879|   12|
|1893-07-01 00:00:00|  29.273|                0.82|ABILENE|UNITED STATES|1893|    7|
|1903-01-01 00:00:00|   5.756|               0.749|ABILENE|UNITED STATES|1903|    1|
|1931-06-01 00:00:00|  26.742|               0.362|ABILENE|UNITED STATES|1931|    6|
|1979-08-01 00:00:00|  26.237|               0.223|ABILENE|UNITED STATES|1979|    8|
|2003-08-01 00:00:00|  29.062|               0.354|ABILENE|UNITED

#### Explore demography data set


In [38]:
file_path = "sample_data/us-cities-demographics.csv"
df_demog = spark.read.csv(file_path, header=True, inferSchema=True, sep=";")


In [39]:
df_demog.columns


['City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code',
 'Race',
 'Count']

| Column                 | Description                                                   |
| ---------------------- | ------------------------------------------------------------- |
| City                   | City Name                                                     |
| State                  | US State where city is located                                |
| Median Age             | Median age of the population                                  |
| Male Population        | Count of male population                                      |
| Female Population      | Count of female population                                    |
| Total Population       | Count of total population                                     |
| Number of Veterans     | Count of total Veterans                                       |
| Foreign born           | Count of residents of the city that were not born in the city |
| Average Household Size | Average city household size                                   |
| State Code             | Code of the US state                                          |
| Race                   | Respondent race                                               |
| Count                  | Count of city's individual per race                           |


In [40]:
df_demog.limit(5).toPandas()


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


#### Clean demography data set

> Drop duplicate columns

> drop rows with missing values

> Parse I94_SAS_Labels_Descriptions.SAS file to get auxiliary dimension table - country_code, city_code, state_code


In [41]:
df_demog = wrangle.drop_demographics_nulls_and_duplicates_rows(df_demog)


In [42]:
dim_demog_population = df_demog[
    [
        "City",
        "State",
        "Male Population",
        "Female Population",
        "Number of Veterans",
        "Foreign-born",
        "Race",
    ]
]


In [43]:
dim_demog_population = wrangle.dim_demog_population(dim_demog_population)
dim_demog_population.limit(5).toPandas()


Unnamed: 0,city,state,male_population,female_population,num_veterans,foreign_born,race,demog_pop_id
0,Quincy,Massachusetts,44129,49500,4147,32935,White,0
1,Wilmington,North Carolina,52346,63601,5908,7401,Asian,1
2,Tampa,Florida,175517,193511,20636,58795,Hispanic or Latino,2
3,Gastonia,North Carolina,35527,39023,3537,5715,Asian,3
4,Tyler,Texas,50422,53283,4813,8225,American Indian and Alaska Native,4


In [44]:
dim_demog_population.createOrReplaceTempView("dim_demog_population")
dim_demog_population.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- num_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- race: string (nullable = true)
 |-- demog_pop_id: integer (nullable = false)



In [45]:
spark.sql("select * from dim_demog_population limit 10 ").show()

+------------+--------------+---------------+-----------------+------------+------------+--------------------+------------+
|        city|         state|male_population|female_population|num_veterans|foreign_born|                race|demog_pop_id|
+------------+--------------+---------------+-----------------+------------+------------+--------------------+------------+
|      Quincy| Massachusetts|          44129|            49500|        4147|       32935|               White|           0|
|  Wilmington|North Carolina|          52346|            63601|        5908|        7401|               Asian|           1|
|       Tampa|       Florida|         175517|           193511|       20636|       58795|  Hispanic or Latino|           2|
|    Gastonia|North Carolina|          35527|            39023|        3537|        5715|               Asian|           3|
|       Tyler|         Texas|          50422|            53283|        4813|        8225|American Indian a...|           4|
|      R

In [46]:
dim_demog_statistics = df_demog[
    ["City", "State", "Median Age", "Average Household Size"]
]


In [47]:
dim_demog_statistics = wrangle.dim_demog_statistics(dim_demog_statistics)
dim_demog_statistics.limit(5).toPandas()


Unnamed: 0,city,state,median_age,avg_household_size,demog_stat_id
0,QUINCY,MASSACHUSETTS,41,2.39,0
1,WILMINGTON,NORTH CAROLINA,35,2.24,1
2,TAMPA,FLORIDA,35,2.47,2
3,GASTONIA,NORTH CAROLINA,36,2.67,3
4,TYLER,TEXAS,33,2.59,4


In [48]:
dim_demog_statistics.createOrReplaceTempView("dim_demog_statistics")
dim_demog_statistics.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: integer (nullable = true)
 |-- avg_household_size: float (nullable = true)
 |-- demog_stat_id: integer (nullable = false)



In [49]:
spark.sql("select * from dim_demog_statistics limit 10 ").show()

+------------+--------------+----------+------------------+-------------+
|        city|         state|median_age|avg_household_size|demog_stat_id|
+------------+--------------+----------+------------------+-------------+
|      QUINCY| MASSACHUSETTS|        41|              2.39|            0|
|  WILMINGTON|NORTH CAROLINA|        35|              2.24|            1|
|       TAMPA|       FLORIDA|        35|              2.47|            2|
|    GASTONIA|NORTH CAROLINA|        36|              2.67|            3|
|       TYLER|         TEXAS|        33|              2.59|            4|
|      RIALTO|    CALIFORNIA|        31|              3.83|            5|
|       SANDY|          UTAH|        34|              3.22|            6|
|ARDEN-ARCADE|    CALIFORNIA|        41|              2.18|            7|
|      UPLAND|    CALIFORNIA|        39|              2.77|            8|
|  CAPE CORAL|       FLORIDA|        45|              2.85|            9|
+------------+--------------+---------

In [50]:
with open("I94_SAS_Labels_Descriptions.SAS") as file:
    contents = file.readlines()


In [51]:
country_code = wrangle.country_code(contents)


In [52]:
df_country_code = spark.createDataFrame(
    data=list(country_code.items()), schema=["code", "country"]
)
df_country_code.limit(5).toPandas()


Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [53]:
df_country_code.createOrReplaceTempView("country_code")
df_country_code.printSchema()

root
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)



In [54]:
spark.sql("select * from country_code limit 10 ").show()

+----+---------------+
|code|        country|
+----+---------------+
| 236|    AFGHANISTAN|
| 101|        ALBANIA|
| 316|        ALGERIA|
| 102|        ANDORRA|
| 324|         ANGOLA|
| 529|       ANGUILLA|
| 518|ANTIGUA-BARBUDA|
| 687|     ARGENTINA |
| 151|        ARMENIA|
| 532|          ARUBA|
+----+---------------+



In [55]:
city_code = wrangle.city_code(contents)


In [56]:
df_city_code = spark.createDataFrame(
    list(city_code.items()), schema=["code", "city"]
)
df_city_code.limit(5).toPandas()


Unnamed: 0,code,city
0,ANC,ANCHORAGE
1,BAR,BAKER AAF - BAKER ISLAND
2,DAC,DALTONS CACHE
3,PIZ,DEW STATION PT LAY DEW
4,DTH,DUTCH HARBOR


In [57]:
df_city_code.createOrReplaceTempView("city_code")
df_city_code.printSchema()

root
 |-- code: string (nullable = true)
 |-- city: string (nullable = true)



In [58]:
spark.sql("select * from city_code limit 10 ").show()

+----+--------------------+
|code|                city|
+----+--------------------+
| ANC|           ANCHORAGE|
| BAR|BAKER AAF - BAKER...|
| DAC|       DALTONS CACHE|
| PIZ|DEW STATION PT LA...|
| DTH|        DUTCH HARBOR|
| EGL|               EAGLE|
| FRB|           FAIRBANKS|
| HOM|               HOMER|
| HYD|               HYDER|
| JUN|              JUNEAU|
+----+--------------------+



In [59]:
state_code = wrangle.state_code(contents)


In [60]:
df_state_code = spark.createDataFrame(
    list(state_code.items()), schema=["code", "state"]
)
df_state_code.limit(5).toPandas()


Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


In [61]:
df_state_code.createOrReplaceTempView("state_code")
df_state_code.printSchema()

root
 |-- code: string (nullable = true)
 |-- state: string (nullable = true)



In [62]:
spark.sql("select * from state_code limit 10 ").show()

+----+-----------------+
|code|            state|
+----+-----------------+
|  AK|           ALASKA|
|  AZ|          ARIZONA|
|  AR|         ARKANSAS|
|  CA|       CALIFORNIA|
|  CO|         COLORADO|
|  CT|      CONNECTICUT|
|  DE|         DELAWARE|
|  DC|DIST. OF COLUMBIA|
|  FL|          FLORIDA|
|  GA|          GEORGIA|
+----+-----------------+



### Step 3: Define the Data Model

#### Conceptual Data Model

Since the purpose of this data warehouse is Online Analytical Processing (OLAP) and Business intelligence (BI) app usage, we will model these data sets with [star schema](https://en.wikipedia.org/wiki/Star_schema) data modeling.

-   Star Schema
    ![alt text](images/conceptual_data_model.png)


#### Data Pipeline Build Up Steps

1. Assume all data sets are stored in [Source] or S3 buckets as below
    - `[Source]/18-83510-I94-Data-2016/*.sas7bdat`
    - `[Source]/I94_SAS_Labels_Descriptions.SAS`
    - `[Source]/GlobalLandTemperaturesByCity.csv`
    - `[Source]/us-cities-demographics.csv`
2. Follow by Step 2 – Cleaning step to clean up data sets (all needed functions are in [wrangle.py](wrangle.py) module)
   ![alt](./images/split-datasets.png)
3. Transform immigration data to 1 fact table and 2 dimension tables, fact table will be partitioned by state
4. Parsing label description file to get 3 auxiliary tables
5. Transform global temperature data set to a dimension table
6. Split demography data set to 2 dimension tables
7. Store these tables back to target S3 bucket


### Step 4: Run Pipelines to Model the Data


#### 4.1 Create the data model

Data processing and data model was created by Spark.

This Notebook [Capstone Project.ipynb](Capstone_Project.ipynb).

You can run [etl.py](etl.py) there are 2 functions to start ETL process

-   `process_datasets` for AWS S3
-   `process_datasets_local`(defualt) for local workspace


#### 4.2 Data Quality Checks

Data quality checks includes

1. No empty table after running the ETL data pipeline.
2. Data schema for each dimensional table matches data model

The model schema is also saved as JSON in [schema.json](schema.json) and in [python module](schema.py) as *StructType* for every table.

Data quality notebook: [test.ipynb](test.ipynb).


Test some queries (The relationship between tables)

In [63]:
query = """
    select  distinct(city) ,foreign_born ,(male_population+female_population) as population
    from dim_demog_population
    order by population desc
    limit 10
"""
spark.sql(query).show()

+------------+------------+----------+
|        city|foreign_born|population|
+------------+------------+----------+
|    New York|     3212500|   8550405|
| Los Angeles|     1485425|   3971896|
|     Chicago|      573463|   2720556|
|     Houston|      696210|   2298628|
|Philadelphia|      205339|   1567442|
|     Phoenix|      300702|   1563001|
| San Antonio|      208046|   1469824|
|   San Diego|      373842|   1394907|
|      Dallas|      326825|   1300082|
|    San Jose|      401493|   1026919|
+------------+------------+----------+



In [64]:
query = """
    select visa_type, count(visa) visa
    from dim_immigration_airline dia
    join fact_immigration fi
    on dia.cic_id = fi.cic_id
    group by visa_type
    order by visa desc
    limit 10
"""
spark.sql(query).show()

+---------+----+
|visa_type|visa|
+---------+----+
|       WT| 443|
|       B2| 356|
|       WB|  91|
|       B1|  61|
|      GMT|  27|
|       F1|  10|
|       CP|   5|
|       F2|   3|
|       E2|   3|
|       M1|   1|
+---------+----+



we notice that *WT* visa type is the most used one for immigration.

In [65]:
query = """
    select cc.code, cc.city from
    city_code cc
    limit 10
"""
spark.sql(query).show()

+----+--------------------+
|code|                city|
+----+--------------------+
| ANC|           ANCHORAGE|
| BAR|BAKER AAF - BAKER...|
| DAC|       DALTONS CACHE|
| PIZ|DEW STATION PT LA...|
| DTH|        DUTCH HARBOR|
| EGL|               EAGLE|
| FRB|           FAIRBANKS|
| HOM|               HOMER|
| HYD|               HYDER|
| JUN|              JUNEAU|
+----+--------------------+



In [66]:
query = """
    select *
    from fact_immigration fi
    join city_code cc
    on fi.city_code = cc.code
    limit 10
"""
spark.sql(query).show()

+-------+----+-----+---------+----------+-------------------+-------------------+----+----+-------------+--------------+----+-------+
| cic_id|year|month|city_code|state_code|        arrive_date|     departure_date|mode|visa|      country|immigration_id|code|   city|
+-------+----+-----+---------+----------+-------------------+-------------------+----+----+-------------+--------------+----+-------+
|5433584|2016|    4|      PHO|        WA|1970-01-01 05:42:53|1970-01-01 05:42:57|   1|   2|United States|           961| PHO|PHOENIX|
|2546113|2016|    4|      PHO|        IL|1970-01-01 05:42:38|1970-01-01 05:42:47|   1|   2|United States|           909| PHO|PHOENIX|
|1215382|2016|    4|      PHO|        CA|1970-01-01 05:42:31|1970-01-01 05:42:49|   1|   1|United States|           892| PHO|PHOENIX|
|2928372|2016|    4|      PHO|        NY|1970-01-01 05:42:40|1970-01-01 05:42:45|   1|   1|United States|           841| PHO|PHOENIX|
| 813390|2016|    4|      PHO|        NC|1970-01-01 05:42:29|1

In [67]:
query = """
   select * from dim_temperature order by avg_temp desc limit 10 
"""
spark.sql(query).show()

+-------------------+--------+--------------------+-------+-------------+----+-----+
|                 dt|avg_temp|avg_temp_uncertainty|   city|      country|year|month|
+-------------------+--------+--------------------+-------+-------------+----+-----+
|2011-08-01 00:00:00|   32.11|               0.201|ABILENE|UNITED STATES|2011|    8|
|2011-07-01 00:00:00|  31.954|               0.148|ABILENE|UNITED STATES|2011|    7|
|2011-06-01 00:00:00|  31.135|               0.342|ABILENE|UNITED STATES|2011|    6|
|1998-07-01 00:00:00|  31.125|               0.148|ABILENE|UNITED STATES|1998|    7|
|2001-07-01 00:00:00|  30.964|               0.292|ABILENE|UNITED STATES|2001|    7|
|1980-07-01 00:00:00|  30.795|               0.394|ABILENE|UNITED STATES|1980|    7|
|1952-08-01 00:00:00|  30.791|               0.396|ABILENE|UNITED STATES|1952|    8|
|1943-08-01 00:00:00|  30.321|               0.218|ABILENE|UNITED STATES|1943|    8|
|1953-06-01 00:00:00|  30.262|               0.358|ABILENE|UNITED

In [68]:
query = """
   select *
   from dim_temperature t
   join 
   city_code cc
   on t.city = cc.city
   order by avg_temp
"""
spark.sql(query).show()

+-------------------+--------+--------------------+-----+-------------+----+-----+----+-----+
|                 dt|avg_temp|avg_temp_uncertainty| city|      country|year|month|code| city|
+-------------------+--------+--------------------+-----+-------------+----+-----+----+-----+
|1918-01-01 00:00:00| -10.192|               0.315|AKRON|UNITED STATES|1918|    1| AKR|AKRON|
|1918-01-01 00:00:00| -10.192|               0.315|AKRON|UNITED STATES|1918|    1| CAK|AKRON|
|1857-01-01 00:00:00|  -9.802|               1.123|AKRON|UNITED STATES|1857|    1| AKR|AKRON|
|1857-01-01 00:00:00|  -9.802|               1.123|AKRON|UNITED STATES|1857|    1| CAK|AKRON|
|1912-01-01 00:00:00|  -8.887|               0.662|AKRON|UNITED STATES|1912|    1| AKR|AKRON|
|1912-01-01 00:00:00|  -8.887|               0.662|AKRON|UNITED STATES|1912|    1| CAK|AKRON|
|1856-01-01 00:00:00|  -8.846|               0.956|AKRON|UNITED STATES|1856|    1| AKR|AKRON|
|1856-01-01 00:00:00|  -8.846|               0.956|AKRON|UNI

In [69]:
query = """
   select city, avg(avg_temp) avarage_temp
   from dim_temperature
   group by city
"""
spark.sql(query).show()

+-------+------------------+
|   city|      avarage_temp|
+-------+------------------+
|ABILENE|16.892499994154758|
|  AKRON| 9.460787474547391|
+-------+------------------+



In [70]:
query = """
   select t.avg_temp, t.city, fi.country
   from dim_temperature t
   join 
   city_code cc
   join fact_immigration fi
   on t.city = cc.city and cc.code = fi.city_code
"""
spark.sql(query).show()

+--------+----+-------+
|avg_temp|city|country|
+--------+----+-------+
+--------+----+-------+



In [71]:
query = """
   select cc.city, fi.visa
   from city_code cc
   join fact_immigration fi
   on  cc.code = fi.city_code
   where fi.city_code='AKR'
"""
spark.sql(query).show()

+----+----+
|city|visa|
+----+----+
+----+----+



#### 4.3 Data dictionary

[data_dictionary.ipynb](data_dictionary.ipynb).


### Step 5: Complete Project Write Up

#### Tools and Technologies

1. _AWS S3_ for data storage
2. _PySpark_ for sample data set exploratory data analysis & large data set data processing to transform staging table to dimensional table

#### Data Update Frequency

1. Immigration and temperature tables need to be updated every month since the raw data set is built up monthly.
2. Tables created from demography data set could be updated every year since demography data collection takes time and high frequent demography might take high cost but generate wrong conclusion.
3. All tables need to be updated in an append-only mode.

#### Future Design Considerations

1. The data was increased by 100x.

    In case of Spark with standalone server mode can not process 100x data set, we could consider to put data in

    - [AWS EMR](https://aws.amazon.com/emr/)
    - [Azure Data Lake Storage Gen2](https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction)
    - [Snowflake](https://www.snowflake.com/en/)

    Each one of them is based on the idea of distributed data cluster for processing large data sets on cloud.

---

2. The data populates a dashboard that must be updated on a daily basis by 7am every day.

    [Apache Airflow](https://airflow.apache.org) could be used for building up a ETL data pipeline to regularly update the date and populate a report. Apache Airflow also is integrated with Python and many programming languages very well.

---

3. The database needed to be accessed by 100+ people.

    we could use AWS Redshift, Google Cloud BigQuery or Snowflake
