# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project will use Spark locally to build a data warehouse as a single-source-of-truth for information on US immigration. The database will be synthesized from different data sources containing information about US airports, cities, temperatures and immigration.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import os
import pandas as pd
import psycopg2
import pyspark
import configparser
import logging
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf, col, lit, year, month, to_date, monotonically_increasing_id

### Step 1: Scope the Project and Gather Data

#### Scope 
The project will be comprised  of gathering immigration, temperature and city demographic data from different sources and synthesizing them into fact and dimension tables for easier access and usage.
A final database will be built to run powerful, informative queries on.

* Tools used:
    * Python : for project set-up and processing
    * Pandas : for data exploration and analysis, as well as some clean-up
    * PySpark : large scale data processing leveraging Apache Spark
    * AWS S3 : for creating buckets to store data

#### Describe and Gather Data 

| Data Set | Data Format | Data Source | Information |
|:---------|:-----------:|:-----------:|-------------|
| I94 Immigration Data       | SAS | US National Tourism and Trade Office | Contains information about immigrants from different world regions and countries, modes of transport, demographic data, etc.            |
| World Temperature Data     | CSV | Kaggle      | Contains temperature data from different countries around the world |
| U.S. City Demographic Data | CSV | OpenDataSoft    | Contains demographic information from each city in the U.S. with population census counts of at least 65,000 residents |
| Airport Codes              | CSV | DataHub     | Contains airport codes, locations, names, types and other information |

---
#### I94 Immigration Data Exploration
---

In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

df_i94 = pd.read_sas(fname, 'sas7bdat')

In [3]:
# Prevents ellipses truncation of columns
# This allows us to see the entire table for better exploration
pd.set_option('display.max_columns', None)

In [4]:
df_i94.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,b'XXX',20573.0,,,,37.0,2.0,1.0,,,,b'T',,b'U',,1979.0,b'10282016',,,,1897628000.0,,b'B2'
1,7.0,2016.0,4.0,254.0,276.0,b'ATL',20551.0,1.0,b'AL',,25.0,3.0,1.0,b'20130811',b'SEO',,b'G',,b'Y',,1991.0,b'D/S',b'M',,,3736796000.0,b'00296',b'F1'
2,15.0,2016.0,4.0,101.0,101.0,b'WAS',20545.0,1.0,b'MI',20691.0,55.0,2.0,1.0,b'20160401',,,b'T',b'O',,b'M',1961.0,b'09302016',b'M',,b'OS',666643200.0,b'93',b'B2'
3,16.0,2016.0,4.0,101.0,101.0,b'NYC',20545.0,1.0,b'MA',20567.0,28.0,2.0,1.0,b'20160401',,,b'O',b'O',,b'M',1988.0,b'09302016',,,b'AA',92468460000.0,b'00199',b'B2'
4,17.0,2016.0,4.0,101.0,101.0,b'NYC',20545.0,1.0,b'MA',20567.0,4.0,2.0,1.0,b'20160401',,,b'O',b'O',,b'M',2012.0,b'09302016',,,b'AA',92468460000.0,b'00199',b'B2'


In [5]:
df_i94_cols = df_i94.columns
df_i94_cols

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

---
#### Immigration Data Sample Exploration
---

In [6]:
df_imm = pd.read_csv('immigration_data_sample.csv')

