# US Immigration Data Analysis
### Data Engineering Capstone Project

## Project Summary
The project is about US Immigration Dataset and the other relevant datasets, with the purpose is to dive into and understand the dataset, build a data model, ETL and data pipeline. 

Detail of the steps of project:
  1. Scope the Project and Gather Data
  2. Explore and Assess the Data
  3. Define the Data Model
  4. Run ETL to Model the Data
  5. Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, udf, min, max, pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType, LongType, DateType, BooleanType, DoubleType, StringType, StructType, StructField
from datetime import datetime, timedelta
import glob 
import tqdm 

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [2]:
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

In [3]:
!ls ../../data/18-83510-I94-Data-2016

i94_apr16_sub.sas7bdat	i94_jan16_sub.sas7bdat	i94_may16_sub.sas7bdat
i94_aug16_sub.sas7bdat	i94_jul16_sub.sas7bdat	i94_nov16_sub.sas7bdat
i94_dec16_sub.sas7bdat	i94_jun16_sub.sas7bdat	i94_oct16_sub.sas7bdat
i94_feb16_sub.sas7bdat	i94_mar16_sub.sas7bdat	i94_sep16_sub.sas7bdat


In [4]:
# use show_data flag to recude the time to execute the query
show_data = False

## Step 1: Scope the Project and Gather Data

### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

### Data Description
The data used in this project included 4 datasets:
- [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html): The dataset about immigration information of US. This data comes from the US National Tourism and Trade Office. More detail [here](https://travel.trade.gov/research/programs/i94/description.asp)
- [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data): The dataset include temperature, longitude and latitude of the cities in US
- [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/): The overview information of the cities in US, like sex distribution, population, median age, ...
- [Airport Code Data](https://datahub.io/core/airport-codes#data): includes name and other information of the airport in US. The data includes the code, name, type, region, gps code, coordinates, ...

## Step 2: Explore and Assess the Data

#### Explore I94 Immigration Dataset

In [5]:
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_immigration.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [6]:
# check null records of column 
if show_data:
    df_immigration.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_immigration.columns]).show()

In [7]:
# remove columns that dont use: many rows are null or "CIC does not use" description file
cols_removed = ['occup', 'entdepu', 'insnum', 'dtadfile', 'visapost', 'entdepa', 'entdepd', 'dtaddto', 'count']
df_immigration_dropped = df_immigration.drop(*cols_removed)
if show_data:
    df_immigration_dropped.show()

In [8]:
df_immigration_dropped.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [9]:
# Explore ../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat data file
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat')
if show_data:
    df_immigration.show(10)
# i94_jun16_sub has 24 columns --> remove abnormal columns: 'delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup', 'df_world_temp.count()'

#### Cleaning Step for I94 Immigration Dataset
- Drop unnecessary columns
- Change data type of columns 
- Change date type and boolean type of columns 

In [10]:
def convert_date_from_int(date):
    if date is None: 
        return None 
    return datetime(1960, 1, 1) + timedelta(days=int(date))
udf_convert_date_from_int = udf(lambda x: convert_date_from_int(x), DateType())

def convert_match(match):
    if str(match) == 'M':
        return True
    return False
udf_convert_match = udf(lambda x: convert_match(x), BooleanType())

