# Data Lake - U.S. Immigration Data

#### Data Engineering Capstone Project - Jonathan Cen

#### Project Summary
    
- A research center decides to study what U.S. cities are among the most popular destinations for immigrants such that they can propose an infrastucutre upgrade plan to the local governemnt. An example of infrastructure upgrade could be constructing more large airports in states that are popular destinations of immigrants. As a data engineer at the research center, Jonathan is tasked to build a <code>data pipeline</code> to ingest raw data and construct both a Data Lake and a Data Warehouse to support analytical purposes for the research center's data sciecne team.

- The datasets available to the research center are:
    - I94 Immigration Data - this data comes from the US Natinoal Tourism and Trade Office.
    - U.S. City Demographic Data - this data comes from OpenDataSoft
    - Airport Code Table, provides details such as types, localtion, state code of an airport. Only airports within the U.S. are of interest at this stage.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

#### Imports

In [1]:
import time
import glob
import os
import configparser
import pandas as pd
import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, LongType
from pyspark.sql.functions import col, udf, create_map, lit
from valid_parameters import valid_i94addr, valid_i94port, valid_i94res_i94cit, valid_i94mode, i94port_map, i94cit_i94res_map, i94mode_map
from itertools import chain

#### constants

In [2]:
datalake_url = "s3a://udacity-capstone-jc-test/data-lake"

In [3]:
# AWS credentials
config = configparser.ConfigParser()
config.read('capstone.cfg')
os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

Instantiate a spark session for data cleansing and ingestion

In [4]:
spark = SparkSession.builder \
.config("spark.jars.repositories", "https://repos.spark-packages.org/") \
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2") \
.enableHiveSupport() \
.getOrCreate()

# bind AWS credentials to the spark session
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])

## Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
    
### Project Detail:
- Raw data are stored in locally.
- Data shall be cleaned, partitioned, and uploaded to an Amazon S3 data lake.
- Create an Amazon Redshift data warehouse to store U.S. immigration data from 2016 and beyond.
    - A <code>Star Schema</code> shall be used to create such data warehouse.
    - Raw data lands on staging tables which will then be used for SQL to SQL ETL.
- BI tools such as Amazon Quicksight could be used for final data visualization and reporting purposes.
- Data science activities such as machine learning and deep learning can then be conducted with data stored in the data warehouse.


### Data Use:
- US Immigration Data in 2016 (structured data, <code>sas7bdat</code> format)
- Airport code table (structured data, <code>csv</code> format)
- U.S. city demographic data (semi-structured data, <code>csv</code> format)

### End Solution:
- A cloud data warehouse which supports high performance analytical purposes.
- An Amazon Quicksight dashboard for data visualisation and reporting purposes.

### Tools:
- Partition Tools:
    - Apache Spark - partition U.S. immigration data by city and by yearå
- Ingestion tools:
    - Python Pandas
- Scheduling tools:
    - Apache Airflow


## Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### U.S. Immigration Dataset (step 2 - Data exploration)

<span style="color: red">smaller <code>.csv</code> dataset will be used in the exploration step, and the original <code>.sas7bdat</code> dataset will be used in the cleansing step</span>

The U.S. Immigration dataset comes fromt the U.S. National Tourism and Trade Office. The dataset used in this project is from the year 2016.

<strong>Main infromation inlcluded:</strong>

* The date an immigrant arrive in the U.S.
* The country of origin of an immigrant.
* The port at which an immigrant arrived.
* The address at which an immigrant lived in the U.S.
* The airline which an immigrant used to arrive in the U.S.
* The flight number which an immigrant used to arrive in the U.S.
* The visa type of an immigrant.


<strong>Main data quality issues:</strong>
    
* Missing values
* Typing issues: field such as year and month shall be integer type - casting is required
* Invalid values issue: the data dictionary defines valid values for fields such as "i94cit", "i94res", and "i94port"
* Duplicated records


In [5]:
immigration_fname = "immigration_data_sample.csv"
df_immigration = pd.read_csv(immigration_fname)
df_immigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


### U.S. Immigration Dataset (step 2 - Data cleansing and upload to S3 Datalake)

In [6]:
match_i94cit = create_map([lit(x) for x in chain(*i94cit_i94res_map.items())])
match_i94res = create_map([lit(x) for x in chain(*i94cit_i94res_map.items())])
match_i94port = create_map([lit(x) for x in chain(*i94port_map.items())])
match_i94mode = create_map([lit(x) for x in chain(*i94mode_map.items())])
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(float(x))).isoformat() if x else None)

In [7]:
df_immigration_master = None
is_first = True

