# Data Engineering for Analysis on i94Immigration Data from US
### Udacity Data Engineering Capstone Project

#### Project Summary
In this project, I worked with four datasets from different sources, designed a Star Schema for those data and prepared them ready for interested analysis on immigration to USA. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# imports and installs 
import pandas as pd
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession,Window
from pyspark.sql.types import *
from pyspark.sql.functions import *

### Step 1: Scope the Project and Gather Data (see README)

### Step 2: Explore and Assess the Data

In [2]:
	# creating sparksession 
spark = SparkSession.builder.\
                    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11"). \
                    enableHiveSupport().getOrCreate()

### 2.1 I94 Immigration Data

#### 2.1.1 Data Exploring

In [3]:
# load i94immigration data from local
df_sas =spark.read.format('com.github.saurfang.sas.spark') \
            .load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#write to parquet
#df_sas.write.parquet("sas_data")
# read in parquet files
df_sas=spark.read.parquet("sas_data")

In [4]:
# explore data: check columns
df_sas.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [5]:
# explore data: show first 5 rows
df_sas.show(n=5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [6]:
# explore data: check interested columns 
df_sas.select(col('airline'),col('i94port'),col('i94addr')).distinct().sort(df_sas.airline.desc()).show(truncate=False)

+-------+-------+-------+
|airline|i94port|i94addr|
+-------+-------+-------+
|ZZ     |MIA    |NC     |
|ZZ     |WAS    |NY     |
|ZZ     |SEA    |WA     |
|ZZ     |SFR    |DE     |
|ZZ     |NYC    |NJ     |
|ZZ     |ADW    |null   |
|ZZ     |SFR    |NY     |
|ZZ     |MIA    |FL     |
|ZZ     |BRO    |TX     |
|ZZ     |NYC    |CT     |
|ZZ     |ATL    |NY     |
|ZZ     |NEW    |HI     |
|ZZ     |ADW    |MD     |
|ZZ     |NYC    |NY     |
|ZZ     |HHW    |HI     |
|ZX     |WAS    |IN     |
|ZX     |FTL    |MN     |
|ZX     |TOR    |GA     |
|ZX     |FTL    |OH     |
|ZX     |HHW    |CT     |
+-------+-------+-------+
only showing top 20 rows



In [7]:
# explore data: count rows
df_sas.count()

3096313

In [8]:
# explore data: Get count of both null and missing values
df_sas.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_sas.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

#### 2.1.2 Data Cleaning

In [6]:
def clear_df_sas(read_format, input_data, output_data):
    """
    load df_sas data and clean it and write to output_data
    Params:
        read_format  : format of the input data
        input_data   : stored location of df_sas
        output_data  : location of output data to be saved. 
    """
    # load i94immigration data from local
    df_sas =spark.read.format(read_format) \
            .load(input_data)
    
    # transfer dates to spark datetype, format column names and datatypes, drop columns and duplicated rows
    df_sas = df_sas.withColumn("data_base_sas", to_date(lit("01/01/1960"), "MM/dd/yyyy")) \
            .withColumn("arrival_date", expr("date_add(data_base_sas, arrdate)")) \
            .withColumn("departure_date", expr("date_add(data_base_sas, depdate)")) \
            .drop("data_base_sas", "arrdate", "depdate") \
            .withColumn("cic_id",col("cicid").cast(IntegerType())).drop("cicid") \
            .withColumn("arrive_year",col('i94yr').cast(IntegerType())).drop("i94yr") \
            .withColumn("arrive_month",col('i94mon').cast(IntegerType())).drop("i94mon") \
            .withColumn("citizen_country",col('i94cit').cast(IntegerType())).drop("i94cit") \
            .withColumn("resident_country",col('i94res').cast(IntegerType())).drop("i94res") \
            .withColumn("age",col('i94bir').cast(IntegerType())).drop("i94bir") \
            .withColumn("birth_year",col('biryear').cast(IntegerType())).drop("biryear") \
            .withColumn("visa_class",col('i94visa').cast(IntegerType())).drop("i94visa") \
            .withColumn("mode",col('i94mode').cast(IntegerType())).drop("i94mode") \
            .withColumn("allowed_date", to_date("dtaddto", "MMddyyyy")) \
            .withColumnRenamed("i94port", "port") \
            .withColumnRenamed("i94addr","arrive_state") \
            .withColumnRenamed("visapost","visa_issue_state") \
            .withColumnRenamed("entdepa","arrive_flag") \
            .withColumnRenamed("entdepd","departure_flag") \
            .withColumnRenamed("matflag","match_flag") \
            .withColumnRenamed("entdepu","update_flag") \
            .withColumnRenamed("fltno","flight_num") \
            .withColumnRenamed("visatype","visa_type") \
            .withColumnRenamed("visapost","visa_issue_state") \
            .withColumnRenamed("occup","occupation") \
            .drop('count','dtadfile','insnum','admnum','dtaddto') \
            .distinct()
    
    # write the end data to parquet file in workspace
    # remove directory if already exist
    if os.path.exists(output_data + 'immigration'):
        os.rmdir(output_data + 'immigration')
    else:
        df_sas.write.partitionBy('arrive_state').parquet('immigration')    

In [7]:
# initiate the prams and call the clear_df_sas function to clean the df_sas data and write the end data to local
df_sas_format    = 'com.github.saurfang.sas.spark'
df_sas_read_from = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_sas_save_to   = 'file:/home/workspace/'
clear_df_sas(df_sas_format,df_sas_read_from,df_sas_save_to)

In [8]:
# read in cleaned df_sas, check schema
df_sas_clear = spark.read.parquet('immigration')
df_sas_clear.printSchema()

root
 |-- port: string (nullable = true)
 |-- visa_issue_state: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- arrive_flag: string (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- update_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight_num: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- cic_id: integer (nullable = true)
 |-- arrive_year: integer (nullable = true)
 |-- arrive_month: integer (nullable = true)
 |-- citizen_country: integer (nullable = true)
 |-- resident_country: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- visa_class: integer (nullable = true)
 |-- mode: integer (nullable = true)
 |-- allowed_date: date (nullable = true)
 |-- arrive_state: stri

In [14]:
# show first 5 rows of clean data
df_sas_clear.show(n=5)

+----+----------------+----------+-----------+--------------+-----------+----------+------+-------+----------+---------+------------+--------------+-------+-----------+------------+---------------+----------------+---+----------+----------+----+------------+------------+
|port|visa_issue_state|occupation|arrive_flag|departure_flag|update_flag|match_flag|gender|airline|flight_num|visa_type|arrival_date|departure_date| cic_id|arrive_year|arrive_month|citizen_country|resident_country|age|birth_year|visa_class|mode|allowed_date|arrive_state|
+----+----------------+----------+-----------+--------------+-----------+----------+------+-------+----------+---------+------------+--------------+-------+-----------+------------+---------------+----------------+---+----------+----------+----+------------+------------+
| AGA|            null|      null|          G|             O|       null|         M|     M|     UA|     00150|       WT|  2016-04-30|    2016-05-02|5750212|       2016|           4|   

### 2.2 U.S. City Demographic Data

#### 2.2.1 Data Exploring

In [15]:
#load us-cities-demographics.csv
df_demo = spark.read.csv("us-cities-demographics.csv",header=True,sep=";")
df_demo.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [16]:
df_demo.printSchema()
df_demo.count()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



2891

In [18]:
### Get count of both null and missing values
df_demo.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_demo.columns]).show()

