# Immigration  Analysis
### Data Engineering Capstone Project

#### Project Summary
- Analysis of the immigration data using the US I94 immigration dataset that enriched with temperature data, US city demographics and airport data.
- Perform ETL operations on the datasets to generate star schema in parquet file format.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# import packages
import pandas as pd

import os
import re

import configparser

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, to_date, col, year, month, date_format, format_number
import pyspark.sql.types as t

from datetime import datetime, timedelta, date

# Step 1: Scope the Project and Gather Data

## Scope 

- Pull data from different sources to find the US immigration patterns for different city tempratures, city demographics and airports.
- End solution wll be data lake with star schema that contains **1** fact table and **3** dimension tables.
- Data lake tables would be used to analyse immigration trends 
- Python, Pandas, and Spark will be used.

#### Describe and Gather Data 

### [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)
- comes from the US National Tourism and Trade Office. 
- includes information on visa category, age, and port of entry 

### [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
- comes from Kaggle.
- includes temperature data

### [U.S. City Demographic Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
- comes from OpenSoft.
- includes U.S. city demographic data

### [Airport Code Table](https://datahub.io/core/airport-codes#data)
- table of airport codes and corresponding cities. 

### Configure Spark session

In [2]:
spark = SparkSession.builder \
                    .appName("Immigration App") \
                    .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
                    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
                    .enableHiveSupport().getOrCreate()

In [3]:
spark

## Immigration dataset
- I-94 immigration data for April 2016

In [4]:
# read immigration data
i94_apr16_path = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
i94_apr16 = spark.read.format('com.github.saurfang.sas.spark') \
                 .load(i94_apr16_path)

In [5]:
# show immigration data schema
i94_apr16.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [6]:
# show immigration data first 5 records
i94_apr16.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [7]:
# configure pandas max columns display
pd.set_option('display.max_columns', 50)

In [8]:
# convert Spark DataFrame to pandas DataFrame
i94_apr16.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [9]:
# list pandas DataFrame columns
i94_apr16.limit(1).toPandas().columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [10]:
print(f"Number of immigration in April 2016 records: {i94_apr16.count():,}")

Number of immigration in April 2016 records: 3,096,313


In [11]:
# parse labels descriptions in the SAS file
labels_path = "I94_SAS_Labels_Descriptions.SAS"
with open(labels_path) as f:
    lines = f.readlines() 

In [12]:
pattern = r'^/\*\s+(?P<label>.+?)\s+$'
prog= re.compile(pattern)
result = [prog.match(c) for c in lines]

In [13]:
for line in result:
    if line != None:
        print(line.group("label"))

I94YR - 4 digit year */
I94MON - Numeric month */
I94CIT & I94RES - This format shows all the valid and invalid codes for processing */
I94PORT - This format shows all the valid and invalid codes for processing */
ARRDATE is the Arrival Date in the USA. It is a SAS date numeric field that a
I94MODE - There are missing values as well as not reported (9) */
I94ADDR - There is lots of invalid codes in this variable and the list below
DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that
I94BIR - Age of Respondent in Years */
I94VISA - Visa codes collapsed into three categories:
COUNT - Used for summary statistics */
DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use */
VISAPOST - Department of State where where Visa was issued - CIC does not use */
OCCUP - Occupation that will be performed in U.S. - CIC does not use */
ENTDEPA - Arrival Flag - admitted or paroled into the U.S. - CIC does not use */
ENTDEPD - Departure Flag - Departed, l

## Temperature dataset

In [14]:
temperatures_path = "../../data2/GlobalLandTemperaturesByCity.csv"
temperatures = spark.read.csv(temperatures_path, header=True, inferSchema=True)

In [15]:
temperatures.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [16]:
temperatures.limit(5).toPandas().head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [17]:
print(f"Number of temperatures records: {temperatures.count():,}")

Number of temperatures records: 8,599,212


## US City Demographics dataset

In [18]:
demographics_path = "us-cities-demographics.csv"
demographics = spark.read.csv(demographics_path, sep=";", header=True, inferSchema=True)

