US Flights Data Engineering (ETL)

In [None]:
#imports 
import os
import numpy as np
import pandas as pd
import petl as etl
import psycopg2 
from sqlalchemy import create_engine
from datetime import datetime


Variables Initialisation

In [None]:
hostname = 'i.rds.amazonaws.com'
database = 'tests_data_engineering'
username = ''
pwd = ''
port_id = 5432
conn = None

driver_ps = "postgresql+psycopg2"
engine_str = "{}://{}:{}@{}/{}".format(driver_ps,username, pwd, hostname, database)

schema_string = ""
staging_table_name = "staging_us_flights"

data_type_replace = {
    'object'        :   'varchar',
    'float64'       :   'float',
    'int64'         :   'int',
    'datetime64'    :   'datetime'
}

Postgres DB Connection

In [None]:
def initialize_db_connection():
    
    connection = psycopg2.connect(
    host = hostname,
    dbname = database,
    user = username,
    password = pwd,
    port = port_id
    )
    
    return connection

Functions : ALTER Data Type & Primary Key

In [None]:
def alter_primarykey(table, column):
    alter_pkey = '''ALTER TABLE {}."{}" 
    ADD PRIMARY KEY ("{}") '''.format(schema_string,table, column )
    cursor.execute(alter_pkey)
    conn.commit()
   

In [None]:
def alter_datatype(table, column, type):
    alter_dtype = '''ALTER TABLE {}."{}" 
    ALTER COLUMN "{}" TYPE {} '''.format(schema_string,table, column, type )
    cursor.execute(alter_dtype)
    conn.commit()
    # USING "{}"::{} column,  type

Function Date Dimension Creation

In [None]:
def get_date_dimension(date_df):
    
    date_df['YEAR'], date_df['QUARTER'] =  date_df['FLIGHTDATE'].dt.year, date_df['FLIGHTDATE'].dt.quarter
    date_df['MONTH'], date_df['MONTHNAME'] =  date_df['FLIGHTDATE'].dt.month, date_df['FLIGHTDATE'].dt.month_name()
    date_df['DAYSINMONTH'] =  date_df['FLIGHTDATE'].dt.days_in_month
    date_df['WEEK'], date_df['WEEKDAY'] =  date_df['FLIGHTDATE'].dt.week, date_df['FLIGHTDATE'].dt.day_name()
    date_df['DAY'], date_df['YEARDAY'] =  date_df['FLIGHTDATE'].dt.day, date_df['FLIGHTDATE'].dt.dayofyear
    return date_df

Function Distance Bucket


In [None]:
def get_distance_group( distance):
    bucket = distance // 100
    upper = (bucket + 1 ) * 100
    lower = 0
    if(bucket > 0):
        lower = (100 * bucket ) + 1

    return "{}-{} miles".format(lower, upper)

Functions : Staging Table Creation

In [5]:
def create_staging_table(cursor):
    
    # Read Source Text file to get columns and their types
    #flight_table = etl.fromcsv('flights.txt',delimiter='|')
    df = pd.read_csv('flights.txt', sep='|')
    # df.head()
    # df.dtypes
    
    column_string = " ,".join("{} {}".format(name,desc) for (name,desc) in zip(df.columns, df.dtypes.replace(data_type_replace)))
   
    cursor.execute("DROP TABLE IF EXISTS {}.{}".format(schema_string, staging_table_name))

    cursor.execute( "CREATE TABLE {}.{} ({})".format(schema_string, staging_table_name, column_string))
  
    
    flight_table = etl.fromcsv('flights.txt',delimiter='|')

    flights_file = open('flights.txt')

    copy_todb_sql = """
    COPY candidate6421.staging_us_flights FROM STDIN WITH
        CSV
        HEADER
        DELIMITER AS '|'
    """
    cursor.copy_expert(sql=copy_todb_sql, file=flights_file)


Connection Init - psycopg2 

In [None]:
conn = initialize_db_connection()

cursor = conn.cursor()

##Staging table creation
# create_staging_table(cursor)

 Sql alchemy engine : Source file read

In [6]:
engine = create_engine(engine_str)
flights_df = pd.read_csv('flights.txt', sep='|')


NameError: name 'create_engine' is not defined

 Airline Dimension

In [None]:
dim_name = "DIM_AIRLINE"
airline_df = pd.DataFrame(flights_df, columns=['AIRLINECODE', 'AIRLINENAME']).drop_duplicates()
airline_df["AIRLINENAME"] = airline_df["AIRLINENAME"].str.split(":", 1).str[0] #.str.split("(", 1, expand=True)[0]

#write to sql
# airline_df.to_sql(dim_name, engine, schema= schema_string, if_exists="replace", index=False)

airline_df


