# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project aims to create an ETL Pipeline for the US Immigration Dataset and other supplemantary datasets which includes: data on airport codes, U.S. city demographics, and temperature datasets for different cities. The ETL pipeline involves steps to fetch the raw datasets provided by the Udacity team, do exploratory data analysis on these datasets, process them and finally create analytics tables in a S3 datalake, which could further be utilised by the analytics team,<br>
>##### To perform analysis to answer questions like:
Which city has the highest records of being the port of entry by the immigrants in US? How long do immigrants tend to stay in US? What is the most common reason for the immigrants to travel to US?
>##### To withdraw corelations like:
What impact does temperature has on the count of number of immigrants? What is the relation between number of travels and city demographics in US?
<br><br>

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### 1.1 Scope 
<p>The purpose of the project is to build an end-to-end ETL pipeline to create a schema-on-read datalake. Raw data provide by the Udacity team is first fetched, cleaned, and processed to create analytics fact and dimension tables in a S3 datalake. As described above in some example use-cases, these tables can be further utilised to draw corelation within datasets and for analytics purpose.</p>

Tools and technologies used in this project are: `Python libraries like Pandas, Numpy, Configparser, os, etc.`, `PySpark`, `AWS Services like AWS IAM, AWS S3`, `Jupyter Notebook` etc.<br>
Pyspark and Pandas has been used to load data into dataframes, explore and process them. Then these processed datasets are stored in AWS S3 as fact and dimension tables in `parquet` file format using spark. Parquet files store data in columnar format, which have better performance on aggregation queries. Finally, data quality checks for Primary key constraints and rowcount is run on the above created tables.

In [1]:
# Do all imports and installs here
import os
import re
import calendar
import numpy as np
import pandas as pd
import configparser
import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col, upper, year, month, dayofmonth, hour, weekofyear, date_format, isnan

#### Load Configuration Data

In [2]:
pd.set_option('max_colwidth',100)
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ['AWS_S3_BUCKET']=config['AWS']['AWS_S3_BUCKET']

#### Define function to create Spark Session Object

In [3]:
def createSparkSession():
    """
         This function creates a spark session object and returns it.
    """
    spark = SparkSession.builder\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
        .enableHiveSupport().getOrCreate()
    
    return spark   

#### 1.2 Describe and Gather Data 
Following datasets have been used in this project:
- `I94 Immigration Data`:This data comes from the US National Tourism and Trade Office.Dataset consists of 12 files containing data for each month. Each file has around 3 million rows and 28 columns. A data dictionary explaining columns is also included at data/I94_SAS_Labels_Descriptions.SAS. [ NOTE: Data inside sas_data dir contains data for April 2016 and thus can also be used to perform this analysis. Similarly, in code we have processed data only for month of April. ]
- `World Temperature Data`: This dataset came from Kaggle. This contains temperature data for different cities for countries across the world. For this project we have used the temperature recorded for only US cites in csv format. Also, since the provided dataset had data last data from year 2013, so we have extracted latest records only from this datset for various cities in US.
- `U.S. City Demographic Data`: This data comes from OpenSoft and have been provided as csv file. It contains various demographics attribute for the cities in US like population count, their median age, average household size, etc.
- `Airport Code Data`: This is a csv file of airport related data like airport codes, airport name, their location, etc.<br><br>
[NOTE: Refer below for more details on these datasets]

In [4]:
# Read Immigration Data
print("*********IMMIGRATION DATA*************")
sas_df = pd.read_sas('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', 'sas7bdat', encoding="ISO-8859-1")
display(sas_df.head(5),sas_df.shape)

#Read Temperature Data
print("\n\n*********TEMPERATURE DATA*************")
temp_df = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
display(temp_df.head(5),temp_df.shape)

#Read US Cities Demographics Data
print("\n\n*********US CITIES DEMOGRAPHICS DATA*************")
city_df = pd.read_csv('us-cities-demographics.csv',sep = ';')
display(city_df.head(5),city_df.shape)

#Read Airport Codes Data
print("\n\n*********AIRPORT CODES DATA*************")
airport_df = pd.read_csv('airport-codes_csv.csv')
display(airport_df.head(5),airport_df.shape)


*********IMMIGRATION DATA*************


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


(3096313, 28)



*********TEMPERATURE DATA*************


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


(8599212, 7)



*********US CITIES DEMOGRAPHICS DATA*************


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


(2891, 12)



*********AIRPORT CODES DATA*************


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


(55075, 12)

### Step 2: Explore and Assess the Data

#### 2.1 Data Exploration
Data quality issues: All datasets have checked below for datatypes, missing values, duplicate records, invalid values, etc.
#### 2.2 Data Cleaning
Data transformation and handling of above mentioned issue has been done while creating fact and dimension tables in the ETL Pipeline creation.

#### US immigration data
- Date columns are present in SAS format. They need to be transformed.
- Invalid destination cities: missing port code or ports of entry don't belong to US
- Missing values for many columns that we will be using as per our data model: i94addr,departure date,fltno. (We will drop other cols as they are not going to be used)
- No Duplicated rows