In [19]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [20]:
demographics.limit(5).toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [21]:
print(f"Number of demographics records: {demographics.count():,}")

Number of demographics records: 2,891


## Airports dataset

In [22]:
airports_path = "airport-codes_csv.csv"
airports = spark.read.csv(airports_path, header=True, inferSchema=True)

In [23]:
airports.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [24]:
airports.limit(5).toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [25]:
print(f"Number of airports records: {airports.count():,}")

Number of airports records: 55,075


# Step 2: Explore and Assess the Data

#### Explore the Data 

# Immigration data

In [26]:
# get SAS data files
immigration_dataset_path = "../../data/18-83510-I94-Data-2016"
files_list = os.listdir(immigration_dataset_path)
print("The Immigration dataset contains")
for file in files_list:
    size = os.path.getsize(f"{immigration_dataset_path}/{file}")
    print(f"{file}: {size:,} bytes")
print(f"Number of files in the dataset: {len(files_list)} ")

The Immigration dataset contains
i94_apr16_sub.sas7bdat: 471,990,272 bytes
i94_sep16_sub.sas7bdat: 569,180,160 bytes
i94_nov16_sub.sas7bdat: 444,334,080 bytes
i94_mar16_sub.sas7bdat: 481,296,384 bytes
i94_jun16_sub.sas7bdat: 716,570,624 bytes
i94_aug16_sub.sas7bdat: 625,541,120 bytes
i94_may16_sub.sas7bdat: 525,008,896 bytes
i94_jan16_sub.sas7bdat: 434,176,000 bytes
i94_oct16_sub.sas7bdat: 556,269,568 bytes
i94_jul16_sub.sas7bdat: 650,117,120 bytes
i94_feb16_sub.sas7bdat: 391,905,280 bytes
i94_dec16_sub.sas7bdat: 523,304,960 bytes
Number of files in the dataset: 12 


### Travel mode (i94mode)

In [27]:
# explore travel mode
# /* I94MODE - There are missing values as well as not reported (9) */
# value i94model
# 	1 = 'Air'
# 	2 = 'Sea'
# 	3 = 'Land'
# 	9 = 'Not reported' ;

In [28]:
# show travel mode distribution 
travel_modes = i94_apr16.filter(i94_apr16["i94mode"].isNotNull()) \
                        .groupBy('i94mode') \
                        .count().sort(col('count').desc()).persist()
travel_modes.show()

+-------+-------+
|i94mode|  count|
+-------+-------+
|    1.0|2994505|
|    3.0|  66660|
|    2.0|  26349|
|    9.0|   8560|
+-------+-------+



- **Air travel mode (1)** is the largest travel mode.

### Visa category (i94visa)

In [29]:
# explore visa category
# /* I94VISA - Visa codes collapsed into three categories:
#    1 = Business
#    2 = Pleasure
#    3 = Student
# */

In [30]:
# show visa category distribution 
visa_categories = i94_apr16.filter(i94_apr16["i94visa"].isNotNull()) \
                      .groupBy('i94visa') \
                      .count().sort(col('i94visa').desc()).persist()
visa_categories.show()

+-------+-------+
|i94visa|  count|
+-------+-------+
|    3.0|  43366|
|    2.0|2530868|
|    1.0| 522079|
+-------+-------+



- **Pleasure Visa category (2)** is the largest visa category.

### Ports (i94port)

In [31]:
# parse ports in the SAS file between line 303 and 962
pattern = r'\'(.*)\'.*\'(.*)\''
prog = re.compile(pattern)
valid_ports = {}
for line in lines[302: 961]:
    result = prog.search(line)
    valid_ports[result.group(1)] = result.group(2)

In [32]:
# for k, v in valid_ports.items():
#     print(k, v)

# ALC ALCAN, AK             
# ANC ANCHORAGE, AK         
# BAR BAKER AAF - BAKER ISLAND, AK
# DAC DALTONS CACHE, AK     
# PIZ DEW STATION PT LAY DEW, AK
# ...

In [33]:
@udf  
def code_to_port(code):
    """Map port code to port name"""
    # handel Washington, D.C. case
    if code == "WAS":
        return "Washington, D.C."
    for key in valid_ports:
        if key.lower() == code.lower():
            return (valid_ports[key].strip()[:-4]).title()