def normalize_immigration_data(df):  
    # drop unnecessary columns
    cols_removed = ['occup', 'entdepu', 'insnum', 'dtadfile', 'visapost', 'entdepa', 'entdepd', 'dtaddto', 'count', 'delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup', 'validres']
    df = df.drop(*cols_removed)
    
    # change data type 
    df = df.withColumn("cicid", df["cicid"].cast(IntegerType()))
    df = df.withColumn("i94yr", df["i94yr"].cast(IntegerType()))
    df = df.withColumn("i94mon", df["i94mon"].cast(IntegerType()))
    df = df.withColumn("i94cit", df["i94cit"].cast(IntegerType()))
    df = df.withColumn("i94res", df["i94res"].cast(IntegerType()))
    df = df.withColumn("i94mode", df["i94mode"].cast(IntegerType()))
    df = df.withColumn("i94bir", df["i94bir"].cast(IntegerType()))
    df = df.withColumn("i94visa", df["i94visa"].cast(IntegerType()))
    df = df.withColumn("biryear", df["biryear"].cast(IntegerType()))                                                
    df = df.withColumn("admnum", df["admnum"].cast(LongType()))
    
    # change date time type
    df = df.withColumn("arrdate", udf_convert_date_from_int("arrdate"))
    df = df.withColumn("depdate", udf_convert_date_from_int("depdate"))
    
    # change match column to boolean type
    df = df.withColumn("matflag", udf_convert_match(df["matflag"]))
    
    return df

In [11]:
immigration_data_paths = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
df_immigration = None
for path in tqdm.tqdm(immigration_data_paths):
    df = spark.read.format('com.github.saurfang.sas.spark').load(path)
    df = normalize_immigration_data(df)
    if df_immigration is None: 
        df_immigration = df
    else:
        df_immigration = df_immigration.union(df)
    
if show_data:
    print("total records:", df_immigration.count())
    
df_immigration.show(10)

100%|██████████| 12/12 [00:09<00:00,  1.55it/s]


+-----+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|matflag|biryear|gender|airline|     admnum|fltno|visatype|
+-----+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|    6| 2016|     4|   692|   692|    XXX|2016-04-29|   null|   null|      null|    37|      2|  false|   1979|  null|   null| 1897628485| null|      B2|
|    7| 2016|     4|   254|   276|    ATL|2016-04-07|      1|     AL|      null|    25|      3|  false|   1991|     M|   null| 3736796330|00296|      F1|
|   15| 2016|     4|   101|   101|    WAS|2016-04-01|      1|     MI|2016-08-25|    55|      2|   true|   1961|     M|     OS|  666643185|   93|      B2|
|   16| 2016|     4|   101|   101|    NYC|2016-04-01|      1|     MA|2016-04

In [12]:
# is "cicid" is unique value
if show_data:
    num_cicid = df_immigration.select('cicid').count()
    num_unique_cicid = df_immigration.select('cicid').distinct().count()

    print("num_cicid", num_cicid)
    print("num_unique_cicid", num_unique_cicid)

    print("cicid is unique", num_cicid == num_unique_cicid)

#### Explore World Temperature Data

In [13]:
df_world_temp = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True)
df_world_temp.show(10)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|                   

In [14]:
if show_data:
    print("total records:", df_world_temp.count())

In [15]:
# check null record
if show_data:
    df_world_temp.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_world_temp.columns]).show()

In [16]:
df_world_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [17]:
# create schema for world temp dataset
df_world_temp_schema = StructType([
    StructField('dt', DateType(), True),
    StructField('AverageTemperature', DoubleType(), True),
    StructField('AverageTemperatureUncertainty', DoubleType(), True),
    StructField('City', StringType(), True),
    StructField('Country', StringType(), True),
    StructField('Latitude', StringType(), True),
    StructField('Longitude', StringType(), True)
])

# re read world temp data 
df_world_temp = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', schema=df_world_temp_schema, header=True)
if show_data:
    df_world_temp.printSchema()

In [18]:
df_world_temp.show(10)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|                   

In [19]:
# how many countries ??
if show_data:
    df_world_temp.select("Country").distinct().count()

In [20]:
# what are the start date and the end date?
if show_data:
    print("start date:", df_world_temp.agg(min("dt")).first()[0])
    print("end date:", df_world_temp.agg(max("dt")).first()[0])

#### Cleaning Step for World Temperature Dataset
- Drop unnecessary columns: AverageTemperatureUncertainty, Latitude, Longitude

In [21]:
df_world_temp = df_world_temp.drop(*['AverageTemperatureUncertainty', 'Latitude', 'Longitude'])
df_world_temp.show()

