# Project Title
## Data Engineering Capstone Project
## Project Summary
--describe your project at a high level--

The project follows the follow steps:

* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Import Libraries
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os
import configparser

from pyspark.sql import SparkSession

import utility

import importlib
"""importlib.reload(utility)
from utility import visualise_missing_values, clean_immigration, clean_temperature_data
from utility import clean_demographics_data"""

'importlib.reload(utility)\nfrom utility import visualise_missing_values, clean_immigration, clean_temperature_data\nfrom utility import clean_demographics_data'

In [2]:
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

## Step 1: Scope the Project and Gather Data
### Scope
---
The objective of this project is to develop a cloud-based data warehouse solution that will provide decision and policy makers with an analytics database for immigration and toursim related data. A use case of this analytics database is to power dashboards and visualizations that assist organisations such as the National Travel & Tourism Office (NTTO) in it's quest to create a positive climate for growth in travel and tourism through reducing institutional barriers to tourism.

The technology used in this project is Amazon S3, Amazon Redshift, Apache Spark and Apache Airflow. Data will be read and staged from the customers repository to S3 using Spark. Scheduled monthly jobs will be run using Apache Airflow to populate a data warehouse on RedShift.

### Data Description
---
Describe the data sets you're using. Where did it come from? What type of information is included?

### World Temperature Data
* This dataset came from Kaggle.
### U.S. City Demographic Data
* This data comes from OpenSoft.
### Airport Code Table
* This is a simple table of airport codes and corresponding cities.

### Technology

### Architecture

### Immigration Data: Data Description
---
This data comes from the US National Tourism and Trade Office. In the past all foreign visitors to the U.S. arriving via air or sea were required to complete paper Customs and Border Protection Form I-94 Arrival/Departure Record or Form I-94W Nonimmigrant Visa Waiver Arrival/Departure Record and this dataset comes from this forms.

This dataset forms the core of the data warehouse and the customer repository has a years worth of data for the year 2016 and the dataset is divided by month. For this project the data is in a folder located at ../../data/18-83510-I94-Data-2016/. Each months data is stored in an SAS binary database storage format sas7bdat. For this project we have chosen going to work with data for the month of April. However, the data extraction, transformation and loading utility functions have been designed to work with any month's worth of data.

<b><i>Data dictionary</i></b>

|Feature|Description|
|:--|:--|
|cicid|Unique record ID|
|i94yr|4 digit year|
|i94mon|Numeric month|
|i94cit|3 digit code for immigrant country of birth|
|i94res|3 digit code for immigrant country of residence|
|i94port|Port of admission|
|arrdate|Arrival Date in the USA|
|i94mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)|
|i94addr|USA State of arrival|
|depdate|Departure Date from the USA|
|i94bir|Age of Respondent in Years|
|i94visa|Visa codes collapsed into three categories|
|count|Field used for summary statistics|
|dtadfile|Character Date Field - Date added to I-94 Files|
|visapost|Department of State where where Visa was issued|
|occup|Occupation that will be performed in U.S|
|entdepa|Arrival Flag - admitted or paroled into the U.S.|
|entdepd|Departure Flag - Departed, lost I-94 or is deceased|
|entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence|
|matflag|Match flag - Match of arrival and departure records|
|biryear|4 digit year of birth|
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|
|gender|Non-immigrant sex|
|insnum|INS number|
|airline|Airline used to arrive in U.S.|
|admnum|Admission Number|
|fltno|Flight number of Airline used to arrive in U.S.|
|visatype|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|


#### Load I94 Immigration dataset

In [3]:
# Read in the data here - stored in dataset folder
fname = 'dataset/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

  df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


In [4]:
df.head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:3.0.0-s_2.12")\
.enableHiveSupport().getOrCreate()

:: loading settings :: url = jar:file:/usr/local/anaconda3/envs/data-lake-aws/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/cian.young/.ivy2/cache
The jars for the packages stored in: /Users/cian.young/.ivy2/jars
saurfang#spark-sas7bdat added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6cfdcba7-e188-4c88-9967-0f5f494cfbb9;1.0
	confs: [default]
	found saurfang#spark-sas7bdat;3.0.0-s_2.12 in spark-packages
	found com.epam#parso;2.0.11 in central
	found org.slf4j#slf4j-api;1.7.5 in central
	found org.apache.logging.log4j#log4j-api-scala_2.12;12.0 in central
	found org.scala-lang#scala-reflect;2.12.10 in central
	found org.apache.logging.log4j#log4j-api;2.13.2 in central
