# DEND Udacity Project Capstone US Immigration
### Data Engineering Capstone Project

#### Project Summary
This project perform ETL Operations on Udacity provided I94 Immigration and Demographics datasets using Spark, Python, SQL. This notebook performs Exploratory Data Analysis on used datasets.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import tools as tools
import etl as etl
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

* I plan to create a datalake about immigrants destinations in US. I will gather data from 4 sources. I will load data into staging dataframes, clean raw data, write it to parquet files and perform an ETL process using a Spark cluster. Then i will write the data into Fact & Dimension tables to form star schema. 

*Tools/Technologies*:
- `Python 3.7` is used for this project as it is the latest compatible version with `Pyspark`. I've used type hinting as well.
- `Aparch Spark (Pyspark)`: I've chosen Apache Spark for this project due to it's parallelism and ability to handle large datasets. Immigrations full dataset contains ~40MM rows. I've used a mix of Pyspark's SQL and python libraries in this project.
- Apart these two main technologies, `Pandas` was used during EDA phase and `Parquet` is used to save output from ETL process in columnar format for better aggregate operations.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
* **I94 Immigration Data**: This data comes from US National Tourism and Trade Office. Dataset consists of 12 files containing data for each month. Each file has around 3 million rows and 28 columns. A dictionay explaining columns is included at I94_SAS_Labels_Descriptions.SAS. Check the sample data at immigration_data_sample.csv. **NOTE**: I've used sample sas dataset provided in sas_data dir in workspace by Udacity. This data contains ~3MM rows which satisfies the requirement of at least 1MM rows. It contains data for April 2016 only.
* **World Temperature Data**: This dataset comes from Kaggle and includes the temperatures of various cities in the world.
* **U.S. City Demographic Data**: This data comes from OpenSoft. 
* **Airport Code Table**: This is a simple table of airport codes and corresponding cities.

In [3]:
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .config("spark.python.worker.memory", "15g")\
        .enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
df_immi_sample = pd.read_csv('immigration_data_sample.csv')
df_immi_sample.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [5]:
# World Temperature Data
temperaturefname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = pd.read_csv(temperaturefname)

In [5]:
temperature_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
# U.S. City Demographic Data
demographicfname = 'us-cities-demographics.csv'
demographic_df = pd.read_csv(demographicfname, sep=';')

In [7]:
demographic_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [8]:
airpotcodefname = 'airport-codes_csv.csv'
airport_df = pd.read_csv(airpotcodefname)

In [9]:
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [None]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [13]:
df_spark.createOrReplaceTempView("immigration_table")

### Step 2: Explore and Assess the Data
* Refer to the "Explore and Assess the Data" notebook for data exploration and analysis
#### Cleaning Steps
* Drop columns containing over 95% missing values
* Drop duplicate values

In [10]:
# Drop columns with over 95% missing values
clean_temperature = tools.eliminate_column_missing_data(temperature_df)

Dropping missing data...
Cleaning complete!


In [11]:
clean_temperature = tools.drop_duplicate_rows(clean_temperature)

Dropping duplicate data...
Drops 0 columns
Cleaning complete!


In [12]:
# Drop columns with over 95% missing values
clean_airport_codes = tools.eliminate_column_missing_data(airport_df)

Dropping missing data...
Cleaning complete!


In [13]:
clean_airport_codes = tools.drop_duplicate_rows(airport_df)

Dropping duplicate data...
Drops 0 columns
Cleaning complete!


In [14]:
# When data df spark too large can meet error Java Heap when using eliminate_column_missing_data, so we will clean by spark tools.
clean_immigration = tools.drop_duplicate_rows(df_immi_sample)

Dropping duplicate data...
Drops 0 columns
Cleaning complete!


In [15]:
# Drop columns with over 95% missing values
clean_demographics = tools.eliminate_column_missing_data(demographic_df)

Dropping missing data...
Cleaning complete!


In [16]:
clean_demographics = tools.drop_duplicate_rows(clean_demographics)

Dropping duplicate data...
Drops 0 columns
Cleaning complete!


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model:

**The Business Process: Immigration department follows their business process of migrants into country. This process generates events which are captured and translated to fact table.**

**Identify the Dimensions:**
1. dim_visa
2. dim_temperature
3. dim_country
4. dim_state
5. dim_time
6. dim_airport
7. dim_port
**Identify the Facts:**
1. fact_immigration

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Load data into staging tables
2. Create Dimension tables
3. Create Fact tables
4. Write data into parquet files
5. Perform data quality checks
6. Perform data analysis

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [4]:
output_path = "data/"