In [34]:
# drop null i94port values 
# map port code to port name
i94_apr16 = i94_apr16.filter(col("i94port").isNotNull()) \
                     .withColumn('port_name', code_to_port(col("i94port"))) 

In [35]:
i94_apr16.select(["i94port", "port_name"]).show(5)

+-------+----------------+
|i94port|       port_name|
+-------+----------------+
|    XXX|Not Reported/Unk|
|    ATL|         Atlanta|
|    WAS|Washington, D.C.|
|    NYC|        New York|
|    NYC|        New York|
+-------+----------------+
only showing top 5 rows



In [36]:
# Show top 10 ports
top_ports = i94_apr16.select(["i94port", "port_name"]) \
                     .groupBy(["i94port", "port_name"]) \
                     .count().sort(col("count").desc()).persist()
top_ports.show(10)

+-------+----------------+------+
|i94port|       port_name| count|
+-------+----------------+------+
|    NYC|        New York|485916|
|    MIA|           Miami|343941|
|    LOS|     Los Angeles|310163|
|    SFR|   San Francisco|152586|
|    ORL|         Orlando|149195|
|    HHW|        Honolulu|142720|
|    NEW|Newark/Teterboro|136122|
|    CHI|         Chicago|130564|
|    HOU|         Houston|101481|
|    FTL| Fort Lauderdale| 95977|
+-------+----------------+------+
only showing top 10 rows



In [37]:
# get unique port codes 
ports = i94_apr16.select('i94port').distinct().collect()

In [38]:
print(f"Dataset has {len(ports)} ports")

Dataset has 299 ports


### Age (i94bir)

In [39]:
# calculate min and max values of age column 
min_age = i94_apr16.agg({"i94bir": "min"}).collect()[0]
max_age = i94_apr16.agg({"i94bir": "max"}).collect()[0]

In [40]:
print(f'Ages ranged from {int(min_age["min(i94bir)"])} to {int(max_age["max(i94bir)"])} years') 

Ages ranged from -3 to 114 years


- negative age values needs to be cleaned
- drop rows with invalid 194bir values
- age values should be between 1 and max_age (114)


In [41]:
i94_apr16 = i94_apr16.filter(i94_apr16.i94bir.between(1,114))

### convert dates

In [42]:
# convert columns arrdate and depdate from date to timestamp type

@udf(t.TimestampType())
def to_timestamp(d):
    try:
        return pd.to_timedelta(d, unit='D') + pd.Timestamp('1960-1-1')
    except:
        return pd.Timestamp('1900-1-1')

i94_apr16 = i94_apr16.withColumn('arrdate', to_date(to_timestamp(col('arrdate'))))\
                     .withColumn('depdate', to_date(to_timestamp(col('depdate'))))


### Countries (i94res)

In [43]:
# parse countries in the SAS file between line 10 and 298
pattern = r'(.*)\'(.*)\''
prog = re.compile(pattern)
valid_countries = {}
for line in lines[9: 298]:
    result = prog.search(line)
    valid_countries[result.group(1).strip()[:3]] = result.group(2)

In [44]:
# for k, v in valid_countries.items():
#     print(k, v)

# 582 MEXICO Air Sea, and Not Reported (I-94, no land arrivals)
# 236 AFGHANISTAN
# 101 ALBANIA
# 316 ALGERIA
# 102 ANDORRA
# 324 ANGOLA    

In [45]:
def country_code_to_name(code):
    """Map country code to country name"""
    name = (valid_countries[code].strip())
    # handel invalid cases
    if "INVALID" in name or "Collapsed" in name or "No Country Code" in name:
        return None
    # handel 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)' case
    elif code == "582":
        return None
    else:
        # Use title() to capitalize the first letter of each word in a string
        return name.title()

In [46]:
country_code_to_name_udf = udf(country_code_to_name, t.StringType())

In [47]:
# convert double to integer using format_number
i94_apr16 = i94_apr16.withColumn('country_name', country_code_to_name_udf(format_number(col("i94res"), 0)))

