# Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create a single-source-of-truth date warehouse that can serve as the foundation for creating logical data marts for analytics purposes on I94 immigration data, city temperatures data and U.S. demographics data.    

The project contains the following steps:
1. Scope the Project and Data
2. Explore and Assess the Data
3. Define the Data Model
4. Run ETL to Model the Data
5. Complete Project Write Up

In [1]:
import pandas as pd
import pyspark
import os

## 1. Project Scope and Data
---

#### Scope 
In order to create a single-source-of-truth data warehouse, the following steps are executed:

* Load dataset into Spark dataframes
* Exploratory data analysis of I94 immigration dataset to identify missing values, empty records, etc, informing the data preprocessing step downstream
* Exploratory data analysis of U.S. city demographics dataset to identify missing values, empty records, etc, informing the data preprocessing step downstream 
* Exploratory data analysis of world temperatures dataset to identify missing values, empty records, etc, informing the data preprocessing step downstream 
* Execute data proprocessing tasks for all datasets
* Create immigration fact table from preprocessed I94 immigration dataset 
* Create dimension tables:
    * Create immigrant demographics dimension table from preprocessed I94 immigration dataset. Relates to immigration fact table by `cic_id` (unique record id) 
    * Create us city demographics dimension table from U.S. city demographics dataset. Relates to immigration fact table by `state_code`
    * Create world temperature dimension table from preprocessed world temperature dataset. Relates to immigration fact table by composite key `city_name`
    * Create country dimension table from `i94cit_i94res` data in the I94_SAS_Labels_Descriptions.SAS file
    * Create city dimension table from `dim_i94port` data in the I94_SAS_Labels_Descriptions.SAS file
    * Create state dimension table from `dim_i94addr` data in I94_SAS_Labels_Descriptions.SAS file
    
##### Datasets:

| Data Set | Format  | Description |
|  :-     |  :-    |  :-        |
|[I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office)| SAS | Dataset contains international visitor arrival statistics by world regions, mode of transportation, port of entry, demographics, visa type, etc.|
|[World Temperature Data](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data)| CSV | Dataset contains monthly average temperatures by city.|
|[U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)| CSV | Dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.|

