# Project Title
### Data Engineering Capstone Project

#### Project Summary

In this project, I will use I94 immigration, global land temperatures and US demographics datasets to design a data warehouse schema with an ETL pipeline.

There are 5 steps in this project
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, TimestampType
from pyspark.sql.functions import *
import pyspark.sql.functions as f
import pandas as pd

### Initialize SparkSession

In [2]:
spark = SparkSession.builder \
    .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### 1.1 Scope 

##### 1.1.1 Data format
I use the data provided which in formats SAS or CSV.

##### 1.1.2 Technology used
Language in this project: Python  
Processing on datasets: PySpark  
Vizualize data: Pandas


#### 1.2 Dataset description

| Dataset                                                                                                       | Format | Description|
|:---------------------------------------------------------------------------------------------------------------|:--------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [I94 Immigration Data](https://www.trade.gov/national-travel-and-tourism-office)                              | SAS    | This data comes from the US National Tourism and Trade Office. This dataset contains immigration data of individuals coming to USA in the year 2016. A data dictionary is included in the workspace. immigration_data_sample.csv contains the sample data.|
| [World Temperature Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographic/information/) | CSV    | This dataset came from Kaggle. This dataset contains information of temperatures by city for each country.|
| [U.S. City demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographic/export/)  | CSV    | This data comes from OpenSoft. This dataset contains information about the demographic of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.|
| [Airport Code Data](https://datahub.io/core/airport-codes#data)                                              | CSV    | This data comes from Our Airports. It is a simple table of airport codes and corresponding cities. It contains the list of all airport codes, the attributes are identified in datapackage description. Some of the columns contain attributes identifying airport locations, other codes (IATA, local if exist) that are relevant to identification of an airport.|

##### 1.2.1. I94 Immigration Data Sample

In Immigtation datasets,June data has difference columns from other month so i will read the June data and process it separately before joining with the rest.

In [3]:
# Get file path
all_file_list = [
 '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat']

june_file_path = '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat'

In [4]:
# Read immigration_df not include June
nonjune_immigration_df = spark.read.format("com.github.saurfang.sas.spark").load(all_file_list.pop())
for file in all_file_list:
    nonjune_immigration_df = nonjune_immigration_df.union(spark.read.format("com.github.saurfang.sas.spark").load(file))

In [5]:
# Read June immigration_df
june_immigration_df = spark.read.format("com.github.saurfang.sas.spark").load(june_file_path)

In [6]:
# Join immigration_df
immigration_df = nonjune_immigration_df.union(june_immigration_df.select(*nonjune_immigration_df.columns))

In [7]:
# 5 record of dataset
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,46.0,2016.0,12.0,129.0,129.0,HOU,20789.0,1.0,TX,20802.0,...,,M,1970.0,05262018,M,,RS,97554140000.0,7715.0,E2
1,56.0,2016.0,12.0,245.0,245.0,NEW,20789.0,1.0,OH,20835.0,...,,M,1988.0,D/S,F,,CA,90623720000.0,819.0,F1
2,67.0,2016.0,12.0,512.0,512.0,PEV,20789.0,2.0,MD,20794.0,...,,M,1968.0,06012017,M,5920.0,,80105030000.0,,B2
3,68.0,2016.0,12.0,512.0,512.0,PEV,20789.0,2.0,FL,20792.0,...,,M,1970.0,06012017,F,5920.0,,80105110000.0,,B2
4,69.0,2016.0,12.0,512.0,512.0,PEV,20789.0,2.0,HI,20792.0,...,,M,1968.0,06012017,M,5920.0,,80105110000.0,,B2


In [8]:
# Schema of dataset
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

##### 1.2.2. World Temperature data

In [9]:
# Read dataset
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = spark.read.csv(fname, header=True, inferSchema=True)

In [10]:
# 5 record of dataset
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
# Schema of dataset
temperature_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



##### 1.2.3. U.S. City demographic Data

In [12]:
fname = "us-cities-demographics.csv"
demographic_df = spark.read.csv(fname, inferSchema=True, header=True, sep=';')

In [13]:
# 5 record of dataset
demographic_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [14]:
# Schema of dataset
demographic_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



##### 1.2.4. Airport Code Data

In [15]:
fname = "airport-codes_csv.csv"
airport_df = spark.read.csv(fname, inferSchema=True, header=True)

In [16]:
# 5 record of dataset
airport_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [17]:
# Schema of dataset
airport_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



### Step 2: Explore and Assess the Data

#### 2.1 Explore the Data  
All schema of data has been printed above.

Some more information is in the file I94_SAS_Labels_Descriptions.SAS. So I will process it to get the following information
- Port code
- Visa type
- Country code
- State code

In [18]:
# Read I94 Immigration description file
with open("I94_SAS_Labels_Descriptions.SAS") as raw_data:
    lines = raw_data.readlines()

In [19]:
# Extract port data
raw_port_data = lines[302:961]
port_ls = []

for line in raw_port_data:
    temp = line.split("=")
    data = [temp[0].strip().strip("'"),
            temp[1].strip().strip("'").split(",")[0]]
    port_ls.append(data)

port_df = spark.createDataFrame(pd.DataFrame(port_ls, columns=["port_cd", "port_city"]))

In [20]:
# Extract country data
raw_country_data = lines[9:298]
country_ls = []

for line in raw_country_data:
    temp = line.split("=")
    data = [temp[0].strip(), temp[1].strip().strip("'").strip(";")]
    country_ls.append(data)

country_df = spark.createDataFrame(
    pd.DataFrame(country_ls, columns=["country_cd", "country_name"]))

In [21]:
# Extract visa data
raw_visa_data = lines[1046:1049]
visa_ls = []

for line in raw_visa_data:
    temp = line.split("=")
    data = [temp[0].strip(), temp[1].strip().strip("'").strip(";")]
    visa_ls.append(data)

visa_df = spark.createDataFrame(pd.DataFrame(visa_ls, columns=["visa_cd", "visa_type"]))

In [22]:
# Extract state data
raw_state_data = lines[981:1036]
state_ls = []

for line in raw_state_data:
    temp = line.split("=")
    data = [temp[0].strip().strip("'"), temp[1].strip().strip("'").strip(";")]
    state_ls.append(data)

state_df = spark.createDataFrame(pd.DataFrame(state_ls, columns=["state_cd", "state_name"]))

#### 2.2 Cleaning Steps

##### 2.2.1 I94 Immigration Data
- I only use which have arrdate and "i94mode == 1".
- Convert some column from string into integer.
- Convert arrive/depart date to yyyy-mm-dd format.
- Rename some columns into friendly names for coding.

In [23]:
# Clean I94 Immigration Data
proc_immigration_df = immigration_df \
    .filter((col("arrdate").isNotNull()) & (col("i94mode") == 1)) \
    .withColumn("arrived_date", f.expr("date_add('1960-01-01', arrdate)")) \
    .withColumn("departed_date", f.expr("date_add('1960-01-01', depdate)")) \
    .select(
        col("cicid").cast(IntegerType()).alias("cicid"),
        col("i94yr").cast(IntegerType()).alias("year"),
        col("i94mon").cast(IntegerType()).alias("month"),
        col("i94res").cast(IntegerType()).alias("departed_cd"),
        col("i94port").alias("arrived_cd"),
        col("i94addr").alias("state_cd"),
        col("arrived_date"),
        col("departed_date"),
        col("i94visa").alias("visa_cd")
    )

##### 2.2.2 World Temperature Data
- Only use data which have "Country == United States" and years from 2006~2016
- Rename some columns into friendly names for coding.
- Convert some column from string into integer/float.
- Drop record which doesn't have temperature.
- Drop duplicate data.

In [24]:
# Cleaning World Temperature Data
proc_temperature_df = temperature_df\
    .filter(
        (col("Country") == "United States")
        & (col("dt") >= "2006-01-01")
        & (col("dt") <= "2016-12-31")
    )\
    .select(
        to_date(col("dt")).alias("date"),
        col("AverageTemperature").cast(FloatType()).alias("avg_temp"),
        col("City").alias("city"),
        col("Country").alias("country_name"),
        split(col("Latitude"), "N")
            .getItem(0).cast(FloatType()).alias("latitude"),
        split(col("Longitude"), "W")
            .getItem(0).cast(FloatType()).alias("longitude")) \
    .withColumn("longitude",col("longitude") * -1) \
    .dropDuplicates(["date","city"])\
    .filter(col("avg_temp").isNotNull())

##### 2.2.3 U.S. City demographic Data
- Convert some column from string into integer/float.
- Rename some columns into friendly names for coding.
- Fill missing values from Male Population, Female Population, Total Population, Number of Veterans, Foreign-born with 0.
- Drop duplicated data.

In [25]:
# ===========U.S. City demographic Data=========== #
demographic_COLS = ["city", "state", "median_age",
                    "male_population", "female_population", "total_population",
                    "num_veterans", "foreign_born", "avg_household_size",
                    "state_code", "race", "count"]

# Rename columns and convert type
proc_demographic_df = demographic_df.toDF(*demographic_COLS)
proc_demographic_df = proc_demographic_df.select(
    col("city"),
    col("state").alias("state_name"),
    col("median_age").cast(FloatType()),
    col("male_population").cast(IntegerType()),
    col("female_population").cast(IntegerType()),
    col("total_population").cast(IntegerType()),
    col("num_veterans").cast(IntegerType()),
    col("foreign_born").cast(IntegerType()),
    col("avg_household_size").cast(FloatType()),
    col("state_code").alias("state_cd"),
    col("race")
)
# Fill missing values from Male Population, Female Population, Total Population,
# Number of Veterans, Foreign-born
proc_demographic_df = proc_demographic_df.na.fill({
    "male_population": 0,
    "female_population": 0,
    "total_population": 0,
    "num_veterans": 0,
    "foreign_born": 0,
})

# Filter out duplicated data
proc_demographic_df = proc_demographic_df.dropDuplicates(["state_cd","city"])

##### 2.2.4 Airport Code Data
- Only use data which have "iso_country == US"
- Only use airport data (%airport)
- Split coordinates into latitude and longitude

In [26]:
# Cleaning Airport Code Data
proc_airport_df = airport_df\
    .filter(
        (col("iso_country") == "US")
        & (col("type").like("%airport"))
    )\
    .withColumn("longitude", split(col("coordinates"), ",")
                .getItem(0).cast(FloatType())) \
    .withColumn("latitude", split(col("coordinates"), ",")
                .getItem(1).cast(FloatType())) \
    .select(
        col("ident").alias("id"),
        col("name"),
        col("type"),
        col("elevation_ft").alias("elevation"),
        split(col("iso_region"),"-").getItem(1).alias("state_cd"),
        col("latitude"),
        col("longitude")
    )

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I choose star model due to its simplicity in querying other informations with the needs of complex joins.  
![Fig 1. Database schema](./images/diagram.jpg)

#### 3.2 Mapping Out Data Pipelines
##### 3.2.1 state_temperature as a dimension table
I will join World Temperature Data with U.S. City demographic Data to calculate min/average/max temperature of each state by month and year

##### 3.2.2 state_demographic as a dimension table
I will aggregate informations by state and race from U.S. City demographic Data and add "sex_ratio" column with below formula:
```
sex_ratio = male_population / female_population * 100
```

##### 3.2.3 state_airport as a dimension table
From the Airport Code Data I will combine with the data from I94_SAS_Labels_Descriptions.SAS to get the "state_name"

##### 3.2.4 immigration as a fact table
From the Immigration Data I will combine with the data from I94_SAS_Labels_Descriptions.SAS to get the "port", "country", "state", "visa"

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.
##### 4.1.1 Construct state_temperature as a dimension table

In [27]:
# Create state_temperature data
state_temperature_df = proc_temperature_df\
    .withColumn("month", month("date"))\
    .withColumn("year", year("date"))\
    .join(proc_demographic_df, "city")\
    .groupby(["month", "year", "state_cd", "state_name"])\
    .agg(
        avg("avg_temp").cast(FloatType()).alias("avg_temp"),
        max("avg_temp").alias("max_temp"),
        min("avg_temp").alias("min_temp")
    )\
    .select("*")

In [28]:
state_temperature_df.printSchema()

root
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- state_cd: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- avg_temp: float (nullable = true)
 |-- max_temp: float (nullable = true)
 |-- min_temp: float (nullable = true)



##### 4.1.2 Construct state_demographic as a dimension table

In [29]:
# Create state_demographic data
state_demographic_df = proc_demographic_df\
    .groupby(["state_cd", "state_name", "race"])\
    .agg(
        round(avg("median_age"), 2).cast(FloatType()).alias("median_age"),
        sum("male_population").alias("male_population"),
        sum("female_population").alias("female_population"),
        sum("total_population").alias("total_population"),
        sum("num_veterans").alias("num_veterans"),
        sum("foreign_born").alias("foreign_born"),
        round(avg("avg_household_size"),2).cast(FloatType()).alias("avg_household_size")
    )\
    .withColumn("sex_ratio",
                round(col("male_population")/col("female_population")*100, 2).cast(FloatType()))\
    .select("*")

In [30]:
state_demographic_df.printSchema()

root
 |-- state_cd: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- race: string (nullable = true)
 |-- median_age: float (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- num_veterans: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- avg_household_size: float (nullable = true)
 |-- sex_ratio: float (nullable = true)



##### 4.1.3 Construct state_airport as a dimension table

In [31]:
# Create state_airport data
state_airport_df = proc_airport_df.alias("proc_airport_df")\
    .join(state_df.alias("state_df"),
          col("proc_airport_df.state_cd") == col("state_df.state_cd"))\
    .select(
        col("proc_airport_df.id"),
        col("proc_airport_df.name").alias("airport_name"),
        col("proc_airport_df.state_cd"),
        col("state_df.state_name"),
        col("proc_airport_df.type"),
        col("proc_airport_df.elevation"),
        col("proc_airport_df.latitude"),
        col("proc_airport_df.longitude")
    )

In [32]:
state_airport_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- state_cd: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- elevation: integer (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)



##### 4.1.4 Construct immigration as a fact table

In [33]:
# Create immigration data
immigration_df = proc_immigration_df.alias("org_data")\
    .join(port_df.alias("port"),
          col("org_data.arrived_cd") == col("port.port_cd"))\
    .join(country_df.alias("country"),
          col("org_data.departed_cd") == col("country.country_cd"))\
    .join(state_df.alias("state"),
          col("org_data.state_cd") == col("state.state_cd"))\
    .join(visa_df.alias("visa"),
          col("org_data.visa_cd") == col("visa.visa_cd"))\
    .select("*")\
    .filter(col("state_name").isNotNull())\
    .select(
        col("org_data.cicid"),
        col("org_data.month"),
        col("org_data.year"),
        col("org_data.state_cd"),
        col("state.state_name"),
        col("org_data.departed_date"),
        col("org_data.departed_cd"),
        col("country.country_name").alias("departed_country"),
        col("org_data.arrived_date"),
        col("org_data.arrived_cd"),
        col("port.port_city").alias("arrived_port"),
        col("visa.visa_type")
    )

In [34]:
immigration_df.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- state_cd: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- departed_date: date (nullable = true)
 |-- departed_cd: integer (nullable = true)
 |-- departed_country: string (nullable = true)
 |-- arrived_date: date (nullable = true)
 |-- arrived_cd: string (nullable = true)
 |-- arrived_port: string (nullable = true)
 |-- visa_type: string (nullable = true)



#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

##### 4.2.1 Verify state_temperature data

In [35]:
# Count total records of table
print("Total records: {}".format(state_temperature_df.count()))

Total records: 3905


In [36]:
# Count invalid (NaN, NULL) value of target columns ("state_temperature_df.columns")
state_temperature_df.select(
    [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in state_temperature_df.columns]
).toPandas()

Unnamed: 0,month,year,state_cd,state_name,avg_temp,max_temp,min_temp
0,0,0,0,0,0,0,0


##### 4.2.2 Verify state_airport data

In [37]:
# Count total records of table
print("Total records: {}".format(state_airport_df.count()))

Total records: 14575


In [38]:
# Count NULL value of target columns (state_airport_df.id, state_airport_df.state_cd)
lst_columns = ["id", "state_cd"]
state_airport_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in lst_columns]).toPandas()

Unnamed: 0,id,state_cd
0,0,0


##### 4.2.3 Verify state_demographic data

In [39]:
# Count total records of table
print("Total records: {}".format(state_demographic_df.count()))

Total records: 174


In [40]:
# Count invalid (NaN, NULL) value of target columns (state_demographic_df.race, state_demographic_df.state_cd, state_demographic_df.sex_ratio)
lst_columns = ["race", "state_cd", "sex_ratio"]
state_demographic_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in lst_columns]).toPandas()

