# Project Title
### Data Engineering Capstone Project

#### Project Summary
In this project a database was created of the immigration data in the United States from 2016. The database is structure with a star schema to simplify the querying.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from datetime import date, datetime, timedelta
import pandas as pd
import os
import shutil
import wikipedia
import re
import numpy as np

from config import LocalConfigurations
from schemas import demographics_schema, airport_schema, world_temp_schema, immigrations_schema

from pyspark.sql.functions import udf, col, monotonically_increasing_id, lit, split, isnull, to_timestamp
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType, DateType


config = LocalConfigurations()

### Scope the Project and Gather Data

#### Scope
In this project the goal is to organize and combine four different datasets into one database. The database is organized to optimize for querying statistics of immigrations in the U.S. in the year 2016. The data will first all be stored in a raw format on AWS S3. The raw data will then be extracted from S3 with Apache Spark and transformed into 7 dimensional tables and 1 fact table following a star schema. A star schema was chosen both because of its simplicity and because of faster aggregations than many other data models. The database will again be stored in S3 using parquet files, but Amazon Athena can then be used to analyse the data further.

#### Data
- [I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office) - Dataset that contains information on every immigration to the U.S. in 2016.
- [World Temperature Data](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data?resource=download) - The temperature in different cities of the world collected on a daily basis.
- [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) - Demographic data of cities in the U.S..
- [Airport Code Table](https://datahub.io/core/airport-codes#data) - Data related to different airports in the world.

![Source Data](../assets/udacity-data-sources.png "Source Data")

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12").\
enableHiveSupport().getOrCreate()

https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /Users/elias/.ivy2/cache
The jars for the packages stored in: /Users/elias/.ivy2/jars
saurfang#spark-sas7bdat added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-53bbf16e-3439-4d99-9e26-c7743896cde6;1.0
	confs: [default]
	found saurfang#spark-sas7bdat;3.0.0-s_2.12 in spark-packages
	found com.epam#parso;2.0.11 in central


:: loading settings :: url = jar:file:/Users/elias/Projects/Udacity-DataEngineering/capstone-project/venv/lib/python3.7/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.slf4j#slf4j-api;1.7.5 in central
	found org.apache.logging.log4j#log4j-api-scala_2.12;12.0 in central
	found org.scala-lang#scala-reflect;2.12.10 in central
	found org.apache.logging.log4j#log4j-api;2.13.2 in central
:: resolution report :: resolve 231ms :: artifacts dl 11ms
	:: modules in use:
	com.epam#parso;2.0.11 from central in [default]
	org.apache.logging.log4j#log4j-api;2.13.2 from central in [default]
	org.apache.logging.log4j#log4j-api-scala_2.12;12.0 from central in [default]
	org.scala-lang#scala-reflect;2.12.10 from central in [default]
	org.slf4j#slf4j-api;1.7.5 from central in [default]
	saurfang#spark-sas7bdat;3.0.0-s_2.12 from spark-packages in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   6

In [3]:
df_demographic = spark.read.options(header='True', delimiter=';').csv(config.DEMOGRAPHICS_DATA_PATH, schema=demographics_schema)
df_demographic.show(n=5, truncate=False, vertical=True)
df_demographic.printSchema()

-RECORD 0-------------------------------------------
 city                   | Silver Spring             
 state                  | Maryland                  
 median_age             | 33.8                      
 male_population        | 40601.0                   
 female_population      | 41862.0                   
 total_population       | 82463                     
 number_of_veterans     | 1562.0                    
 foreign_born           | 30908.0                   
 average_household_size | 2.6                       
 state_code             | MD                        
 race                   | Hispanic or Latino        
 count                  | 25924                     
-RECORD 1-------------------------------------------
 city                   | Quincy                    
 state                  | Massachusetts             
 median_age             | 41.0                      
 male_population        | 44129.0                   
 female_population      | 49500.0             

23/02/26 21:02:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: City, State, Median Age, Male Population, Female Population, Total Population, Number of Veterans, Foreign-born, Average Household Size, State Code, Race, Count
 Schema: city, state, median_age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, state_code, race, count
Expected: median_age but found: Median Age
CSV file: file:///Users/elias/Projects/Udacity-DataEngineering/capstone-project/data/us-cities-demographics.csv


In [4]:
df_airport = spark.read.options(header='True', delimiter=',').csv(config.AIRPORT_DATA_PATH, schema = airport_schema)
df_airport.show(n=5, truncate=False, vertical=True)
df_airport.printSchema()

-RECORD 0---------------------------------------------
 ident        | 00A                                   
 type         | heliport                              
 name         | Total Rf Heliport                     
 elevation_ft | 11                                    
 continent    | NA                                    
 iso_country  | US                                    
 iso_region   | US-PA                                 
 municipality | Bensalem                              
 gps_code     | 00A                                   
 iata_code    | null                                  
 local_code   | 00A                                   
 coordinates  | -74.93360137939453, 40.07080078125    
-RECORD 1---------------------------------------------
 ident        | 00AA                                  
 type         | small_airport                         
 name         | Aero B Ranch Airport                  
 elevation_ft | 3435                                  
 continent

In [5]:
df_temperature = spark.read.options(header='True', delimiter=',').csv(config.TEMPERATURE_DATA_PATH, schema = world_temp_schema)
df_temperature.show(n=5, truncate=False, vertical=True)
df_temperature.printSchema()

-RECORD 0---------------------------------------------
 dt                              | 1743-11-01         
 average_temperature             | 6.068              
 average_temperature_uncertainty | 1.7369999999999999 
 city                            | Århus              
 country                         | Denmark            
 latitude                        | 57.05N             
 longitude                       | 10.33E             
-RECORD 1---------------------------------------------
 dt                              | 1743-12-01         
 average_temperature             | null               
 average_temperature_uncertainty | null               
 city                            | Århus              
 country                         | Denmark            
 latitude                        | 57.05N             
 longitude                       | 10.33E             
-RECORD 2---------------------------------------------
 dt                              | 1744-01-01         
 average_t

23/02/26 21:02:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: dt, AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude
 Schema: dt, average_temperature, average_temperature_uncertainty, city, country, latitude, longitude
Expected: average_temperature but found: AverageTemperature
CSV file: file:///Users/elias/Projects/Udacity-DataEngineering/capstone-project/data/GlobalLandTemperaturesByCity.csv


In [6]:
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(os.path.join(config.SAS_DATA_PATH, 'i94_apr16_sub.sas7bdat'), schema=immigrations_schema)


In [7]:
#write to parquet
folder = "./sas_data"

if (os.path.exists(folder)):
    shutil.rmtree(folder, ignore_errors=False, onerror=None)
df_immigration.write.parquet(folder)
df_immigration=spark.read.parquet(folder)

23/02/26 21:02:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/02/26 21:02:39 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/02/26 21:02:39 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/02/26 21:02:39 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/02/26 21:02:39 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
23/02/26 21:02:39 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
23/02/26 21:02:39 WARN MemoryManager: Total allocatio

In [8]:
df_immigration.printSchema()
print(f"Number of entries are: {df_immigration.count()}")
df_immigration.show(n=5, truncate=False, vertical=True)

root
 |-- cic_id: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- sas_arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- sas_depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nu

### Explore and Assess the Data
#### Explore the Data 



Check out the **Data Exploration Notebook** for the data explorations steps.

#### Cleaning Steps
Document steps necessary to clean the data

In [9]:
from utils import read_sas_description
sas_data = read_sas_description(config.SAS_DESC_DATA_PATH)

In [24]:
sas_ref_date = datetime(1960, 1, 1)

@udf
def to_city(id):
    try:
        return sas_data['I94CIT'][int(id)]
    except:
        return "No Country Code"

@udf
def to_port(port_id):
    try:
        return sas_data['I94PORT'][port_id]
    except:
        return "No PORT Code"

@udf
def to_travel_mode(mode_id):
    try:
        return sas_data['I94MODE'][int(mode_id)]
    except:
        None
        
@udf
def to_transport_no(transport_no, company_name, mode):
    if transport_no:
        return str(transport_no)
    no = 'no'
    if company_name:
        return no
    if company_name:
        no += str(company_name[0:3])
    if mode:
        no += str(mode)
    return no

@udf
def to_visa_type(visa_id):
    try:
        return sas_data['I94VISA'][visa_id]
    except:
        None

@udf
def to_int(number):
    if number:
        try:
            return int(number)
        except:
            print(number)
            return 0
    return None

@udf(TimestampType())
def to_timestamp_sas(days):
    try:
        return sas_ref_date + timedelta(days=int(days))
    except:
        return None

@udf(DateType())
def to_date_sas(days):
    try:
        return sas_ref_date + timedelta(days=int(days))
    except:
        return None
    
@udf(StringType())
def upper(val):
    return val.upper()

@udf(StringType())
def city_sas(val):
    return sas_data['I94PORT'][val][0]

@udf(StringType())
def state_sas(val):
    return sas_data['I94PORT'][val][1]

### Define the Data Model
#### Conceptual Data Model




For this project it was decided to use star schema to model the data. The star schema was chosen for fast and simple way of analysing the data.

![Conceptual Data model](../assets/udacity-star-schema.png "Conceptual Data Model")

### Run Pipelines to Model the Data 
#### Create the data model
Build the data pipelines to create the data model.

### Demographics Dimension Table

In [11]:
# 1. Augment data
df_immigration_aug = df_immigration \
    .withColumn('arrdate', to_date_sas('sas_arrdate')) \
    .withColumn('depdate', to_date_sas('sas_depdate')) \
    .withColumn('transport_no',  to_transport_no('fltno', 'airline', 'i94mode')) \
    .withColumn('city', city_sas(df_immigration.i94port)) \
    .withColumn('state', state_sas(df_immigration.i94port)) \
    .na.drop(subset=["admnum"])
df_immigration_aug.head()

[Stage 9:>                                                          (0 + 1) / 1]                                                                                

Row(cic_id=5749531.0, i94yr=2016.0, i94mon=4.0, i94cit=251.0, i94res=251.0, i94port='NYC', sas_arrdate=20574.0, i94mode=1.0, i94addr='NV', sas_depdate=20579.0, i94bir=42.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='TLV', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1974.0, dtaddto='10292016', gender='F', insnum=None, airline='DL', admnum=94931887530.0, fltno='00469', visatype='B1', arrdate=datetime.date(2016, 4, 30), depdate=datetime.date(2016, 5, 5), transport_no='00469', city='NEW YORK', state='NY')

In [12]:
%%time
# city_id, name, state, median_age, male_population, female_population, total_population,
# number_of_veterans, foreign_born, average_household_size, state_code, most_common_race, country, avg_temp
from pyspark.sql.functions import isnan, when, count, col
# 1. Cleaning demographic data
#      1.1 Fill NaNs and remove duplicated rows if exists
col_names = ['male_population', 'female_population', 'number_of_veterans', 'foreign_born']
df_demographic_structured = df_demographic \
    .na.fill(value=0, subset=col_names) \
    .dropDuplicates()

df_demographic_structured.printSchema()

# 1. Cleaning temperature data
#      2.1 Average Temperature in the US
df_temperature_structured = df_temperature \
    .select('city', 'country', 'average_temperature') \
    .where(df_temperature.country == 'United States') \
    .groupBy('city', 'country').mean() \
    .withColumnRenamed('avg(average_temperature)', 'avg_temp') \
    .drop('country')


# 3. Reconstruction
#      3.1 - Find most common race for each city-state pair.
df_demographic_structured.createOrReplaceTempView("demographic_race_count")
df_demographic_count = spark.sql("select city, state, count from demographic_race_count") \
    .groupby('city', 'state') \
    .max() \
    .withColumnRenamed('max(count)', 'count')
df_demographic_structured = df_demographic_structured \
    .join(df_demographic_count, on=['city', 'state', 'count'], how='inner') \
    .withColumnRenamed('race', 'most_common_race') \
    .withColumnRenamed('city', 'name') \
    .withColumn('name', upper(col('name'))) \
    .withColumn('country', lit('US')) \
    .drop('count')

df_demographic_temp = df_demographic_structured \
    .join(df_temperature_structured,df_demographic_structured.name ==  df_temperature_structured.city,"left") \
    .drop('city')

city_table = df_demographic_temp \
        .join(df_immigration_aug.select(col('state').alias('state_code'), col('city').alias('name')).dropDuplicates(), ['state_code', 'name'], "fullouter") \
        .withColumn('city_id', monotonically_increasing_id())
city_table.printSchema()
city_table.head()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: double (nullable = false)
 |-- female_population: double (nullable = false)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: double (nullable = false)
 |-- foreign_born: double (nullable = false)
 |-- average_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- count: integer (nullable = true)

root
 |-- state_code: string (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: double (nullable = true)
 |-- female_population: double (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_of_veterans: double (nullable = true)
 |-- foreign_born: double (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- most_c

23/02/26 21:02:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: City, State, Median Age, Male Population, Female Population, Total Population, Number of Veterans, Foreign-born, Average Household Size, State Code, Race, Count
 Schema: city, state, median_age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, state_code, race, count
Expected: median_age but found: Median Age
CSV file: file:///Users/elias/Projects/Udacity-DataEngineering/capstone-project/data/us-cities-demographics.csv
23/02/26 21:02:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: AverageTemperature, City, Country
 Schema: average_temperature, city, country
Expected: average_temperature but found: AverageTemperature
CSV file: file:///Users/elias/Projects/Udacity-DataEngineering/capstone-project/data/GlobalLandTemperaturesByCity.csv

CPU times: user 44.1 ms, sys: 15.5 ms, total: 59.6 ms
Wall time: 8.07 s


                                                                                

Row(state_code='AK', name='ALCAN', state=None, median_age=None, male_population=None, female_population=None, total_population=None, number_of_veterans=None, foreign_born=None, average_household_size=None, most_common_race=None, country=None, avg_temp=None, city_id=0)

### Port Dimension Table

In [13]:
%%time
# 1. Cleaning ports
# port_id, port_code, name, city, country, elevation_ft, longitude, latitude
#      1.1 Only U.S. airports
port_table = df_airport \
    .join(df_immigration_aug.select(col('i94port').alias('local_code')).dropDuplicates(), ['local_code'], "fullouter") \
    .select('local_code', 'name', 'municipality', 'iso_country', 'elevation_ft'
            , split('coordinates', ',').getItem(0).alias("longitude")
            , split('coordinates', ',').getItem(1).alias("latitude")
            , lit('NA').alias('continent')
            , 'type') \
    .where(df_airport['iso_country'] == "US") \
    .withColumnRenamed('local_code', 'port') \
    .withColumnRenamed('city', 'municipality') \
    .withColumnRenamed('iso_country', 'country') \
    .withColumn('port_id', monotonically_increasing_id())
port_table.printSchema()
port_table.head()

root
 |-- port: string (nullable = true)
 |-- name: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- country: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- continent: string (nullable = false)
 |-- type: string (nullable = true)
 |-- port_id: long (nullable = false)





CPU times: user 15.5 ms, sys: 5.3 ms, total: 20.8 ms
Wall time: 1.07 s


                                                                                

Row(port='00A', name='Total Rf Heliport', municipality='Bensalem', country='US', elevation_ft=11, longitude='-74.93360137939453', latitude=' 40.07080078125', continent='NA', type='heliport', port_id=0)

### Alien Dimension Table

In [32]:
%%time
# 1.1 Create alien table
alien_table = df_immigration_aug.select(
    to_int(df_immigration_aug.admnum).alias('admission_id'),
    to_city(df_immigration.i94cit).alias('citizenship_origin'),
    to_city(df_immigration.i94res).alias('residency_origin'),
    df_immigration.gender,
    to_int(df_immigration.i94bir).alias('age'),
    df_immigration.visatype,
    to_visa_type(df_immigration.i94visa).alias('i94visa'),
    df_immigration.entdepa,
    df_immigration.entdepd,
    df_immigration.matflag,
    to_int(df_immigration.biryear).alias('year_of_birth'),
).dropDuplicates(['admission_id']) \
.na.drop(subset=["admission_id"])
alien_table.printSchema()
alien_table.head()

root
 |-- admission_id: string (nullable = true)
 |-- citizenship_origin: string (nullable = true)
 |-- residency_origin: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- year_of_birth: string (nullable = true)





CPU times: user 53 ms, sys: 9.96 ms, total: 62.9 ms
Wall time: 14.2 s


                                                                                

Row(admission_id='1023352085', citizenship_origin='MEXICO Air Sea, and Not Reported (I-94, no land arrivals)', residency_origin='MEXICO Air Sea, and Not Reported (I-94, no land arrivals)', gender='F', age='16', visatype='E2', i94visa='Business', entdepa='T', entdepd='O', matflag='M', year_of_birth='2000')

### Time Dimension Table

In [15]:
%%time

# extract columns to create time table
arr_time_table = df_immigration_aug.select(
    to_timestamp_sas(df_immigration_aug.sas_arrdate).alias('time'),
    hour(df_immigration_aug.arrdate).alias('hour'),
    dayofmonth(df_immigration_aug.arrdate).alias('day'),
    weekofyear(df_immigration_aug.arrdate).alias('week'),
    month(df_immigration_aug.arrdate).alias('month'),
    year(df_immigration_aug.arrdate).alias('year'),
    dayofweek(df_immigration_aug.arrdate).alias('weekday')
)
dep_time_table = df_immigration_aug.select(
    to_timestamp_sas(df_immigration_aug.sas_depdate).alias('time'),
    hour(df_immigration_aug.depdate).alias('hour'),
    dayofmonth(df_immigration_aug.depdate).alias('day'),
    weekofyear(df_immigration_aug.depdate).alias('week'),
    month(df_immigration_aug.depdate).alias('month'),
    year(df_immigration_aug.depdate).alias('year'),
    dayofweek(df_immigration_aug.depdate).alias('weekday')
)

expiration_time_table = df_immigration_aug.select(
    to_timestamp(df_immigration_aug.dtaddto).alias('time'),
    hour(df_immigration_aug.dtaddto).alias('hour'),
    dayofmonth(df_immigration_aug.dtaddto).alias('day'),
    weekofyear(df_immigration_aug.dtaddto).alias('week'),
    month(df_immigration_aug.dtaddto).alias('month'),
    year(df_immigration_aug.dtaddto).alias('year'),
    dayofweek(df_immigration_aug.dtaddto).alias('weekday')
)
expiration_time_table.head()
time_table = arr_time_table \
    .union(dep_time_table) \
    .union(expiration_time_table) \
    .dropDuplicates() \
    .na.drop()
time_table.printSchema()
time_table.head()

root
 |-- time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)





CPU times: user 75.5 ms, sys: 28.1 ms, total: 104 ms
Wall time: 3min 42s


                                                                                

Row(time=datetime.datetime(2016, 4, 16, 0, 0), hour=0, day=16, week=15, month=4, year=2016, weekday=7)

### Visa Issuer Dimension Table

In [16]:
%%time

from pyspark.sql.functions import monotonically_increasing_id
visa_issuer_table = df_immigration.select(
    df_immigration.visapost.alias('name'),
).dropDuplicates()
visa_issuer_table = visa_issuer_table.withColumn("visa_issuer_id", monotonically_increasing_id())

visa_issuer_table.printSchema()
visa_issuer_table.head()

root
 |-- name: string (nullable = true)
 |-- visa_issuer_id: long (nullable = false)

CPU times: user 9.1 ms, sys: 4.53 ms, total: 13.6 ms
Wall time: 768 ms


Row(name='CRS', visa_issuer_id=0)

### Logistic Dimension Table

In [17]:
%%time

logistic_table = df_immigration_aug.select(
    df_immigration_aug.transport_no,
    df_immigration_aug.airline.alias('company'),
    df_immigration_aug.i94mode.alias('mode'),
    
) \
.dropDuplicates(['transport_no']) \
.withColumn("transport_id", monotonically_increasing_id())

logistic_table.printSchema()
logistic_table.head()

root
 |-- transport_no: string (nullable = true)
 |-- company: string (nullable = true)
 |-- mode: double (nullable = true)
 |-- transport_id: long (nullable = false)





CPU times: user 14.3 ms, sys: 6.24 ms, total: 20.5 ms
Wall time: 4.66 s


                                                                                

Row(transport_no='00000', company='JL', mode=1.0, transport_id=0)

### Immigration Fact Table

In [29]:
%%time

immigration_fact_table = df_immigration_aug \
    .join(city_table, ((df_immigration_aug.city == city_table.name) & (df_immigration_aug.state == city_table.state_code)), "left") \
    .join(port_table, (df_immigration_aug.i94port == port_table.port), "left") \
    .join(visa_issuer_table, (df_immigration_aug.visapost == visa_issuer_table.name), "left") \
    .join(logistic_table, (df_immigration_aug.transport_no == logistic_table.transport_no), "left") \
    .select(
        col('cic_id'),
        col('city_id').alias('arrival_city'),
        to_timestamp(col('dtaddto')).alias('expiration_time'),
        to_int(col('admnum')).alias('admission_id'),
        to_timestamp(col('sas_arrdate')).alias('arrdate'),
        to_timestamp(col('sas_depdate')).alias('depdate'),
        col('transport_id'),
        col('visa_issuer_id'),
        col('i94Yr').alias('i94_year'),
        col('i94Mon').alias('i94_month')
    )

immigration_fact_table.printSchema()
immigration_fact_table.head()

root
 |-- cic_id: double (nullable = true)
 |-- arrival_city: long (nullable = true)
 |-- expiration_time: timestamp (nullable = true)
 |-- admission_id: string (nullable = true)
 |-- arrdate: timestamp (nullable = true)
 |-- depdate: timestamp (nullable = true)
 |-- transport_id: long (nullable = true)
 |-- visa_issuer_id: long (nullable = true)
 |-- i94_year: double (nullable = true)
 |-- i94_month: double (nullable = true)



23/02/26 22:45:57 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: City, State, Median Age, Male Population, Female Population, Total Population, Number of Veterans, Foreign-born, Average Household Size, State Code, Race, Count
 Schema: city, state, median_age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, state_code, race, count
Expected: median_age but found: Median Age
CSV file: file:///Users/elias/Projects/Udacity-DataEngineering/capstone-project/data/us-cities-demographics.csv
                                                                                

CPU times: user 34 ms, sys: 13.2 ms, total: 47.1 ms
Wall time: 8.37 s


Row(cic_id=5749531.0, arrival_city=557, expiration_time=None, admission_id='94931887530', arrdate=datetime.datetime(1970, 1, 1, 5, 42, 54), depdate=datetime.datetime(1970, 1, 1, 5, 42, 59), transport_id=2572, visa_issuer_id=209, i94_year=2016.0, i94_month=4.0)

#### Data Quality Checks
The quality checks are pretty simple but the checks fail if there is a null value in a primary key

In [19]:
# Perform quality checks here
def raise_if_null(table, col_name):
    if table.filter(col(col_name).isNull()).count() > 0:
        raise ValueError(f"The column {col_name} in table {table} had a NULL value!")

raise_if_null(immigration_fact_table, 'cic_id')
raise_if_null(logistic_table, 'transport_id')
raise_if_null(visa_issuer_table, 'visa_issuer_id')
raise_if_null(time_table, 'time')
raise_if_null(alien_table, 'admission_id')
raise_if_null(city_table, 'city_id')
raise_if_null(port_table, 'port_id')




23/02/26 21:07:16 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: City, State, Median Age, Male Population, Female Population, Total Population, Number of Veterans, Foreign-born, Average Household Size, State Code, Race, Count
 Schema: city, state, median_age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, state_code, race, count
Expected: median_age but found: Median Age
CSV file: file:///Users/elias/Projects/Udacity-DataEngineering/capstone-project/data/us-cities-demographics.csv
                                                                                

#### Data dictionary 

![Data dictionary](../assets/udacity-data-dictionary.png "Data dictionary") 

[I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office): This data was extracted from the US National Tourism and Trade Office.

[World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data): This dataset is from Kaggle an online community platform for data scientists and machine learning enthusiasts.

[U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/): Comes from OpenSoft.

[Airport Code Table](https://datahub.io/core/airport-codes#data): Extracted from datahub.io.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In this project, a ETL process is implemented where the data is extracted from S3, transformed to a star schema using Pyspark on an EMR cluster, and then loaded back to S3

S3 is used as a data source because it is easy to use, the performance is far better than needed for this project and the cost is reasonable. In this project, a significant amount of data is being used and therefore it can take hours to days for a single computer to transform the data. To speed up the processing time a network of computers is used to process the data. [Amazon EMR](https://aws.amazon.com/emr/) pre-configured with [Apache Spark](https://spark.apache.org/) was used as the tool for the ETL process, but EMR is very good for managing the cluster and Spark for doing distributed processing.

Python 3.7 was chosen as the programming language, and therefore Pyspark as well, but python is really good for fast development and for learning new things.


The frequency of updates on this database should be on approximately monthly basis, but this tool is mostly built for people interested in analysing immigration data for historical reasons but not for user needing live updates.

If the data was increased by 100x more resources and more powerful instances should be used. This could all be set in the `create-cluser.sh` with the flags `instance-count` and `instance-type`.

If the data had to be updated on a daily basis by 7am it would be very good to use a scheduling tool like [Apache Airflow](https://airflow.apache.org/).

If the database needed to be accessed by 100+ people it would be good to load the data from S3 to Amazon Redshift, but with Amazon Redshift it is easy to do SQL queries that most business analytics should be familiar with.