# US Immigration Data

### Data Engineering Capstone Project

#### Project Summary
This project aims to use techniques learned in the Udacity Nanodegree course on Data Engineer to create a fact and dimension tables regarding US immigration data, together with analysis on temperature and airport characteristics. Here we will use two formats of data (SAS and .csv) to build FACT and dimension tables, splitting the US immigration dataset in regards to what are its informations about (flight? passenger?), and add informtion on temperature on the dates observed.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up



In [1]:
import pandas as pd
import psycopg2
import os
import re

from pyspark.sql import SparkSession
import pyspark.sql.functions as F  
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql import Window
from pyspark.sql.functions import udf

import datetime

import numpy as np

from src.queries_new import immigration_insert, temperature_insert, passenger_insert, \
time_insert, status_insert
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

In [2]:
# Session Spark
spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
                            .master("local[*]")\
                            .enableHiveSupport()\
                            .getOrCreate()

In [3]:
# Immigration dataset
df_immigration = spark.read.parquet('sas_data/')

# Dataset 1: I94 Immigration Data from the US National Tourism and Trade Office 

- Is the data that will be used to build our FACT table. It contains information on airline, visatipe, destination of people immigrating to the US.
- Firstly we will check for missing data, check if the data is behaving as expected and transform the data where necessary.
- This dataset has around 3 million lines and 28 columns. We will limit the columns to those we are interested in and explore how to break it into new tables when relevant
- This data is updated monthly
- For this project I opted for deleting columns with more than 30% of its values being null. However, it is expected that cases with null values happen in data engineering projects, and this is one of many possible approaches.
- In this dataset we have a lot of infomations, we can split it in regards to what the informations are about.
- We make use of draw.io to build visualization for the schema
- I will allow nan values to continue as nan, without replacement techniques

# Dataset 2:Temperature 

# Technologies and Tools 

Here I opted for using pyspark and take a vantage of parallem processing.

### Step 2: Explore and Assess the Data

#### Cleaning Steps
Document steps necessary to clean the data