In [22]:
# Staging data
df_countries = pd.read_csv('countries.csv')
df_ports = pd.read_csv('ports.csv')
df_visas = pd.read_csv('visas.csv')
df_states = pd.read_csv('states.csv')

In [34]:
# Create immigration schema
df_spark = df_spark.withColumn("cicid",df_spark.cicid.cast(FloatType()))\
                   .withColumn("i94yr",df_spark.i94yr.cast(FloatType()))\
                   .withColumn("i94mon",df_spark.i94mon.cast(FloatType()))\
                   .withColumn("i94cit",df_spark.i94cit.cast(FloatType()))\
                   .withColumn("i94res",df_spark.i94res.cast(FloatType()))\
                   .withColumn("arrdate",df_spark.arrdate.cast(FloatType()))\
                   .withColumn("i94mode",df_spark.i94mode.cast(FloatType()))\
                   .withColumn("depdate",df_spark.depdate.cast(FloatType()))\
                   .withColumn("i94bir",df_spark.i94bir.cast(FloatType()))\
                   .withColumn("i94visa",df_spark.i94visa.cast(FloatType()))\
                   .withColumn("count",col("count").cast(FloatType()))\
                   .withColumn("biryear",df_spark.biryear.cast(FloatType()))\
                   .withColumn("insnum",df_spark.insnum.cast(FloatType()))\
                   .withColumn("admnum",df_spark.admnum.cast(FloatType()))
df_spark.printSchema()