+----+-----+----------+----+-----+----------+---------------+-----------------+----------------+------------+-----------------------+------------------+
|city|state|state_code|race|count|median_age|male_population|female_population|total_population|veterans_num|foreign_born_population|avg_household_size|
+----+-----+----------+----+-----+----------+---------------+-----------------+----------------+------------+-----------------------+------------------+
|   0|    0|         0|   0|    0|         0|              3|                3|               0|          13|                     13|                16|
+----+-----+----------+----+-----+----------+---------------+-----------------+----------------+------------+-----------------------+------------------+



In [19]:
# show rows with null values in 'male_population' column
df_demo.select('city','state','race','count','male_population','female_population','veterans_num','foreign_born_population','avg_household_size') \
    .where(col('male_population').isNull()).show()

+------------+-------+--------------------+-----+---------------+-----------------+------------+-----------------------+------------------+
|        city|  state|                race|count|male_population|female_population|veterans_num|foreign_born_population|avg_household_size|
+------------+-------+--------------------+-----+---------------+-----------------+------------+-----------------------+------------------+
|The Villages|Florida|               White|72211|           null|             null|       15231|                   4034|              null|
|The Villages|Florida|Black or African-...|  331|           null|             null|       15231|                   4034|              null|
|The Villages|Florida|  Hispanic or Latino| 1066|           null|             null|       15231|                   4034|              null|
+------------+-------+--------------------+-----+---------------+-----------------+------------+-----------------------+------------------+