+----------+-------------------+-----+-------+
|        dt| AverageTemperature| City|Country|
+----------+-------------------+-----+-------+
|1743-11-01|              6.068|Århus|Denmark|
|1743-12-01|               null|Århus|Denmark|
|1744-01-01|               null|Århus|Denmark|
|1744-02-01|               null|Århus|Denmark|
|1744-03-01|               null|Århus|Denmark|
|1744-04-01| 5.7879999999999985|Århus|Denmark|
|1744-05-01|             10.644|Århus|Denmark|
|1744-06-01| 14.050999999999998|Århus|Denmark|
|1744-07-01|             16.082|Århus|Denmark|
|1744-08-01|               null|Århus|Denmark|
|1744-09-01| 12.780999999999999|Århus|Denmark|
|1744-10-01|               7.95|Århus|Denmark|
|1744-11-01|  4.638999999999999|Århus|Denmark|
|1744-12-01|0.12199999999999987|Århus|Denmark|
|1745-01-01|-1.3330000000000002|Århus|Denmark|
|1745-02-01|             -2.732|Århus|Denmark|
|1745-03-01|              0.129|Århus|Denmark|
|1745-04-01|              4.042|Århus|Denmark|
|1745-05-01| 

#### Explore U.S. City Demographic Dataset

In [22]:
df_us_city = spark.read.csv('./us-cities-demographics.csv', header=True, sep=';')
df_us_city.show(10)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

In [23]:
if show_data:
    print("total records:", df_us_city.count())

In [40]:
# check null record
if show_data:
    df_us_city.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_us_city.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|city|state|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|race|count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [25]:
df_us_city.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [26]:
df_us_city_schema = StructType([
    StructField('city', StringType(), True),
    StructField('state', StringType(), True),
    StructField('median_age', DoubleType(), True),
    StructField('male_population', IntegerType(), True),
    StructField('female_population', IntegerType(), True),
    StructField('total_population', IntegerType(), True),
    StructField('number_of_veterans', IntegerType(), True),
    StructField('foreign_born', IntegerType(), True),
    StructField('average_household_size', DoubleType(), True),
    StructField('state_code', StringType(), True),
    StructField('race', StringType(), True),
    StructField('count', IntegerType(), True)
])

df_us_city = spark.read.csv('./us-cities-demographics.csv', schema=df_us_city_schema, header=True, sep=';')
df_us_city.show(10)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            city|         state|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|                race|count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

#### Explore Airport Code Dataset

In [27]:
df_airport_code = spark.read.csv('./airport-codes_csv.csv', header=True)
df_airport_code.show(10)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [28]:
if show_data:
    print("total records:", df_airport_code.count())

In [29]:
# check null record
if show_data:
    df_airport_code.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_airport_code.columns]).show()

In [30]:
df_airport_code_schema = StructType([
    StructField('ident', StringType(), True),
    StructField('type', StringType(), True),
    StructField('name', StringType(), True),
    StructField('elevation_ft', IntegerType(), True),
    StructField('continent', StringType(), True),
    StructField('iso_country', StringType(), True),
    StructField('iso_region', StringType(), True),
    StructField('municipality', StringType(), True),
    StructField('gps_code', StringType(), True),
    StructField('iata_code', StringType(), True),
    StructField('local_code', StringType(), True),
    StructField('coordinates', StringType(), True)
])

df_airport_code = spark.read.csv('./airport-codes_csv.csv', schema=df_airport_code_schema, header=True)
if show_data:
    df_airport_code.show(10)

#### Cleaning Step for Airport Code Dataset
- Drop unnecessary columns: AverageTemperatureUncertainty, Latitude, Longitude

In [31]:
df_airport_code = df_airport_code.drop(*['type', 'elevation_ft', 'continent', 'coordinates', 'iata_code'])
df_airport_code.show(10)