In [5]:
#Check column types
sas_df.dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile     object
visapost     object
occup        object
entdepa      object
entdepd      object
entdepu      object
matflag      object
biryear     float64
dtaddto      object
gender       object
insnum       object
airline      object
admnum      float64
fltno        object
visatype     object
dtype: object

In [6]:
#Check for null Values
sas_df.isna().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
i94port           0
arrdate           0
i94mode         239
i94addr      152372
depdate      142457
i94bir          802
i94visa           0
count             0
dtadfile          1
visapost    1881250
occup       3088187
entdepa         238
entdepd      138429
entdepu     3095921
matflag      138429
biryear         802
dtaddto         477
gender       414269
insnum      2982605
airline       83627
admnum            0
fltno         19549
visatype          0
dtype: int64

In [7]:
#check for duplicates
duplicateRows = sas_df[sas_df.duplicated()]
print('Duplicate row count:'+str(duplicateRows))

Duplicate row count:Empty DataFrame
Columns: [cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, i94mode, i94addr, depdate, i94bir, i94visa, count, dtadfile, visapost, occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, gender, insnum, airline, admnum, fltno, visatype]
Index: []

[0 rows x 28 columns]


#### Cities temperature data
- dt column needs to be renamed and changed to datetime format
- this dataset has temperature records for citeies all over the world, but as per our data model
   we are interested in US cities only
- rename columns as per datamodel and change to lowercase for merging with demographics dataset
- last data was updated in year 2013, but immigration data was provided for 2016. Thus, we will extract latest records only.
- AverageTemprature and AverageTemperatureUncertainty has missing values which needs to be handled.
- No Duplicated rows.

In [8]:
#Check column types
temp_df.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

In [9]:
#Check for null Values
temp_df.isna().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [10]:
#check for duplicates
duplicateRows = temp_df[temp_df.duplicated()]
print('Duplicate row count:'+str(duplicateRows))

Duplicate row count:Empty DataFrame
Columns: [dt, AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude]
Index: []


In [11]:
#Check for the values for year
print(type(temp_df.dt[0]))
temp_df.rename(columns ={'dt':'date'}, inplace=True)
temp_df['date'] = pd.to_datetime(temp_df['date'])
temp_df['date'].dt.year.unique()

<class 'str'>