In [None]:
# dim_name = "DIM_AIRLINE"
# alter_datatype( dim_name, "AIRLINECODE", "CHAR(2)")
# alter_datatype( dim_name, "AIRLINENAME", "VARCHAR(100)")
# alter_primarykey(dim_name, "AIRLINECODE")

Origin Airport Dimension

In [8]:
dim_name = "DIM_ORIGINAIRPORT"
origin_df = pd.DataFrame(flights_df, columns=['ORIGINAIRPORTCODE', \
                                              'ORIGAIRPORTNAME',\
                                              'ORIGINCITYNAME', \
                                              'ORIGINSTATE', \
                                              'ORIGINSTATENAME']).drop_duplicates()

#Clean ORIGAIRPORTNAME
name= origin_df["ORIGAIRPORTNAME"].str.split(":", 1)

origin_df["ORIGAIRPORTNAME"] = name.str[1].str.strip()

#Extract State from airportName, if null
origin_df["ORIGINSTATE"] = origin_df["ORIGINSTATE"].fillna(name.str[0].str[-2:])

#Clean ORIGINCITYNAME
origin_df["ORIGINCITYNAME"] = origin_df["ORIGINCITYNAME"].str.split("/", 1).str[0] 


#write to sql
# origin_df.sort_values(by=['ORIGINAIRPORTCODE']).to_sql(dim_name, engine, schema= schema_string, if_exists="replace", index=False)

origin_df.loc[origin_df["ORIGINSTATENAME"] == 'nan'].head()



NameError: name 'pd' is not defined

In [None]:
# dim_name = "DIM_ORIGINAIRPORT"

# alter_datatype( dim_name, "ORIGINAIRPORTCODE", "CHAR(3)")
# alter_primarykey(dim_name, "ORIGINAIRPORTCODE")
# alter_datatype( dim_name, "ORIGAIRPORTNAME", "VARCHAR(150)")
# alter_datatype( dim_name, "ORIGINCITYNAME", "VARCHAR(50)")
# alter_datatype( dim_name, "ORIGINSTATE", "CHAR(2)")
# alter_datatype( dim_name, "ORIGINSTATENAME", "VARCHAR(50)")

Destination Airport Dimension

In [None]:
dim_name = "DIM_DESTAIRPORT"
dest_df = pd.DataFrame(flights_df, columns=['DESTAIRPORTCODE', \
                                              'DESTAIRPORTNAME',\
                                              'DESTCITYNAME', \
                                              'DESTSTATE', \
                                              'DESTSTATENAME']).drop_duplicates()

#Clean ORIGAIRPORTNAME
name= dest_df["DESTAIRPORTNAME"].str.split(":", 1)

dest_df["DESTAIRPORTNAME"] = name.str[1].str.strip()

#Extract State from airportName, if null
dest_df["DESTSTATE"] = dest_df["DESTSTATE"].fillna(name.str[0].str[-2:])

#Clean CITYNAME : Save multi cities values to new columns
dest_df[["DESTCITYNAME", "DESTCITY1", "DESTCITY2"]] = dest_df["DESTCITYNAME"].str.split("/", expand=True)

#write to sql
# dest_df.sort_values(by=['DESTAIRPORTCODE']).to_sql(dim_name, engine, schema= schema_string, if_exists="replace", index=False)

# dest_df.loc[dest_df["DESTSTATE"] == 'PA'].head()
dest_df.loc[dest_df["DESTAIRPORTNAME"]  == 'nan'].head()

In [None]:

# dim_name = "DIM_DESTAIRPORT"

# # alter_datatype( dim_name, "DESTAIRPORTCODE", "CHAR(3)")
# # alter_primarykey(dim_name, "DESTAIRPORTCODE")
# alter_datatype( dim_name, "DESTAIRPORTNAME", "VARCHAR(150)")
# alter_datatype( dim_name, "DESTCITYNAME", "VARCHAR(50)")
# alter_datatype( dim_name, "DESTSTATE", "CHAR(2)")
# alter_datatype( dim_name, "DESTSTATENAME", "VARCHAR(50)")
# alter_datatype( dim_name, "DESTCITY1", "VARCHAR(50)")
# alter_datatype( dim_name, "DESTCITY2", "VARCHAR(50)")


Date Dimension Derivation 

In [None]:
dim_name = "DIM_DATE"
date_df = pd.DataFrame(flights_df, columns=['FLIGHTDATE']).drop_duplicates()

#convert number to string to date
date_df['FLIGHTDATE'] = pd.to_datetime(date_df["FLIGHTDATE"].astype(str))

#create date parts
date_df = get_date_dimension(date_df)

#write to SQL
# date_df.sort_values(by=['FLIGHTDATE']).to_sql(dim_name, engine, schema= schema_string, if_exists="replace", index=False)

In [None]:

