# U.S. Immigration Data Project
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Project Scope
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### STEP 1: Project Scope

###### This capstone project involves using the project provided by Udacity to create a schema that enables analysis and trend identification from the datasets. The datasets can be utilized to extract insights such as identifying immigration trends based on the country of residence or analyzing peak seasons when immigrants travel the most, depending on the type of visa, among other potential applications.

-- Datasets in consideration: 
* (1) i94_apr16_sub.sas7bdat (SAS dataset for immigration)
* (2) us-cities-demographics (.csv)
* (3) Port_data (.csv)
* (4) Country_codes (.csv)

* To create the database, the following steps will be taken:

    - Firstly, data will be loaded from the above sources to dataframes using Spark.
    - Then, columns that require conversion from the source will be identified and transformed.
    - An exploratory data analysis will be conducted to identify columns with significant amounts of missing and duplicate data.
    - Dimension tables will be created using the following data:
      - visa_dim, i94_mode_dim, status_dim, status_lookup_dim, and date_dim will be constructed from the immigration data.
      - demog_bystate_dim will be created from the demographics data.
      - i94_port_dim will be derived from the port data.
      - country_dim will be derived from the country data.
    - Fact table will be created from the immigration dataset, visa_dim, and status_dim.
    
    
* Choice of Technologies used in the project: Apache Spark, Amazon S3

A detailed explanation of the project research and implementation steps is contained in this notebook, while the 'etl.py' script contains the actual code.

In [1]:
! pip install -U numpy
# ! pip install missingno