In [4]:
# Print to have an overview
df_immigration.show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [5]:
# Create udf to convert SAS date to PySpark date 
def convert(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None
    

convertUDF = udf(lambda x: convert(x), StringType())

df_immigration = df_immigration.withColumn("arrdate", convertUDF(df_immigration.arrdate))

In [6]:
# Checking if we need to change the datatype or if it corresponds to our desired type
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
%%time
# Checking how many different values we have for each column and other statistics
df_immigration.select(*(F.countDistinct(F.col(c)).alias(c) for c in df_immigration.columns)).show(vertical = True)

-RECORD 0-----------
 cicid    | 3096313 
 i94yr    | 1       
 i94mon   | 1       
 i94cit   | 243     
 i94res   | 229     
 i94port  | 299     
 arrdate  | 30      
 i94mode  | 4       
 i94addr  | 457     
 depdate  | 235     
 i94bir   | 112     
 i94visa  | 3       
 count    | 1       
 dtadfile | 117     
 visapost | 530     
 occup    | 111     
 entdepa  | 13      
 entdepd  | 12      
 entdepu  | 2       
 matflag  | 1       
 biryear  | 112     
 dtaddto  | 777     
 gender   | 4       
 insnum   | 1913    
 airline  | 534     
 admnum   | 3075579 
 fltno    | 7152    
 visatype | 17      

CPU times: user 32.8 ms, sys: 19 ms, total: 51.8 ms
Wall time: 24 s


- We only have data on year 2016 month 04
- 243 different cities
- 534 different airline values

In [8]:
# Do we have misspells / typos on airlines?
df_immigration.select(F.collect_set("airline").alias("airline")).first()["airline"]

['GA',
 'R0E',
 'FC',
 'BSK',
 'NK',
 'DE',
 'BS',
 'MP',
 'M0M',
 'UX',
 'LE',
 'B01',
 'G5A',
 'K4',
 'Q5U',
 '3',
 'T8B',
 '9V',
 '001',
 'D2V',
 'LU',
 'XD',
 'CL',
 'EY',
 'K8',
 '9K',
 'Q2',
 'TN',
 'YL',
 '734',
 '*GA',
 'IG',
 'G5J',
 'EI',
 '2U',
 '5X',
 '79A',
 'DL',
 '3M',
 'LO',
 'AZ',
 'AS',
 'VW',
 'IB',
 'SJ',
 'B73',
 'A3R',
 '35L',
 '0ZD',
 'PY',
 'PS',
 'G3',
 '278',
 'CV',
 'VX',
 'TC',
 'ZW',
 'MU',
 '0FR',
 'TOS',
 '99',
 'AD',
 '5Y',
 'E1Q',
 'T4A',
 'DZ',
 'SN',
 'Q0P',
 'ON',
 'KR',
 'AJ',
 'IJW',
 '605',
 'ARU',
 'PZ',
 '0FF',
 'BB',
 '537',
 '6R',
 'TOM',
 'EL',
 'RJD',
 'H0P',
 'G1A',
 '813',
 '18',
 'EW',
 'TIW',
 'Y6Y',
 'B3D',
 'OO',
 'PP',
 'V6D',
 'C5X',
 '3X',
 '351',
 'ET',
 'B2H',
 'A1B',
 '848',
 'PL',
 'C7',
 'FI',
 'WU',
 'BC',
 '0L7',
 'CSQ',
 'P8K',
 'LP',
 'LY',
 'CH',
 'KQ',
 'T0',
 '031',
 'HA',
 'EA',
 'HB',
 '0AZ',
 'LM',
 'I0H',
 '35T',
 'YCK',
 'N9',
 'TYW',
 'E8D',
 '15',
 'WK',
 'QT',
 '0AF',
 '81',
 'D0S',
 'NJ',
 'W4J',
 'LA',
 'B57',


In [9]:
#are there duplicate values that would help me reduce my dataset size?
print(f'Before remove duplicates  -> {df_immigration.count()}')
df_immigration = df_immigration.drop_duplicates()
print(f'After remove duplicates -> {df_immigration.count()}')

Before remove duplicates  -> 3096313
After remove duplicates -> 3096313


In [10]:
# Creating a unique ID 
window = Window.orderBy(F.col('cicid'))
df_immigration = df_immigration.withColumn('person_id', F.row_number().over(window))

In [11]:
# now I will import the temperature data
fname = 'data/df_temp.csv'
df_temp = spark.read.option("header", "True").option("charset", "utf-8").csv(fname)

In [12]:
df_temp.show()

+---+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|_c0|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+---+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|  0|1743-11-01|              6.068|                        1.737|Århus|Denmark|  57.05N|   10.33E|
|  1|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|  2|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|  3|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|  4|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|  5|1744-04-01|  5.787999999999999|                        3.624|Århus|Denmark|  57.05N|   10.33E|
|  6|1744-05-01|             10.644|                        1.283|Århus|Denmark|  57.05N|   10.33E|


In [13]:
# Rename first column to "id"
df_temp = df_temp.withColumnRenamed('_c0', 'id')

In [14]:
# Checking data types
df_temp.printSchema()

root
 |-- id: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [15]:
# Convert Temperature to double type
df_temp = df_temp.withColumn('AverageTemperature', F.col('AverageTemperature').cast(DoubleType())) \
                 .withColumn('AverageTemperatureUncertainty', F.col('AverageTemperatureUncertainty').cast(DoubleType()))

In [16]:
# Checking data types
df_temp.printSchema()

root
 |-- id: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



- Since we are using immigration data in the US, we can limit country here in the dataset

In [17]:
df_temp.show(3)

+---+----------+------------------+-----------------------------+-----+-------+--------+---------+
| id|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+---+----------+------------------+-----------------------------+-----+-------+--------+---------+
|  0|1743-11-01|             6.068|                        1.737|Århus|Denmark|  57.05N|   10.33E|
|  1|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|  2|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+---+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 3 rows



In [18]:
#limiting only to the United States
df_temp = df_temp.filter(df_temp.Country == 'United States')

In [19]:
# Checking filters result
df_temp.groupBy('Country').count().show()

+-------------+------+
|      Country| count|
+-------------+------+
|United States|687289|
+-------------+------+



In [20]:
# Checking null values 
df_temp.select(*(F.sum(F.col(c).isNull().cast("int") / df_temp.count()).alias(c) for c in df_temp.columns)).show(vertical = True)

-RECORD 0---------------------------------------------
 id                            | 0.0                  
 dt                            | 0.0                  
 AverageTemperature            | 0.037487869004161394 
 AverageTemperatureUncertainty | 0.037487869004161394 
 City                          | 0.0                  
 Country                       | 0.0                  
 Latitude                      | 0.0                  
 Longitude                     | 0.0                  



In [24]:
# Create list of valid ports
i94_sas_label_descriptions = "data/I94_SAS_Labels_Descriptions.SAS"

with open(i94_sas_label_descriptions) as f:
    lines = f.readlines()

result_ports = {}
for line in lines[302:961]:
    results = re.compile(r"\'(.*)\'.*\'(.*)\'").search(line)
    result_ports[results.group(1)] = results.group(2)

# Create udf to convert city to port 
def convert_city_to_port(city):
    for key in result_ports:
        if city.lower() in valid_ports[key].lower():
            return key

convertCity = udf(lambda x: city_to_port(x), StringType())

df_temp =  df_temp \
    .withColumn("i94port", convertCity(F.col("city")))

In [26]:
# There are duplicates values?
print(f'Before remove duplicates -> {df_temp.count()}')
df_temp = df_temp.drop_duplicates()
print(f'After remove duplicates -> {df_temp.count()}')

Before remove duplicates -> 687289
After remove duplicates -> 687289


In [27]:
# Now we have to load airport data
df_airport = spark.read.option("header", "True").option("charset", "utf-8").csv('data/i94portCodes.csv')

In [28]:
df_airport.show()

+----+--------------------+-----+
|code|            location|state|
+----+--------------------+-----+
| ALC|               ALCAN|   AK|
| ANC|           ANCHORAGE|   AK|
| BAR|BAKER AAF - BAKER...|   AK|
| DAC|       DALTONS CACHE|   AK|
| PIZ|DEW STATION PT LA...|   AK|
| DTH|        DUTCH HARBOR|   AK|
| EGL|               EAGLE|   AK|
| FRB|           FAIRBANKS|   AK|
| HOM|               HOMER|   AK|
| HYD|               HYDER|   AK|
| JUN|              JUNEAU|   AK|
| 5KE|           KETCHIKAN|   AK|
| KET|           KETCHIKAN|   AK|
| MOS|MOSES POINT INTER...|   AK|
| NIK|             NIKISKI|   AK|
| NOM|                 NOM|   AK|
| PKC|         POKER CREEK|   AK|
| ORI|      PORT LIONS SPB|   AK|
| SKA|             SKAGWAY|   AK|
| SNP|     ST. PAUL ISLAND|   AK|
+----+--------------------+-----+
only showing top 20 rows



In [30]:
# Lowerin column names for both datasets
df_temp = df_temp.select([F.col(x).alias(x.lower()) for x in df_temp.columns])

In [31]:
df_immigration = df_immigration.select([F.col(x).alias(x.lower()) for x in df_immigration.columns])

In [32]:
df_airport = df_airport.select([F.col(x).alias(x.lower()) for x in df_airport.columns])

In [33]:
print(f'Checking results -> {df_temp.columns}')
print(f'Checking results -> {df_immigration.columns}')
print(f'Checking results -> {df_airport.columns}')

Checking results -> ['id', 'dt', 'averagetemperature', 'averagetemperatureuncertainty', 'city', 'country', 'latitude', 'longitude', 'i94port']
Checking results -> ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype', 'person_id']
Checking results -> ['code', 'location', 'state']


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

# Data Model

- For this project I opted for a simpler version (less dimensions tables than possible), in order to be more goal-oriented and improve time management.
- Out fact table contains all ids used in the dimension tables
- We have a reduced number of three dim tables, which are in regards of time, status, passengers data and temperature per city in the US.

![alt_text](new_datamodel.PNG)

# Pipeline Data

- Graun declaration: what we want the fact table to show.
- Identify dimensions: how we can split the data without losing a way to link them. Also, sort it by different 'themes'
- Load and clab the datasets
- Create fact and dim tables

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Check if we can find all ports in dim_airpot

def create_immigration(df_immigration=df_immigration):

    return df_immigration.select(
    F.col('cicid'),
    F.col('person_id').alias('personid'),
    F.col('arrdate').alias('timeid'),
    F.col('i94cit').alias('cit'),
    F.col('i94res').alias('res'),
    F.col('i94port').alias('port'),
    F.col('arrdate').alias('arrdate'),
    F.col('i94mode').alias('mode'),
    F.col('i94addr').alias('addr'),
    F.col('depdate'),
    F.col('i94bir').alias('bir'),
    F.col('i94visa').alias('visa'),
    F.col('count'),
    F.col('dtadfile'),
    F.col('entdepa'),
    F.col('entdepd'),
    F.col('entdepu'),
    F.col('matflag'),
    F.col('dtaddto'),
    F.col('airline'),
    F.col('admnum'),
    F.col('fltno'),
    F.col('visatype')).createOrReplaceTempView("immigration"), spark.sql("create table fact_immigration as select * from immigration")

In [37]:
# Now we have to create the fact model on Hive from a tempview
df_immigration.select(

    F.col('cicid'),
    F.col('person_id').alias('personid'),
    F.col('arrdate').alias('timeid'),
    F.col('i94cit').alias('cit'),
    F.col('i94res').alias('res'),
    F.col('i94port').alias('port'),
    F.col('arrdate').alias('arrdate'),
    F.col('i94mode').alias('mode'),
    F.col('i94addr').alias('addr'),
    F.col('depdate'),
    F.col('i94bir').alias('bir'),
    F.col('i94visa').alias('visa'),
    F.col('count'),
    F.col('dtadfile'),
    F.col('entdepa'),
    F.col('entdepd'),
    F.col('entdepu'),
    F.col('matflag'),
    F.col('dtaddto'),
    F.col('airline'),
    F.col('admnum'),
    F.col('fltno'),
    F.col('visatype')

).createOrReplaceTempView("immigration")

# Verificar quem é o person id

spark.sql("create table fact_immigration as select * from immigration")

DataFrame[]

In [38]:
# Checking the fact table
spark.sql('SELECT * FROM fact_immigration').show(1)

+-----+--------+----------+-----+-----+----+----------+----+----+-------+----+----+-----+--------+-------+-------+-------+-------+--------+-------+-------------+-----+--------+
|cicid|personid|    timeid|  cit|  res|port|   arrdate|mode|addr|depdate| bir|visa|count|dtadfile|entdepa|entdepd|entdepu|matflag| dtaddto|airline|       admnum|fltno|visatype|
+-----+--------+----------+-----+-----+----+----------+----+----+-------+----+----+-----+--------+-------+-------+-------+-------+--------+-------+-------------+-----+--------+
|  6.0|       1|2016-04-29|692.0|692.0| XXX|2016-04-29|null|null|   null|37.0| 2.0|  1.0|    null|      T|   null|      U|   null|10282016|   null|1.897628485E9| null|      B2|
+-----+--------+----------+-----+-----+----+----------+----+----+-------+----+----+-----+--------+-------+-------+-------+-------+--------+-------+-------------+-----+--------+
only showing top 1 row



In [None]:
def create_airport(df_airport=df_airport):

    return df_airport.createOrReplaceTempView("airport"), 
    spark.sql("create table dim_airport as select * from airport")


In [40]:
# Now we have to create airport dimension

df_airport.createOrReplaceTempView("airport")

spark.sql("create table dim_airport as select * from airport")

DataFrame[]

In [41]:
spark.sql("select * from dim_airport").show()

+----+--------------------+-----+
|code|            location|state|
+----+--------------------+-----+
| ALC|               ALCAN|   AK|
| ANC|           ANCHORAGE|   AK|
| BAR|BAKER AAF - BAKER...|   AK|
| DAC|       DALTONS CACHE|   AK|
| PIZ|DEW STATION PT LA...|   AK|
| DTH|        DUTCH HARBOR|   AK|
| EGL|               EAGLE|   AK|
| FRB|           FAIRBANKS|   AK|
| HOM|               HOMER|   AK|
| HYD|               HYDER|   AK|
| JUN|              JUNEAU|   AK|
| 5KE|           KETCHIKAN|   AK|
| KET|           KETCHIKAN|   AK|
| MOS|MOSES POINT INTER...|   AK|
| NIK|             NIKISKI|   AK|
| NOM|                 NOM|   AK|
| PKC|         POKER CREEK|   AK|
| ORI|      PORT LIONS SPB|   AK|
| SKA|             SKAGWAY|   AK|
| SNP|     ST. PAUL ISLAND|   AK|
+----+--------------------+-----+
only showing top 20 rows



In [None]:
def create_temp(df_temp=df_temp):
    return  df_temp.select(

    F.concat(F.split(F.col('dt'), "-")[0], F.lit("-"), F.split(F.col('dt'), "-")[1]).alias('dt'),
    F.col('i94port').alias('port'),
    F.col('city'),
    F.col('averagetemperature'),
    F.col('averagetemperatureuncertainty')).groupBy('dt', 'city', 'port')\
        .agg(F.avg('averagetemperature').alias("averagetemperature"),\
             F.avg('averagetemperatureuncertainty').alias("averagetemperatureuncertainty"))\
                 .createOrReplaceTempView("temperature"), spark.sql("create table dim_temperature as select * from temperature")

In [47]:
# Now we have to create temperature dimension

df_temp.select(

    F.concat(F.split(F.col('dt'), "-")[0], F.lit("-"), F.split(F.col('dt'), "-")[1]).alias('dt'),
    F.col('i94port').alias('port'),
    F.col('city'),
    F.col('averagetemperature'),
    F.col('averagetemperatureuncertainty')

).groupBy('dt', 'city', 'port')\
.agg(F.avg('averagetemperature').alias("averagetemperature"), F.avg('averagetemperatureuncertainty').alias("averagetemperatureuncertainty"))\
.createOrReplaceTempView("temperature")

spark.sql("create table dim_temperature as select * from temperature")

DataFrame[]

In [48]:
spark.sql('select * from dim_temperature').show()

+-------+--------------+----+------------------+-----------------------------+
|     dt|          city|port|averagetemperature|averagetemperatureuncertainty|
+-------+--------------+----+------------------+-----------------------------+
|1870-09|   Brownsville| BRO| 28.19100000000001|                        2.518|
|1821-03|       Norfolk| NOR|7.7360000000000015|                        3.464|
|1902-07|    Birmingham| BHX|            27.962|           0.4970000000000001|
|1766-11|       Jackson| JAC|              null|                         null|
|1928-12|   Springfield| SPI|1.1303333333333332|           0.4663333333333333|
|1928-01|       Buffalo| BUF|            -4.152|                        0.223|
|1933-04|       El Paso| ELP|            13.867|                        0.212|
|1945-10|    Carrollton|null|            17.969|                        0.249|
|1865-02|        Edison|null|            -3.153|           0.8240000000000001|
|1979-11|         Flint|null|             4.438|    

In [None]:
def create_passenger(df_immigration=df_immigration):
    return df_immigration.select(
    F.col('person_id'),
    F.col('biryear'),
    F.col('gender')).createOrReplaceTempView("passenger"), spark.sql("create table dim_passenger as select * from passenger")

In [52]:
# Now we have to create a passenger dimension
df_immigration.select(
    F.col('person_id'),
    F.col('biryear'),
    F.col('gender')
).createOrReplaceTempView("passenger")

spark.sql("create table dim_passenger as select * from passenger")

DataFrame[]

In [53]:
spark.sql('select * from dim_passenger').show(1)

+---------+-------+------+
|person_id|biryear|gender|
+---------+-------+------+
|        1| 1979.0|  null|
+---------+-------+------+
only showing top 1 row



In [None]:
def create_time(df_immigration=df_immigration):
    return df_immigration.select(
    F.col('arrdate'),
    F.split(F.col('arrdate'), "-")[0].alias('year'),
    F.split(F.col('arrdate'), "-")[1].alias('month'),
    F.split(F.col('arrdate'), "-")[2].alias('day'),
    ).createOrReplaceTempView("time"), spark.sql("create table dim_time as select * from time")

In [57]:
# Now we have to create a time dimension
df_immigration.select(
    F.col('arrdate'),
    F.split(F.col('arrdate'), "-")[0].alias('year'),
    F.split(F.col('arrdate'), "-")[1].alias('month'),
    F.split(F.col('arrdate'), "-")[2].alias('day'),
).createOrReplaceTempView("time")

spark.sql("create table dim_time as select * from time")

DataFrame[]

In [58]:
spark.sql('select * from dim_time').show(1)

+----------+----+-----+---+
|   arrdate|year|month|day|
+----------+----+-----+---+
|2016-04-01|2016|   04| 01|
+----------+----+-----+---+
only showing top 1 row



#### 4.2 Data Quality Checks

Here I will check if any of my tables do have rows with values.
 
Run Quality Checks

In [59]:
# Perform quality checks here
tables = ['fact_immigration', 'dim_temperature', 'dim_airport', 'dim_time', 'dim_passenger']

def check_rows(tables):
    for i in tables:
        if spark.sql(f'select count(*) from {i}').collect()[0][0] > 0:
            print(f'table {i} is not empty')
        else:
            print(f'table {i} is empty')

check_rows(tables)

table fact_immigration is not empty
table dim_temperature is not empty
table dim_airport is not empty
table dim_time is not empty
table dim_passenger is not empty


In [67]:
# Check if we can find all ports in dim_airpot

def check_relation():

    return spark.sql("""

    SELECT FA.port, AIR.location, AIR.state

    FROM FACT_IMMIGRATION FA
        INNER JOIN DIM_AIRPORT AIR ON (FA.port == AIR.code)


    """).select('port').distinct().count() == spark.sql('select port from fact_immigration').distinct().count()

check_relation()

True

In [85]:
spark.sql("SELECT * FROM dim_temperature").printSchema()

root
 |-- dt: string (nullable = true)
 |-- city: string (nullable = true)
 |-- port: string (nullable = true)
 |-- averagetemperature: double (nullable = true)
 |-- averagetemperatureuncertainty: double (nullable = true)



In [98]:
# Check if we can get the temperature avg in a specific port and for a specific date-month

spark.sql(f'''

SELECT avg(averagetemperature)

FROM fact_immigration FA 
    INNER JOIN dim_temperature TP

WHERE FA.PORT = "BUF"
AND TP.dt = "1821-03"

GROUP BY FA.port

''').show()

+-----------------------+
|avg(averagetemperature)|
+-----------------------+
|      6.764667728235528|
+-----------------------+



#### 4.3 Data dictionary 

## Fact Table schema:

 - |-- cicid: double (nullable = true) - Unique ID 
 - |-- personid: integer (nullable = true) - FK for passenger dimension
 - |-- timeid: string (nullable = true) - FK for time dimension
 - |-- cit: double (nullable = true)
 - |-- res: double (nullable = true)
 - |-- port: string (nullable = true) - FK for airport dimension and FK for temperature dimension 
 - |-- arrdate: string (nullable = true)
 - |-- mode: double (nullable = true)
 - |-- addr: string (nullable = true)
 - |-- depdate: double (nullable = true)
 - |-- bir: double (nullable = true)
 - |-- visa: double (nullable = true) - Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student)
 - |-- count: double (nullable = true)
 - |-- dtadfile: string (nullable = true)
 - |-- entdepa: string (nullable = true)
 - |-- entdepd: string (nullable = true)
 - |-- entdepu: string (nullable = true)
 - |-- matflag: string (nullable = true)
 - |-- dtaddto: string (nullable = true)
 - |-- airline: string (nullable = true) - Airline used to arrive in U. S.
 - |-- admnum: double (nullable = true)
 - |-- fltno: string (nullable = true)
 - |-- visatype: string (nullable = true) - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
 

## Dimension Tables

### Airport data chema

 - |-- code: string (nullable = true) - Airport code Primary Key 
 - |-- location: string (nullable = true) - Location 
 - |-- state: string (nullable = true) - State
 
### Person data schema:

 - |-- birthYear: integer (nullable = true) - 4 digit year of birth
 - |-- gender: string (nullable = true) - Gender
 - |-- personId: long (nullable = false) - Primary key
 
### Time data schema:

 |-- timeid: date (nullable = true) - Primary key
 |-- month: string (nullable = true) - Month
 |-- year: string (nullable = true) - year
 |-- day: string - day
 
 ### Temperature data schema:

 - |-- dt: string (nullable = true) - Date and month 
 - |-- city: string (nullable = true) - City 
 - |-- port: string (nullable = true) - Primary key
 - |-- averagetemperature: double (nullable = true) - Temperature
 - |-- averagetemperatureuncertainty: double (nullable = true) - Temperature


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
-- (done above)
* Propose how often the data should be updated and why.
-- Since immigration dataset updates montly, we can also update our data monthly
* Write a description of how you would approach the problem differently under the following scenarios:  


 * The data was increased by 100x.
 -- If the data is increased we can rely on cloud resources, for instance, place the data in Redshift, since it allows for querying heavy datasets.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 -- We can schedule it using airflow
 * The database needed to be accessed by 100+ people.
 -- Redshift with auto-scaling capabilities and Elastic Search for more performance of searches