In [20]:
# check distinct values in 'race' column
df_demo.select('race').distinct().show(truncate = False)

+---------------------------------+
|race                             |
+---------------------------------+
|Black or African-American        |
|Hispanic or Latino               |
|White                            |
|Asian                            |
|American Indian and Alaska Native|
+---------------------------------+



#### 2.2.2 Data Cleaning

In [18]:
def clean_df_demography(delimiter, input_data, output_data):
    """
    cleaning df_demo data
    prams:
        delimiter  : delimiter of csv file
        input_data : location of the source data
        output_data: location where to save the clean data

    """
    df_demo = spark.read.csv(input_data,header=True,sep = delimiter)

    
    # change datatypes, format column names and delete duplicates
    df_demo = df_demo.withColumn("median_age",col("Median Age").cast(FloatType())).drop("Median Age") \
        .withColumn("male_population",col("Male Population").cast(IntegerType())).drop("Male Population") \
        .withColumn("female_population",col("Female Population").cast(IntegerType())).drop("Female Population") \
        .withColumn("total_population",col("Total Population").cast(IntegerType())).drop("Total Population") \
        .withColumn("veterans_num",col("Number of Veterans").cast(IntegerType())).drop("Number of Veterans") \
        .withColumn("foreign_born_population",col("Foreign-born").cast(IntegerType())).drop("Foreign-born") \
        .withColumn("avg_household_size",col("Average Household Size").cast(FloatType())).drop("Average Household Size") \
        .withColumn("count",col("Count").cast(IntegerType())) \
        .withColumnRenamed("City", "city") \
        .withColumnRenamed("State", "state") \
        .withColumnRenamed("State Code", "state_code") \
        .withColumnRenamed("Race", "race") \
        .distinct()
    
    # pivot table to make each race population into seperate columns, change column names
    df_demo = df_demo.groupBy(col("city"),col("state"),col("median_age") \
                        ,col("male_population"),col("female_population") \
                        ,col("total_population"),col("veterans_num") \
                        ,col("foreign_born_population"),col("avg_household_size") \
                        ,col("state_code")) \
                    .pivot("race").agg(sum("count").cast("integer")) \
                    .fillna({"American Indian and Alaska Native": 0,
                     "Asian": 0,
                     "Black or African-American": 0,
                     "Hispanic or Latino": 0,
                     "White": 0}) \
                    .withColumnRenamed("American Indian and Alaska Native", "american_indian_alaska_native") \
                    .withColumnRenamed("Asian","asian") \
                    .withColumnRenamed("Black or African-American","african_american") \
                    .withColumnRenamed("Hispanic or Latino","hispanic_latino") \
                    .withColumnRenamed("White","white")
    
    # write data to local as parquet files
    # remove directory if already exist
    if os.path.exists(output_data + 'demography'):
        os.rmdir(output_data + 'demography')
    else:
        df_demo.write.partitionBy('state','city').parquet('demography')

In [19]:
df_demo_read_from = 'us-cities-demographics.csv'
df_demo_save_to   = 'file:/home/workspace/'
delimiter         = ';'
clean_df_demography(delimiter,df_demo_read_from,df_demo_save_to)

In [9]:
# read in cleaned df_demo, check schema and first 5 rows
df_demo_clear = spark.read.parquet('demography')
df_demo_clear.printSchema()
df_demo_clear.show(5)

root
 |-- median_age: float (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- veterans_num: integer (nullable = true)
 |-- foreign_born_population: integer (nullable = true)
 |-- avg_household_size: float (nullable = true)
 |-- state_code: string (nullable = true)
 |-- american_indian_alaska_native: integer (nullable = true)
 |-- asian: integer (nullable = true)
 |-- african_american: integer (nullable = true)
 |-- hispanic_latino: integer (nullable = true)
 |-- white: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)

+----------+---------------+-----------------+----------------+------------+-----------------------+------------------+----------+-----------------------------+-----+----------------+---------------+------+----------+---------+
|median_age|male_population|female_population|total_population|veterans_num|fo