In [48]:
i94_apr16 = i94_apr16.filter(i94_apr16["country_name"].isNotNull())

In [49]:
i94_apr16.select(["i94res", "country_name"]).show(5)

+------+------------+
|i94res|country_name|
+------+------------+
| 692.0|     Ecuador|
| 276.0| South Korea|
| 101.0|     Albania|
| 101.0|     Albania|
| 101.0|     Albania|
+------+------------+
only showing top 5 rows



In [50]:
# Show top 10 countries
top_countries = i94_apr16.select(["i94res", "country_name"]) \
                         .groupBy(["i94res", "country_name"]) \
                         .count().sort(col("count").desc()).persist()
top_countries.show(10)

+------+--------------+------+
|i94res|  country_name| count|
+------+--------------+------+
| 135.0|United Kingdom|368259|
| 209.0|         Japan|248913|
| 245.0|    China, Prc|185339|
| 111.0|        France|185280|
| 112.0|       Germany|156562|
| 276.0|   South Korea|136136|
| 689.0|        Brazil|134893|
| 438.0|     Australia|112304|
| 213.0|         India|107184|
| 687.0|     Argentina| 75095|
+------+--------------+------+
only showing top 10 rows



- **United Kingdom** (i94res code **135**) had the largest number of visitors

In [51]:
# get unique port codes 
countries = i94_apr16.select('i94res').distinct().collect()

In [52]:
print(f"Dataset has {len(countries)} countries")

Dataset has 228 countries


In [53]:
i94_apr16.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true

In [54]:
i94_apr16.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,port_name,country_name
0,6.0,2016.0,4.0,692.0,692.0,XXX,2016-04-29,,,1900-01-01,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2,Not Reported/Unk,Ecuador
1,7.0,2016.0,4.0,254.0,276.0,ATL,2016-04-07,1.0,AL,1900-01-01,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1,Atlanta,South Korea
2,15.0,2016.0,4.0,101.0,101.0,WAS,2016-04-01,1.0,MI,2016-08-25,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2,"Washington, D.C.",Albania
3,16.0,2016.0,4.0,101.0,101.0,NYC,2016-04-01,1.0,MA,2016-04-23,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2,New York,Albania
4,17.0,2016.0,4.0,101.0,101.0,NYC,2016-04-01,1.0,MA,2016-04-23,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2,New York,Albania


## Tempratures

In [55]:
temperatures.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [56]:
# calculate max and min temprature
min_temp = temperatures.agg({"AverageTemperature": "min"}).collect()[0]
max_temp = temperatures.agg({"AverageTemperature": "max"}).collect()[0]

In [57]:
print(f'Tempratures ranged from {int(min_temp["min(AverageTemperature)"])} to {int(max_temp["max(AverageTemperature)"])} degree celsius') 

Tempratures ranged from -42 to 39 degree celsius


In [58]:
# calculate average temprature per city

In [59]:
@udf  
def city_to_port(city):
    """Convert city name to city code"""
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [60]:
# filter United States temperatures 
# add port code and average temperature
temp_city = temperatures.filter(col("Country") =="United States") \
                        .withColumn("year", year(col("dt"))) \
                        .withColumn("month",month(col("dt"))) \
                        .withColumn("i94port", city_to_port((col("City")))) \
                        .withColumn("AverageTemperature",col("AverageTemperature").cast("float")) \
                        .na.drop(subset=["i94port"])

In [61]:
temp_city.show(5)

+-------------------+------------------+-----------------------------+-----+-------------+--------+---------+----+-----+-------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|      Country|Latitude|Longitude|year|month|i94port|
+-------------------+------------------+-----------------------------+-----+-------------+--------+---------+----+-----+-------+
|1743-11-01 00:00:00|             3.209|           1.9609999999999999|Akron|United States|  40.99N|   80.95W|1743|   11|    AKR|
|1743-12-01 00:00:00|              null|                         null|Akron|United States|  40.99N|   80.95W|1743|   12|    AKR|
|1744-01-01 00:00:00|              null|                         null|Akron|United States|  40.99N|   80.95W|1744|    1|    AKR|
|1744-02-01 00:00:00|              null|                         null|Akron|United States|  40.99N|   80.95W|1744|    2|    AKR|
|1744-03-01 00:00:00|              null|                         null|Akron|United States|  40.99

