# InterWorks - Data Engineer Case Study
Model developed exclusively to InterWorks with InterWorks provided data and instructions. 

In [None]:
__author__ = 'Phil Baltazar'
__email__  = 'phillusnow@gmail.com'
__website__= 'www.github.com/pbswe'

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import sqlalchemy
from sqlalchemy import create_engine
import datetime as dt
import psycopg2

Loading the data and EDA (exploratory data analysis)

In [None]:
url = "../InterWorks_DE/flights.txt"
interDF = pd.read_csv(url, sep="|", encoding="utf-8")

In [None]:
interDF.head()

In [None]:
interDF.columns
#headerNames = "TRANSACTIONID|FLIGHTDATE|AIRLINECODE|AIRLINENAME|TAILNUM|FLIGHTNUM|ORIGINAIRPORTCODE|ORIGAIRPORTNAME|ORIGINCITYNAME|ORIGINSTATE|ORIGINSTATENAME|DESTAIRPORTCODE|DESTAIRPORTNAME|DESTCITYNAME|DESTSTATE|DESTSTATENAME|CRSDEPTIME|DEPTIME|DEPDELAY|TAXIOUT|WHEELSOFF|WHEELSON|TAXIIN|CRSARRTIME|ARRTIME|ARRDELAY|CRSELAPSEDTIME|ACTUALELAPSEDTIME|CANCELLED|DIVERTED|DISTANCE"

In [None]:
interDF.info()

Correcting / fixing data types. 

In [None]:
interDF['CANCELLED'].value_counts()

In [None]:
interDF['CANCELLED'].replace('False', 0, inplace=True)
interDF['CANCELLED'].replace('F', 0, inplace=True)
interDF['CANCELLED'].replace('0', 0, inplace=True)
interDF['CANCELLED'].replace('True', 1, inplace=True)
interDF['CANCELLED'].replace('T', 1, inplace=True)
interDF['CANCELLED'].replace('1', 1, inplace=True)


In [None]:
interDF['DIVERTED'].value_counts()

In [None]:
interDF['DIVERTED'].replace('False', 0, inplace=True)
interDF['DIVERTED'].replace('F', 0, inplace=True)
interDF['DIVERTED'].replace('0', 0, inplace=True)
interDF['DIVERTED'].replace('True', 1, inplace=True)
interDF['DIVERTED'].replace('T', 1, inplace=True)
interDF['DIVERTED'].replace('1', 1, inplace=True)

In [None]:
interDF['DISTANCE'].value_counts()

In [None]:
# The lines below handles the 'DISTANCE' attribute that contains: \n
# the number of miles (as it should be used as a numerical data for calculation), and \n
# the word "miles" next to the number, which is measurement lable in a string format. 

# I decided to preserve the original "DISTANCE" according to the Case Study document, \n
# but also split it into a numerical column with the numbers only and another with the string.

distCol = interDF.DISTANCE.str.split(expand=True)
distColRename = ['DISTNUM', 'MEASURE']
distCol.columns = distColRename
distCol

In [None]:
airNameCol = interDF.AIRLINENAME.str.split(':', expand=True)
airColRename = ['AIRLINENAME1', 'AIRLINECODE1']
airNameCol.columns = airColRename
airNameCol

In [None]:
newDF = pd.concat([interDF, airNameCol, distCol], axis=1)
interDF = newDF
interDF = interDF.drop(['AIRLINENAME'], axis=1)
interDF = interDF.drop(['AIRLINECODE1'], axis=1)

In [None]:
interDF.rename(columns = {'AIRLINENAME1':'AIRLINENAME'}, inplace = True)

In [None]:
interDF.head(5)

In [None]:
interDF['DISTNUM'].value_counts()

In [None]:
categoricCols = ['TRANSACTIONID', 'FLIGHTDATE', 'AIRLINECODE', 'AIRLINENAME', 'TAILNUM',
                 'FLIGHTNUM', 'ORIGINAIRPORTCODE', 'ORIGAIRPORTNAME', 'ORIGINCITYNAME',
                 'ORIGINSTATE', 'ORIGINSTATENAME', 'DESTAIRPORTCODE', 'DESTAIRPORTNAME',
                 'DESTCITYNAME', 'DESTSTATE', 'DESTSTATENAME', 'DISTANCE', 'MEASURE']

numericCols = ['CRSDEPTIME', 'DEPTIME', 'DEPDELAY', 'TAXIOUT', 'TAXIIN', 
               'ARRDELAY', 'CRSELAPSEDTIME', 'ACTUALELAPSEDTIME', 'ARRTIME', 
               'CRSARRTIME', 'WHEELSOFF', 'WHEELSON', 'DISTNUM']

boolCols = ['CANCELLED', 'DIVERTED']

#timeCols = ['ARRTIME', 'CRSARRTIME', 'WHEELSOFF', 'WHEELSON']