### 2.3 Airport Code Table

#### 2.3.1 Data Exploring

In [22]:
# load airport-codes_csv.csv
df_airport = spark.read.csv("airport-codes_csv.csv",header=True,sep=",")
df_airport.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [17]:
# count distinct countries
df_airport.select('iso_country').distinct().count()

244

In [201]:
# select rows for US
df_airport.filter(df_airport.iso_country == "US").show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [202]:
# count rows
df_airport.count()

55075

In [203]:
# count rows where country = US
#df_airport.select('iso_country' == 'US').distinct().count()
df_airport.select('iso_country').where("iso_country = 'US'").count()

22757

In [204]:
# count distince iata_code for US
df_airport.select('iata_code').filter(df_airport.iso_country == "US").distinct().count()

2015

In [205]:
# count Null iata_code for US
df_airport.where(col("iata_code").isNull()).filter(df_airport.iso_country == "US").count()

20738

In [21]:
# check "code" and city columns to find out the relationship
df_airport.select(col('iata_code'),col('iso_country'),col('iso_region'),col('municipality')) \
            .distinct().sort(df_airport.iata_code.desc()).show(truncate=False)

+---------+-----------+----------+------------------------------+
|iata_code|iso_country|iso_region|municipality                  |
+---------+-----------+----------+------------------------------+
|ZZV      |US         |US-OH     |Zanesville                    |
|ZPH      |US         |US-FL     |Zephyrhills                   |
|ZNC      |US         |US-AK     |Nyac                          |
|YUM      |US         |US-AZ     |Yuma                          |
|YNG      |US         |US-OH     |Youngstown/Warren             |
|YKN      |US         |US-SD     |Yankton                       |
|YKM      |US         |US-WA     |Yakima                        |
|YIP      |US         |US-MI     |Detroit                       |
|YAK      |US         |US-AK     |Yakutat                       |
|XSD      |US         |US-NV     |Tonopah                       |
|XPR      |US         |US-SD     |Pine Ridge                    |
|XNA      |US         |US-AR     |Fayetteville/Springdale/Rogers|
|XMD      

In [24]:
### Get count of both null and missing values
df_airport.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_airport.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+---------+--------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|longitude|latitude|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+---------+--------+
|    0|   0|   0|          34|        0|          0|         0|           6|      81|        0|        50|        0|       0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+---------+--------+



In [26]:
# check id, gps_code, iata_code, local_code and explor the relationship between those codes
df_airport.select('ident','gps_code','iata_code','local_code').orderBy(col('iata_code').desc()).show(10)

+-----+--------+---------+----------+
|ident|gps_code|iata_code|local_code|
+-----+--------+---------+----------+
| KZZV|    KZZV|      ZZV|       ZZV|
| KZPH|    KZPH|      ZPH|       ZPH|
|  ZNC|     ZNC|      ZNC|       ZNC|
| KNYL|    KNYL|      YUM|       NYL|
| KYNG|    KYNG|      YNG|       YNG|
| KYKN|    KYKN|      YKN|       YKN|
| KYKM|    KYKM|      YKM|       YKM|
| KYIP|    KYIP|      YIP|       YIP|
| PAYA|    PAYA|      YAK|       YAK|
| KTNX|    KTNX|      XSD|       TNX|
+-----+--------+---------+----------+
only showing top 10 rows



#### 2.3.2 Data Cleaning

In [23]:
def clean_df_airport(delimiter,input_data,output_data):
    """
    cleaning df_airport data
        delimiter  : delimiter of csv file
        input_data : location of the source data
        output_data: location where to save the clean data   
    """
    
    # load data
    df_airport = spark.read.csv(input_data,header=True,sep=delimiter)

    # delete rows with iata_code as Null, None or empty string where country == US
    df_airport = df_airport.where("iso_country = 'US'") \
                        .filter(col('iata_code').isNotNull() | 
                        ~col('iata_code').contains('None') | \
                        ~col('iata_code').contains('NULL') | \
                            (col('iata_code') != '' ))
    
    # split "coordinates" into seperate columns
    split_col = split(df_airport['coordinates'], ',')
    df_airport = df_airport.withColumn('longitude', split_col.getItem(0)) \
                        .withColumn('latitude', split_col.getItem(1)) \
                        .drop('coordinates','continent','iso_country')
    
    # write the end data to output location as parquet files
    # remove directory if already exist
    if os.path.exists(output_data + 'airport'):
        os.rmdir(output_data + 'airport')
    else:
    # write to parquet
        df_airport.write.partitionBy('iso_region').parquet('airport')