Collecting numpy
[?25l  Downloading https://files.pythonhosted.org/packages/45/b2/6c7545bb7a38754d63048c7696804a0d947328125d81bf12beaa692c3ae3/numpy-1.19.5-cp36-cp36m-manylinux1_x86_64.whl (13.4MB)
[K    100% |████████████████████████████████| 13.4MB 2.7MB/s eta 0:00:01   45% |██████████████▋                 | 6.1MB 25.2MB/s eta 0:00:01    65% |█████████████████████           | 8.8MB 27.7MB/s eta 0:00:01
[31mtensorflow 1.3.0 requires tensorflow-tensorboard<0.2.0,>=0.1.0, which is not installed.[0m
[?25hInstalling collected packages: numpy
  Found existing installation: numpy 1.12.1
    Uninstalling numpy-1.12.1:
      Successfully uninstalled numpy-1.12.1
Successfully installed numpy-1.19.5


In [2]:
# Do all the necesarry imports and installs here

import pandas as pd
import numpy as np
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import configparser
import pyspark.sql.functions as F
from pyspark.sql import types as T
from datetime import datetime, timedelta, date
import datetime as dt
import boto3  #to copy files to S3
import create_tables as create_tables
import Utilities as util

try:
    %load_ext autotime
except:
    !pip install ipython-autotime
    %load_ext autotime
    
pd.set_option('display.max_colwidth',-1, 'display.max_rows',200, 'display.max_columns',None)

Collecting ipython-autotime
  Downloading https://files.pythonhosted.org/packages/b4/c9/b413a24f759641bc27ef98c144b590023c8038dfb8a3f09e713e9dff12c1/ipython_autotime-0.3.1-py2.py3-none-any.whl
Installing collected packages: ipython-autotime
Successfully installed ipython-autotime-0.3.1
time: 16.4 ms (started: 2023-04-05 01:19:16 +00:00)


In [3]:
#Initializing spark session
# util.create_spark_session()

spark = SparkSession\
.builder\
.config("spark.jars.repositories", "https://repos.spark-packages.org/")\
.config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate() 

time: 13.4 s (started: 2023-04-05 01:19:16 +00:00)


In [4]:
#(1): Reading immigration data

path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# immigration_spark_df = spark.read.format('com.github.saurfang.sas.spark').load(path)

# Reading 5000 rows of data from immigration
immigration_spark_df = spark.read.format('com.github.saurfang.sas.spark').load(path)
# \.limit(5000)

print("type: {}".format(type(immigration_spark_df)), "\n")

print("Immigration Schema:", "\n")
print(immigration_spark_df.printSchema())

type: <class 'pyspark.sql.dataframe.DataFrame'> 

Immigration Schema: 

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)

In [5]:
#total row count
print(f'total row counts in immigration: {immigration_spark_df.count()}', "\n")

total row counts in immigration: 3096313 

time: 42.4 s (started: 2023-04-05 01:19:32 +00:00)


In [6]:
immigration_spark_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


time: 673 ms (started: 2023-04-05 01:20:15 +00:00)


In [7]:
#(2): Reading demographics data
path = 'us-cities-demographics.csv'
demographics_spark_df = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(path)

time: 2.07 s (started: 2023-04-05 01:20:15 +00:00)


In [8]:
print("type: {}".format(type(demographics_spark_df)))

print("Demographics Schema:" , "\n")
print(demographics_spark_df.printSchema())

type: <class 'pyspark.sql.dataframe.DataFrame'>
Demographics Schema: 

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)

None
time: 13 ms (started: 2023-04-05 01:20:17 +00:00)


In [9]:
demographics_spark_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


time: 337 ms (started: 2023-04-05 01:20:17 +00:00)


In [10]:
#(3): Reading i94 port data
path = 'Port_data.txt'
schema = T.StructType([T.StructField("port_code", T.StringType(), True),\
                       T.StructField("port", T.StringType(), True)])

i94port_spark_df = spark.read.format('csv').option("delimiter", "=").schema(schema).load(path)
i94port_spark_df.show(5, truncate = False)

+---------+-----------------------------+
|port_code|port                         |
+---------+-----------------------------+
|ALC'     |ALCAN, AK '                  |
|ANC'     |ANCHORAGE, AK '              |
|BAR'     |BAKER AAF - BAKER ISLAND, AK'|
|DAC'     |DALTONS CACHE, AK '          |
|PIZ'     |DEW STATION PT LAY DEW, AK'  |
+---------+-----------------------------+
only showing top 5 rows

time: 289 ms (started: 2023-04-05 01:20:18 +00:00)


In [11]:
#(4): Reading Country data
path = 'Country_codes.csv'
schema = T.StructType([T.StructField("country_code", T.StringType(), True),\
                       T.StructField("country_name", T.StringType(), True)])

country_spark_df = spark.read.format('csv').option("delimiter", "=").schema(schema).load(path)
country_spark_df.show(5, truncate = False)

+------------+-------------------------------------------------------------+
|country_code|country_name                                                 |
+------------+-------------------------------------------------------------+
|582         |  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'|
|236         |  'AFGHANISTAN'                                              |
|101         |  'ALBANIA'                                                  |
|316         |  'ALGERIA'                                                  |
|102         |  'ANDORRA'                                                  |
+------------+-------------------------------------------------------------+
only showing top 5 rows

time: 190 ms (started: 2023-04-05 01:20:18 +00:00)


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### (1): Cleaning - immigration

In [12]:
immigration_spark_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


time: 276 ms (started: 2023-04-05 01:20:18 +00:00)



* From the above data sample, we see that 'arrdate' & 'depdate' are in SAS date format and need to be transformed to the appropriate format. 'dtadfile' and 'dtaddto' are imported as strings and need to be converted as well

In [13]:
# Converting SAS dates to date format (arrdate & depdate)
immigration = util.clean_SAS_dates(immigration_spark_df)

time: 582 ms (started: 2023-04-05 01:20:19 +00:00)


In [14]:
# Convert dates from string format to date (dtadfile & dtaddto)
immigration = util.string_to_date_conv(immigration)

time: 853 ms (started: 2023-04-05 01:20:19 +00:00)


In [15]:
immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,dt_arrival,dt_departure,dt_add_tofile,dt_stay_until
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2,2016-04-29,,,2016-10-28
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1,2016-04-07,,2013-08-11,
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2,2016-04-01,2016-08-25,2016-04-01,2016-09-30
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2,2016-04-01,2016-04-23,2016-04-01,2016-09-30
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2,2016-04-01,2016-04-23,2016-04-01,2016-09-30


time: 2.36 s (started: 2023-04-05 01:20:20 +00:00)


* checking and validating for garbage values in dates below

In [16]:
# Validate 
vals = ['183', '10 02003', 'D/S', '06 02002', '/   183D', '12319999'] 

immigration.select(['dtaddto','dt_stay_until']).filter(F.col('dtaddto').isin(vals)).distinct().show()

+--------+-------------+
| dtaddto|dt_stay_until|
+--------+-------------+
|     183|         null|
|10 02003|         null|
|     D/S|         null|
|06 02002|         null|
|/   183D|         null|
|12319999|         null|
+--------+-------------+

time: 42.7 s (started: 2023-04-05 01:20:22 +00:00)


In [17]:
immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

* renaming columns to appropriate values

In [18]:
#Handling data types and renaming columns 
immigration_pre = util.immig_dtypes(immigration)
immigration_pre.printSchema()

root
 |-- count: double (nullable = true)
 |-- occup: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- dt_arrival: date (nullable = true)
 |-- dt_departure: date (nullable = true)
 |-- dt_add_tofile: date (nullable = true)
 |-- dt_stay_until: date (nullable = true)
 |-- i94port_code: string (nullable = true)
 |-- arrival_state_code: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- update_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)
 |-- flight_no: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- cicid: integer (nullable = true)
 |-- entry_year: integer (nullable = true)
 |-- entry_month: integer (nullable = true)
 |-- country_of_birth: integer (nullable = true)
 |-- country_of_residence: integer (nullable = true)
 |-- i94_mode: integer (nullable = true)
 |-- age: integer (

In [19]:
#total row count
print(f'total row counts in immigration: {immigration_pre.count()}', "\n")

total row counts in immigration: 3096313 

time: 36 s (started: 2023-04-05 01:21:06 +00:00)


* checking for missing values across all columns in the immigration dataset

In [20]:
# null value counts by column 
nulls = util.null_val_bycol(immigration_pre)
nulls.limit(2).toPandas()
# heavy on missing values and will be dropped from the fact table : occup, insnum, update_flag(entdepu) 

Unnamed: 0,count,occup,insnum,gender,airline,dt_arrival,dt_departure,dt_add_tofile,dt_stay_until,i94port_code,arrival_state_code,arrival_flag,departure_flag,update_flag,match_flag,flight_no,visa_type,cicid,entry_year,entry_month,country_of_birth,country_of_residence,i94_mode,age,visa_cat_code,visa_issued_state,birth_year,admission_num
0,0,3088187,2982605,414269,83627,0,142457,1,45826,0,152592,238,138429,3095921,138429,19549,0,0,0,0,0,0,239,802,0,3095709,802,0


time: 5min 19s (started: 2023-04-05 01:21:42 +00:00)


* checking for unique values across all columns 

In [21]:
# count distinct values by column 
unique = util.unique_val_bycol(immigration_pre)

unique.limit(2).toPandas()

Unnamed: 0,count,occup,insnum,gender,airline,dt_arrival,dt_departure,dt_add_tofile,dt_stay_until,i94port_code,arrival_state_code,arrival_flag,departure_flag,update_flag,match_flag,flight_no,visa_type,cicid,entry_year,entry_month,country_of_birth,country_of_residence,i94_mode,age,visa_cat_code,visa_issued_state,birth_year,admission_num
0,1,111,1913,4,534,30,235,117,771,299,457,13,12,2,1,7152,17,3096313,1,1,243,229,4,112,3,1,112,3075579


time: 3min 57s (started: 2023-04-05 01:27:01 +00:00)


* dropping duplicates by 'admission_num' from the df

In [22]:
immigration_cleaned = immigration_pre.dropDuplicates(['admission_num'])

immigration_cleaned.count()

3075579

time: 41.4 s (started: 2023-04-05 01:30:59 +00:00)


In [23]:
immigration_cleaned.limit(5).toPandas()

Unnamed: 0,count,occup,insnum,gender,airline,dt_arrival,dt_departure,dt_add_tofile,dt_stay_until,i94port_code,arrival_state_code,arrival_flag,departure_flag,update_flag,match_flag,flight_no,visa_type,cicid,entry_year,entry_month,country_of_birth,country_of_residence,i94_mode,age,visa_cat_code,visa_issued_state,birth_year,admission_num
0,1.0,,,F,DL,2016-04-01,2016-05-09,2016-04-01,,DET,MI,U,O,,M,158,F2,74750,2016,4,254,276,1,35,3,,1981,209395185
1,1.0,,,M,,2016-04-13,2016-04-24,2016-04-13,2018-01-08,OTM,,Z,O,,M,415,E2,2322334,2016,4,129,129,3,51,1,,1965,324673385
2,1.0,,,M,85J,2016-04-05,2016-04-06,2016-04-05,2016-07-08,LAR,AL,H,R,,M,0687C,B1,935651,2016,4,582,582,1,34,1,,1982,324770585
3,1.0,,,F,B6,2016-04-29,2016-05-22,2016-04-29,,NAS,MA,H,N,,M,00910,F1,5442902,2016,4,129,129,1,23,3,,1993,334434085
4,1.0,,3669.0,M,UA,2016-04-10,,2016-06-08,2016-05-24,AGA,GU,A,,,,00136,GMT,5945017,2016,4,252,209,1,69,2,,1947,543561533


time: 2min 54s (started: 2023-04-05 01:31:40 +00:00)


##### (2): Cleaning - demographics

In [24]:
demographics = util.col_names_conversion(demographics_spark_df)
demographics.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- number_of_veterans: string (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- average_household_size: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- count: string (nullable = true)

time: 83.8 ms (started: 2023-04-05 01:34:35 +00:00)


In [25]:
demographics.limit(5).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


time: 181 ms (started: 2023-04-05 01:34:35 +00:00)


* pivoting data pertaining to race from rows to columns and aggegating by state 

In [26]:
# Pivot Race from rows to columns across the DF
demographics_pivot = util.pivot_demog(demographics)

demographics_pivot.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- number_of_veterans: string (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- average_household_size: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- American Indian and Alaska Native: integer (nullable = false)
 |-- Asian: integer (nullable = false)
 |-- Black or African-American: integer (nullable = false)
 |-- Hispanic or Latino: integer (nullable = false)
 |-- White: integer (nullable = false)

time: 2.01 s (started: 2023-04-05 01:34:36 +00:00)


In [27]:
# Renaming columns and changing data types in dataframe
demographics_cleaned = util.demog_dtypes(demographics_pivot)
demographics_cleaned.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- native_pop: integer (nullable = false)
 |-- asian_pop: integer (nullable = false)
 |-- african_american_pop: integer (nullable = false)
 |-- hispanic_pop: integer (nullable = false)
 |-- white_pop: integer (nullable = false)
 |-- median_age: float (nullable = true)
 |-- male_pop: integer (nullable = true)
 |-- female_pop: integer (nullable = true)
 |-- total_pop: integer (nullable = true)
 |-- number_of_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: float (nullable = true)

time: 110 ms (started: 2023-04-05 01:34:38 +00:00)


* checking for missing values across demographics_cleaned

In [28]:
# null counts across the dataframe
nulls = util.null_val_bycol(demographics_cleaned)
nulls.limit(2).toPandas()

Unnamed: 0,city,state,state_code,native_pop,asian_pop,african_american_pop,hispanic_pop,white_pop,median_age,male_pop,female_pop,total_pop,number_of_veterans,foreign_born,average_household_size
0,0,0,0,0,0,0,0,0,0,1,1,0,7,7,8


time: 11.7 s (started: 2023-04-05 01:34:38 +00:00)


In [29]:
#total row count
print(f'total row counts in demographics: {demographics_cleaned.count()}', "\n")

total row counts in demographics: 596 

time: 5.65 s (started: 2023-04-05 01:34:49 +00:00)


##### (3):- Cleaning - i94port

In [30]:
i94port_cleaned = util.clean_i94port_names(i94port_spark_df)
i94port_cleaned.columns

['port_code', 'port', 'port_city', 'port_state']

time: 234 ms (started: 2023-04-05 01:34:55 +00:00)


* checking for missing values across port data

In [31]:
# null counts across the dataframe
nulls = util.null_val_bycol(i94port_cleaned)
nulls.limit(2).toPandas()

Unnamed: 0,port_code,port,port_city,port_state
0,0,0,0,76


time: 525 ms (started: 2023-04-05 01:34:55 +00:00)


In [32]:
#total row count
print(f'total row counts in i94port: {i94port_cleaned.count()}', "\n")

# count distinct values by column 
unique = util.unique_val_bycol(i94port_cleaned)
unique.limit(2).toPandas()

total row counts in i94port: 660 



Unnamed: 0,port_code,port,port_city,port_state
0,660,657,634,103


time: 2.17 s (started: 2023-04-05 01:34:56 +00:00)


##### (4):- Cleaning - Country

In [33]:
country_cleaned = util.col_names_conversion(country_spark_df)
country_cleaned.columns

['country_code', 'country_name']

time: 30.9 ms (started: 2023-04-05 01:34:58 +00:00)


In [34]:
# null counts across the dataframe
nulls = util.null_val_bycol(country_cleaned)
nulls.limit(2).toPandas()

Unnamed: 0,country_code,country_name
0,0,0


time: 357 ms (started: 2023-04-05 01:34:58 +00:00)


In [35]:
#total row count
print(f'total row counts in country {country_cleaned.count()}', "\n")

# count distinct values by column 
unique = util.unique_val_bycol(country_cleaned)
unique.limit(2).toPandas()

total row counts in country 289 



Unnamed: 0,country_code,country_name
0,289,287


time: 2.02 s (started: 2023-04-05 01:34:58 +00:00)


In [36]:
country_cleaned.limit(3).toPandas()

Unnamed: 0,country_code,country_name
0,582,"'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'"
1,236,'AFGHANISTAN'
2,101,'ALBANIA'


time: 117 ms (started: 2023-04-05 01:35:00 +00:00)


### Step 3: Define the Data Model

3.1- Data Model
* Fact table: immigration_fact
* Dimension tables: visa_dim, i94_mode_dim, status_dim, status_lookup_dim, date_dim, demog_bystate_dim,  i94_port_dim, country_dim

    With the above design, users can analyze immigration patterns based not only on demographics but also on the overall population of the states. This enables users to derive trends and insights from a more comprehensive dataset, providing a more complete picture of the immigration landscape. 

    In addition to identifying immigration patterns based on visa category and country of residence, this approach can also help understand any seasonal peaks in immigration by visa type. By analyzing the data for seasonal trends and patterns, users can gain a deeper understanding of the drivers of immigration.
    This provides additional insights into the factors driving immigration, enabling more targeted and informed decision-making.

3.2- The following steps outline how to map data pipelines:
- Load the relevant datasets
- Perform any necessary cleaning and transformations on the data to prepare it for analysis.
- Create a series of dimension tables to provide context for the data in the pipeline, including tables for visa category, mode of transportation, immigration status, status lookups, dates, state demographics, port information, and country information.
- Create a fact table to store the key metrics and measures from the pipeline, linking it to the relevant dimension tables to provide additional context.
- Ensure that the necessary data connections are made between the fact table and the dimension tables to enable effective analysis and reporting.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

- Build the pipelines to create the data model

In [37]:
# Path where the output files are saved 
output_path = "data_outputs/"

time: 1.24 ms (started: 2023-04-05 01:35:01 +00:00)


##### 1.1- Create dimension table: - visa_dim : 

In [38]:
# Source - immigration_cleaned
visa_dim = create_tables.create_visa_dim(immigration_cleaned, output_path)

Exporting visa_dim to data_outputs/visa_dim
Parquet file is now available in: data_outputs/visa_dim
time: 55 s (started: 2023-04-05 01:35:01 +00:00)


In [39]:
# Reading data from output parquet 
visa_dim = spark.read.parquet("data_outputs/visa_dim")
visa_dim.printSchema()

root
 |-- visa_cat_code: integer (nullable = true)
 |-- visa_category: string (nullable = true)
 |-- visa_type: string (nullable = true)

time: 210 ms (started: 2023-04-05 01:35:56 +00:00)


In [40]:
visa_dim.limit(5).toPandas()

Unnamed: 0,visa_cat_code,visa_category,visa_type
0,2,Pleasure,CP
1,2,Pleasure,SBP
2,1,Business,GMB
3,2,Pleasure,GMT
4,2,Pleasure,CPL


time: 286 ms (started: 2023-04-05 01:35:56 +00:00)


##### 1.2- Create dimension table: - i94_mode_dim : 

In [41]:
# Source - immigration_cleaned
i94_mode_dim = create_tables.create_i94mode_dim(immigration_cleaned, output_path)

Exporting i94_mode_dim to data_outputs/i94_mode_dim
Parquet file is now available in: data_outputs/i94_mode_dim
time: 46 s (started: 2023-04-05 01:35:56 +00:00)


In [42]:
# Reading data from output parquet 
i94_mode_dim = spark.read.parquet("data_outputs/i94_mode_dim")
i94_mode_dim.printSchema()

root
 |-- i94_mode: integer (nullable = true)
 |-- transport_mode: string (nullable = true)

time: 132 ms (started: 2023-04-05 01:36:42 +00:00)


In [43]:
i94_mode_dim.distinct().toPandas()

Unnamed: 0,i94_mode,transport_mode
0,3.0,Land
1,,
2,1.0,Air
3,2.0,Sea
4,9.0,Not Reported


time: 1.14 s (started: 2023-04-05 01:36:42 +00:00)


##### 1.3- Create dimension table: - status_dim

In [44]:
# Source - immigration_cleaned
status_dim = create_tables.create_status_dim(immigration_cleaned, output_path)

Exporting status_dim to data_outputs/status_dim
Parquet file is now available in: data_outputs/status_dim
time: 56.3 s (started: 2023-04-05 01:36:43 +00:00)


In [45]:
status_dim.count()

99

time: 50.1 s (started: 2023-04-05 01:37:40 +00:00)


In [46]:
# Reading data from output parquet 
status_dim = spark.read.parquet("data_outputs/status_dim")
status_dim.printSchema()

root
 |-- status_flag_id: long (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)

time: 130 ms (started: 2023-04-05 01:38:30 +00:00)


##### 1.3.a- , Create dimension table: - status_lookup_dim : 

In [47]:
schema = T.StructType([T.StructField("status_flag", T.StringType(), True)\
                      ])

# Based on the values provided in data dictionary
data = [("G",), ("O",), ("R",), ("K",), ("N",), ("T",), ("Z",), (None,)]
                          
status_lookup = spark.createDataFrame(data = data, schema = schema)

time: 103 ms (started: 2023-04-05 01:38:30 +00:00)


In [48]:
status_lookup_dim = create_tables.create_status_lookup_dim(status_lookup,output_path)

Exporting status_lookup_dim to data_outputs/status_lookup_dim
Parquet file is now available in: data_outputs/status_lookup_dim
time: 3.37 s (started: 2023-04-05 01:38:30 +00:00)


In [49]:
# Reading data from output parquet 
status_lookup_dim = spark.read.parquet("data_outputs/status_lookup_dim")
status_lookup_dim.printSchema()

root
 |-- status_flag: string (nullable = true)
 |-- definition: string (nullable = true)

time: 88.1 ms (started: 2023-04-05 01:38:33 +00:00)


In [50]:
status_lookup_dim.limit(10).show(truncate = False)

+-----------+--------------------------+
|status_flag|definition                |
+-----------+--------------------------+
|Z          |Adjusted to perm residence|
|K          |Lost I 94 or is deceased  |
|G          |Admitted into US          |
|O          |Paroled into US           |
|N          |Apprehended               |
|T          |Overstayed                |
|R          |Departed                  |
|null       |null                      |
+-----------+--------------------------+

time: 168 ms (started: 2023-04-05 01:38:33 +00:00)


##### 1.4- Create date_dim table: - date_dim

In [51]:
# Source - immigration_cleaned
date_dim = create_tables.create_date_dim(immigration_cleaned, output_path)

Exporting date_dim to data_outputs/date_dim
Parquet file is now available in: data_outputs/date_dim
time: 1min 5s (started: 2023-04-05 01:38:34 +00:00)


In [52]:
# Reading data from output parquet 
date_dim = spark.read.parquet("data_outputs/date_dim")
date_dim.printSchema()

root
 |-- dt_arrival: date (nullable = true)
 |-- arr_year: integer (nullable = true)
 |-- arr_month: integer (nullable = true)
 |-- arr_weekofyear: integer (nullable = true)
 |-- arr_dayofweek: integer (nullable = true)
 |-- arr_dayofmonth: integer (nullable = true)

time: 121 ms (started: 2023-04-05 01:39:40 +00:00)


In [53]:
date_dim.toPandas().head(5)

Unnamed: 0,dt_arrival,arr_year,arr_month,arr_weekofyear,arr_dayofweek,arr_dayofmonth
0,2016-04-16,2016,4,15,7,16
1,2016-04-18,2016,4,16,2,18
2,2016-04-09,2016,4,14,7,9
3,2016-04-28,2016,4,17,5,28
4,2016-04-15,2016,4,15,6,15


time: 307 ms (started: 2023-04-05 01:39:40 +00:00)


##### 2- Create dimension table: - demog_bystate_dim: 

In [54]:
# Aggegating data by state & creating a spark df and exporting to a parquet 
demog_bystate_dim = create_tables.create_demog_dim(demographics_cleaned, output_path)

Exporting demog_bystate_dim to data_outputs/demog_bystate_dim
Parquet file is now available in: data_outputs/demog_bystate_dim
time: 12.9 s (started: 2023-04-05 01:39:40 +00:00)


In [55]:
#Filter by state to validate the df is at a state grain
demog_bystate_dim.filter(F.col('state_code') == "CA").show()

+----------+----------+--------+----------+---------+------------+--------------------+---------+------------+---------+----------+------------------+----------+
|state_code|     state|male_pop|female_pop|total_pop|foreign_born|african_american_pop|asian_pop|hispanic_pop|white_pop|native_pop|avg_household_size|median_age|
+----------+----------+--------+----------+---------+------------+--------------------+---------+------------+---------+----------+------------------+----------+
|        CA|California|12278281|  12544179| 24822460|     7448257|             2047009|  4543730|     9856464| 14905129|    401386|               3.1|     36.18|
+----------+----------+--------+----------+---------+------------+--------------------+---------+------------+---------+----------+------------------+----------+

time: 9.79 s (started: 2023-04-05 01:39:53 +00:00)


In [56]:
# Reading data from output parquet 
demog_bystate_dim = spark.read.parquet("data_outputs/demog_bystate_dim")
demog_bystate_dim.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- male_pop: long (nullable = true)
 |-- female_pop: long (nullable = true)
 |-- total_pop: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- african_american_pop: long (nullable = true)
 |-- asian_pop: long (nullable = true)
 |-- hispanic_pop: long (nullable = true)
 |-- white_pop: long (nullable = true)
 |-- native_pop: long (nullable = true)
 |-- avg_household_size: double (nullable = true)
 |-- median_age: double (nullable = true)

time: 124 ms (started: 2023-04-05 01:40:03 +00:00)


In [57]:
demog_bystate_dim.toPandas().head(5)

Unnamed: 0,state_code,state,male_pop,female_pop,total_pop,foreign_born,african_american_pop,asian_pop,hispanic_pop,white_pop,native_pop,avg_household_size,median_age
0,LA,Louisiana,626998,673597,1300595,83419.0,602377,38739,87133,654578,8263,2.47,34.63
1,TN,Tennessee,1031836,1120724,2152560,181405.0,810758,73024,167962,1253142,17638,2.47,34.4
2,AR,Arkansas,286479,303400,589879,62108.0,149608,22062,77813,384733,9381,2.53,32.77
3,KS,Kansas,564145,584129,1148274,118645.0,141075,68689,165678,927169,24204,2.59,34.83
4,AZ,Arizona,2227455,2272087,4499542,682313.0,296222,229183,1508157,3591611,129708,2.77,35.04


time: 525 ms (started: 2023-04-05 01:40:03 +00:00)


##### 3- Create dimension table:i94_port_dim

In [58]:
i94_port_dim = create_tables.create_i94port_dim(i94port_cleaned,output_path)

Exporting i94_port_dim to data_outputs/i94_port_dim
Parquet file is now available in: data_outputs/i94_port_dim
time: 5.52 s (started: 2023-04-05 01:40:03 +00:00)


In [59]:
#Reading data from output parquet 
i94_port_dim = spark.read.parquet("data_outputs/i94_port_dim")
i94_port_dim.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- port_city: string (nullable = true)
 |-- port_state: string (nullable = true)

time: 133 ms (started: 2023-04-05 01:40:09 +00:00)


##### 4- Create dimension table: country_dim 

In [60]:
i94_port_dim.toPandas().head(3)

Unnamed: 0,port_code,port_city,port_state
0,AMT,No PORT Code (AMT),
1,EUR,EUREKA,MT (BPS)
2,RNO,CANNON INTL - RENO/TAHOE,NV


time: 870 ms (started: 2023-04-05 01:40:09 +00:00)


In [61]:
country_dim = create_tables.create_country_dim(country_cleaned, output_path)

Exporting country_dim to data_outputs/country_dim
Parquet file is now available in: data_outputs/country_dim
time: 5.46 s (started: 2023-04-05 01:40:10 +00:00)


In [62]:
# Reading data from output parquet 
country_dim = spark.read.parquet("data_outputs/country_dim")
country_dim.printSchema()

root
 |-- country_code: integer (nullable = true)
 |-- country_name: string (nullable = true)

time: 122 ms (started: 2023-04-05 01:40:15 +00:00)


In [63]:
country_dim.limit(5).toPandas()

Unnamed: 0,country_code,country_name
0,719,'INVALID: BOUVET ISLAND (ANTARCTICA/NORWAY TERR.)'
1,739,'INVALID: DRONNING MAUD LAND (ANTARCTICA-NORWAY)'
2,311,'Collapsed Tanzania (should not show)'
3,471,"'INVALID: MARIANA ISLANDS, NORTHERN'"
4,720,'INVALID: CANTON AND ENDERBURY ISLS'


time: 98.7 ms (started: 2023-04-05 01:40:16 +00:00)


In [64]:
immigration_cleaned.printSchema()

root
 |-- count: double (nullable = true)
 |-- occup: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- dt_arrival: date (nullable = true)
 |-- dt_departure: date (nullable = true)
 |-- dt_add_tofile: date (nullable = true)
 |-- dt_stay_until: date (nullable = true)
 |-- i94port_code: string (nullable = true)
 |-- arrival_state_code: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- update_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)
 |-- flight_no: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- cicid: integer (nullable = true)
 |-- entry_year: integer (nullable = true)
 |-- entry_month: integer (nullable = true)
 |-- country_of_birth: integer (nullable = true)
 |-- country_of_residence: integer (nullable = true)
 |-- i94_mode: integer (nullable = true)
 |-- age: integer (

In [65]:
immigration_cleaned.limit(5).toPandas()

Unnamed: 0,count,occup,insnum,gender,airline,dt_arrival,dt_departure,dt_add_tofile,dt_stay_until,i94port_code,arrival_state_code,arrival_flag,departure_flag,update_flag,match_flag,flight_no,visa_type,cicid,entry_year,entry_month,country_of_birth,country_of_residence,i94_mode,age,visa_cat_code,visa_issued_state,birth_year,admission_num
0,1.0,,,F,DL,2016-04-01,2016-05-09,2016-04-01,,DET,MI,U,O,,M,158,F2,74750,2016,4,254,276,1,35,3,,1981,209395185
1,1.0,,,M,,2016-04-13,2016-04-24,2016-04-13,2018-01-08,OTM,,Z,O,,M,415,E2,2322334,2016,4,129,129,3,51,1,,1965,324673385
2,1.0,,,M,85J,2016-04-05,2016-04-06,2016-04-05,2016-07-08,LAR,AL,H,R,,M,0687C,B1,935651,2016,4,582,582,1,34,1,,1982,324770585
3,1.0,,,F,B6,2016-04-29,2016-05-22,2016-04-29,,NAS,MA,H,N,,M,00910,F1,5442902,2016,4,129,129,1,23,3,,1993,334434085
4,1.0,,3669.0,M,UA,2016-04-10,,2016-06-08,2016-05-24,AGA,GU,A,,,,00136,GMT,5945017,2016,4,252,209,1,69,2,,1947,543561533


time: 2min 57s (started: 2023-04-05 01:40:16 +00:00)


In [66]:
# immigration_cleaned.createOrReplaceTempView('immigration_cleaned')
# immigration_cleaned.createOrReplaceTempView('immigration_filtered')
# visa_dim.createOrReplaceTempView('visa_dim')
# i94_mode_dim.createOrReplaceTempView('i94_mode_dim')
# date_dim.createOrReplaceTempView('date_dim')
# i94_port_dim.createOrReplaceTempView('i94_port_dim')
# country_dim.createOrReplaceTempView('country_dim')
# status_dim.createOrReplaceTempView('status_dim')
# demog_bystate_dim.createOrReplaceTempView('demog_bystate_dim')

time: 3.43 ms (started: 2023-04-05 01:43:13 +00:00)


##### Creating Fact Table: immig_fact

In [67]:
#Creating fact table

immigration_fact = immigration_cleaned\
.join(visa_dim,
      (visa_dim.visa_cat_code == immigration_cleaned.visa_cat_code) &
      (visa_dim.visa_type == immigration_cleaned.visa_type),
      how= "left")\
.join(i94_mode_dim,
      (i94_mode_dim.i94_mode == immigration_cleaned.i94_mode),
      how= "left")\
.join(status_dim,
      (status_dim.arrival_flag == immigration_cleaned.arrival_flag) &
      (status_dim.departure_flag == immigration_cleaned.departure_flag) &
      (status_dim.match_flag == immigration_cleaned.match_flag),
      how= "left")\
.join(date_dim, (date_dim.dt_arrival == immigration_cleaned.dt_arrival), how= "left")\
.join(demog_bystate_dim, demog_bystate_dim.state_code == immigration_cleaned.arrival_state_code
      ,how= "left")\
.join(i94_port_dim, (i94_port_dim.port_code == immigration_cleaned.i94port_code), how= "left")\
.join(country_dim, (country_dim.country_code == immigration_cleaned.country_of_residence), how= "left")\
.where(F.col('cicid').isNotNull())\
.select("cicid"
        ,immigration_cleaned["dt_arrival"]
        ,"entry_year"
        ,"entry_month"
        ,"arrival_state_code" 
        ,immigration_cleaned["visa_type"]
        ,immigration_cleaned["visa_cat_code"]
        ,"visa_issued_state"
        ,immigration_cleaned["i94_mode"]
        ,immigration_cleaned["i94port_code"]
        ,"country_of_residence"
        ,"country_of_birth"
        ,"age"
        ,"birth_year"
        ,"gender"
        ,"admission_num"
        ,"occup"
        ,immigration_cleaned["arrival_flag"]
        ,immigration_cleaned["departure_flag"]
        ,immigration_cleaned["match_flag"]
        ,immigration_cleaned["dt_departure"]
        ,immigration_cleaned["dt_add_tofile"]
        ,immigration_cleaned["dt_stay_until"]
        ,status_dim['status_flag_id'])       

time: 307 ms (started: 2023-04-05 01:43:13 +00:00)


In [68]:
immigration_fact.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- dt_arrival: date (nullable = true)
 |-- entry_year: integer (nullable = true)
 |-- entry_month: integer (nullable = true)
 |-- arrival_state_code: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- visa_cat_code: integer (nullable = true)
 |-- visa_issued_state: integer (nullable = true)
 |-- i94_mode: integer (nullable = true)
 |-- i94port_code: string (nullable = true)
 |-- country_of_residence: integer (nullable = true)
 |-- country_of_birth: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admission_num: decimal(15,0) (nullable = true)
 |-- occup: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)
 |-- dt_departure: date (nullable = true)
 |-- dt_add_tofile: date (nullable = true)
 |-- dt_stay_until: date (nullabl

In [69]:
immigration_fact.count()

3075579

time: 1min 25s (started: 2023-04-05 01:43:13 +00:00)


In [70]:
immigration_fact.limit(5).toPandas()

Unnamed: 0,cicid,dt_arrival,entry_year,entry_month,arrival_state_code,visa_type,visa_cat_code,visa_issued_state,i94_mode,i94port_code,country_of_residence,country_of_birth,age,birth_year,gender,admission_num,occup,arrival_flag,departure_flag,match_flag,dt_departure,dt_add_tofile,dt_stay_until,status_flag_id
0,74750,2016-04-01,2016,4,MI,F2,3,,1,DET,276,254,35,1981,F,209395185,,U,O,M,2016-05-09,2016-04-01,,1451699000000.0
1,2322334,2016-04-13,2016,4,,E2,1,,3,OTM,129,129,51,1965,M,324673385,,Z,O,M,2016-04-24,2016-04-13,2018-01-08,730144400000.0
2,935651,2016-04-05,2016,4,AL,B1,1,,1,LAR,582,582,34,1982,M,324770585,,H,R,M,2016-04-06,2016-04-05,2016-07-08,1056562000000.0
3,5442902,2016-04-29,2016,4,MA,F1,3,,1,NAS,129,129,23,1993,F,334434085,,H,N,M,2016-05-22,2016-04-29,,532575900000.0
4,5945017,2016-04-10,2016,4,GU,GMT,2,,1,AGA,209,252,69,1947,M,543561533,,A,,,,2016-06-08,2016-05-24,


time: 2min 50s (started: 2023-04-05 01:44:39 +00:00)


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [72]:
# Data quality checks on dimension and fact tables
tables = {"immigration_fact": immigration_fact,
          "visa_dim" : visa_dim,
          "i94_mode_dim": i94_mode_dim, 
          "status_dim": status_dim,
          "status_lookup_dim": status_lookup_dim,
          "date_dim": date_dim,
          "demog_bystate_dim": demog_bystate_dim,
          "i94_port_dim": i94_port_dim,
          "country_dim": country_dim }

for tab_name, df in tables.items():
    util.check_row_counts(df,tab_name)

Table immigration_fact has 3075579 records
Table visa_dim has 17 records
Table i94_mode_dim has 5 records
Table status_dim has 99 records
Table status_lookup_dim has 8 records
Table date_dim has 30 records
Table demog_bystate_dim has 49 records
Table i94_port_dim has 660 records
Table country_dim has 289 records
time: 1min 19s (started: 2023-04-05 01:49:02 +00:00)


In [71]:
# Check to verify counts across the primary key column
# Should be equal and no duplicates
immigration_fact.select(F.countDistinct('cicid').alias('cicid_dist_cnt')
                  ,F.count('cicid').alias('cicid_cnt')).show()

+--------------+---------+
|cicid_dist_cnt|cicid_cnt|
+--------------+---------+
|       3075579|  3075579|
+--------------+---------+

time: 1min 31s (started: 2023-04-05 01:47:30 +00:00)


In [75]:
visa_dim.select(F.countDistinct('visa_type').alias('dist_cnt_visa_type')
                  ,F.count('visa_type').alias('cnt_visa_type')).show()

+------------------+-------------+
|dist_cnt_visa_type|cnt_visa_type|
+------------------+-------------+
|                17|           17|
+------------------+-------------+

time: 1.47 s (started: 2023-04-05 02:05:59 +00:00)


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

- Data dictionary is included in the workspace

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### Solutions:
* Clearly state the rationale for the choice of tools and technologies for the project.
  * Choice of tools for this project:
    * Apache spark was the preferred engine because of its ability to process massive amounts of data, ease of use and versatility in handling different data formats
    
    
* Propose how often the data should be updated and why.
  * Frequency of data updates:
    * Since the source data is updated monthly, it makes sense to have the relevant data to follow the same frequency as well
    

* The data was increased by 100x.
    * If the data was increased by 100x a more appropriate approach is to utilize Amazon EMR to do the data processing and load the tables in Redshift 
    
    

* The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * If the data is used to populate a dashboard that updates at a set time every day, it is recommended to use Apache Airflow to manage the ETL pipeline through a Directed Acyclic Graph (DAG). By doing so, the data can be processed and updated in a timely and automated manner, with necessary checks in place to ensure data quality and completeness. In case of any failures, Airflow can send notifications to the appropriate stakeholders, enabling them to take corrective action quickly. This helps ensure that the dashboard displays up-to-date information that is accurate and reliable.
     
 
* The database needed to be accessed by 100+ people
  * If the system needs to handle multiple requests at the same time, loading the data into Amazon Redshift can help increase the efficiency of the process. Redshift is a powerful and scalable data warehousing solution that can handle large amounts of data and provide fast query performance. By storing the data in Redshift, it can be easily accessed and queried by multiple users or applications simultaneously, without impacting the performance of the system. Additionally, Redshift provides built-in security features to help protect sensitive data and maintain data integrity.

credits: ER diagram: https://dbdiagram.io/d