In [None]:
interDF[categoricCols] = interDF[categoricCols].astype('category')
interDF[numericCols] = interDF[numericCols].astype('float')
interDF[boolCols] = interDF[boolCols].astype('bool')
#interDF[timeCols] = interDF[timeCols].astype('dateutil')

In [None]:
interDF.describe(include=['category'])

In [None]:
interDF['DISTNUM'] = interDF['DISTNUM'].astype(int)

In [None]:
interDF['DISTNUM'].describe()

In [None]:
binwidth = int((max(interDF['DISTNUM'])-min(interDF['DISTNUM']))/4)
bins = range(min(interDF['DISTNUM']), max(interDF['DISTNUM']), binwidth)
group_names = ['Short', 'Medium', 'Long']

interDF['DISTANCEGROUP'] = pd.cut(interDF['DISTNUM'], bins, labels=group_names)

In [None]:
# Create a DEPDELAYGT15 column for delays greater than 15 minutes.

interDF['DEPDELAYGT15'] = np.where(interDF['DEPDELAY'] >= 15.0, True, False)


In [None]:
interDF['DEPDELAYGT15'].value_counts()

In [None]:
# Create a NEXTDAYARR column for next day arrivals.

interDF['NEXTDAYARR'] = (interDF['ARRTIME'] > 15) 

interDF['DEPTIME'].fillna(0, inplace=True)
interDF['DEPTIME'] = interDF['DEPTIME'].astype(int)

interDF['ACTUALELAPSEDTIME'].fillna(0, inplace=True)
interDF['ACTUALELAPSEDTIME'] = interDF['ACTUALELAPSEDTIME'].astype(int)

interDF['ARRTIME'].fillna(0, inplace=True)
interDF['ARRTIME'] = interDF['ARRTIME'].astype(int)

interDF['NEXTDAYARR'].fillna(0, inplace=True)
interDF['NEXTDAYARR'] = interDF['NEXTDAYARR'].astype(int)

interDF['NEXTDAYARR'] = np.where(interDF['ARRTIME'] >= 15.0, True, False)

In [None]:
interDF.info()

In [None]:
# Column 'MEASURE' would be useful if we ever have an entry showing something else \n
# such as kilometers. However, if that is unlikely to happen and this column becomes \n
# useless, we can simply remove the hash/comment from the line below and drop it.

#interDF = interDF.drop(['MEASURE'], axis=1)
interDF.head(5)

In [None]:
# Save the formatted, clean data just in case.

interDF.to_csv('cleaned_flights.txt', sep='|', index=False)

The Data has been cleaned up and formatted, and is ready to be saved into PostgreSQL.

Getting SQL engine started and connection established. 

In [None]:
sqlalchemy.create_engine('postgres://INFORMATIONHIDDEN')

In [None]:
# Load data into PostgreSQL.
%load_ext sql

In [None]:
%sql postgresql://INFORMATIONHIDDEN:INFORMATIONHIDDEN@INFORMATIONHIDDEN/tests_data_engineering     
'''
POSTGRES_ADDRESS = 'XXX'
POSTGRES_USERNAME = 'XXX'
POSTGRES_PASSWORD = 'XXX'
POSTGRES_DBNAME = 'tests_data_engineering'
'''

In [None]:
# I've been facing some issues while pushing all the DF to PostgreSQL. I've found a \n
# workaround that pushes data in increments so I can be sure data was successfully \n
# saved in the database. Not as pretty as a single push, but it works for now. -PB

In [None]:
engine = create_engine('postgresql://INFORMATIONHIDDEN:INFORMATIONHIDDEN@INFORMATIONHIDDEN/tests_data_engineering') 

In [None]:
interDF.iloc[:100,:].to_sql('flights', con=engine, index=False, if_exists='replace', chunksize=500)