In [24]:
df_airport_delimiter = ","
df_airport_read_from = "airport-codes_csv.csv"
df_airport_save_to   = "file:/home/workspace/"
clean_df_airport(df_airport_delimiter,df_airport_read_from,df_airport_save_to)

In [10]:
# read in cleaned df_airport, check schema and first 5 rows
df_airport_clear = spark.read.parquet('airport')
df_airport_clear.printSchema()
df_airport_clear.show(5)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- iso_region: string (nullable = true)

+-----+-------------+--------------------+------------+-------------+--------+---------+----------+--------------+--------------+----------+
|ident|         type|                name|elevation_ft| municipality|gps_code|iata_code|local_code|     longitude|      latitude|iso_region|
+-----+-------------+--------------------+------------+-------------+--------+---------+----------+--------------+--------------+----------+
|  0AK|small_airport|Pilot Station Air...|         305|Pilot Station|    null|      PQS|       0AK|   -162.899994|     61.934601

### 2.4 World Temperature Data

#### 2.4.1 Data Exploring

In [26]:
# load world temperature data
#fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = spark.read.csv("../../data2/GlobalLandTemperaturesByCity.csv",header=True,sep=",")
df_temperature.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [46]:
df_temperature.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [47]:
### Get count of both null and missing values
df_temperature.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_temperature.columns]).show()

+----+-------+--------+---------+
|City|Country|Latitude|Longitude|
+----+-------+--------+---------+
|   0|      0|       0|        0|
+----+-------+--------+---------+



#### 2.4.2 Data Cleaning

In [31]:
def clean_df_temperature(delimiter,input_data,output_data):
    """
    cleaning df_airport data
        delimiter  : delimiter of csv file
        input_data : location of the source data
        output_data: location where to save the clean data   
    """
    # load data
    df_temperature = spark.read.csv(input_data,header=True,sep=delimiter) 
    
    # slice table with interested columns (City, Country, Latitude and Longitude) only
    df_temperature = df_temperature.select('City','Country','Latitude','Longitude') \
                                .filter(df_temperature.Country == 'United States') \
                                .distinct()
    
    # drop duplicated rows and change column names
    df_temperature = df_temperature.dropDuplicates() \
                                .withColumnRenamed("City","city") \
                                .withColumnRenamed("Latitude","latitude") \
                                .withColumnRenamed("Longitude","longitude") \
                                .drop("Country")
    
    # write the end data to output location as parquet files
    # remove directory if already exist
    if os.path.exists(output_data + 'coordinates'):
        os.rmdir(output_data + 'coordinates')
    else:
    # write to parquet
        df_temperature.write.partitionBy('city').parquet('coordinates')

In [32]:
df_temperature_delimiter = ","
df_temperature_read_from = "../../data2/GlobalLandTemperaturesByCity.csv"
df_temperature_save_to   = "file:/home/workspace/"
clean_df_temperature(df_temperature_delimiter,df_temperature_read_from,df_temperature_save_to)

In [11]:
# read in cleaned df_temperature, check schema and first 5 rows
df_coordinates_clear = spark.read.parquet('coordinates')
df_coordinates_clear.printSchema()
df_coordinates_clear.show(5)

root
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- city: string (nullable = true)

+--------+---------+-------------+
|latitude|longitude|         city|
+--------+---------+-------------+
|  37.78N|  103.73W|       Pueblo|
|  34.56N|  118.70W|Thousand Oaks|
|  34.56N|  118.70W|     El Monte|
|  37.78N|  122.03W|      Concord|
|  37.78N|  122.03W|  Santa Clara|
+--------+---------+-------------+
only showing top 5 rows



### Step 3: Define the Data Model (see README)
I decided on a Star Schema for my database, the fact and dimension tables look like the following:       

#### Fact table

__fact_immigration_record__:        
*__cic_id (PK)__, port(FK), arrival_date, arrive_year, arrive_month, departure_date, ariline, flight_num, arrive_city (FK), arrive_state (FK), mode*