# dim_name = "DIM_DATE"

# alter_datatype( dim_name, "FLIGHTDATE", "DATE")
# alter_primarykey(dim_name, "FLIGHTDATE")
# alter_datatype( dim_name, "YEAR", "SMALLINT")
# alter_datatype( dim_name, "QUARTER", "SMALLINT")
# alter_datatype( dim_name, "MONTH", "SMALLINT")
# alter_datatype( dim_name, "MONTHNAME", "VARCHAR(10)")
# alter_datatype( dim_name, "DAYSINMONTH", "SMALLINT")
# alter_datatype( dim_name, "WEEK", "SMALLINT")
# alter_datatype( dim_name, "WEEKDAY", "VARCHAR(10)")
# alter_datatype( dim_name, "DAY", "SMALLINT")
# alter_datatype( dim_name, "YEARDAY", "SMALLINT")



Flight Dimension

In [None]:
dim_name = "DIM_FLIGHT"

dflight_df = pd.DataFrame(flights_df, columns=['TRANSACTIONID', 'FLIGHTDATE','TAILNUM','FLIGHTNUM',	'DEPTIME','DEPDELAY', \
                                                'TAXIOUT','TAXIIN',	'ARRTIME',	'ARRDELAY',	'DISTANCE'])

#convert number to string to date
dflight_df['FLIGHTDATE'] = pd.to_datetime(dflight_df["FLIGHTDATE"].astype(str))


#convert number to time for DEPTIME and ARRTIME
deptime = dflight_df["DEPTIME"].astype(str).str.replace("\.0", "", regex=True).str.zfill(4)
deptime = deptime.str.replace("0nan", "", regex=True)
dflight_df['DEPTIME'] = pd.to_datetime( deptime, format='%H%M',  errors='coerce').dt.time

arrtime = dflight_df["ARRTIME"].astype(str).str.replace("\.0", "", regex=True).str.zfill(4)
arrtime = arrtime.str.replace("0nan", "", regex=True)
dflight_df['ARRTIME'] = pd.to_datetime( arrtime, format='%H%M',  errors='coerce').dt.time

# DEPDELAY, ARRDELAY, TAXIIN, TAXIOUT to integer

cols_to_numeric = ["DEPDELAY", "ARRDELAY", "TAXIOUT", "TAXIIN"]
dflight_df[cols_to_numeric] =  dflight_df[cols_to_numeric].fillna(0).astype(int)

#Distance to int
dflight_df["DISTANCE"]  = dflight_df["DISTANCE"].str.split().str[0].fillna(0).astype(int)

#TAILNUM 
dflight_df["TAILNUM"]  = dflight_df["TAILNUM"].fillna('N/A')



#write to SQL
# dflight_df.sort_values(by=['FLIGHTDATE']).to_sql(dim_name, engine, schema= schema_string, if_exists="replace", index=False)

# dflight_df.sort_values(by=['FLIGHTDATE']).head(20)
dflight_df

In [None]:
# dim_name = "DIM_FLIGHT"

# alter_datatype( dim_name, "TRANSACTIONID", "BIGINT")
# alter_primarykey(dim_name, "TRANSACTIONID")
# alter_datatype( dim_name, "FLIGHTDATE", "DATE")
# alter_datatype( dim_name, "TAILNUM", "CHAR(6)")
# alter_datatype( dim_name, "FLIGHTNUM", "SMALLINT") --change it to CHAR(4)
# alter_datatype( dim_name, "DEPTIME", "TIME")
# alter_datatype( dim_name, "ARRTIME", "TIME")
# alter_datatype( dim_name, "DEPDELAY", "SMALLINT")
# alter_datatype( dim_name, "TAXIOUT", "SMALLINT")
# alter_datatype( dim_name, "TAXIIN", "SMALLINT")
# alter_datatype( dim_name, "ARRDELAY", "SMALLINT")
# alter_datatype( dim_name, "DISTANCE", "SMALLINT")

FACT FLIGHTS

In [None]:

fact_table = "FACT_FLIGHTS"

fact_df = pd.DataFrame(flights_df, columns=['TRANSACTIONID', 'FLIGHTDATE','DEPTIME','DEPDELAY', \
                                                "AIRLINECODE", 'ORIGINAIRPORTCODE', 'DESTAIRPORTCODE',\
                                                'ARRTIME',	'ARRDELAY',	'DISTANCE', 'CANCELLED', 'DIVERTED'])


#FACT Flight ID - AUTO Increment
fact_df.insert(0, 'FACTFLIGHTID', range(1, 1 + len(fact_df)))

#CANCELLED to bool
fact_df[ "CANCELLED"] = fact_df[ "CANCELLED"].map({'F': False,'0': False, 'False' :False, \
                                                'T': True, 'True': True, '1': True})