##### Tech Stack:
We've made use of the followng technologies in this project: 
- [AWS S3](https://aws.amazon.com/s3/): data storage
- Apache Spark ([PySpark](https://spark.apache.org/docs/latest/api/python/#:~:text=PySpark%20is%20an%20interface%20for,data%20in%20a%20distributed%20environment.)): for reading data from the source (e.g. customer systems / internal systems etc), preprocessing the data and creates fact and dimension tables, and writing the data into fact and dimension tabls on S3.

### I94 Immigration Data
---

##### Read I94 Immigration data

In [2]:
# Read in the data here
df_immi_data = pd.read_csv("immigration_data_sample.csv")

In [3]:
df_immi_data.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [None]:
# write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

##### Data dictionary

| Field Name | Description |
|  :-      |  :-        |
| cicid    | Unique record ID |
|i94yr     | 4  digit year|
|i94mon| Numeric month |
|i94cit|3 digit code for immigrant country of birth|
|i94res|3 digit code for immigrant country of residence|
|i94port|Port of admission|
|arrdate|Arrival Date in the USA|
|i94mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)|
|i94addr|USA State of arrival|
|depdate|Departure Date from the USA|
|i94bir|Age of Respondent in Years|
|i94visa|Visa codes collapsed into three categories|
|count|Field used for summary statistics|
|dtadfile|Character Date Field - Date added to I-94 Files|
|visapost|Department of State where where Visa was issued|
|occup|Occupation that will be performed in U.S|
|entdepa|Arrival Flag - admitted or paroled into the U.S.|
|entdepd|Departure Flag - Departed, lost I-94 or is deceased|
|entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence|
|matflag|Match flag - Match of arrival and departure records|
|biryear|4 digit year of birth|
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|
|gender|Non-immigrant sex|
|insnum|INS number|
|airline|Airline used to arrive in U.S.|
|admnum|Admission Number|
|fltno|Flight number of Airline used to arrive in U.S.|
|visatype|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|

### World Temperature Data
---

##### Read World Temperature data

In [5]:
file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
df_world_temp_data = pd.read_csv(file_name)
df_world_temp_data.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
f"Number of data points: {df_world_temp_data.shape[0]}"

'Number of data points: 8599212'

In [7]:
df_world_temp_data.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### Data dictionary
| Field Name | Description |
|  :-      |  :-        |
|dt|Date|
|AverageTemperature|Global average city temperature in celsius|
|AverageTemperatureUncertainty|95% confidence interval around the average|
|City|Name of city|
|Country|Name of country|
|Latitude|City latitude|
|Longitude|City longitude|

### U.S. City Demographic data
---

##### Read U.S. City Demographic data

In [8]:
file_name = "us-cities-demographics.csv"
df_dmg_data = pd.read_csv(file_name, sep=';')
df_dmg_data.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


##### Data dictionary
| Field Name | Description |
|  :-      |  :-        |
|City|City Name|
|State|US State where city is located|
|Median Age|Median age of the population|
|Male Population|Count of male population|
|Female Population|Count of female population|
|Total Population|Count of total population|
|Number of Veterans|Count of total Veterans|
|Foreign born|Count of residents of the city that were not born in the city|
|Average Household Size|Average city household size|
|State Code|Code of the US state|
|Race|Respondent race|
|Count|Count of city's individual per race|

In [9]:
f"Number of data points: {df_dmg_data.shape[0]}"

'Number of data points: 2891'

### Airport Code data
---

##### Read Airport Code data

In [2]:
file_name = "airport-codes_csv.csv"
df_ac_data = pd.read_csv(file_name)
df_ac_data.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


##### Data dictionary
| Field Name | Description |
|  :-      |  :-        |
|ident| Unique ID |
|type |Type of airport|
|name |Airport name|
|elevation_ft |Airport elevation in feet|
|Continent | Continent|
|iso_country| ISO country code|
|iso_region|ISO region code|
|municipality|Municipality name|
|gps_code | GPS code|
|iata_code|Three-character alphanumeric geocode designating airport |
|local_code| Local code|
|coordinates| Airport Longitude and Latitude|

In [3]:
f"Number of data points: {df_ac_data.shape[0]}"

'Number of data points: 55075'

In [8]:
df_ac_data.loc[df_ac_data['iso_country'] == 'US'].shape

(22757, 12)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
174,02PR,small_airport,Cuylers Airport,15.0,,PR,PR-U-A,Vega Baja,02PR,,02PR,"-66.36689758300781, 18.45330047607422"
223,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"
1111,0TT8,heliport,Dynasty Heliport,150.0,OC,MP,MP-U-A,"San Jose, Tinian",0TT8,,0TT8,"145.64199829101562, 14.963299751281738"
1360,12PR,heliport,Villamil-304 Ponce De Leon Heliport,148.0,,PR,PR-U-A,San Juan,12PR,,12PR,"-66.05699920654297, 18.42259979248047"
1484,14PR,heliport,Emp. Coco Beach Golf Club LLC Heliport,11.0,,PR,PR-U-A,Rio Grande,14PR,,14PR,"-65.798751, 18.407301"


### Step 2: Explore and Assess the Data

#### Exploratory data analysis to identify any data quality issues such as missing values, duplicate data etc

### I94 Immigration Data
---

In [19]:
# list all files in the customer repository
files = os.listdir('../../data/18-83510-I94-Data-2016/')
files

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [20]:
# Read in the data for April 2016
file_name = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immi_data = pd.read_sas(file_name, 'sas7bdat', encoding="ISO-8859-1")

In [21]:
df_immi_data.shape

(3096313, 28)

In [22]:
df_immi_data.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


##### Find columns with more than 85% missing values

In [14]:
def count_column_null_values(df):
    nulls_dict = df.isnull().sum().to_dict()  
    return nulls_dict

def find_dropping_feaures(nulls_dict, count_rows):
    nulls_df = pd.DataFrame(list(nulls_dict.items()), columns=['Field Name', 'Count'])
    nulls_df['% missing values'] = 100*nulls_df['Count']/count_rows
    return nulls_df[nulls_df['% missing values']>85]    

In [23]:
# Note: this cell takes a long time to run
count_rows = 0
nulls_dict = {k: 0 for k in df_immi_data.columns}

for f in files[:1]:
    file_name = f'../../data/18-83510-I94-Data-2016/{f}'
    
    # Read data
    df_immi_data = pd.read_sas(file_name, 'sas7bdat', encoding="ISO-8859-1")
    
    # Add row count to existing counter
    count_rows = count_rows + df_immi_data.shape[0]
    
    # Count null values
    nulls_dict_tmp = count_column_null_values(df_immi_data)
    
    # Add null values to existing counter
    nulls_dict = {k: nulls_dict.get(k, 0) + nulls_dict_tmp.get(k, 0) for k in set(nulls_dict)}

In [24]:
df_drop_immigration_cols = find_dropping_feaures(nulls_dict, count_rows)
df_drop_immigration_cols

Unnamed: 0,Field Name,Count,% missing values
5,occup,3088187,99.737559
8,entdepu,3095921,99.98734
25,insnum,2982605,96.327632


##### Check for empty rows

In [25]:
df_immi_data = df_immi_data.dropna(how="all")
df_immi_data.shape

(3096313, 28)

##### Conclusion: 

- Columns ```occup```, ```entdepu```, ```insum``` have more than 85% of their values missing
- No empty rows

### World Temperature data
---

In [12]:
df_world_temp_data.shape

(8599212, 7)

##### Find columns with more than 85% missing values

In [13]:
dict_nulls_temperature_data = count_column_null_values(df_world_temp_data)
df_drop_temperature_cols = find_dropping_feaures(dict_nulls_temperature_data, df_world_temp_data.shape[0])
df_drop_temperature_cols

Unnamed: 0,Field Name,Count,% missing values


##### Check for missing values in average temperature column and dupliate rows

In [14]:
def preprocess_temperature_data(df):
    """Preprocess world temperature dataset to remove rows with missing values and duplicates
    
    :param df: pandas dataframe with world temperature data
    :return: processed dataframe
    """
    # Remove rows with missing average temperature
    df = df.dropna(subset=['AverageTemperature'])
    
    # Remove duplicate rows on date, city and country
    df = df.drop_duplicates(subset=['dt', 'City', 'Country'])
    
    # Remove empty rows
    df.dropna(how="all")
    
    return df

In [16]:
df_world_temp_data_clean = preprocess_temperature_data(df_world_temp_data)
df_world_temp_data_clean.shape

(8190783, 7)

##### Conclusion:
- No columns with substantial missing values
- Duplicate rows and / or rows with missing average temperature values

### U.S. City Demographic data
---

In [114]:
df_dmg_data.shape

(2891, 12)

##### Find columns with more than 85% missing values

In [115]:
dict_nulls_demo_data = count_column_null_values(df_dmg_data)
df_drop_demo_cols = find_dropping_feaures(dict_nulls_demo_data, df_dmg_data.shape[0])
df_drop_demo_cols

Unnamed: 0,Field Name,Count,% missing values


In [116]:
def preprocess_demographics_data(df):
    """Preprocess US demographics dataset to remove rows with missing values and duplicates
    
    :param df: pandas dataframe with us demograpgics data
    :return: processed dataframe
    """
    # Remove duplicate rows on city, state and race
    df = df.drop_duplicates(subset=['City', 'State', 'Race'])
    
    # Remove empty rows
    df.dropna(how="all")
    
    return df

In [47]:
df_dmg_data_clean = preprocess_demographics_data(df_dmg_data)
df_dmg_data_clean.shape

(2891, 12)

##### Conclusion:
- No columns with substantial missing values
- No empty rows or duplicate rows by city, state and race

### Airport Code data
---

In [12]:
df_ac_data.shape

(55075, 12)

##### Find columns with more than 85% missing values

In [16]:
dict_nulls_aiport_data = count_column_null_values(df_ac_data)
df_drop_aiport_cols = find_dropping_feaures(dict_nulls_aiport_data, df_ac_data.shape[0])
df_drop_aiport_cols

Unnamed: 0,Field Name,Count,% missing values


In [17]:
def preprocess_airport_data(df):
    """Preprocess airport dataset to remove rows with missing values and duplicates
    
    :param df: pandas dataframe with airport data
    :return: processed dataframe
    """
    # Remove duplicate rows
    df = df.drop_duplicates()
    
    # Remove empty rows
    df.dropna(how="all")
    
    return df

In [19]:
df_ac_data_clean = preprocess_airport_data(df_ac_data)
df_ac_data_clean.shape

(55075, 12)

##### Conclusion:
- No columns with substantial missing values
- No empty rows or duplicate rows

#### 2.2 Cleaning Steps

The following cleaning steps are to be performed before definition of the data model and mapping out of the data pipeline: 

- I94 Immigration data: remove columns ```occup```, ```entdepu```, ```insum``` due to significant (> 85%) of missing values
- World Temperature data: remove duplicate rows and rows with missing average temperature values

## 3. Define the Data Model
#### 3.1 Conceptual Data Model
The conceptual data model for our single-source-of-truth datawarehouse looks as follows:

<img src="erd_data_warehouse.png" alt="Conceptual model" width="1500" height="1500" />

#### 3.2 Mapping Out Data Pipelines
The data pipeline is as follows:

1. Load datasets stored in S3 buckets into Spark dataframes:
    - [Source_S3_Bucket]/immigration_data/18-83510-I94-Data-2016/*.sas7bdat
    - [Source_S3_Bucket]/I94_SAS_labels_data/I94_SAS_Labels_Descriptions.SAS
    - [Source_S3_Bucket]/temperature_data/GlobalLandTemperaturesByCity.csv
    - [Source_S3_Bucket]/us_demographics_data/us-cities-demographics.csv
    

2. Create helper dimension tables from I94_SAS_Labels_Descriptinons.SAS file
    - Create country dimension table from `i94cit_i94res` data in the I94_SAS_Labels_Descriptions.SAS file
    - Create city dimension table from `dim_i94port` data in the I94_SAS_Labels_Descriptions.SAS file
    - Create state dimension table from `dim_i94addr` data in I94_SAS_Labels_Descriptions.SAS file

3. Preprocess I94 Immigration data
4. Create I94 Immigration fact table - `fact_immigration` - from preprocessed I94 Immigration data  
5. Create I94 Immigration demographics dimension table - `dim_immigrant_demographics` - from preprocessed I94 Immigration data 
6. Create U.S. City Demographic dimension table - `dim_city_demographics` - from U.S. City Demographic data
7. Preprocess World Temperature data
8. Create World Temperature dimension table - `dim_city_temperature` - from preprocessed World Temperature data 

Create `dim_country` table from `i94cit_i94res` data in I94_SAS_Labels_Descriptinons.SAS file

In [10]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [11]:
i94cit_i94res = {}
for countries in contents[9:298]:
    pair = countries.split('=')
    country_code, country_name = pair[0].strip(), pair[1].strip().strip("'")
    i94cit_i94res[country_code] = country_name

In [12]:
df_i94cit_i94res = pd.DataFrame(list(i94cit_i94res.items()), columns=['country_code', 'country_name'])
df_i94cit_i94res.head(5)

Unnamed: 0,country_code,country_name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


Create `dim_city` table from `dim_i94port` data in I94_SAS_Labels_Descriptinons.SAS file

In [13]:
i94port = {}
for cities in contents[302:962]:
    pair = cities.split('=')
    city_code, city_name = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
    i94port[city_code] = city_name

In [14]:
df_i94port = pd.DataFrame(list(i94port.items()), columns=['city_code', 'city_name'])

In [15]:
df_i94port[['city_name', 'state_code']] = df_i94port['city_name'].str.split(',', 1, expand=True)
df_i94port['city_name'] = df_i94port['city_name'].str.title()
df_i94port.head()

Unnamed: 0,city_code,city_name,state_code
0,ALC,Alcan,AK
1,ANC,Anchorage,AK
2,BAR,Baker Aaf - Baker Island,AK
3,DAC,Daltons Cache,AK
4,PIZ,Dew Station Pt Lay Dew,AK


Create `dim_state` table from `dim_i94addr` data in I94_SAS_Labels_Descriptinons.SAS file

In [16]:
i94addr = {}
for states in contents[981:1036]:
    pair = states.split('=')
    state_code, state_name = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    i94addr[state_code] = state_name.title()

In [17]:
df_i94addr = pd.DataFrame(list(i94addr.items()), columns=['state_code', 'state_name'])
df_i94addr.head()

Unnamed: 0,state_code,state_name
0,AL,Alabama
1,AK,Alaska
2,AZ,Arizona
3,AR,Arkansas
4,CA,California


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Create demographics dimension table
---

In [None]:
def create_city_demographics_dimension_table(df, output_data):
    """Creates a us city demographics dimension table from the U.S. City Demographic dataset.
    
    :param df: spark dataframe of us city demographics data
    :param output_data: write path
    :return: spark dataframe with demographics data
    """
    dim_df = df.withColumnRenamed('City', 'city_code') \
            .withColumnRenamed('State Code', 'state_code') \
            .withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born_num') \
            .withColumnRenamed('Average Household Size', 'avg_household_size') \
            .withColumnRenamed('Race', 'race') \
            .withColumnRenamed('Count', 'count')

    dim_df = dim_df.withColumn('id', monotonically_increasing_id())
    
    # write dimension to parquet file
    dim_df.write.mode("overwrite").parquet(path=output_data + "dim_city_demographics")
    
    return dim_df

In [None]:
demographics_dim_df = create_city_demographics_dimension_table(new_demographics_df, output_data)
demographics_dim_df.limit(5).toPandas()

##### Create Immigration fact table
---

In [None]:
def create_immigration_fact_table(df, output_data):
    """Creates an immigration fact table from  I94 Immigration data.
    
    :param df: spark dataframe of immigration data
    :param output_data: write path
    :return: spark dataframe with immigration fact data
    """    
    # UDF to convert arrival date in SAS format to datetime object
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    fact_df = fact_df.withColumnRenamed('cicid','cic_id') \
            .withColumnRenamed('year','i94yr') \
            .withColumnRenamed('month','i94mon') \
            .withColumnRenamed('city_code','i94port') \
            .withColumnRenamed('state_code','i94addr') \
            .withColumnRenamed('arrival_date', 'arrdate') \
            .withColumnRenamed('departure_data', 'depdate') \
            .withColumnRenamed('mode', 'i94mode') \
            .withColumnRenamed('visa', 'i94visa') \
            .withColumnRenamed('visa_type', 'visatype')
    
    # convert dates into datetime objects
    fact_df = fact_df.withColumn("arrival_date", get_datetime(df.arrival_date))
    fact_df = fact_df.withColumn("departure_data", get_datetime(df.departure_data))
    
    fact_df = dim_df.withColumn('immigration_id', monotonically_increasing_id())
    
    # write fact table to parquet file partioned by state
    fact_df.write.mode("overwrote").partitionBy('state_code').parquet(path=output_data + "fact_immigration")
    
    return df

In [None]:
immigration_fact_df = create_immigration_fact_table(new_immigration_df, output_data)
immigration_fact_df.limit(5).toPandas()

##### Create Immigrant Demographics dimension table
---

In [None]:
def create_immi_demographics_dim_table(df, output_data):
    """Creates an immigrant demographics dim table from  I94 Immigration data.
    
    :param df: spark dataframe of immigration data
    :param output_data: write path
    :return: spark dataframe with immigrant demographics fact data
    """    
    dim_df = dim_df.withColumnRenamed('cicid','cic_id') \
            .withColumnRenamed('country_of_birth','i94cit') \
            .withColumnRenamed('country_of_residence','i94res') \
            .withColumnRenamed('year_of_birth','biryear') \
            .withColumnRenamed('insnum', 'ins_num')
    
    dim_df = dim_df.withColumn('immi_demographics_id', monotonically_increasing_id())
    
    # write dimension to parquet file
    dim_df.write.mode("overwrite").parquet(path=output_data + "dim_immigrant_demographics")
    
    return df

In [None]:
immi_demographics_dim_df = create_immi_demographics_dim_table(new_immi_demograpgics_df, output_data)
immi_demographics_dim_df.limit(5).toPandas()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def run_quality_checks(df, table_name):
    """Check for non-empty fact and dimension tables.
    :param df: spark dataframe
    :param table_name: table name
    """
    total_count = df.count()

    if total_count == 0:
        print(f"Data quality check failed for {table_name} with zero records!")
    else:
        print(f"Data quality check passed for {table_name} with {total_count:,} records.")
    return 0

In [None]:
tables_dict = {
    'immigration_fact': immigration_fact_df,
    'immigrant_demographics_dim': immi_demographics_dim_df,
    'demographics_dim': demographics_dim_df
}

for table_name, table_df in tables_dict.items():
    run_quality_checks(table_df, table_name)

#### 4.3 Data Dictionary of the Data Model 


<img src="data_dictionary.png" width="1500" height="1500" />

#### 5. Project Write Up

##### 5.1 The rationale for the chosen tools and technologies
* [AWS S3](https://aws.amazon.com/s3/) for data storage.
* Apache Spark ([PySpark](https://spark.apache.org/docs/latest/api/python/#:~:text=PySpark%20is%20an%20interface%20for,data%20in%20a%20distributed%20environment.)) processing the data and creating fact and dimension tables.

##### 5.2 Data update frequency
* The immigration fact and immigrant demographics dimension table, and temperature table should be updated on a monthly schedule as the raw data is aggregated on a monthly time period.
* The US city demographics table can be updated depending on the refresh time period of the raw data, which, given how involved it is to update census data, probably annually.

##### 5.3 Future work
5.3.1 The data was increased by 100x
* It seems unlikely that a 100x increase in the data size would be efficiently processes by Apache Spark in standalone server mode and a cloud big data plaform for running large-scale distributed processing jobs such as [Amazon EMR](https://aws.amazon.com/emr/) should be considered to scale.

5.3.2 The data populates a dashboard that must be updated on a daily basis by 7am every day.
* [Apache Airflow](https://airflow.apache.org/) can be used for building out an ETL data pipeline that automates the tasks of processing fresh data and updating the dashboard on a daily basis by 7am.   
 
5.3.3 The database needed to be accessed by 100+ people.
* In this scenario we would move our single-source-of-truth database to a cloud dataware house such as [Amazon Redshift](https://aws.amazon.com/redshift/).