#### Dimension Tables   
1. __dim_immigrant__: *__cic_id (PK)__, age, occupation, gender, birth_year, citizen_country,resident_country*

2. __dim_city__: *city, state, state_code, longitude, latitude, median_age, avg_household_size, total_population,
male_population, female_population, veterans_num, foreign_born_population, american_indian_alaska_native, asian,african_american, hispanic_latino, white, __(city,state PK)__*

3. __dim_airport__: *__id (PK)__, type, name, elevation_ft, iso_region, municipality, gps_code, iata_code (FK) reference fact_port, local_code, longitude, latitude*

4. __dim_visa__: *__cic_id (PK)__, visa_type, visa_class, visa_issue_state, rrive_flag, departure_flag, update_flag, match_flag, allowed_date*


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [18]:
def create_data_model():
    """
    ETL process to create data model including one fact table and four dimension tables and write them to paquet files    
        
    """
    
    # create fact table    
    df_airport_temp = df_airport_clear.select('iata_code','municipality').filter(df_airport_clear.municipality.isNotNull())
    df_sas_temp = df_sas_clear.select("cic_id","port","mode","arrival_date","arrive_year","arrive_month", \
                                        "departure_date","airline","flight_num","arrive_state").distinct()
    cond = [df_sas_temp['port'] == df_airport_temp['iata_code']]
    fact_immigration_record = df_sas_temp.join(df_airport_temp, cond,'left').drop('iata_code') \
                                    .select("cic_id","port","mode","arrival_date","arrive_year","arrive_month", \
                                        "departure_date","airline","flight_num","arrive_state","municipality") \
                                    .withColumnRenamed('municipality','arrive_city') \
                                    .distinct()
            
    # Create dim_immigrant
    dim_immigrant = df_sas_clear.select("cic_id","age","birth_year","gender","occupation", \
                              "citizen_country","resident_country","arrive_state").distinct()
       
    # Create dim_city
    city_list = set(fact_immigration_record.select('arrive_city').toPandas()['arrive_city'])
    df_coordinates_temp = df_coordinates_clear.selectExpr('city as temp_city','latitude','longitude')
    dim_city = df_demo_clear.join(df_coordinates_temp,df_demo_clear.city == df_coordinates_temp.temp_city, 'left') \
                    .select('city','state','state_code','latitude','longitude', \
                           'median_age','male_population','female_population', \
                           'total_population','veterans_num','foreign_born_population', \
                           'avg_household_size','american_indian_alaska_native', \
                           'asian','african_american','hispanic_latino','white') \
                    .filter(df_demo_clear.city.isin(city_list)).distinct()   
    
    # create dim_airport
    # select distinct port codes from df_sas
    port_list = set(df_sas_clear.select('port').filter(df_sas_clear.port.isNotNull()).toPandas()['port'])
    
    # load dim_airport reserving only the airport included in fact table
    dim_airport = df_airport_clear.select('ident', 'type', 'name', 'elevation_ft', \
                                'municipality', 'gps_code', 'iata_code',\
                                'local_code', 'longitude', 'latitude', 'iso_region') \
                        .filter(df_airport_clear.iata_code.isin(port_list)) \
                        .filter(df_airport_clear.municipality.isNotNull()) \
                        .distinct()    
    
    # create dim_visa
    dim_visa = df_sas_clear.select('cic_id','visa_type','visa_class','visa_issue_state','arrive_flag', \
                         'departure_flag','update_flag','match_flag','allowed_date') \
                    .distinct()
    
    # write to parquet
    fact_immigration_record.write.partitionBy("arrive_state").parquet("US_immigration")
    dim_immigrant.write.partitionBy("arrive_state").parquet("US_immigrant")
    dim_city.write.partitionBy("state_code").parquet("US_city")
    dim_airport.write.partitionBy("iso_region").parquet("US_airport")
    dim_visa.write.partitionBy("visa_type").parquet("US_visa")   
    

In [19]:
create_data_model()

#### 4.2 Data Quality Checks
Quality checks are performed to ensure the pipeline ran as expected. These included:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

In [20]:
# load final tables 
fact = spark.read.parquet("US_immigration")
dim_immigrant = spark.read.parquet("US_immigrant")
dim_city = spark.read.parquet("US_city")
dim_airport = spark.read.parquet("US_airport")
dim_visa = spark.read.parquet("US_visa")
tablelist = [fact,dim_immigrant,dim_city,dim_airport,dim_visa]