# only the following columns are within scope of this project:
chosen_columns = ["cicid", "i94yr", "i94mon", "i94cit", "i94res", "i94port", 
                  "arrdate", "i94mode", "i94addr", "depdate", "i94bir", "i94visa",
                  "matflag", "biryear", "gender", "airline", "fltno", "visatype"]
for file in glob.glob("./data/18-83510-I94-Data-2016/*.sas7bdat"):
    # read in dataset into spark dataframe
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)
    
    # filter out unnecessary columns
    df_immigration = df_immigration.select(*chosen_columns)
    
    # cast SAS date to iso date
    df_immigration = df_immigration.withColumn("arrdate", get_date(df_immigration.arrdate)).withColumn("depdate", get_date(df_immigration.depdate))
    
    # cast columns into the correct datatypes
    df_immigration = df_immigration.withColumn("cicid", col("cicid").cast(IntegerType())).withColumn("i94yr", col("i94yr").cast(IntegerType())) \
                        .withColumn("i94mon", col("i94mon").cast(IntegerType())) \
                        .withColumn("i94visa", col("i94visa").cast(IntegerType())) \
                        .withColumn("biryear", col("biryear").cast(IntegerType())).withColumn("i94bir", col("i94bir").cast(IntegerType())) \
                        .withColumn("i94cit", col("i94cit").cast(IntegerType())).withColumn("i94res", col("i94res").cast(IntegerType())) \
                        .withColumn("i94mode", col("i94mode").cast(IntegerType()))
    
    # drop records that contain null value in any column
    df_immigration = df_immigration.dropna()
    
    # drop duplicates
    df_immigration = df_immigration.distinct()
    
    # filter out invalid values
    df_immigration = df_immigration.filter(df_immigration.i94addr.isin(*valid_i94addr))
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(*valid_i94port))
    df_immigration = df_immigration.filter(df_immigration.i94cit.isin(*valid_i94res_i94cit))
    df_immigration = df_immigration.filter(df_immigration.i94res.isin(*valid_i94res_i94cit))
    df_immigration = df_immigration.filter(df_immigration.i94mode.isin(*valid_i94mode))
    
    df_immigration = df_immigration.withColumn("i94cit", col("i94cit").cast(StringType())).withColumn("i94res", col("i94res").cast(StringType())) \
                                    .withColumn("i94mode", col("i94mode").cast(StringType()))
    
    # match code to values
    df_immigration = df_immigration.withColumn("i94cit", match_i94cit[df_immigration["i94cit"]].alias("i94cit")) \
                                .withColumn("i94res", match_i94res[df_immigration["i94res"]].alias("i94res")) \
                                .withColumn("i94port", match_i94port[df_immigration["i94port"]].alias("i94port")) \
                                .withColumn("i94mode", match_i94mode[df_immigration["i94mode"]].alias("i94mode"))
    
    if is_first:
        is_first = False
        df_immigration_master = df_immigration
    else:
        df_immigration_master = df_immigration_master.union(df_immigration)
    print(f"{file} is processed.")

../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat is processed.
../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat is processed.


In [8]:
# upload to S3 datalake in parquet format
s1 = time.time()
df_immigration_master.coalesce(1).write.partitionBy("i94yr", 'i94addr').mode('overwrite').parquet(path=os.path.join(datalake_url, 'us_immigration'))
print("time used = {} seconds.".format(time.time() - s1))

time used = 1803.5672590732574 seconds.



### U.S. Cities Demographics (Step 2 - Data Exploration)

This dataset comes from OpenDataSoft, it contains main statistics of demographics of U.S. cities.

Main information:

* City name
* State
* Median Age
* Total Population
* Number of Foreign-born
* Race
* State Code

Main data quality issues:

* Duplicated records
* All fields are of the type String, which is incorrect
* Some recordes contain null values
* field names contain spaces - rename required

In [9]:
demographic_fname = "us-cities-demographics.csv"
df_us_cities = pd.read_csv(demographic_fname, sep=";")
df_us_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### U.S. Cities Demographics Dataset (Step 2 - Data cleansing and upload to S3 Datalake)

In [10]:
df_us_cities = spark.read.format("csv").option("header", "true").option( "delimiter",";").load(demographic_fname)
# drop records with null values
df_us_cities = df_us_cities.dropna()
# drop duplicate records
df_us_cities = df_us_cities.distinct()