+-----+--------------------+-----------+----------+------------+--------+----------+
|ident|                name|iso_country|iso_region|municipality|gps_code|local_code|
+-----+--------------------+-----------+----------+------------+--------+----------+
|  00A|   Total Rf Heliport|         US|     US-PA|    Bensalem|     00A|       00A|
| 00AA|Aero B Ranch Airport|         US|     US-KS|       Leoti|    00AA|      00AA|
| 00AK|        Lowell Field|         US|     US-AK|Anchor Point|    00AK|      00AK|
| 00AL|        Epps Airpark|         US|     US-AL|     Harvest|    00AL|      00AL|
| 00AR|Newport Hospital ...|         US|     US-AR|     Newport|    null|      null|
| 00AS|      Fulton Airport|         US|     US-OK|        Alex|    00AS|      00AS|
| 00AZ|      Cordes Airport|         US|     US-AZ|      Cordes|    00AZ|      00AZ|
| 00CA|Goldstone /Gts/ A...|         US|     US-CA|     Barstow|    00CA|      00CA|
| 00CL| Williams Ag Airport|         US|     US-CA|       Biggs| 

#### Create df_world_average_temp dataset

In [32]:
df_world_avg_temp = df_world_temp.select(['Country', 'AverageTemperature']).groupBy('Country').avg()
df_world_avg_temp.show()

+-----------+-----------------------+
|    Country|avg(AverageTemperature)|
+-----------+-----------------------+
|       Chad|     27.189829394812683|
|     Russia|       3.34726798287354|
|   Paraguay|     22.784014312977117|
|      Yemen|      25.76840766445382|
|    Senegal|      25.98417669449083|
|     Sweden|      5.665518003790279|
|     Guyana|      26.54984937439856|
|      Burma|     26.016839989290048|
|Philippines|     26.516462467464876|
|    Eritrea|     24.001515877771144|
|   Djibouti|     29.152790108564506|
|   Malaysia|      26.43475662438397|
|  Singapore|     26.523102826510677|
|     Turkey|     12.951888167466654|
|     Malawi|      21.34787202649805|
|       Iraq|     19.884738137449155|
|    Germany|      8.482790790263826|
|Afghanistan|     13.816496896263578|
|   Cambodia|     26.918136297728335|
|     Jordan|     18.360980886539238|
+-----------+-----------------------+
only showing top 20 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
![erd](./erd.png)

- The Data Model has 6 tables, includes a fact table (**i94 table**) and 5 dimension tables:
    - us_cities_demographics table
    - airport table
    - visa_type table
    - immigrate_type table
    - country_temp table
    
- The architecture follows by Star Schema    

#### 3.2 Mapping Out Data Pipelines
- i94 table
    - use source from `../../data/18-83510-I94-Data-2016/*.sas7bdat`
    - read as `sas7bdat` format
    - define type data of columns
    
    - drop columns:
        - occup
        - entdepu
        - insnum
        - dtadfile
        - visapost
        - entdepa
        - entdepd
        - dtaddto
        - count
        - delete_days
        - delete_mexl
        - delete_dup
        - delete_visa
        - delete_recdup
        - validres
        
    - change null or abnormal value to exception value on:
        - matflag
        - i94mode
        
    - change date time datatype on:
        - arrdate
        - depdate
        
- us_cities_demographics table
    - use source from `./us-cities-demographics.csv`
    - read as `csv` format
    - define type data of columns
    - drop columns:
        - number_of_veterans
        - foreign_born
        - average_household_size
        - race
        - count    
    
- airport table
    - use source from `./airport-codes_csv.csv`
    - read as `csv` format
    - define type data of columns
    - drop columns:
        - type
        - elevation_ft
        - continent
        - coordinates
        - iata_code

- visa_type table
    - use source from `I94_SAS_Labels_Descriptions.SAS` file
    - modify and save to csv 
    - read as `csv` format
    