In [62]:
# find most recent year
max_year = temp_city.agg({"year": "max"}).collect()[0]

In [63]:
max_year

Row(max(year)=2013)

In [64]:
# group city tempratures for most recent year 2013
temp_city = temp_city.filter(temp_city['year'] == 2013) \
                         .groupBy('i94port','city') \
                         .agg({'AverageTemperature':'avg'})

In [65]:
temp_city.show(5)

+-------+---------------+-----------------------+
|i94port|           city|avg(AverageTemperature)|
+-------+---------------+-----------------------+
|    PHO|        Phoenix|      23.56455506218804|
|    SAC|     Sacramento|      16.23366684383816|
|    BGC|     Bridgeport|      12.32911103963852|
|    FTL|Fort Lauderdale|      24.44011116027832|
|    MIA|          Miami|      24.44011116027832|
+-------+---------------+-----------------------+
only showing top 5 rows



In [66]:
# select city_code, city_name and avgtemp
temp_city = temp_city.select(col("i94port").alias("city_code"),
                             col("city").alias("city_name"),
                             format_number(col("avg(AverageTemperature)"), 0).alias("avgtemp")) \
                     .drop_duplicates()

In [67]:
temp_city.show(5)

+---------+-----------+-------+
|city_code|  city_name|avgtemp|
+---------+-----------+-------+
|      BGC| Bridgeport|     12|
|      MEM|    Memphis|     18|
|      SAC| Sacramento|     16|
|      SPI|Springfield|     13|
|      NOG|    Nogales|     20|
+---------+-----------+-------+
only showing top 5 rows



In [68]:
# find cities with highest average temperature 
highest_temp = temp_city.sort(col("avgtemp").desc()).persist()

In [69]:
highest_temp.show(10)

+---------+---------------+-------+
|city_code|      city_name|avgtemp|
+---------+---------------+-------+
|      GRB|      Green Bay|      9|
|      ORL|        Orlando|     24|
|      BRO|    Brownsville|     24|
|      MIA|          Miami|     24|
|      OTM|           Mesa|     24|
|      PHO|        Phoenix|     24|
|      TAM|          Tampa|     24|
|      LCB|         Laredo|     24|
|      FTL|Fort Lauderdale|     24|
|      CRP| Corpus Christi|     23|
+---------+---------------+-------+
only showing top 10 rows



In [70]:
# find cities with lowest average temperature
lowest_temp = temp_city.sort(col("avgtemp")).persist()

In [71]:
lowest_temp.show(10)

+---------+------------+-------+
|city_code|   city_name|avgtemp|
+---------+------------+-------+
|      ANC|   Anchorage|      1|
|      BUF|     Buffalo|     10|
|      CID|Cedar Rapids|     10|
|      RST|   Rochester|     10|
|      BOS|      Boston|     10|
|      MHT|  Manchester|     10|
|      RFD|    Rockford|     10|
|      TAC|      Tacoma|     10|
|      SYR|    Syracuse|     10|
|      SEA|     Seattle|     10|
+---------+------------+-------+
only showing top 10 rows



## Demographics

In [72]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [73]:
# show number of States
print(f"Number of States: {demographics.select('State').distinct().count()}")

Number of States: 49


In [74]:
# show number of Cities
print(f"Number of cities: {demographics.select('City').distinct().count()}")

Number of cities: 567


In [75]:
# drop null and invalid City and State values
demographics = demographics.filter(demographics["State"].isNotNull()) \
                           .filter(demographics["State"].rlike(r'^[a-zA-Z]')) \
                           .filter(demographics["City"].isNotNull()) \
                           .filter(demographics["City"].rlike(r'^[a-zA-Z]'))
demographics.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

## Airports

In [76]:
# filter United States airports 
# drop null and invalid iata_code 
airports = airports.filter(col("iso_country") == "US") \
                   .filter(airports["iata_code"].isNotNull()) \
                   .filter(airports["iata_code"].rlike(r'^[A-Z]'))