root
 |-- cicid: float (nullable = true)
 |-- i94yr: float (nullable = true)
 |-- i94mon: float (nullable = true)
 |-- i94cit: float (nullable = true)
 |-- i94res: float (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: float (nullable = true)
 |-- i94mode: float (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: float (nullable = true)
 |-- i94bir: float (nullable = true)
 |-- i94visa: float (nullable = true)
 |-- count: float (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: float (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: float (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: float (nullable = true)
 |-- flt

In [25]:
# create schema for temperature
temperature_schema = StructType([StructField("dt", StringType(), True)\
                          ,StructField("AverageTemperature", FloatType(), True)\
                          ,StructField("AverageTemperatureUncertainty", FloatType(), True)\
                          ,StructField("City", StringType(), True)\
                          ,StructField("Country", StringType(), True)\
                          ,StructField("Latitude", StringType(), True)\
                          ,StructField("Longitude", StringType(), True)])

In [26]:
# create schema for temperature
temperature_spark = spark.read.csv(temperaturefname, header=True, inferSchema=True)

In [35]:
temperature_spark = temperature_spark.withColumn("AverageTemperature",col("AverageTemperature").cast(FloatType()))\
                                     .withColumn("AverageTemperatureUncertainty",col("AverageTemperatureUncertainty").cast(FloatType()))
temperature_spark.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: float (nullable = true)
 |-- AverageTemperatureUncertainty: float (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [36]:
# create schema for demographics
demographics_schema = StructType([StructField("City", StringType(), True)\
                        ,StructField("State", StringType(), True)\
                        ,StructField("Median Age", FloatType(), True)\
                        ,StructField("Male Population", FloatType(), True)\
                        ,StructField("Female Population", FloatType(), True)\
                        ,StructField("Total Population", IntegerType(), True)\
                        ,StructField("Number of Veterans", FloatType(), True)\
                        ,StructField("Foreign-born", FloatType(), True)\
                        ,StructField("Average Household Size", FloatType(), True)\
                        ,StructField("State Code", StringType(), True)\
                        ,StructField("Race", StringType(), True)\
                        ,StructField("Count", IntegerType(), True)])

demographics_spark = spark.createDataFrame(clean_demographics, schema=demographics_schema)

In [37]:
demographics_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: float (nullable = true)
 |-- Male Population: float (nullable = true)
 |-- Female Population: float (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: float (nullable = true)
 |-- Foreign-born: float (nullable = true)
 |-- Average Household Size: float (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [38]:
# create schema for airport
airport_codes_schema = StructType([StructField("ident", StringType(), True)\
                        ,StructField("type", StringType(), True)\
                        ,StructField("name", StringType(), True)\
                        ,StructField("elevation_ft", FloatType(), True)\
                        ,StructField("continent", StringType(), True)\
                        ,StructField("iso_country", StringType(), True)\
                        ,StructField("iso_region", StringType(), True)\
                        ,StructField("municipality", StringType(), True)\
                        ,StructField("gps_code", StringType(), True)\
                        ,StructField("iata_code", StringType(), True)\
                        ,StructField("local_code", StringType(), True)\
                        ,StructField("coordinates", StringType(), True)])
airport_codes_spark = spark.createDataFrame(clean_airport_codes, schema=airport_codes_schema)

In [39]:
airport_codes_spark.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: float (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [40]:
# create schema for airport
country_spark = spark.createDataFrame(df_countries)

In [41]:
country_spark.printSchema()

root
 |-- code: long (nullable = true)
 |-- country: string (nullable = true)



In [42]:
# create schema for port
port_schema = StructType([StructField("code", StringType(), True)\
                        ,StructField("location", StringType(), True)\
                        ,StructField("state", StringType(), True)])
port_spark = spark.createDataFrame(df_ports, schema=port_schema)

In [43]:
port_spark.printSchema()

root
 |-- code: string (nullable = true)
 |-- location: string (nullable = true)
 |-- state: string (nullable = true)



***create dim_visa***

In [44]:
visa_df = etl.create_dim_visa(df_spark)

In [45]:
# write parquet files
tools.write_to_parquet(visa_df, output_path, 'visa')

Writing table visa to data/visa
Writing completed


In [5]:
visa = spark.read.parquet("data/visa")
visa.toPandas().head()

Unnamed: 0,visa_id,i94visa,visatype,visapost
0,8901,3.0,F1,PRS
1,37333,3.0,F1,OTT
2,52212,2.0,B2,ALB
3,69139,1.0,B1,LUS
4,159998,1.0,I,SHG


***create dim_country***

In [47]:
country_df = etl.create_dim_country(country_spark)

In [48]:
# write parquet files
tools.write_to_parquet(country_df, output_path, 'country')

Writing table country to data/country
Writing completed


In [6]:
country = spark.read.parquet("data/country")
country.printSchema()

root
 |-- country_code: long (nullable = true)
 |-- country_name: string (nullable = true)



***create dim_state***

In [50]:
state_df = etl.create_dim_state(demographics_spark)

In [51]:
# write parquet files
tools.write_to_parquet(state_df, output_path, 'state')

Writing table state to data/state
Writing completed


In [7]:
state = spark.read.parquet("data/state")
state.toPandas().head()

Unnamed: 0,state_code,state,median_age,total_population,male_population,female_population,foreign_born,average_household_size
0,DC,District of Columbia,33.8,3361140,1598525.0,1762615.0,475585.0,2.24
1,AR,Arkansas,32.74,2882889,1400724.0,1482165.0,307753.0,2.53
2,TN,Tennessee,34.31,10690165,5124189.0,5565976.0,900149.0,2.46
3,LA,Louisiana,34.63,6502975,3134990.0,3367985.0,417095.0,2.47
4,AZ,Arizona,35.04,22497710,11137275.0,11360435.0,3411565.0,2.77


***create dim_time***

In [53]:
time_df = etl.create_dim_time(df_spark)

In [54]:
# write parquet files
tools.write_to_parquet(time_df, output_path, 'time')

Writing table time to data/time
Writing completed


In [8]:
time = spark.read.parquet("data/time")
time.toPandas().head()

Unnamed: 0,arrdate,date,day,month,year,week,weekday
0,20571.0,2016-04-27,27,4,2016,17,4
1,20567.0,2016-04-23,23,4,2016,16,7
2,20564.0,2016-04-20,20,4,2016,16,4
3,20574.0,2016-04-30,30,4,2016,17,7
4,20560.0,2016-04-16,16,4,2016,15,7


***create dim_airport***

In [56]:
airport_df = etl.create_dim_airport(airport_codes_spark)

In [57]:
# write parquet files
tools.write_to_parquet(airport_codes_spark, output_path, 'airport')

Writing table airport to data/airport
Writing completed


In [9]:
airport = spark.read.parquet("data/airport")
airport.toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


***create dim_port***

In [59]:
port_df = etl.create_dim_port(port_spark)

In [60]:
# write parquet files
tools.write_to_parquet(port_df, output_path, 'port')

Writing table port to data/port
Writing completed


In [10]:
port = spark.read.parquet("data/port")
port.toPandas().head()

Unnamed: 0,port_code,city,state_code
0,EPT,EASTPORT MUNICIPAL,ME
1,SPA,ST PAMPILE,ME
2,GTF,Collapsed into INT,MN
3,WSB,WARROAD INTL,SPB
4,GRE,GREAT FALLS,MT


***create dim_temperature***

In [62]:
temperature_staging = etl.create_staging_temperature(temperature_spark)

In [63]:
# write parquet files
tools.write_to_parquet(temperature_staging, output_path, 'staging_temperature')

Writing table staging_temperature to data/staging_temperature
Writing completed


In [21]:
temperature_staging = spark.read.parquet("data/staging_temperature")
temperature_staging.toPandas().head()

Unnamed: 0,temperature_id,country,average_temperature,average_temperature_uncertainty
0,1675037245440,United Kingdom,9.1,1.59
1,1675037245441,Moldova,8.67,1.51
2,1675037245442,Congo (Democratic Republic Of The),23.24,0.7
3,1649267441664,Bosnia And Herzegovina,10.45,1.57
4,1236950581248,United Arab Emirates,26.57,0.84


In [22]:
# merge countries and temperature data.
country_temp = country.select(["*"])\
            .join(temperature_staging, (upper(trim(country.country_name)) == upper(trim(temperature_staging.country))), how='full')\
            .select([country.country_code, country.country_name, temperature_staging.temperature_id, temperature_staging.average_temperature, temperature_staging.average_temperature_uncertainty])\
            .dropna(subset="temperature_id")

In [23]:
# write parquet files
tools.write_to_parquet(country_temp, output_path, 'temperature')

Writing table temperature to data/temperature
Writing completed


In [24]:
temperature = spark.read.parquet("data/temperature")
temperature.count()

159

***create fact_immigration***

In [25]:
# (spark, immigration_df, countries_df, states_df, ports_df, visas_df, temperature_df, airports_df, time_df)
immigration = etl.create_fact_immigration(spark, df_spark, country, state, port, visa, temperature, airport, time)

In [26]:
# Check sum data of immigration
immigration.count()

3096313

In [27]:
# Check sum data of immigration
spark.sql("""
    SELECT 
        count(DISTINCT si.cicid)
    FROM staging_immigration_data si 
    LEFT JOIN staging_states_data ss ON si.i94addr = ss.state_code 
    LEFT JOIN staging_visas_data sv ON 
    (si.i94visa = sv.i94visa
    AND si.visatype = sv.visatype
    AND si.visapost = sv.visapost)
    LEFT JOIN staging_ports_data sp ON sp.port_code = si.i94port 
    LEFT JOIN staging_temperature_data st ON si.i94res = st.country_code 
    LEFT JOIN staging_countries_data sc ON sc.country_code = si.i94res 
    LEFT JOIN staging_airports_data sa ON sa.ident = si.i94port 
    LEFT JOIN staging_time_data std ON std.arrdate = si.arrdate  
    """).show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



In [28]:
# write parquet files
tools.write_to_parquet(immigration, output_path, 'fact_immigration')

Writing table fact_immigration to data/fact_immigration
Writing completed


In [29]:
immigration = spark.read.parquet("data/fact_immigration")
immigration.count()

3096313

**For Analysis**

In [30]:
immigration.createOrReplaceTempView('fact_immigration')
temperature.createOrReplaceTempView('dim_temperature')
country.createOrReplaceTempView('dim_country')
state.createOrReplaceTempView('dim_state')
port.createOrReplaceTempView('dim_port')
visa.createOrReplaceTempView('dim_visa')
airport.createOrReplaceTempView('dim_airport')
time.createOrReplaceTempView('dim_time')

In [30]:
# Which city was most visited in a specific month?
# This data is from April file, let's try for it
# numeric code for April is 4

spark.sql("""
    SELECT
        tvc.port_code,
        tvc.immigrant_visits,
        dp.city,
        dp.state_code
    FROM
        (SELECT 
            fi.port_code AS port_code, 
            COUNT(*) AS immigrant_visits
        FROM fact_immigration fi 
        WHERE fi.i94mon = 4
        GROUP BY fi.port_code
        ORDER BY immigrant_visits DESC
        LIMIT 10
        ) AS tvc 
    JOIN dim_port dp
        ON dp.port_code = tvc.port_code
    ORDER BY tvc.immigrant_visits DESC
""").show()

+---------+----------------+----------------+----------+
|port_code|immigrant_visits|            city|state_code|
+---------+----------------+----------------+----------+
|      NYC|          485916|        NEW YORK|        NY|
|      MIA|          343941|           MIAMI|        FL|
|      LOS|          310163|     LOS ANGELES|        CA|
|      SFR|          152586|   SAN FRANCISCO|        CA|
|      ORL|          149195|         ORLANDO|        FL|
|      HHW|          142720|        HONOLULU|        HI|
|      NEW|          136122|NEWARK/TETERBORO|        NJ|
|      CHI|          130564|         CHICAGO|        IL|
|      HOU|          101481|         HOUSTON|        TX|
|      FTL|           95977| FORT LAUDERDALE|        FL|
+---------+----------------+----------------+----------+



In [34]:
#From which country (or countries) travelers originate? Top countries of origin.
spark.sql("""
    SELECT *
    FROM
        (SELECT 
            fi.country_code AS origin_country_code, 
            COUNT(*) AS country_visitors
        FROM fact_immigration fi 
        GROUP BY fi.country_code
        ORDER BY country_visitors DESC
        LIMIT 10
        ) AS tcv
    JOIN dim_country dc
        ON tcv.origin_country_code = dc.country_code
    ORDER BY country_visitors DESC
""").show()

+-------------------+----------------+------------+--------------------+
|origin_country_code|country_visitors|country_code|        country_name|
+-------------------+----------------+------------+--------------------+
|                135|          368421|         135|      UNITED KINGDOM|
|                209|          249167|         209|               JAPAN|
|                245|          185609|         245|          CHINA, PRC|
|                111|          185339|         111|              FRANCE|
|                582|          179603|         582|MEXICO Air Sea, a...|
|                112|          156613|         112|             GERMANY|
|                276|          136312|         276|         SOUTH KOREA|
|                689|          134907|         689|              BRAZIL|
|                438|          112407|         438|           AUSTRALIA|
|                213|          107193|         213|               INDIA|
+-------------------+----------------+------------+

In [49]:
# Top countries from where students are coming?
# visa_type = B1(SAS file)
spark.sql("""
    SELECT *
    FROM
        (SELECT 
            fi.country_code AS origin_country_code, 
            COUNT(*) AS student_visitors
        FROM fact_immigration fi
        WHERE fi.visa_type = 'B1'
        GROUP BY fi.country_code
        ORDER BY student_visitors DESC
        LIMIT 10
        ) AS tcv
    JOIN dim_country dc
        ON tcv.origin_country_code = dc.country_code
    ORDER BY student_visitors DESC
""").show()

+-------------------+----------------+------------+--------------------+
|origin_country_code|student_visitors|country_code|        country_name|
+-------------------+----------------+------------+--------------------+
|                245|           32284|         245|          CHINA, PRC|
|                582|           31031|         582|MEXICO Air Sea, a...|
|                213|           22485|         213|               INDIA|
|                689|           14044|         689|              BRAZIL|
|                691|            6748|         691|            COLOMBIA|
|                251|            6147|         251|              ISRAEL|
|                135|            4261|         135|      UNITED KINGDOM|
|                687|            4196|         687|           ARGENTINA|
|                107|            4167|         107|              POLAND|
|                264|            3892|         264|              TURKEY|
+-------------------+----------------+------------+

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [32]:
tools.quality_check(immigration,
                  country,
                  time,
                  airport,
                  temperature,
                  visa,
                  port,
                  state,
                  spark)

Data quality check passed for Immigrations fact with record_count: 3096313 records.
Data quality check passed for Dim time with record_count: 30 records.
Data quality check passed for Dim airport with record_count: 55075 records.
Data quality check passed for Dim temperature with record_count: 159 records.
Data quality check passed for Dim port with record_count: 660 records.
Data quality check passed for Dim visa with record_count: 2101 records.
Data quality check passed for Dim state with record_count: 47 records.
Data quality check passed for Dim country with record_count: 289 records.
All tables passed.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**refer to Data_Dictionary.txt**

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**1. Clearly state the rationale for the choice of tools and technologies for the project.**
- `Python 3.7` is used for this project as it is the latest compatible version with `Pyspark`. I've used type hinting as well.
- `Aparch Spark (Pyspark)`: I've chosen Apache Spark for this project due to it's parallelism and ability to handle large datasets. Immigrations full dataset contains ~40MM rows. I've used a mix of Pyspark's SQL and python libraries in this project.
- Apart these two main technologies, `Pandas` was used during EDA phase and `Parquet` is used to save output from ETL process in columnar format for better aggregate operations.
- `Pandas` due to its convenient dataframe manipulation functions

**2. Propose how often the data should be updated and why.**

- The immigration (i94) data set is updated monthly, hence all relevant data should be updated monthly as well

**3. Write a description of how you would approach the problem differently under the following scenarios:**
- When data increased by 100x: Our data would bs store in AWS S3. Still use Spark as it as our data processing platform since it is the best suited for large datasets
- When need to scheduled data pipelines, We can use Apache Airflow to perform ETL and data quality validation
- When database need to access by 100+ people: We can use postgres database on a redshift cluster