In [None]:
interDF.iloc[101:500,:].to_sql('flights', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
interDF.iloc[501:5000,:].to_sql('flights', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
interDF.iloc[5001:10000,:].to_sql('flights', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
# If time allows
# interDF.iloc[10001:50000,:].to_sql('flights', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
# In SQL, create a FACT_FLIGHTS with flights information.

factDF = interDF[['TRANSACTIONID', 'DISTANCEGROUP', 'DISTNUM', 'MEASURE', 'DEPDELAYGT15', 
                 'NEXTDAYARR', 'AIRLINENAME', 'ORIGAIRPORTNAME', 'DESTAIRPORTNAME']]
factDF

In [None]:
factDF.iloc[:10000,:].to_sql('FACT_FLIGHTS', con=engine, index=False, if_exists='replace', chunksize=500)

In [None]:
# If time allows
# factDF.iloc[10001:50000,:].to_sql('FACT_FLIGHTS', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
# factDF.iloc[50001:500000,:].to_sql('FACT_FLIGHTS', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
# In SQL, create DIM_DATE and DIM_AIRPORT dimension tables. 

dimAirDF = interDF[['TRANSACTIONID', 'AIRLINECODE', 'AIRLINENAME', 'TAILNUM', 'FLIGHTNUM',
                     'ORIGINAIRPORTCODE', 'ORIGAIRPORTNAME', 'DESTAIRPORTCODE', 'DESTAIRPORTNAME',
                     'TAXIOUT', 'WHEELSOFF', 'WHEELSON', 'TAXIIN', 'ARRDELAY', 'CANCELLED',
                     'DIVERTED', 'NEXTDAYARR', 'DEPDELAYGT15']]

dimDateDF = interDF[['TRANSACTIONID', 'FLIGHTDATE', 'ORIGINAIRPORTCODE', 'ORIGINCITYNAME',
                    'ORIGINSTATE', 'DESTAIRPORTCODE', 'DESTCITYNAME', 'DESTSTATE', 'DEPDELAY',
                    'ARRTIME', 'ARRDELAY', 'DISTANCEGROUP']]

In [None]:
'''
# This is a different approach to create the alternative DFs - not used. 

#factDF.columns = interDF.columns
factDF = factDF.drop(['FLIGHTDATE', 'AIRLINECODE', 'TAILNUM', 'FLIGHTNUM', 'ORIGINAIRPORTCODE',
                      'ORIGINCITYNAME', 'ORIGINSTATE', 'ORIGINSTATENAME', 'DESTAIRPORTCODE',
                      'DESTCITYNAME', 'DESTSTATE', 'DESTSTATENAME', 'CRSDEPTIME', 'DEPTIME', 
                      'DEPDELAY', 'TAXIOUT', 'WHEELSOFF', 'WHEELSON', 'TAXIIN', 'CRSARRTIME', 
                      'ARRTIME', 'ARRDELAY', 'CRSELAPSEDTIME', 'ACTUALELAPSEDTIME', 'CANCELLED',
                      'DIVERTED', 'DISTANCE'], axis=1)

dimAirDF.columns = interDF.columns
dimAirDF = dimAirDF.drop(['FLIGHTDATE', 'ORIGINCITYNAME', 'ORIGINSTATE', 'ORIGINSTATENAME',
                        'DESTCITYNAME', 'DESTSTATE', 'DESTSTATENAME', 'CRSDEPTIME', 'DEPTIME',
                        'DEPDELAY', 'CRSARRTIME', 'ARRTIME', 'CRSELAPSEDTIME', 'ACTUALELAPSEDTIME',
                        'DISTANCE', 'DISTNUM', 'MEASURE','DISTANCEGROUP'], axis=1)

dimDateDF = interDF
dimDateDF.columns = interDF.columns
dimDateDF = dimDateDF.drop(['AIRLINECODE', 'TAILNUM', 'FLIGHTNUM', 'ORIGAIRPORTNAME',
                          'ORIGINSTATENAME', 'DESTAIRPORTNAME', 'DESTSTATENAME', 'CRSDEPTIME',
                          'DEPTIME', 'TAXIOUT', 'WHEELSOFF', 'WHEELSON', 'TAXIIN', 'CRSARRTIME',
                          'CRSELAPSEDTIME', 'ACTUALELAPSEDTIME', 'CANCELLED', 'DIVERTED',
                          'DISTANCE', 'AIRLINENAME', 'DISTNUM', 'MEASURE', 'DEPDELAYGT15', 
                          'NEXTDAYARR'], axis=1)
'''

In [None]:
dimAirDF.iloc[:10000,:].to_sql('DIM_AIRPORT', con=engine, index=False, if_exists='replace', chunksize=500)

In [None]:
# If time allows
# dimAirDF.iloc[10001:50000,:].to_sql('DIM_AIRPORT', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
# dimAirDF.iloc[50001:500000,:].to_sql('DIM_AIRPORT', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
dimDateDF.iloc[:10000,:].to_sql('DIM_DATE', con=engine, index=False, if_exists='replace', chunksize=500)

In [None]:
# If time allows
# dimDateDF.iloc[10001:50000,:].to_sql('DIM_DATE', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
# dimDateDF.iloc[50001:500000,:].to_sql('DIM_DATE', con=engine, index=False, if_exists='append', chunksize=500)

In [None]:
# In SQL, created a view named VW_FLIGHTS that joins the fact and dimension tables and \n
# returns columns useful for analysis. 

In [None]:
%%sql

CREATE OR REPLACE VIEW candidate3195.VW_FLIGHTS AS
SELECT *
FROM flights
;

In [None]:
# VW_FLIGHTS filtered columns:
# TRANSACTIONID, DISTANCEGROUP, DEPDELAYGT15, NEXTDAYARR, 
# AIRLINENAME, ORIGAIRPORTNAME, DESTAIRPORTNAME

In [None]:
# Final considerations in the presentation. 