airports.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|    Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|
| 0CO2|small_airport|Crested Butte Air...|        8980|       NA|         US|     US-CO|Crested Butte|    0CO2|      CSE|      0CO2|-106.928341, 38.8...|
| 0TE7|small_airport|   LBJ Ranch Airport|        1515|       NA|         US

# Step 3: Define the Data Model

## 3.1 Conceptual Data Model

![](immigration_schema.png)

Star schema that contains **1** fact table (**immigration**) and **3** dimension tables (**tempratures**, **demographics**, and **airports**)

## Fact Table

### immigration

- year : year 
- month : month 
- origin_city : origin city code 
- destionation_city : destionation city code 
- travel_mode : travel mode 
- age : Age 
- arrival_date : Arrival date 
- departure_date : Departure date 
- visa_category : Visa category 
- avgtemp : Average temperature

## Dimension Tables

### temperatures

- city_code : City code
- city_name : City name
- avgtemp : Average temperature

### demographics

- city : City
- state : State
- median_age : Median age
- total_population : Total population
- foreign_born : number of foreign born

### airports

- airport_id : Airport ID 
- airport_type : Airport type
- airport_name : Airport name
- region : Region 
- municipality : Municipality
- iata_code : IATA code

## 3.2 Mapping Out Data Pipelines

1. Load immigration SAS file
2. Clean data
3. Load temperature CSV file
4. Select columns to create dimesion temperatures table
5. Write temperatures table as parquet file partitiond by city
6. Load city demographics CSV file
7. Select columns to create dimesion demographics table
8. Write demographics table as parquet file partitiond by city
9. Load airports CSV file
5. Select columns to create dimesion airports table
10. Write airports table as parquet file partitiond by region
11. Join datasets using i94port, select columns and create fact immigration table
12. Write immigration table as parquet file partitiond by port

# Step 4: Run Pipelines to Model the Data 

## 4.1 Create the data model

In [77]:
# create staging Tables
i94_apr16.createOrReplaceTempView("staging_immigration_table")
temp_city.createOrReplaceTempView("staging_temperatures_table")
demographics.createOrReplaceTempView("staging_demographics")
airports.createOrReplaceTempView("staging_airports_table")

In [78]:
# create immigration Table
immigration_table = spark.sql("""SELECT 
                                       cast(si.i94yr  AS INT)  AS year,
                                       cast(si.i94mon AS INT)  AS month,
                                       si.i94cit               AS origin_city,
                                       si.i94port              AS destination_city,
                                       cast(si.i94mode AS INT) AS travel_code,
                                       cast(si.i94bir AS INT)  AS age,
                                       si.arrdate              AS arrival_date,
                                       si.depdate              AS depature_date,
                                       cast(si.i94visa AS INT) AS visa_category,
                                       cast(st.avgtemp AS INT) AS avgtemp
                                 FROM  staging_immigration_table si
                                 JOIN  staging_temperatures_table st
                                 ON    si.i94port = st.city_code
                                 ORDER BY si.i94port 
                             """).dropDuplicates()