- immigrate_type table
    - use source from `I94_SAS_Labels_Descriptions.SAS` file
    - modify and save to csv 
    - read as `csv` format
    
- country_temp table
    - use temperature data from `../../data2/GlobalLandTemperaturesByCity.csv` 
    - calculate average temperature
    - use `country_name` from `I94_SAS_Labels_Descriptions.SAS` file
    - use `country_code` from `i94` table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [33]:
country_code_df = pd.read_csv('./country_code.csv')
country_code_df.head()

Unnamed: 0,code,country_name
0,582,MEXICO Air Sea-and Not Reported (I-94-no land ...
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [34]:
df_world_avg_temp_pd = df_world_avg_temp.toPandas()

In [35]:
# Write code here
@udf(StringType())
def get_country_name(country_code):
    try:
        country_name = country_code_df[country_code_df['code']==country_code]['country_name'].iloc[0]
        return country_name
    except:
        return None
    
@udf(DoubleType())
def get_avg_temp(country_name):
    try:
        temp = df_world_avg_temp_pd[df_world_avg_temp_pd['Country']==country_name].iloc[0,1]

        return temp
    except:
        return None
    
    
def create_country_temp_table(df_immigration):
    table = df_immigration.select('i94res').distinct()
    
    table = table.withColumnRenamed('i94res', 'country_code')
    table = table.withColumn('country_name', get_country_name(table.country_code))
    table = table.withColumn('avg_temperature', get_avg_temp(table.country_name))
    
    table.write.parquet("country_temp_table", mode="overwrite")
    
    return table   

country_temp_table = create_country_temp_table(df_immigration)
country_temp_table.show()

+------------+--------------------+---------------+
|country_code|        country_name|avg_temperature|
+------------+--------------------+---------------+
|         392|                MALI|           null|
|         243|               BURMA|           null|
|         516| TRINIDAD AND TOBAGO|           null|
|         251|              ISRAEL|           null|
|         255|             LEBANON|           null|
|         296|UNITED ARAB EMIRATES|           null|
|         472|    MARSHALL ISLANDS|           null|
|         322|            DJIBOUTI|           null|
|         513|            BARBADOS|           null|
|         321|             REUNION|           null|
|         375|             BURUNDI|           null|
|         108|             DENMARK|           null|
|         155|          KAZAKHSTAN|           null|
|         368|               EGYPT|           null|
|         101|             ALBANIA|           null|
|         115|             ICELAND|           null|
|         12

In [36]:
def create_us_cities_demographics_table(df_us_city):
    table = df_us_city.drop(*['number_of_veterans', 'foreign_born', 'average_household_size', 'race', 'count'])
    
    table.write.parquet("us_cities_demographics_table", mode="overwrite")

    return table

us_cities_demographics_table = create_us_cities_demographics_table(df_us_city)
us_cities_demographics_table.show()

+----------------+--------------+----------+---------------+-----------------+----------------+----------+
|            city|         state|median_age|male_population|female_population|total_population|state_code|
+----------------+--------------+----------+---------------+-----------------+----------------+----------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|        MD|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|        MA|
|          Hoover|       Alabama|      38.5|          38040|            46799|           84839|        AL|
|Rancho Cucamonga|    California|      34.5|          88127|            87105|          175232|        CA|
|          Newark|    New Jersey|      34.6|         138040|           143873|          281913|        NJ|
|          Peoria|      Illinois|      33.1|          56229|            62432|          118661|        IL|
|        Avondale|       Arizona|    

In [37]:
def create_airport_table(df_airport_code):
    table = df_airport_code
    
    table.write.parquet("airport_table", mode="overwrite")

    return table

airport_table = create_airport_table(df_airport_code)
airport_table.show()

+-----+--------------------+-----------+----------+------------+--------+----------+
|ident|                name|iso_country|iso_region|municipality|gps_code|local_code|
+-----+--------------------+-----------+----------+------------+--------+----------+
|  00A|   Total Rf Heliport|         US|     US-PA|    Bensalem|     00A|       00A|
| 00AA|Aero B Ranch Airport|         US|     US-KS|       Leoti|    00AA|      00AA|
| 00AK|        Lowell Field|         US|     US-AK|Anchor Point|    00AK|      00AK|
| 00AL|        Epps Airpark|         US|     US-AL|     Harvest|    00AL|      00AL|
| 00AR|Newport Hospital ...|         US|     US-AR|     Newport|    null|      null|
| 00AS|      Fulton Airport|         US|     US-OK|        Alex|    00AS|      00AS|
| 00AZ|      Cordes Airport|         US|     US-AZ|      Cordes|    00AZ|      00AZ|
| 00CA|Goldstone /Gts/ A...|         US|     US-CA|     Barstow|    00CA|      00CA|
| 00CL| Williams Ag Airport|         US|     US-CA|       Biggs| 

In [38]:
def create_visa_type_table(path='./visa_category.csv'):
    table = spark.read.csv(path, header=True)
    
    table.write.parquet("visa_type_table", mode="overwrite")

    return table

visa_type_table = create_visa_type_table('./visa_category.csv')
visa_type_table.show()

+----+--------+
|type|    name|
+----+--------+
|   1|Business|
|   2|Pleasure|
|   3| Student|
+----+--------+



In [41]:
def create_immigrate_type_table(path='./immigrate_type.csv'):
    table = spark.read.csv(path, header=True)
    
    table.write.parquet("immigrate_type_table", mode="overwrite")
    
    return table

immigrate_type_table = create_immigrate_type_table('./immigrate_type.csv')
immigrate_type_table.show()

+----+------------+
|type|        name|
+----+------------+
|   1|         Air|
|   2|         Sea|
|   3|        Land|
|   9|Not reported|
+----+------------+



In [43]:
@udf(IntegerType())
def mode_9_immigrate_type(mode):
    return 9 if (mode is None or mode == 0) else mode

def create_i94_table(df_immigration):
    table = df_immigration.withColumn("i94mode", mode_9_immigrate_type(df_immigration["i94mode"]))
    
    table.write.parquet("i94_table", mode="overwrite")
    
    return table

i94_table = create_i94_table(df_immigration)
i94_table.show()

+-----+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|matflag|biryear|gender|airline|     admnum|fltno|visatype|
+-----+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-------+-------+------+-------+-----------+-----+--------+
|    6| 2016|     4|   692|   692|    XXX|2016-04-29|      9|   null|      null|    37|      2|  false|   1979|  null|   null| 1897628485| null|      B2|
|    7| 2016|     4|   254|   276|    ATL|2016-04-07|      1|     AL|      null|    25|      3|  false|   1991|     M|   null| 3736796330|00296|      F1|
|   15| 2016|     4|   101|   101|    WAS|2016-04-01|      1|     MI|2016-08-25|    55|      2|   true|   1961|     M|     OS|  666643185|   93|      B2|
|   16| 2016|     4|   101|   101|    NYC|2016-04-01|      1|     MA|2016-04

In [47]:
i94_table.select([count(when( col(c).isNull(), c)).alias(c) for c in i94_table.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+------+------+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|matflag|biryear| gender|airline|admnum| fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+------+------+--------+
|    0|    0|     0| 28575|     0|      0|      0|      0|2027926|3308012|  9517|      0|      0|   9517|4079983|1308066|     0|333922|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+------+------+--------+



#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [44]:
tables = ['country_temp_table', 'us_cities_demographics_table', 'airport_table', 'visa_type_table', 'immigrate_type_table', 'i94_table']

for table in tqdm.tqdm(tables):
    df = spark.read.parquet(table)
    records = df.count()
    
    if records == 0:
        raise Exception("{} table has 0 record, please check".format(table))
        
print("Check all tables successfully")

100%|██████████| 6/6 [00:04<00:00,  1.10it/s]

Check all tables successfully





#### 4.3 Data dictionary 
- i94 table
| column | type | is_null | description |
| - | - | - | - |
| cid | int |  | ID |
| i94yr | int |  | 4 digit year of arrival |
| i94mon | int |  | numeric month of arrival |
| i94cit | int | ✅ | 3 digit code for immigrant country of birth |
| i94res | int |  | 3 digit code for immigrant country of residence |
| i94port | varchar |  | port of arrival |
| arrdate | date |  | arrival date in the USA |
| i94mode | int |  | type of immigrate |
| i94addr | varchar | ✅ | State of arrival in the USA |
| depdate | date | ✅ | departure date in the USA |
| i94bir | int | ✅ | age of respondent in years |
| i94visa | int |  | type of visa |
| matflag | boolean |  | match flag of arrdate and depdate |
| biryear |int  | ✅ | 4 digit year of birth |
| gender | varchar | ✅ | gender code |
| airline | varchar | ✅ | airline port of arrival |
| admnum | varchar |  | admission number |
| fltno | varchar | ✅ | flight number of plane of arrival |
| visatype | varchar |  | visa type of admission legally admitting |

- us_cities_demographics table
| column | type | is_null | description |
| - | - | - | - |
| city | varchar |  | city name |
| state | varchar |  | state name |
| median_age | double |  | median age of the city |
| male_population | int | ✅ | male population of the city |
| female_population | int | ✅ | female population of the city |
| total_population | int |  | total population of the city |
| state_code | varchar |  | state code in the USA |

- airport table
| column | type | is_null | description |
| - | - | - | - |
| ident | varchar |  | ID |
| name | varchar |  | airport name |
| iso_country | varchar |  | country code of airport |
| iso_region | varchar |  | region code of airport |
| municipality | varchar | ✅ | city of airport |
| gps_code | varchar | ✅ | gps code of airport |
| local_code | varchar | ✅ | local code of airport |

- visa_type table
| column | type | is_null | description |
| - | - | - | - |
| type | int |  | visa type code |
| name | varchar |  | visa type |

- immigrate_type table
| column | type | is_null | description |
| - | - | - | - |
| type | int |  | immigrate type code |
| name | varchar |  | immigrate type |

- country_temp table
| column | type | is_null | description |
| - | - | - | - |
| country_code | int |  | country code |
| country_name | varchar |  | country name |
| avg_temperature | double | ✅ | average temperature of country |


#### Step 5: Complete Project Write Up
- The choice of tools and technologies for the project?
    - PySpark: PySpark run faster than Pandas with the large dataset, also PySpark calculate data as dataframe, friendly and easy to use
    - Parquet file: save data as parquet file is so fast, easy to read and modify as dataframe. With large dataset, parquet is faster than csv, json or other text file.
    
- How often the data should be updated and why?
    - i94 table: the table get by each month -> should update each month by load the new file 
    - visa_type and immigrate_type tables: only update when have new type, so dont need to update frequently
    - country_temp table: if you need to update the new average temperature of the country, you could update frequently
    - airport_code table: you dont need to update frequently
    - us_cities_demographics table: the change is in population columns, so if you dont need to update the information about the population of the city, you dont need to update the table
    
- Write a description of how you would approach the problem differently under the following scenarios:
 - The data was increased by 100x?
     Run Spark with multy nodes, like AWS S3, AWS RedShift, with multi node and it increase the performance of the system, so the data process faster 
 
 - The data populates a dashboard that must be updated on a daily basis by 7am every day?
     The most changed data is in country_temperature and i94 table, it is not a problem if the data need to run daily
     
 - The database needed to be accessed by 100+ people?
     Move database to AWS Redshift because it supports more than 100 access+ people

In [52]:
!rm -rf airport_table country_temp_table i94_table immigrate_type_table sas_data us_cities_demographics_table visa_type_table