In [40]:
# check rows in tables and column types
def check_rows(tablelist):
    for table in tablelist:
        if table.count() > 0:
            print("{} table is not empty".format(table))
        else:
            print("No records in {} table".format(table))
    

In [41]:
check_rows(tablelist)

DataFrame[cic_id: int, port: string, mode: int, arrival_date: date, arrive_year: int, arrive_month: int, departure_date: date, airline: string, flight_num: string, arrive_city: string, arrive_state: string] table is not empty
DataFrame[cic_id: int, age: int, birth_year: int, gender: string, occupation: string, citizen_country: int, resident_country: int, arrive_state: string] table is not empty
DataFrame[city: string, state: string, latitude: string, longitude: string, median_age: float, male_population: int, female_population: int, total_population: int, veterans_num: int, foreign_born_population: int, avg_household_size: float, american_indian_alaska_native: int, asian: int, african_american: int, hispanic_latino: int, white: int, state_code: string] table is not empty
DataFrame[ident: string, type: string, name: string, elevation_ft: string, municipality: string, gps_code: string, iata_code: string, local_code: string, longitude: string, latitude: string, iso_region: string] table i

In [55]:
def check_unique_keys(fact,dim1,dim2,dim3,dim4):
    if fact.groupBy("cic_id").count().filter("count > 1").count() == 0:
        print("Fact table's keys are unique")
    else:
        print("Fact table's keys are not unique")
    if dim1.groupBy("cic_id").count().filter("count > 1").count() == 0:
        print("Dimention table's keys are unique")
    else:
        print("Dimention table's keys are not unique")
    if dim2.groupBy("city","state").count().filter("count > 1").count() == 0:
        print("Dimention table's keys are unique")
    else:
        print("Dimention table's keys are not unique")
    if dim3.groupBy("iata_code").count().filter("count > 1").count() == 0:
        print("Dimention table's keys are unique")
    else:
        print("Dimention table's keys are not unique")
    if dim4.groupBy("cic_id").count().filter("count > 1").count() == 0:
        print("Dimention table's keys are unique")
    else:
        print("Dimention table's keys are not unique")

In [56]:
check_unique_keys(*tablelist)

Fact table's keys are unique
Dimention table's keys are unique
Dimention table's keys are unique
Dimention table's keys are unique
Dimention table's keys are unique


#### 4.3 Data dictionary 
Data dictionary is drafted in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.          
  The source datasets are relativly large (e.g. the immigration data included 3096313 rows), from the efficiency perspective, using PySpark can process the data in parallel and save time. Spark is also easy to deploy on cloud and easy to scale. If the data volume will increase, the complete process can be moved to a cloud hosted Spark cluster, the complete process could also be adopted. A Star Schema is used for data model, because I assumed that we already know the analysis purpose of our data, which is in this project to analysis the immigration data to U.S., in this case, we could create a data model instead of turning to a unstructured data lake architecture. Instead of normalizing the tables or using a Snowflake Schema, I decided for a Star Schema, because of its simplicity for users to write, and databases to process: queries are written with simple inner joins between the facts and a small number of dimensions. Star joins are simpler than possible in snowflake schema.
  
* Propose how often the data should be updated and why.        
  In princip should be updated every time a new immigrant is registered or according to the consumer demand
* Write a description of how you would approach the problem differently under the following scenarios:

 * The data was increased by 100x.       
  As storage, another scalable Cloud-based datawarehouse/data lake or on-premise location would be proper. PySpark or other paralyzed frameworks would still be preferred because Apache Spark is linearly scalable, which means I could simply add the number of clusters to increase the performance. Specifically, I would run Spark on multiple clusters using services like EMR. 
  
 * The data populates a dashboard that must be updated daily by 7 am every day.      
 I would move the database on a Cloud platform (AWS for example) and connect with proper BI tools (Dash, Tableau for example) that are connected to the cloud platform and automate the entire data flow using tools like Airflow. A popular implementation here is a combination of Airflow + Spark + Apache Livy in EMR cluster so that Spark commands can be passed through an API interface.
 
 * The database needed to be accessed by 100+ people.     
 Apache Hive or AWS Redshift will meet the need. 
 Amazon maintains a software fork of Apache Hive included in Amazon Elastic MapReduce on Amazon Web Services.