In [79]:
immigration_table.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- origin_city: double (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- travel_code: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- depature_date: date (nullable = true)
 |-- visa_category: integer (nullable = true)
 |-- avgtemp: integer (nullable = true)



In [80]:
immigration_table.show(5)

+----+-----+-----------+----------------+-----------+---+------------+-------------+-------------+-------+
|year|month|origin_city|destination_city|travel_code|age|arrival_date|depature_date|visa_category|avgtemp|
+----+-----+-----------+----------------+-----------+---+------------+-------------+-------------+-------+
|2016|    4|      126.0|             ATL|          1| 75|  2016-04-01|   2016-04-10|            2|     16|
|2016|    4|      135.0|             ATL|          1| 42|  2016-04-01|   2016-04-09|            2|     16|
|2016|    4|      148.0|             ATL|          1| 28|  2016-04-01|   2016-04-08|            2|     16|
|2016|    4|      148.0|             ATL|          1| 32|  2016-04-01|   2016-04-10|            2|     16|
|2016|    4|      254.0|             ATL|          1| 54|  2016-04-01|   2016-06-01|            1|     16|
+----+-----+-----------+----------------+-----------+---+------------+-------------+-------------+-------+
only showing top 5 rows



In [81]:
# create temperatures Table
temperatures_table = spark.sql("""SELECT 
                                        city_code,
                                        city_name,                                    
                                        cast(avgtemp AS INT) AS avgtemp
                                  FROM staging_temperatures_table 
                                  ORDER BY city_name
                              """).dropDuplicates()

In [82]:
temperatures_table.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- avgtemp: integer (nullable = true)



In [83]:
temperatures_table.show(5)

+---------+-----------+-------+
|city_code|  city_name|avgtemp|
+---------+-----------+-------+
|      AKR|      Akron|     12|
|      ABQ|Albuquerque|     14|
|      AXB| Alexandria|     14|
|      ANC|  Anchorage|      1|
|      ATL|    Atlanta|     16|
+---------+-----------+-------+
only showing top 5 rows



In [84]:
# create demographics Table
demographics_table = spark.sql("""SELECT 
                                     City               AS city,
                                     State              AS state,
                                     `Median Age`       AS median_age,
                                     `Total Population` AS total_population,
                                     `Foreign-born`     AS foreign_born
                                  FROM staging_demographics
                                  ORDER BY city
                                """).dropDuplicates()

In [85]:
demographics_table.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)



In [86]:
demographics_table.show(5)

+-------+----------+----------+----------------+------------+
|   city|     state|median_age|total_population|foreign_born|
+-------+----------+----------+----------------+------------+
|Abilene|     Texas|      31.3|          125876|        8129|
|  Akron|      Ohio|      38.1|          197553|       10024|
|Alafaya|   Florida|      33.5|           85264|       15842|
|Alameda|California|      41.4|           78614|       18841|
| Albany|   Georgia|      33.3|           71109|         861|
+-------+----------+----------+----------------+------------+
only showing top 5 rows



In [87]:
# create Airports Table
airports_table = spark.sql("""SELECT 
                                    ident        AS airport_id,
                                    type         AS airport_type,
                                    name         AS airport_name,                                    
                                    iso_region   AS region,
                                    municipality AS municipality,                                    
                                    iata_code    AS iata_code
                             FROM staging_airports_table 
                             ORDER BY municipality
                            """).dropDuplicates()

In [88]:
airports_table.printSchema()

root
 |-- airport_id: string (nullable = true)
 |-- airport_type: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- iata_code: string (nullable = true)



In [89]:
airports_table.show(5)

+----------+------------+--------------------+------+------------+---------+
|airport_id|airport_type|        airport_name|region|municipality|iata_code|
+----------+------------+--------------------+------+------------+---------+
|       AUS|      closed|Austin Robert Mue...| US-TX|        null|      AUS|
|       CLG|      closed|    Coalinga Airport| US-CA|        null|      CLG|
|      KAYE|      closed|Ft Devens Moore A...| US-MA|        null|      AYE|
|      KUIZ|      closed| Berz-Macomb Airport| US-MI|        null|      UIZ|
|       O62|      closed|       Carmel Valley| US-CA|        null|      O62|
+----------+------------+--------------------+------+------------+---------+
only showing top 5 rows



In [90]:
# write tables in parquet format
# output_data = "./output"

In [91]:
# immigration_table.write \
#                  .partitionBy('destionation_city') \
#                  .parquet(os.path.join(output_data, 'immigration'), \
#                                        mode='overwrite')

In [92]:
# temperatures_table.write \
#                  .parquet(os.path.join(output_data, 'temperatures'), \
#                                        mode='overwrite')

In [93]:
# demographics_table.write \
#          .partitionBy('state') \
#          .parquet(os.path.join(output_data, 'demographics'), \
#                                mode='overwrite')

In [94]:
# airports_table.write \
#               .partitionBy('region') \
#               .parquet(os.path.join(output_data, 'airports'), \
#                                     mode='overwrite')