array([1743, 1744, 1745, 1746, 1747, 1748, 1749, 1750, 1751, 1752, 1753,
       1754, 1755, 1756, 1757, 1758, 1759, 1760, 1761, 1762, 1763, 1764,
       1765, 1766, 1767, 1768, 1769, 1770, 1771, 1772, 1773, 1774, 1775,
       1776, 1777, 1778, 1779, 1780, 1781, 1782, 1783, 1784, 1785, 1786,
       1787, 1788, 1789, 1790, 1791, 1792, 1793, 1794, 1795, 1796, 1797,
       1798, 1799, 1800, 1801, 1802, 1803, 1804, 1805, 1806, 1807, 1808,
       1809, 1810, 1811, 1812, 1813, 1814, 1815, 1816, 1817, 1818, 1819,
       1820, 1821, 1822, 1823, 1824, 1825, 1826, 1827, 1828, 1829, 1830,
       1831, 1832, 1833, 1834, 1835, 1836, 1837, 1838, 1839, 1840, 1841,
       1842, 1843, 1844, 1845, 1846, 1847, 1848, 1849, 1850, 1851, 1852,
       1853, 1854, 1855, 1856, 1857, 1858, 1859, 1860, 1861, 1862, 1863,
       1864, 1865, 1866, 1867, 1868, 1869, 1870, 1871, 1872, 1873, 1874,
       1875, 1876, 1877, 1878, 1879, 1880, 1881, 1882, 1883, 1884, 1885,
       1886, 1887, 1888, 1889, 1890, 1891, 1892, 18

In [12]:
#Check for unique city values
temp = temp_df[temp_df.Country =="United States"]
temp.City.unique()

array(['Abilene', 'Akron', 'Albuquerque', 'Alexandria', 'Allentown',
       'Amarillo', 'Anaheim', 'Anchorage', 'Ann Arbor', 'Antioch',
       'Arlington', 'Arvada', 'Atlanta', 'Aurora', 'Austin', 'Bakersfield',
       'Baltimore', 'Baton Rouge', 'Beaumont', 'Bellevue', 'Berkeley',
       'Birmingham', 'Boston', 'Bridgeport', 'Brownsville', 'Buffalo',
       'Burbank', 'Cambridge', 'Cape Coral', 'Carrollton', 'Cary',
       'Cedar Rapids', 'Chandler', 'Charleston', 'Charlotte',
       'Chattanooga', 'Chesapeake', 'Chicago', 'Chula Vista', 'Cincinnati',
       'Clarksville', 'Clearwater', 'Cleveland', 'Colorado Springs',
       'Columbia', 'Columbus', 'Concord', 'Coral Springs', 'Corona',
       'Corpus Christi', 'Costa Mesa', 'Dallas', 'Dayton', 'Denton',
       'Denver', 'Des Moines', 'Detroit', 'Downey', 'Durham',
       'East Los Angeles', 'Edison', 'El Monte', 'El Paso', 'Elizabeth',
       'Escondido', 'Eugene', 'Evansville', 'Fairfield', 'Fayetteville',
       'Flint', 'Fontana',

#### US Cities Demographics data
- columns names have whitespce, '-' and to be transformed/renamed to lowercase or as per data model schema.
- very few null values present for a few columns like Male Population, Female Poulation, no. of veterans etc,
  which can be dropped
- Duplicate rows are present due to Race and Count columns which can be filtered out as we don't need them as per data model

In [13]:
#Check column types
city_df.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

In [14]:
#Check for null Values
city_df.isna().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [15]:
#check for duplicates
duplicateRows = city_df[city_df.duplicated('City')]
print('Duplicate row count for city:'+str(duplicateRows.count()))

duplicateRows = city_df[city_df.duplicated(['City','State'])]
print('Duplicate row count for city,state:'+str(duplicateRows.count()))

Duplicate row count for city:City                      2324
State                     2324
Median Age                2324
Male Population           2322
Female Population         2322
Total Population          2324
Number of Veterans        2318
Foreign-born              2318
Average Household Size    2316
State Code                2324
Race                      2324
Count                     2324
dtype: int64
Duplicate row count for city,state:City                      2295
State                     2295
Median Age                2295
Male Population           2293
Female Population         2293
Total Population          2295
Number of Veterans        2289
Foreign-born              2289
Average Household Size    2287
State Code                2295
Race                      2295
Count                     2295
dtype: int64


#### Airport Codes data
-  columns are to be renamed and datatypes are to be changed as per the data model
- No Duplicate records
- Drop column iata_code as more than 80% records are null for this column.
- Handle missing values for contient column by populating null values based on other rows for same column using column
  iso_country.
- Create two new columns from coordinate column i.e 'latitude' and 'longitude' and round data upto two decimal values.
  Hence coordinate column is also to be dropped.

In [16]:
#Check column types
airport_df.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

In [17]:
#Check for null Values
airport_df.isna().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [18]:
#check for duplicates
duplicateRows = airport_df[airport_df.duplicated()]
print('Duplicate row count:'+str(duplicateRows))

Duplicate row count:Empty DataFrame
Columns: [ident, type, name, elevation_ft, continent, iso_country, iso_region, municipality, gps_code, iata_code, local_code, coordinates]
Index: []


In [19]:
#check for unique valid values to populate for missing values
print(airport_df.continent.unique())
air_df = airport_df[airport_df.iso_country== 'US']
arr=air_df.iso_region.unique()
print(arr[:50])
air_df.continent.unique()

[nan 'OC' 'AF' 'AN' 'EU' 'AS' 'SA']
['US-PA' 'US-KS' 'US-AK' 'US-AL' 'US-AR' 'US-OK' 'US-AZ' 'US-CA' 'US-CO'
 'US-FL' 'US-GA' 'US-HI' 'US-ID' 'US-IN' 'US-IL' 'US-KY' 'US-LA' 'US-MD'
 'US-MI' 'US-MN' 'US-MO' 'US-MT' 'US-NJ' 'US-NC' 'US-NY' 'US-OH' 'US-OR'
 'US-SC' 'US-SD' 'US-TX' 'US-TN' 'US-UT' 'US-VA' 'US-WA' 'US-WI' 'US-WV'
 'US-WY' 'US-CT' 'US-IA' 'US-MA' 'US-ME' 'US-NE' 'US-NH' 'US-NM' 'US-NV'
 'US-MS' 'US-ND' 'US-VT' 'US-RI' 'US-DC']


array([nan, 'AS'], dtype=object)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Below is the conceptual design for the analytical tables built in AWS S3 datalake. The following ***design strategies*** have been incorporated:
 1. A **fact constellation schema** has been designed, to allow heavy analytical query processing on the datalake.
 2. Multiple facts tables and shared dimension tables haven been created in S3 datalake to model the datasets in this schema.
 3. The different components of fact constellation schema incude:
 
    - **Fact Tables**: fact_us_cities_demographics, fact_immigration, fact_airport<br>
    - **Dimension Table**: dim_calendar, dim_visa, dim_country, dim_state, dim_travelmode, dim_flight
    
 4. These tables have been partitioned on different columns to achive storage and data retrival optimisation.
 
<img src=images/ERD.png>

#### 3.2 Mapping Out Data Pipelines: List the steps necessary to pipeline the data into the chosen data model
- for ***fact_immigration*** columns are renamed as per the data model, date columns are transformed from sas date format to datetime date format and data is saved by creating partions on arrival_date.
- ***dim_flight*** is cleaned by dropping duplicates and filtering null values for the primary key.
- ***dim_calendar*** is cleaned by dropping duplicates and data is partitioned by year and month
- fact_us_cities_demographics is created using us cities demographics dataset and by fetching latest temp records from temperature dataset for corresponding cities. Data is cleaned by filtering out columns that are not required and handling missing values and transforming data as per defined schema.
- ***fact_airport*** is created from airport codes data, by populating null values based on other rows for same column, renaming columns as per defined schema, dropping unecessary columns.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.
1. Fetch the config details like AWS Credentials, AWS S3 Bucket path from the config file.
2. Load the provided raw datasets using python pandas and pyspark session object, perform data cleaning by filtering null values and data transformation on these datasets to create following tables as per above defined data model: fact_us_cities_demographics, fact_immigration, fact_airport, dim_calendar, dim_flight.
3. Parse file I94_SAS_Labels_Descriptions.SAS file and process the data by dropping duplicates and by removing missing values to create following dimension tables dim_visa, dim_country, dim_state, dim_travelmode
4. Save all the above created tables in AWS S3 in paraquet format.
5. Perform data quality checks on these tables.

###### Immigration SAS Data

In [20]:
def processUSImmigrationData(sas_input_path_prefix,sas_input_path_suffix,spark,output_path):
    '''
        This function reads SAS data,processes it and saves it parquet format to create fact_immigration and dimension tables:dim_flight and dim_calendar.
        Input: prefix and suffix strings to calculate input path, spark session object, S3 path.
        Output: Print out result.
    '''
    #Create a list of columns to extracted from SAS dataset and dict to rename these columns as per our data model
    sas_cols = ['cicid','arrdate', 'i94mon','i94yr','visatype','i94visa', 'i94cit','i94port',
                'i94addr','i94mode', 'airline','fltno','gender','biryear','depdate']    
    new_sas_cols_dict = {'arrdate':'arrival_date', 'i94mon':'month','i94yr':'year','visatype':'visa_category','i94visa':'visa_type',
                         'i94cit':'country_of_departure','i94port':'port_of_entry','i94addr':'us_address_state','i94mode':'mode_of_entry',
                         'airline':'airline','fltno':'flight_no','biryear':'birth_year','depdate':'departure_date_from_us'}
    
   
    sas_df = pd.DataFrame(columns=sas_cols)  #create a pandas df which will hold SAS data   
    months_list = [(x[0:3]).lower() for x in calendar.month_name[1:]] #list all months in a year
    
    #Read immigration files from disk, transform to desired schema and save the files to S3 in paraquet format
    for month in months_list[3:4]:   #remove slicing([3:4]) to retrieve data for all months, this slicing returns month april only
        file_path = f"{sas_input_path_prefix}{month}{sas_input_path_suffix}"
        print(f"Reading file:{file_path}")        
        df = pd.read_sas(file_path, 'sas7bdat', encoding="ISO-8859-1")
        df =  df.loc[:,sas_cols]
        sas_df = sas_df.append(df,ignore_index= True)
        print(f'{file_path} file read sucessfully!')
    
    #Data Transformation       
    sas_df.rename(columns=new_sas_cols_dict, inplace=True) #Rename the columns as per data model  
    sas_df['arrival_date'] = pd.to_datetime(sas_df['arrival_date'], unit='D', origin='1960-01-01') #Convert SAS Date
    sas_df['departure_date_from_us'] = pd.to_datetime(sas_df['departure_date_from_us'], unit='D', origin='1960-01-01') #Convert SAS Date
    
    
    #code to extract immigration_df for fact and dimension tables
    immigration_df = sas_df.to_csv("fact_immigration")
    immigration_df = spark.read.option("header","true").csv("fact_immigration")
    
    # extract columns to create fact_immigration table and write parquet file to s3
    fact_immigration= immigration_df.select(['cicid', 'arrival_date', 'visa_type','port_of_entry', 'us_address_state','country_of_departure','mode_of_entry','flight_no','gender','birth_year','departure_date_from_us'])
    fact_immigration.write.parquet(os.path.join(output_path,'fact_immigration'),mode = "overwrite", partitionBy='arrival_date')

    # extract columns to create dim_flight table and write parquet file to s3
    flight_df = immigration_df.select(['flight_no','airline'])
    flight_df = flight_df.drop_duplicates(subset=['flight_no']) #drop duplicates
    flight_df = flight_df.filter(flight_df.flight_no.isNotNull()) #filter df to exclude null values
    flight_df.write.parquet(os.path.join(output_path,'dim_flight'),mode = "overwrite")
    display(flight_df.show(3))


    # extract columns to create dim_calendar table and write parquet file to s3
    time_df = immigration_df.select('arrival_date','year','month',  
                 F.dayofmonth("arrival_date").alias('day'),
                 F.weekofyear("arrival_date").alias('week'), 
                 F.date_format(F.col("arrival_date"), "E").alias("weekday"))
    
    time_df = time_df.drop_duplicates(subset=['arrival_date']) #drop duplicates
    time_df.write.parquet(os.path.join(output_path,'dim_calendar'),mode = "overwrite", partitionBy=['year', 'month'])
    display(time_df.show(3))
    print('*****SAS data files processed!*******')

###### Temperature Data : To withdraw latest temp

In [21]:
def processTempData(filepath):
    '''
        This function reads temperature data, filters US cities data transforms it and returns temperature dataframe.
        Input:  input path
        Output: pandas df
    '''
    print(f"Reading file:{filepath}")  
    temp_df = pd.read_csv(filepath) #read temperature data
    print(f'{filepath} file read sucessfully!')
    
    temp_df = temp_df.dropna() #drop null values
    col_dict = {temp_df.columns[1]:'avg_temperature', 'dt':'recorded_temp_date', temp_df.columns[2]:'avg_temp_uncertainity'}
    temp_df = temp_df.rename(columns = col_dict) #rename columns as per data model
    temp_df.columns = temp_df.columns.str.lower() #transform df as per data model
    temp_df['recorded_temp_date'] = pd.to_datetime(temp_df['recorded_temp_date']) #convert to datetime object
    
      
    temp_df = temp_df[temp_df.country =="United States"] #filter df for US only
    # fetch latest temp records only
    temp_df = temp_df[temp_df.groupby('city').recorded_temp_date.transform('max')==temp_df['recorded_temp_date']] 
    temp_df.reset_index(inplace=True, drop=True)
    print('*****Temperature file processed!*******')    
    return temp_df

###### US Cities Data

In [22]:
def processUSCitiesData(temp_df,filepath,spark,output_path):
    '''
        This function reads us cities demographics data,processes it, merges it with temperature data and saves it parquet format to create fact_us_cities_demographics table.
        Input: temperature df, input path, spark session object, S3 output path.
        Output: Print out result.
    '''    
    #Read the file and transform the header names using a list of new headers
    print(f"Reading file:{filepath}")  
    names=['city','state_name','median_age','male_population','female_population','total_population','no_of_veterans','foriegn_born','avg_household_size','state_code','race','count']
    city_df = pd.read_csv(filepath,sep = ';',names=names,header=0)
    print(f'{filepath} file read sucessfully!')
    
    city_df.dropna(inplace=True) #drop rows with NaN values
    city_df = city_df[names[0:-3]] #filter DF for required columns only as per data model 
    city_df = pd.merge(city_df, temp_df, how="left", on='city') #merge city_df with input temperature DF
    city_df = city_df[city_df.columns[0:-3]] #filter DF for required columns only as per data model
    
    #code to transform pandas df to spark df and save in parquet format on s3
    city_df = spark.createDataFrame(city_df)
    city_df = city_df.drop_duplicates(['city','state_name']) #drop duplicates
    city_df.write.mode("overwrite").parquet(os.path.join(output_path,'fact_us_cities_demographics'))
    print('*****US Cities Demographics file processed with temperature data!*******')    

###### Airport Data

In [23]:
def processAirportData(filepath,spark,output_path):
    '''
        This function reads us airport data, transforms data to handle null values,rename cols, drops duplicates and saves it parquet format to create fact_airport table.
        Input: input path, spark session object, S3 output path.
        Output: Print out result.
    ''' 
    print(f"Reading file:{filepath}")  
    airport_df = pd.read_csv(filepath) #read airport data
    print(f'{filepath} file read sucessfully!')    

    #Fill continent as nan : North America if iso_country in 'US' ir 'CA', like other US or Canada regions data already present in df 
    convert_dict ={'continent':str,'gps_code':str, 'local_code':str,'municipality':str,'iso_country':str}
    airport_df=airport_df.astype(convert_dict)
    airport_df.rename(columns = {'ident':'airport_id'},inplace=True) #rename col as per data model
    airport_df['continent'] = np.where(airport_df.iso_country == ('US'or'CA') ,'nan',airport_df['continent'])

    #Split coloumn coordinates into two new columns lat and long 
    airport_df[['latitude','longitude']] = airport_df['coordinates'].str.split(',', expand=True)
    airport_df['latitude']= airport_df.latitude.apply(lambda x : round(float(x),2))
    airport_df['longitude']= airport_df.longitude.apply(lambda x : round(float(x),2))

    #Drop column iata_code as more than 80% records are null for this column and drop coordinates column
    airport_df.drop(labels=['iata_code','coordinates'], axis = 1, inplace=True)
    airport_df.reset_index(inplace=True, drop=True)
    
    #code to load csv file to S3
    airport_df = spark.createDataFrame(airport_df)
    airport_df = airport_df.drop_duplicates(subset=['airport_id'])
    airport_df.write.mode("overwrite").parquet(os.path.join(output_path,'fact_airport'))
    print('*****Airport codes file processed!*******') 
    

###### SAS Description Data

In [24]:
def processSasDescriptionFile(filepath,spark,output_path):
    '''
        This function reads SAS Description file, extracts columns,rename cols, save it parquet format 
        to create dim_visa, dim_travelmode, dim_country, dim_state.
        Input: input path, spark session object, S3 output path.
        Output: Print out result.
    '''     
    sas_dict={}
    sas_data = []
    
    print(f"Processing file:{filepath}")  
    with open(filepath, "r") as file:
        for line in file:
            line = re.sub(r"\s+", " ", line)
            if "/*" in line and "-" in line:
                k, v = [i.strip(" ") for i in line.split("*")[1].split("-", 1)]
                k = k.replace(' & ', '_').lower()
                sas_dict[k] = {'description': v}
                
            elif '=' in line and ';' not in line:
                sas_data.append([i.strip(' ').strip("'").title() for i in line.split('=')])
            
            elif len(sas_data) > 0:
                sas_dict[k]['data'] = sas_data
                sas_data = []

    #To create dim_country           
    country_df = spark.createDataFrame(sas_dict['i94cit_i94res']['data'], schema=['country_code', 'country_name'])
    country_df.write.mode("overwrite").parquet(os.path.join(output_path,'dim_country')) #load parquet file to S3
    display(country_df.show(3))
    print("dim_country created!")

    #To create dim_state
    state_df = spark.createDataFrame(sas_dict['i94addr']['data'],schema=['state_code', 'state_name'])
    state_df = state_df.withColumn('state_code', upper(state_df.state_code))
    state_df.write.mode("overwrite").parquet(os.path.join(output_path,'dim_state')) #load parquet file to S3
    display(state_df.show(3))
    print("dim_state created!")
    
    #To create dim_visa  
    visa_df = spark.createDataFrame(sas_dict['i94visa']['data'], schema=['visa_type_id', 'visa_category'])
    visa_df.write.mode("overwrite").parquet(os.path.join(output_path,'dim_visa')) #load parquet file to S3
    display(visa_df.show(3))
    print("dim_visa created!")

    #To create dim_travelmode
    travel_mode_df = spark.createDataFrame(sas_dict['i94mode']['data'], schema=['mode_type_id', 'mode_category'])
    travel_mode_df.write.mode("overwrite").parquet(os.path.join(output_path,'dim_travelmode')) #load parquet file to S3
    display(travel_mode_df.show(3))
    print("dim_travelmode created!")
   

#### 4.2 Data Quality Checks 
Data quality for the created records has been checked by:
* **check_row_count()**: Checking the count of records in each of the tables, it should not be 0.
* **check_for_primary_key()**: Checking for primary key constraints: Checking both Unique Key contraint, that column should have no duplicates and for the NULL constraint that column should not have any NULL values.

In [25]:
def check_row_count(spark, output_path, table_list):
    '''
        This function counts the number of rows in all the tables in the list, if less than 0 raise error.
        Input: Spark Session object, S3 path, list of tables.
        Output: Print out result.
    '''
    print("Checking Data quality:for rowcount")
    
    for table in table_list:
        table_df = spark.read.parquet(os.path.join(output_path, table))
        cnt = table_df.count()
        if cnt == 0:
            raise ValueError(f'Quality check FAILED for {table} with zero records.')
        else:
            print(f'Rowcount quality check PASSED for {table} with {cnt} records.')

In [26]:
def check_for_primary_key(spark, output_path, table_dict):
    '''
        This function checks for primary key constraints: column should be unique and have no null records.
        Input: Spark Session object, S3 path, dict of tables and columns.
        Output: Print out check result.
    '''
    print("Checking Data quality:for primary key constraints")
    
    for table,column in table_dict.items():
        table_df = spark.read.parquet(os.path.join(output_path, table))        
        if table_df.count() > table_df.dropDuplicates(''.join(column).strip(',').split(',')).count():
            raise ValueError(f'Primary Key unique constraint check FAILED for table:{table} column:{column}')            
        elif (not isinstance(column,list)) and (table_df.filter((table_df[column] == "") | table_df[column].isNull() | isnan(table_df[column])).count() > 0):
            raise ValueError(f'Primary Key null constraint check FAILED for table:{table} column:{column}')            
        else:
            print(f'Primary Key constraints check PASSED for table:{table} column:{column}')


In [27]:
print("ETL process started!")
    
spark = createSparkSession() #create spark session
output_path = 's3a://juhi-capstone/'
sas_input_path_prefix = '../../data/18-83510-I94-Data-2016/i94_'
sas_input_path_suffix = '16_sub.sas7bdat'
table_dict = {'fact_us_cities_demographics': ['city,state_name'],'fact_immigration':'cicid','fact_airport':'airport_id',
             'dim_calendar':'arrival_date','dim_flight':'flight_no','dim_visa':'visa_type_id','dim_country':'country_code',
              'dim_state':'state_code','dim_travelmode':'mode_type_id'}

#Process temperature data and US cities demographics data to create fact table: fact_us_cities_demographics
temp_df = processTempData('../../data2/GlobalLandTemperaturesByCity.csv')
processUSCitiesData(temp_df,'us-cities-demographics.csv',spark,output_path)
    
#Process sas data to create fact table: fact_immigration and dimension tables : dim_calendar and dim_flight
processUSImmigrationData(sas_input_path_prefix,sas_input_path_suffix,spark,output_path)
    
#Process airport data to create fact table: fact_airport
processAirportData('airport-codes_csv.csv',spark,output_path)
    
#Process SAS Description file to create dimensions: dim_country, dim_state, dim_visa, dim_travelmode
processSasDescriptionFile("I94_SAS_Labels_Descriptions.SAS",spark,output_path)
    
#Data quality check: check for rowcount
check_row_count(spark, output_path,table_dict.keys())

#Data quality check: check for Primary key constraints: Unique Key and Null value check
check_for_primary_key(spark, output_path, table_dict)

print("ETL process completed!")

ETL process started!
Reading file:../../data2/GlobalLandTemperaturesByCity.csv
../../data2/GlobalLandTemperaturesByCity.csv file read sucessfully!
*****Temperature file processed!*******
Reading file:us-cities-demographics.csv
us-cities-demographics.csv file read sucessfully!
*****US Cities Demographics file processed with temperature data!*******
Reading file:../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat file read sucessfully!
+---------+-------+
|flight_no|airline|
+---------+-------+
|    00332|    YNT|
|    00456|     LH|
|    00530|     LA|
+---------+-------+
only showing top 3 rows



None

+------------+------+-----+---+----+-------+
|arrival_date|  year|month|day|week|weekday|
+------------+------+-----+---+----+-------+
|  2016-04-22|2016.0|  4.0| 22|  16|    Fri|
|  2016-04-15|2016.0|  4.0| 15|  15|    Fri|
|  2016-04-18|2016.0|  4.0| 18|  16|    Mon|
+------------+------+-----+---+----+-------+
only showing top 3 rows



None

*****SAS data files processed!*******
Reading file:airport-codes_csv.csv
airport-codes_csv.csv file read sucessfully!
*****Airport codes file processed!*******
Processing file:I94_SAS_Labels_Descriptions.SAS
+------------+--------------------+
|country_code|        country_name|
+------------+--------------------+
|         582|Mexico Air Sea, A...|
|         236|         Afghanistan|
|         101|             Albania|
+------------+--------------------+
only showing top 3 rows



None

dim_country created!
+----------+----------+
|state_code|state_name|
+----------+----------+
|        AL|   Alabama|
|        AK|    Alaska|
|        AZ|   Arizona|
+----------+----------+
only showing top 3 rows



None

dim_state created!
+------------+-------------+
|visa_type_id|visa_category|
+------------+-------------+
|           1|     Business|
|           2|     Pleasure|
|           3|      Student|
+------------+-------------+



None

dim_visa created!
+------------+-------------+
|mode_type_id|mode_category|
+------------+-------------+
|           1|          Air|
|           2|          Sea|
|           3|         Land|
+------------+-------------+



None

dim_travelmode created!
Checking Data quality:for rowcount
Rowcount quality check PASSED for fact_us_cities_demographics with 588 records.
Rowcount quality check PASSED for fact_immigration with 3096313 records.
Rowcount quality check PASSED for fact_airport with 55075 records.
Rowcount quality check PASSED for dim_calendar with 30 records.
Rowcount quality check PASSED for dim_flight with 7152 records.
Rowcount quality check PASSED for dim_visa with 3 records.
Rowcount quality check PASSED for dim_country with 288 records.
Rowcount quality check PASSED for dim_state with 54 records.
Rowcount quality check PASSED for dim_travelmode with 3 records.
Checking Data quality:for primary key constraints
Primary Key constraints check PASSED for table:fact_us_cities_demographics column:['city,state_name']
Primary Key constraints check PASSED for table:fact_immigration column:cicid
Primary Key constraints check PASSED for table:fact_airport column:airport_id
Primary Key constraints check PASSED 

#### 4.3 Data dictionary 

##### <u>fact_immigration</u>: Fact table extracted from the I94 immigration SAS data.
* `cicid` = key id
* `arrival_date` = arrival date
* `visa_type` = 1 digit visa type code
* `port_of_entry` = 3 character code of destination city
* `country_of_departure` = 3 digit code of origin country
* `mode_of_entry` = 1 digit transportation mode code
* `flight_number` = flight code taken by the traveller
* `gender` = gender of traveller 
* `birth_year` = Age of traveller
* `departure_date_from_us` = depature date from US

<br>

##### <u>fact_us_cities_demographics</u>: Fact table extracted from US cities demographics dataset joined with temperature dataset.
* `city_name` = US city name
* `state_code` = US state code
* `median_age` = median age
* `male_population` = male population
* `female_population` = female population
* `total_population` = total population
* `no_of_veterans` = number of veterans
* `foreign_born` = number of foreign born
* `avg_household_size` = average household size
* `recorded_temp_date` = latest date on which temperature was recorded
* `avg_temperature` = average temperature on the recorded_temp_date
* `avg_temp_uncertainity` = average temperature uncertainity on the recorded_temp_date

<br>

##### <u>fact_airport</u>: Fact table extracted from airport codes dataset.
* `airport_id` = airport id code
* `airport_type` = Type of the airport (small, medium, large)
* `airport_name` = name of the airport
* `elevation_ft` = elevation height of the airport
* `continent` = continent in which airport is located
* `iso_country` = 2 letter code for country in which airport is located
* `iso_region` =  location in which airport is located
* `municipality` = location in which airport is located
* `gps_code` =  4 letter gps_code in which airport is located
* `local_code` = 4 letter local code in which airport is located
* `latitude` =  latitude for airport location
* `longitude` = longitude for airport location

<br>

##### <u>dim_calendar</u>: Dimension table extracted from the I94 immigration SAS data.
* `arrival_date` = arrival date
* `year` = arrival year
* `month` = arrival month
* `day` = arrival day of month
* `week` = arrival week of year
* `weekday` = arrival weekday

<br>

##### <u>dim_flight</u>: Dimension table extracted from the I94 immigration SAS data.
* `flight_no` = flight number
* `airline_name` = airline name

<br>

##### <u>dim_visa</u>: Dimension table extracted from I94_SAS_Labels_Descriptions.SAS
* `visa_type_id` = 1 digit code of visa type
* `visa_category` = Type of visa (business, pleasure, student)

<br>

##### <u>dim_travelmode</u>: Dimension table extracted from I94_SAS_Labels_Descriptions.SAS
* `mode_code` = 1 digit code of transportation mode
* `mode` = Mode of transportation (air, land, sea)

<br>

##### <u>dim_countries</u>: Dimension table extracted from I94_SAS_Labels_Descriptions.SAS
* `country_code` = 3 digit country code 
* `country_name` = country name

<br>

##### <u>dim_state</u>: Dimension table extracted from I94_SAS_Labels_Descriptions.SAS
* `state_code` = 2 digit state code 
* `state_name` = state name


#### Step 5: Complete Project Write Up

* #### Clearly state the rationale for the choice of tools and technologies for the project.
As discussed above, we have used Python libraries, PySpark and AWS S3 to build ETL pipline and datalake as per the defined data model. Python file handling functions and Pandas works efficiently for small files and makes it convinient to work with multiple dataframes. And to process and save large volume dataset like i94 immigration data which contains over 3 million rows per month, Apache Spark has been used, it has many easy-to-use APIs and it executes much faster by caching data in memory across multiple parallel operations. Finally, datalake has been implemented on AWS S3 to store the fact and dimension tables with partioning for efficient and fast storage and retrival process.

* #### Propose how often the data should be updated and why.
The data should be updated whenever raw data is created from the data sources.
In this project, dimension tables are created from SAS Immigration data source, so dimension tables are to be updated only when new visa category, mode of entries, cities or countries are added to Immigration data. But for fact_immigration SAS data is added monthly so data updation should be done monthly, similarly dim_flight and dim_calendar will be uppdated accordingly for new date and flight details correspondingly.
The US Cities Demographics data is updated in every ten years(reference: https://www.usa.gov/statistics). So, the fact_us_cities_demographics and fact_airport are required to be updated whenever new records are generated by their data sources.
    

* #### Write a description of how you would approach the problem differently under the following scenarios:

 * ##### The data was increased by 100x.
     We are currently executing code in local mode, but by increasing the number of nodes in the cluster,
     spark will be able handle the increasing amount of data. However, an efficient approch would be to deploy this
     solution on AWS EMR Cluster. AWS supports easy scaling for this much data.<br>
 * ##### The data populates a dashboard that must be updated on a daily basis by 7am every day.
     If data was to be updated daily scheduling jobs would be an efficient solution.
     We can use any of the scheduling options available like cronjobs or serverless solutions AWS Glue, AWS lambda to 
     process code every time data is recieved. Another, very efficient solution for this use case would be using 
     scheduling tool Airflow.<br>
 * ##### The database needed to be accessed by 100+ people.
     We could move the analytics database solution to AWS Redshift Cluster, it can easily scale and can handle increasing
     data access requirements by multiple users. It is efficient to handle high concurrent and parallel access by large 
     amount of users.

# Sample Queries

In [29]:
# Top 10 cities used as port of entry
immigration_df = spark.read.parquet("s3a://juhi-capstone/fact_immigration")
immigration_df.createOrReplaceTempView("fact_immigration")
spark.sql("""
    SELECT
        port_of_entry as City,
        count(*) as count
    FROM
        fact_immigration
    GROUP BY port_of_entry
    ORDER BY 2 desc
    LIMIT 10
""").show()

+----+------+
|City| count|
+----+------+
| NYC|485916|
| MIA|343941|
| LOS|310163|
| SFR|152586|
| ORL|149195|
| HHW|142720|
| NEW|136122|
| CHI|130564|
| HOU|101481|
| FTL| 95977|
+----+------+



In [30]:
#Most common reason for immigrants to travel to US
visa_df = spark.read.parquet("s3a://juhi-capstone/dim_visa")
visa_df.createOrReplaceTempView("dim_visa")
spark.sql("""
    SELECT
        v.visa_type_id,
        v.visa_category,
        count(i.visa_type) as count
    FROM
        fact_immigration i,
        dim_visa v
    WHERE int(i.visa_type) = v.visa_type_id
    GROUP BY 1,2
    ORDER BY 3 desc
""").show()

+------------+-------------+-------+
|visa_type_id|visa_category|  count|
+------------+-------------+-------+
|           2|     Pleasure|2530868|
|           1|     Business| 522079|
|           3|      Student|  43366|
+------------+-------------+-------+