In [7]:
df_imm.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [8]:
df_imm_cols = df_imm.columns
df_imm_cols

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [9]:
# Designating a fact table from immigration data
ft_imm = df_imm[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']]

In [10]:
ft_imm = ft_imm.rename({'cicid' : 'cic_id',
                        'i94yr' : 'year',
                        'i94mon' : 'month', 
                        'i94port' : 'city_code',
                        'i94addr' : 'state_code', 
                        'arrdate' : 'arr_date',
                        'depdate' : 'dep_date',
                        'i94mode' : 'mode_code',
                        'i94visa' : 'visa_code'
                       }, axis = 1)

In [11]:
ft_imm.head()

Unnamed: 0,cic_id,year,month,city_code,state_code,arr_date,dep_date,mode_code,visa_code
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [12]:
dt_imm_indiv = df_imm[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']]

In [13]:
dt_imm_indiv = dt_imm_indiv.rename({'cicid' : 'cic_id',
                                    'i94cit' : 'citizenship',
                                    'i94res' : 'residency',
                                    'biryear' : 'birth_year',
                                    'insnum' : 'ins_num'                                   
                                   }, axis = 1)

In [14]:
dt_imm_indiv.head()

Unnamed: 0,cic_id,citizenship,residency,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


In [15]:
dt_imm_airline = df_imm[['cicid', 'airline', 'admnum', 'fltno', 'visatype']]

In [16]:
dt_imm_airline = dt_imm_airline.rename({'cicid' : 'cic_id',
                                        'admnum' : 'admin_number',
                                        'fltno' : 'flight_number',
                                        'visatype' : 'visa_type'
                                       }, axis = 1)

In [17]:
dt_imm_airline.head()

Unnamed: 0,cic_id,airline,admin_number,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


---
#### U.S. Cities Demographics Data Exploration
---

In [18]:
dt_demo = pd.read_csv('us-cities-demographics.csv', delimiter = ';')

In [19]:
dt_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [20]:
dt_demo = dt_demo.drop(columns = ['Count'], axis = 1)

In [21]:
dt_demo = dt_demo.rename({'City' : 'city',
                          'State' : 'state',
                          'Median Age' : 'median_age',
                          'Male Population' : 'male_population',
                          'Female Population' : 'female_population',
                          'Total Population' : 'total_population',
                          'Number of Veterans' : 'num_of_veterans',
                          'Foreign-born' : 'foreign_born',
                          'Average Household Size' : 'avg_household_size',
                          'State Code' : 'state_code',
                          'Race' : 'race'
                         }, axis = 1)

In [22]:
dt_demo.head()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,num_of_veterans,foreign_born,avg_household_size,state_code,race
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White


---
#### Temperature Data Exploration
---

In [23]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
dt_temp = pd.read_csv(fname, sep=',')

In [24]:
dt_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [25]:
dt_temp.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [26]:
dt_temp = dt_temp.rename({'AverageTemperature' : 'avg_temp',
                          'AverageTemperatureUncertainty' : 'avg_temp_uncertainty',
                          'City' : 'city',
                          'Country' : 'country',
                          'Latitude' : 'latitude',
                          'Longitude' : 'longitude'
                         }, axis = 1)

##### Since we are only interested in data about the United States in this project, we will only pull temperature
##### records pertinent to cities located in the U.S.

In [27]:
dt_temp_us = dt_temp[dt_temp['country'] == 'United States']

In [28]:
dt_temp_us.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertainty,city,country,latitude,longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [29]:
# changing the dt column to pandas dt format
dt_temp_us['dt'] = pd.to_datetime(dt_temp_us['dt'])

# extracting a year and month column from dt for consistency with the i94 dataset
dt_temp_us['year'] = dt_temp_us['dt'].apply(lambda x: x.year)
dt_temp_us['month'] = dt_temp_us['dt'].apply(lambda x: x.month)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [30]:
dt_temp_us.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertainty,city,country,latitude,longitude,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W,1820,5


##### Since the I94 dataset we have is specifically looking at 2016
##### we should inspect the temperatures in the U.S. throughout that year

In [31]:
dt_temp_us[dt_temp_us['year'] == 2016]

Unnamed: 0,dt,avg_temp,avg_temp_uncertainty,city,country,latitude,longitude,year,month


There doesn't seem to be any temperature data available for 2016 from this dataset :(

In [32]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').\
load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [33]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

AnalysisException: 'path file:/home/workspace/sas_data already exists.;'

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
1. Remove null values from temperature readings in the US temperature dataframe (df_temp_us)
2. Transform I94 file times and dates from SAS format to pandas datetime format for ease of use and consistency
---

#### Removing null values from temp readings

In [34]:
dt_temp_us = dt_temp_us.dropna(subset=['avg_temp'])

In [35]:
dt_temp_us.isnull().sum()

dt                      0
avg_temp                0
avg_temp_uncertainty    0
city                    0
country                 0
latitude                0
longitude               0
year                    0
month                   0
dtype: int64

#### Transforming SAS datetimes formates to pandas datetime formats

In [36]:
def SAS_pd_datetime(dt):
    return pd.to_timedelta(dt, unit='D') + pd.Timestamp('1960-1-1')

In [37]:
ft_imm['arr_date'] = SAS_pd_datetime(ft_imm['arr_date'])
ft_imm['deo_date'] = SAS_pd_datetime(ft_imm['dep_date'])

In [38]:
ft_imm.head()

Unnamed: 0,cic_id,year,month,city_code,state_code,arr_date,dep_date,mode_code,visa_code,deo_date
0,4084316.0,2016.0,4.0,HHW,HI,2016-04-22,20573.0,1.0,2.0,2016-04-29
1,4422636.0,2016.0,4.0,MCA,TX,2016-04-23,20568.0,1.0,2.0,2016-04-24
2,1195600.0,2016.0,4.0,OGG,FL,2016-04-07,20571.0,1.0,2.0,2016-04-27
3,5291768.0,2016.0,4.0,LOS,CA,2016-04-28,20581.0,1.0,2.0,2016-05-07
4,985523.0,2016.0,4.0,CHM,NY,2016-04-06,20553.0,3.0,2.0,2016-04-09


In [39]:
# create an object to pull data from SAS file
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    i94_data = f.readlines()

In [40]:
# uncommenting this index slice retrieves the codes that we are interested in
# i94_data[10:298] for countries
# i94_data[303:962] for cities
# i94_data[982:1036] for states

In [41]:
# create a dictionary to insert corresponding country names and codes into
country_code = {}
for countries in i94_data[10:298]:
    # we will have to split on = to separate the code from the country
    line = countries.split('=')
    code, country = line[0].strip(), line[1].strip().strip("''")
    country_code[code] = country

In [42]:
# country_code

In [43]:
# creating a DF to hold the country_code list
country_code_list = list(country_code.items())

In [44]:
df_countries = pd.DataFrame(country_code_list, columns = ['code', 'country'])

In [45]:
df_countries.head()

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [46]:
# creating a dictionary to insert corresponding city names and codes into
city_code = {}
for cities in i94_data[303:962]:
    line = cities.split('=')
    code, city = line[0].strip("\t").strip().strip("''"), line[1].split(",")[0].strip("\t").strip("''")
    city_code[code] = city

In [47]:
city_code_list = list(city_code.items())

In [48]:
df_cities = pd.DataFrame(city_code_list, columns = ['code', 'city'])

In [49]:
df_cities.head()

Unnamed: 0,code,city
0,ANC,ANCHORAGE
1,BAR,BAKER AAF - BAKER ISLAND
2,DAC,DALTONS CACHE
3,PIZ,DEW STATION PT LAY DEW
4,DTH,DUTCH HARBOR


In [50]:
# creating a dictionary to insert corresponding state names and codes into
state_code = {}
for states in i94_data[982:1036]:
    line = states.split('=')
    code, state = line[0].strip("\t").strip("''"), line[1].strip().strip("''")
    state_code[code] = state

In [51]:
state_code_list = list(state_code.items())

In [52]:
df_states = pd.DataFrame(state_code_list, columns = ['code', 'state'])

In [53]:
df_states.head()

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Since the application of this project is intended for BI deployment, a star schema will be easiest for the end users to understand and work with and provides the most straightforward schema for data relationships

* Star Schema:
![capstone_schema](images/capstone_schema.png)


#### 3.2 Mapping Out Data Pipelines
Steps for pipelining data into star schema:

1. Locate/Move data to preferred storage medium, in this case AWS S3 buckets
2. Explore, clean and modify data to fit required criteria and shapes
3. Create fact and dimension tables as required
4. Load data back into target S3 buckets for usage

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [54]:
# Setting up logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [55]:
config = configparser.ConfigParser()
config.read('cap.cfg')
os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
SRC_BUCKET = config['S3']['SRC_BUCKET']
OUTPUT_BUCKET = config['S3']['OUTPUT_BUCKET']

In [56]:
spark = SparkSession.builder.config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                            .enableHiveSupport().getOrCreate()

In [57]:
imm_data = os.path.join(SRC_BUCKET + 'data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [58]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').\
load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [59]:
# making a column renaming function so that we don't manually have to re-do it for each table
def rename_cols(table, new_cols):
    for old, new in zip(table.columns, new_cols):
        table = table.withColumnRenamed(old, new)
    return table

##### Immigration Fact Table
---

In [60]:
ft_imm = df_spark.select('cicid', 'i94yr', 'i94mon', 'i94port',
                         'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa')\
                        .distinct().withColumn('immigration_id', monotonically_increasing_id())

In [61]:
new_cols = ['cic_id', 'year', 'month', 'city_code', 'state_code',
            'arr_date',  'dep_date', 'mode_code','visa_code']

In [62]:
ft_imm = rename_cols(ft_imm, new_cols)

In [63]:
ft_imm.write.mode("append").partitionBy('state_code').parquet(OUTPUT_BUCKET + 'ft_imm')

##### Individual Immigrant Dimension Table
---

In [64]:
dt_imm_indiv = df_spark.select('cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum')\
                              .distinct().withColumn('imm_indiv_id', monotonically_increasing_id())

In [65]:
new_cols = ['cic_id', 'citizenship', 'residency', 'birth_year', 'gender', 'ins_num']

In [66]:
dt_imm_indiv = rename_cols(dt_imm_indiv, new_cols)

In [92]:
dt_imm_indiv.write.mode("append").parquet(OUTPUT_BUCKET + 'dt_imm_indiv')

##### Airline Dimension Table
---

In [67]:
dt_imm_airline = df_spark.select('cicid', 'airline', 'admnum', 'fltno', 'visatype')\
                        .distinct().withColumn('imm_airline_id', monotonically_increasing_id())

In [68]:
new_cols = ['cic_id', 'airline', 'admin_number', 'flight_number', 'visa_type']

In [69]:
dt_imm_airline = rename_cols(dt_imm_airline, new_cols)

In [93]:
dt_imm_indiv.write.mode("append").parquet(OUTPUT_BUCKET + 'dt_imm_airline')

##### Temperature Dimension Table
---

In [96]:
df_temp = spark.read.csv(SRC_BUCKET + 'GlobalLandTemperaturesByCity.csv', sep=',', header=True)

In [86]:
# If you did not run the data exploration cells you must substitute
# df_temp['country'] with df_temp['Country']

df_temp_us = df_temp.where(df_temp['country'] == 'United States')

In [87]:
dt_temp_us = df_temp_us.select('dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country',
                               'Latitude', 'Longitude').distinct()

In [88]:
new_cols = ['dt', 'avg_temp', 'avg_temp_uncertainty', 'city', 'country', 'latitude', 'longitude']

In [89]:
dt_temp_us = dt_temp_us = dt_temp_us.withColumn('dt', to_date(col('dt')))
dt_temp_us = dt_temp_us = dt_temp_us.withColumn('year', year(dt_temp_us['dt']))
dt_temp_us = dt_temp_us = dt_temp_us.withColumn('month', month(dt_temp_us['dt']))

In [94]:
dt_temp_us.write.mode("append").parquet(OUTPUT_BUCKET + 'dt_temp_us')

##### Demographics Dimension Table
---

In [106]:
df_demo = spark.read.csv('us-cities-demographics.csv', header=True, sep=';')

In [107]:
dt_demo = df_demo.select('City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population',
                         'Number of Veterans',  'Foreign-born', 'Average Household Size', 'State Code', 'Race')\
                        .distinct().withColumn('demo_id', monotonically_increasing_id())

In [108]:
new_cols = ['city', 'state', 'median_age', 'male_population', 'female_population', 'total_population',
            'num_of_veterans', 'foreign_born', 'avg_household_size', 'state_code', 'race']

In [109]:
dt_demo = rename_cols(dt_demo, new_cols)

In [110]:
dt_demo.write.mode("append").parquet(OUTPUT_BUCKET + 'dt_demo')

#### 4.2 Data Quality Checks

1. Correct delivery of data to corresponding tables after ETL pipeline execution
3. Data structures are present and tables are not empty

In [None]:
### Same config as 4.1 data modelling set up

#config = configparser.ConfigParser()
#config.read('cap.cfg')
#os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
#os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
#SRC_BUCKET = config['S3']['SRC_BUCKET']
#OUTPUT_BUCKET = config['S3']['OUTPUT_BUCKET']

In [122]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.1.0").enableHiveSupport().getOrCreate()

In [None]:
S3_bucket = Path(SRC_BUCKET)

In [None]:
for directory in S3_bucket.iterdir():
    if directory.is_dir():
        path = str(directory)
        df = spark.read.parquet(path)
        print("Displaying directory tree for: " + path.split("/")[-1])
        schema = df.printSchema()

In [None]:
for directory in S3_bucket.iterdir():
    if directory.is_dir():
        path = str(directory)
        df = spark.read.parquet(path)
        num_of_records = df.count()
        if num_of_records < 0:
            raise ValueError(f"Something went wrong! Negative value of {num_of_records} records detected")
        elif num_of_records == 0:
            raise ValueError(f"{num_of_records} detected! Table is empty...")
        else:
            print("The table: " + path.split("/")[-1] + f"has {num_of_records} records inserted!")

#### 4.3 Data dictionary 
---
##### ft_imm
---
| Data | Definition |
|:-----|------------|
| cic_id | id number from the SAS file |
| year | Year of successful immigration entry into the U.S. |
| month | Month of successful immigration entry into the U.S. |
| city_code | Code designated to specific cities as per the SAS labels description file |
| state_code | Code designated to specific states as per the SAS labels description file |
| arr_date | Date of arrival to the U.S. |
| dep_date | Date of departure from the U.S. |
| mode_code | Code for the method of travel used to enter the U.S. |
| visa_code | Category of visa designation as per the SAS labels description file |

---
##### dt_imm_indiv
---
| Data | Definition |
|:-----|------------|
| imm_indiv_id | id number for record numbering specific to this table |
| cic_id | id number from the SAS file |
| citizenship | country of citizenship |
| residency | country of residence |
| birth_year | individual in question's birth year |
| gender | individual in question's gender |
| ins_num | INS number |

---
##### dt_imm_airline
---
| Data | Definition |
|:-----|------------|
| imm_airline_id | id number for record numbering specific to this table |
| cic_id | id number from the SAS file |
| airline | airline carrier used |
| admin_number | admission number |
| flight_number | flight number |
| visa_type | type of visa given to immigrant depending on their purpose of visit |

---
##### dt_temp_us
---
| Data | Definition |
|:-----|------------|
| dt | datetime format timestamp of temperature recording |
| avg_temp | the average temperature of the month on record |
| avg_temp_uncertainty | scientific uncertainty of average temperature measurements based on human and instrumental error |
| city | city where temperatures were recorded |
| country | country where temperatures were recorded |
| latitude | latitude coordinate where the temperatures were recorded |
| longitude | longitude coordinate where the temperatures were recorded |
| year | year during which the temperatures were recorded (extracted from dt) |
| month | month during which the temperatures were recorded (extracted from dt) |

---
##### dt_demo
---
| Data | Definition |
|:-----|------------|
| demo_id | id number for record numbering specific to this table |
| city | name of U.S. city of interest |
| state | name of U.S. state of interest |
| median_age | median age of the city population |
| male_population | total number of males in the city |
| female_population | total number of females in the city |
| total_population | total number of people living in the city |
| num_of_veterans | number of veterans living in the city |
| foreign_born | number of people foreign born living in the city |
| avg_household_size | Average number of people living together per household |
| state_code | U.S. state abbreviation codes |
| race | ethnicity of majority population present in the city |

#### Step 5: Complete Project Write Up

#### Rationale behind tool and techonlogy choice for the project:
---

1. **Pandas**: I opted to use Pandas because I am familiar with it and am aware of it's strengths for quick and efficient data exploration as well as more powerful editing capabilities

2. **AWS S3**: For storage of data due to it's ease of use and scalability

3. **PySpark**: Python library to give access to Spark

4. **Spark**: Used for it's ability to handle multiple file formats that contain large amounts of data

#### How often the data should be updated:
---

Since the raw data is gathered on a monthly basis, the data should also be updated on monthly cycles.

### Approaching the problem in the case of other scenarios
---

#### If the data was increased by 100x:
Typically we would either add more worker nodes to the cluster or opt for AWS EMR.
Considering that this dataset could easily get very large and unwieldy, AWS EMR is probably the safer alternative.

#### If we were required to populate a dashboard on a daily basis by 7AM every day:
We would use Apache Airflow to trigger scheduled Spark jobs at the designated time (7AM).
This would automate the process for us.

#### If the database needed to be accessed by over 100 people:
We can utilize Amazon Redshift to meet our needs here as the Redshift clusters can host the data, have good scaling capabilities and will provide us with sufficient read performance with many people accessing the data.

In [None]:
# Sourced from Udacity Knowledge based for help downloading file to circumvent upload size limit
!tar -cvzf file.zip .