## 4.2 Data Quality Checks

### immigration check

#### Has rows check

In [95]:
def hasrowscheck(table, table_name):
    """Check table has rows"""
    print(f'Checking {table_name} table has rows')
    rows_num = table.count()
    if rows_num == 0:
        raise ValueError(f'{table_name} table has no rows')
    print(f'{table_name} table has {rows_num} rows')

In [96]:
hasrowscheck(immigration_table, "immigration")

Checking immigration table has rows
immigration table has 1886625 rows


#### Null check

In [97]:
immigration_table.createOrReplaceTempView("staging_immigration")

In [98]:
destionation_city_check  = spark.sql("""SELECT COUNT(*) 
                                        FROM staging_immigration 
                                        WHERE destination_city IS NULL
                                    """)

In [99]:
destionation_city_check.show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



### temperatures check

#### Has row check

In [100]:
hasrowscheck(temperatures_table, "temperatures")

Checking temperatures table has rows
temperatures table has 112 rows


#### Null check

In [101]:
temperatures_table.createOrReplaceTempView("staging_temperatures_table")

In [102]:
city_check = spark.sql("""SELECT COUNT(*) 
                                 FROM staging_temperatures_table 
                                 WHERE city_name IS NULL""")

In [103]:
city_check.show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



### demographics check

#### Has rows check

In [104]:
hasrowscheck(demographics_table, "demographics")

Checking demographics table has rows
demographics table has 596 rows


#### Null check

In [105]:
demographics_table.createOrReplaceTempView("staging_demographics")

In [106]:
city_check = spark.sql("""SELECT COUNT(*) 
                                 FROM staging_demographics 
                                 WHERE city IS NULL""")

In [107]:
city_check.show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



### airports check

#### Has rows check

In [108]:
hasrowscheck(airports_table, "airports")

Checking airports table has rows
airports table has 2019 rows


#### Null check

In [109]:
airports_table.createOrReplaceTempView("staging_airports_table")

In [110]:
iata_code_check = spark.sql("""SELECT COUNT(*) 
                                 FROM staging_airports_table 
                                 WHERE iata_code IS NULL""")

In [111]:
iata_code_check.show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



## 4.3 Data dictionary 

## Fact Table

### immigration
> source: [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)

- year : year (I94YR)
- month : month (I94MON)
- origin_city : origin city code (I94CIT)
    - As per labels descriptions in the SAS file
- destionation_city : destionation city code (I94PORT)
    - As per labels descriptions in the SAS file
- travel_mode : travel mode code (I94MODE)
    - 1 = 'Air'
    - 2 = 'Sea'
    - 3 = 'Land' 
    - 9 = 'Not reported' ;
- age : Age (I94BIR)
- arrival_date : Arrival date (ARRDATE)
- departure_date : Departure date (DEPDATE)
- visa_category : Visa category (Business/Pleasure/Student) (I94VISA)
    - 1 = 'Business'
    - 2 = 'Pleasure'
    - 3 = 'Student' 
- avgtemp : Average temperature (mapped from temperatures dataset)

## Dimension Tables

### temperatures
> source: [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

- city_code : City code (mapped from immigration dataset)
- city_name : City name (City)
- avgtemp : Average temperature (AverageTemperature)

### demographics
> source: [U.S. City Demographic Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

- city : City
- state : State
- median_age : Median age
- total_population : Total population
- foreign_born : number of foreign born (Foreign-born)

### airports
> source: [Airport Code Table](https://datahub.io/core/airport-codes#data)

- airport_id : Airport ID (ident)
- airport_type : Airport type (type)
- airport_name : Airport name (name)
- region : Region (iso_region)
- municipality : Municipality
- iata_code : IATA code

# Step 5: Complete Project Write Up

## Scenarios

### The data was increased by 100x.
- Use Spark to process the data efficiently in a distributed way with EMR. 

### The data populates a dashboard that must be updated on a daily basis by 7am every day.
- Use Airflow and create a DAG that performs the logic of the described pipeline.

### The database needed to be accessed by 100+ people.
- Use RedShift to have the data stored in a way that it can efficiently be accessed by many people. 