downloading https://repos.spark-packages.org/saurfang/spark-sas7bdat/3.0.0-s_2.12/spark-sas7bdat-3.0.0-s_2.12.jar ...
	[SUCCESSFUL ] saurfang#spark-sas7bdat;3.0.0-s_2.12!spark-sas7bdat.jar (181ms)
downloading https://repo1.maven.org/maven2/com/epam/parso/2.0.11/parso-2.0.11.jar ...
	[SUCCESSFUL ] com.epam#parso;2.0.11!parso.jar (53ms)
downloading https://repo1.maven.org/mave

In [None]:
"""spark = SparkSession.builder\
                    .config("spark.jars","https://repo1.maven.org/maven2/com/epam/parso/2.0.8/parso-2.0.8.jar,https://repos.spark-packages.org/saurfang/spark-sas7bdat/2.0.0-s_2.11/spark-sas7bdat-2.0.0-s_2.11.jar")\
                    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                    .enableHiveSupport()\
                    .getOrCreate()"""

In [None]:
"""spark = SparkSession.builder \
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.1.0-s_2.11,com.epam:parso:2.0.11") \
    .getOrCreate()"""


In [6]:
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('dataset/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [7]:
spark.version

'3.2.1'

In [8]:
df_spark.count()

23/04/01 13:56:25 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

3096313

In [9]:
#write to parquet
df_spark.write.parquet("sas_data_1")
df_spark=spark.read.parquet("sas_data_1")

23/04/01 13:57:58 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/04/01 13:57:58 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/04/01 13:57:58 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/04/01 13:57:58 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
23/04/01 13:57:58 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
23/04/01 13:57:58 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
23/04/01 13:57:58 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,

### World Temperature Data: Data Description
---
The World Temperature dataset comes from Kaggle and represents global land temperatures by city.

Load World Temperature dat

In [10]:
file_name = 'data/GlobalLandTemperaturesByCity.csv'

In [11]:
df_wt = pd.read_csv(file_name)

In [13]:
#Peek at the data
df_wt.head(2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E


#### Data dictionary

| Feature | Description |
| --- | --- |
| dt | Date |
| AverageTemperature | Global average land temperature in celsius |
| AverageTemperatureUncertainty | 95% confidence interval around the average |
| City | Name of City |
| Country | Name of Country |
| Latitude | City Latitude |
| Longitude | City Longitude |


In [16]:
df_wt.shape

(8599212, 7)

### U.S. City Demographic Data: Data Description
---
This data comes from OpenSoft and contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. Original data comes from the US Census Bureau's 2015 American Community Survey.

Load the dataset

In [21]:
df_cd = pd.read_csv('data/us-cities-demographics.csv')

In [22]:
df_cd.shape

(2891, 12)

In [23]:
df_cd.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


#### Data dictionary

| Feature | Description |
|---------|-------------|
| City | City Name |
| State | US State where city is located |
| Median Age | Median age of the population |
| Male Population | Count of male population |
| Female Population | Count of female population |
| Total Population | Count of total population |
| Number of Veterans | Count of total Veterans |
| Foreign born | Count of residents of the city that were not born in the city |
| Average Household Size | Average city household size |
| State Code | Code of the US state |
| Race | Respondent race |
| Count | Count of city's individual per race |

In [24]:
df_cd.shape

(2891, 12)

## Step 2: Explore and Assess the Data
### Explore the Data
Identify data quality issues, like missing values, duplicate data, etc.

### Cleaning Steps
Document steps necessary to clean the data

### EDA Immigration data

In [25]:
# list all files in the customer repository
files = os.listdir('dataset/18-83510-I94-Data-2016/')
files

['i94_nov16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_apr16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat']

In [26]:
# Read in the data for April 2016
fname = 'dataset/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

  df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


In [27]:
# see dataframe dimensions
df.shape

(3096313, 28)

In [28]:
# first five rows
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [29]:
# be careful with running this cell, will take a long time to execute
for name in files:
    # read the data into a data frame
    fname = 'dataset/18-83510-I94-Data-2016/' + name
    df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
    print(f'{fname} df.shape = ', df_f.shape)

  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat df.shape =  (2914926, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat df.shape =  (3432990, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat df.shape =  (3733786, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat df.shape =  (4103570, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat df.shape =  (3444249, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat df.shape =  (3574989, 34)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat df.shape =  (3649136, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat df.shape =  (3096313, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat df.shape =  (2847924, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat df.shape =  (4265031, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat df.shape =  (3157072, 28)


  df_f =  pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")


dataset/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat df.shape =  (2570543, 28)


In [30]:
# lets visualize % missing values per immigration feature
visualise_missing_values(df)

NameError: name 'visualise_missing_values' is not defined