#Diverted to bool 
fact_df["DIVERTED"] = fact_df["DIVERTED"].map({'F': False,'0': False, 'False' :False, \
                                                'T': True, 'True': True, '1': True})
#convert number to string to date
fact_df['FLIGHTDATE'] = pd.to_datetime(fact_df["FLIGHTDATE"].astype(str))


#convert number to time for DEPTIME and ARRTIME
deptime = fact_df["DEPTIME"].astype(str).str.replace("\.0", "", regex=True).str.zfill(4)
deptime = deptime.str.replace("0nan", "", regex=True)
fact_df['DEPTIME'] = pd.to_datetime( deptime, format='%H%M',  errors='coerce')

arrtime = fact_df["ARRTIME"].astype(str).str.replace("\.0", "", regex=True).str.zfill(4)
arrtime = arrtime.str.replace("0nan", "", regex=True)
fact_df['ARRTIME'] = pd.to_datetime( arrtime, format='%H%M',  errors='coerce')

# DEPDELAY, ARRDELAY, TAXIIN, TAXIOUT to integer

cols_to_numeric = ["DEPDELAY", "ARRDELAY"]
fact_df[cols_to_numeric] =  fact_df[cols_to_numeric].fillna(0).astype(int)

#Distance to int
fact_df["DISTANCE"]  = fact_df["DISTANCE"].str.split().str[0].fillna(0).astype(int)

#Dep Delay > 15
fact_df["DEPDELAYGT15"] = np.where(fact_df["DEPDELAY"] > 15, 1, 0)

#Distance Group
fact_df["DISTANCEGROUP"] = fact_df.apply(lambda x: get_distance_group(x.DISTANCE), axis=1)

#ElapsedTime
elapsedtime =  fact_df["ARRTIME"] - fact_df["DEPTIME"] 
fact_df["ACTUALELAPSEDTIME"]  = elapsedtime.dt.total_seconds() / 60
fact_df["ACTUALELAPSEDTIME"] = np.where( fact_df[ "CANCELLED"] , 0, fact_df["ACTUALELAPSEDTIME"].fillna(0).astype(int))

#NEXTDAYARR
fact_df["NEXTDAYARR"] = np.where(  ( fact_df["ACTUALELAPSEDTIME"] + fact_df["ARRDELAY"] - fact_df["DEPDELAY"] > 1440) | (fact_df["ACTUALELAPSEDTIME"] < 0 )   , 1, 0)

# fact_df["ACTUALELAPSEDTIME"] = np.where( fact_df["ACTUALELAPSEDTIME"] < 0, 1440 +fact_df["ACTUALELAPSEDTIME"] , fact_df["ACTUALELAPSEDTIME"] )
# fact_df.loc[fact_df["NEXTDAYARR"]  == 1]

cols_to_drop = ['DEPTIME','DEPDELAY', 'ARRTIME', 'DISTANCE',	'ARRDELAY', 'ACTUALELAPSEDTIME']
fact_df.drop(columns=cols_to_drop, inplace=True)

#write to SQL
# fact_df.sort_values(by=['TRANSACTIONID']).to_sql(fact_table, engine, schema= schema_string, if_exists="replace", index=False)

In [None]:
fact_df.loc[(fact_df["ACTUALELAPSEDTIME"] > -100) & (fact_df["NEXTDAYARR"]  == 1) ].head(50)

In [None]:
conn = initialize_db_connection()

cursor = conn.cursor()
fact_table = "FACT_FLIGHTS"

# alter_primarykey(fact_table, "FACTFLIGHTID")
# alter_datatype( fact_table, "TRANSACTIONID", "BIGINT")
# alter_datatype( fact_table, "FLIGHTDATE", "DATE")
# alter_datatype( fact_table, "AIRLINECODE", "CHAR(2)")
# alter_datatype( fact_table, "ORIGINAIRPORTCODE", "CHAR(3)")
# alter_datatype( fact_table, "DESTAIRPORTCODE", "CHAR(3)")
# alter_datatype( fact_table, "CANCELLED", "BOOLEAN")
# alter_datatype( fact_table, "DIVERTED", "BOOLEAN")
# alter_datatype( fact_table, "DEPDELAYGT15", "BIT(1)")
# alter_datatype( fact_table, "DISTANCEGROUP", "VARCHAR(30)")
# alter_datatype( fact_table, "NEXTDAYARR", "BIT(1)")

# "TRANSACTIONID", "FLIGHTDATE", "AIRLINECODE", "ORIGINAIRPORTCODE", "DESTAIRPORTCODE", 
# "CANCELLED", "DIVERTED", "DEPDELAYGT15", "DISTANCEGROUP", "NEXTDAYARR"
	

In [None]:

conn.commit()
cursor.close()