Unnamed: 0,race,state_cd,sex_ratio
0,0,0,0


##### 4.2.4 Verify immigration data

In [41]:
# Count total records of table
print("Total records: {}".format(immigration_df.count()))

Total records: 37227531


In [42]:
# Count NULL value of target columns
lst_columns = ["cicid", "month", "year","departed_date", "departed_cd","arrived_date", "arrived_cd"]
immigration_df.select([count(when(col(c).isNull(), c)).alias(c) for c in lst_columns]).toPandas()

Unnamed: 0,cicid,month,year,departed_date,departed_cd,arrived_date,arrived_cd
0,0,0,0,2798756,0,0,0


#### 4.4 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### 5.1 Tools and technologies

In this project, I use Spark for ETL processes due to below reasons:
- Ability to read multiple data formats (json, avro, parquet etc.)
- Spark is distributed data processing engine, which will enable scaling when needed.

##### 5.2 Update frequency
- immigration data should be updated daily, only appending new records into fact table. 
- state_temperature data only needs to be updated monthly because temperature data's fluctuation is not so impactful.
- state_demographic and state_airport are long-term related data. I think we should update these table monthly or quarterly.

##### 5.3 Resolving scenarios
###### 5.3.1 The data was increased by 100x
To resolve that we can use cloud storage services (AWS S3, Google Cloud Storage,...).
Then we can use AWS EMR to build data pipelines with reasonable cost.

###### 5.3.2 The data populates a dashboard that must be updated on a daily basis by 7am every day
Use Apache Airflow to build data pipelines at scheduled inverals.

###### 5.3.3 The database needed to be accessed by 100+ people
AWS Redshift can support 100+ people with access. Use it to solve this problem.