In [11]:
# cast all fields to correct data type
df_us_cities = df_us_cities.withColumn("Median Age", col("Median Age").cast(DoubleType())) \
                        .withColumn("Male Population", col("Male Population").cast(IntegerType())) \
                        .withColumn("Female Population", col("Female Population").cast(IntegerType())) \
                        .withColumn("Number of Veterans", col("Number of Veterans").cast(IntegerType())) \
                        .withColumn("Foreign-born", col("Foreign-born").cast(IntegerType())) \
                        .withColumn("Average Household Size", col("Average Household Size").cast(DoubleType())) \
                        .withColumn("Count", col("Count").cast(IntegerType()))

In [12]:
# rename all fields to remove spaces
df_us_cities = df_us_cities.withColumnRenamed("City", "city") \
                        .withColumnRenamed("State", "state") \
                        .withColumnRenamed("Median Age", "median_age") \
                        .withColumnRenamed("Male Population", "male_population") \
                        .withColumnRenamed("Female Population", "female_population") \
                        .withColumnRenamed("Total Population", "total_population") \
                        .withColumnRenamed("Number of Veterans", "num_of_veterans") \
                        .withColumnRenamed("Foreign-born", "foreign_born") \
                        .withColumnRenamed("Average Household Size", "average_household_size") \
                        .withColumnRenamed("State Code", "state_code") \
                        .withColumnRenamed("Race", "race") \
                        .withColumnRenamed("Count", "count") \

In [13]:
df_us_cities.show(5)

+----------+----------+----------+---------------+-----------------+----------------+---------------+------------+----------------------+----------+--------------------+------+
|      city|     state|median_age|male_population|female_population|total_population|num_of_veterans|foreign_born|average_household_size|state_code|                race| count|
+----------+----------+----------+---------------+-----------------+----------------+---------------+------------+----------------------+----------+--------------------+------+
|   Lynwood|California|      29.4|          35634|            36371|           72005|            776|       28061|                  4.43|        CA|Black or African-...|  5346|
| Hollywood|   Florida|      41.4|          75358|            74363|          149721|           6056|       55158|                  2.65|        FL|               White|107916|
|   Fremont|California|      38.3|         114383|           117808|          232191|           4629|      109427| 

In [14]:
# upload to s3 datalake in parquet format
s2 = time.time()
df_us_cities.coalesce(1).write.partitionBy("state_code").mode('overwrite').parquet(path=os.path.join(datalake_url, 'us_cities_demographics'))
print("time used = {} seconds.".format(time.time() - s2))

time used = 16.081935167312622 seconds.


### Airport Codes (Step 2 - Data exploration)

The airport code dataset comes from <a href="https://datahub.io/core/airport-codes#data">datahub.io</a>

Main information:

* Airport type
* Airport name
* The country of the airport
* The state of the airport
* The coordinates of the airports

Main data quality issues:

* elevation_ft is of the type string - this should be integer type
* contains non-U.S. airport code - this project only considers U.S. data
* state code can be extracted from the "iso_region" field
* longitude and latitude need to be extracted from "coordinates"

In [15]:
airport_code_fname = "airport-codes_csv.csv"
df_airport_codes = pd.read_csv(airport_code_fname)
df_airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Airport Codes Dataset (Step 2 - Data cleansing and upload to S3 Datalake)

In [16]:
df_airport_codes = spark.read.format("csv").option("header", "true").load(airport_code_fname)

In [17]:
# define udf for new columns
extract_state_code = udf(lambda iso_region: iso_region.split('-')[-1])
extract_longitude = udf(lambda coordinates: coordinates.split(',')[0])
extract_latitude = udf(lambda coordinates: coordinates.split(',')[1])

In [18]:
# extract information for new columns
df_airport_codes = df_airport_codes.withColumn("state_code", extract_state_code(df_airport_codes.iso_region)) \
                                    .withColumn("longitude", extract_longitude(df_airport_codes.coordinates)) \
                                    .withColumn("latitude", extract_latitude(df_airport_codes.coordinates))

In [19]:
# filter out non-U.S. data
df_airport_codes = df_airport_codes.filter(df_airport_codes.iso_country=="US")
# drop unnecessary columns
df_airport_codes = df_airport_codes.drop("iso_region", "coordinates")
# drop duplicates
df_airport_codes = df_airport_codes.distinct()
# cast columns to correct data types
df_airport_codes = df_airport_codes.withColumn("elevation_ft", col("elevation_ft").cast(IntegerType())) \
                    .withColumn("longitude", col("longitude").cast(DoubleType())) \
                    .withColumn("latitude", col("latitude").cast(DoubleType()))

In [20]:
# upload to S3 datalake
s3 = time.time()
df_airport_codes.coalesce(1).write.partitionBy("state_code").mode('overwrite').parquet(path=os.path.join(datalake_url, 'airport_codes'))
print("time used = {} seconds.".format(time.time() - s3))

time used = 18.05463218688965 seconds.
