In [1]:
#!pip install pandas
#!pip install psycopg2

In [2]:
import psycopg2
import pandas as pd
import glob
from datetime import datetime
from sqlalchemy import create_engine

In [3]:
def create_revenuedb_connection():
    conn = psycopg2.connect("host=127.0.0.1 dbname=airline_revenue user=postgres password=cherry")
    cur = conn.cursor()

    return cur, conn

In [4]:
def create_database():
    # connect to default database
    conn = psycopg2.connect("host=127.0.0.1 dbname=postgres user=postgres password=cherry")
    conn.set_session(autocommit=True)
    cur = conn.cursor()

    # create new database
    cur.execute("DROP DATABASE IF EXISTS airline_revenue")
    cur.execute("CREATE DATABASE airline_revenue")

    # commit and close connection to default database
    conn.close()

    # connect to new database
    cur, conn = create_revenuedb_connection()

    return cur, conn

In [5]:
def extract_from_csv(file):
    dataframe = pd.read_csv(file)
    return dataframe

In [6]:
def extract():
    # create an empty list
    extracted_data = list()
    
    # process all csv files
    for csvfile in glob.glob("./data/*.csv"):
        extracted_data.append(extract_from_csv(csvfile))
        print(csvfile)
                
    return extracted_data

In [7]:
def transform(data):
    tranformed_data = list()
    asm_df = data[0]
    or_df = data[1]
    flights_df = data[2]
    netincome_df = data[3]
    rpm_df = data[4]
    passengers_df = data[5]

    month_id_data = pd.Series(range(1, rpm_df.shape[0]+1))
    quarter_id_data = pd.Series(range(1, or_df.shape[0]+1))
    asm_id_data = pd.Series(range(1, asm_df.shape[0]+1))
    rpm_id_data = pd.Series(range(1, rpm_df.shape[0]+1))

    transfomrmed_months_data = pd.DataFrame(columns=['MONTH ID', 'YEAR', 'MONTH'])
    transfomrmed_quarters_data = pd.DataFrame(columns=['QUARTER ID', 'YEAR', 'QUARTER'])
    transformed_plf_data = pd.DataFrame(columns=['ASM ID', 'RPM ID','MONTH ID','DOMESTIC PLF(%)','INTERNATIONAL PLF(%)','TOTAL PLF(%)'])
    
    transfomrmed_months_data['MONTH ID'] = month_id_data
    transfomrmed_months_data['YEAR'] = rpm_df['YEAR'].astype('int')
    transfomrmed_months_data['MONTH'] = rpm_df['MONTH'].astype('int')
    
    transfomrmed_quarters_data['QUARTER ID'] = quarter_id_data
    transfomrmed_quarters_data['YEAR'] = or_df['YEAR'].astype('int')
    transfomrmed_quarters_data['QUARTER'] = or_df['QUARTER'].astype('int')
    
    transformed_plf_data['ASM ID'] = asm_id_data
    transformed_plf_data['RPM ID'] = rpm_id_data
    transformed_plf_data['MONTH ID'] = month_id_data
    transformed_plf_data['DOMESTIC PLF(%)'] = round(rpm_df['DOMESTIC(IN THOUSANDS)'] / asm_df['DOMESTIC(IN THOUSANDS)'] * 100, 2)
    transformed_plf_data['INTERNATIONAL PLF(%)'] = round(rpm_df['INTERNATIONAL(IN THOUSANDS)'] / asm_df['INTERNATIONAL(IN THOUSANDS)'] * 100, 2)
    transformed_plf_data['TOTAL PLF(%)'] = round((transformed_plf_data['DOMESTIC PLF(%)'] + transformed_plf_data['INTERNATIONAL PLF(%)']) / 2, 2)
    
    tranformed_data.append(transfomrmed_months_data)
    tranformed_data.append(transfomrmed_quarters_data)
    tranformed_data.append(asm_df)
    tranformed_data.append(rpm_df)
    tranformed_data.append(transformed_plf_data)
    tranformed_data.append(passengers_df)
    tranformed_data.append(flights_df)
    tranformed_data.append(netincome_df)
    tranformed_data.append(or_df)
    
    return tranformed_data

In [8]:
# Loading dataframes to PostgreSQL directly
def load(data):
    dialect = 'postgresql'
    user_name = 'postgres'
    password = 'cherry'
    host = '127.0.0.1'
    port = '5432'
    db_name = 'airline_revenue'
    conn = create_engine(f'{dialect}://{user_name}:{password}@{host}:{port}/{db_name}')
   
    data[0].to_sql(con=conn, name='months', if_exists='replace', index=False)
    data[1].to_sql(con=conn, name='quarters', if_exists='replace', index=False)
    data[2].to_sql(con=conn, name='monthly_asm', if_exists='replace', index=False)
    data[3].to_sql(con=conn, name='monthly_rpm', if_exists='replace', index=False)
    data[4].to_sql(con=conn, name='monthly_plf', if_exists='replace', index=False)
    data[5].to_sql(con=conn, name='monthly_passengers', if_exists='replace', index=False)
    data[6].to_sql(con=conn, name='monthly_flights', if_exists='replace', index=False)
    data[7].to_sql(con=conn, name='monthly_netincome', if_exists='replace', index=False)
    data[8].to_sql(con=conn, name='monthly_or', if_exists='replace', index=False) 

In [9]:
def log(message):
    timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)
    with open("./data/logfile.txt","a") as f:
        f.write(timestamp + ',' + message + '\n')

In [10]:
# Running ETL process
log("ETL Job Started")
log("Extract phase Started")
extracted_data = extract()
log("Extract phase Ended")
log("Transform phase Started")
transformed_data = transform(extracted_data)
log("Transform phase Ended")
log("Load phase Started")
load(transformed_data)
log("Load phase Ended")
log("ETL Job Ended")

./data/asm_for_all_airports.csv
./data/operating_revuenue_all_regions.csv
./data/flights_for_all_airports.csv
./data/net_income_all_regions.csv
./data/rpm_for_all_airports.csv
./data/passengers_for_all_airports.csv


In [11]:
# Reterieving data from tables
cur, conn = create_revenuedb_connection()
conn.set_session(autocommit=True)

In [12]:
# Reterieve month details
cur.execute(""" SELECT * FROM months LIMIT(5); """)
month_details = cur.fetchall()
print(month_details)

[(1, 2003, 1), (2, 2003, 2), (3, 2003, 3), (4, 2003, 4), (5, 2003, 5)]


In [13]:
# Reterieve quarter details
cur.execute(""" SELECT * FROM quarters LIMIT(5); """)
quarter_details = cur.fetchall()
print(quarter_details)

[(1, 2003, 1), (2, 2003, 2), (3, 2003, 3), (4, 2003, 4), (5, 2004, 1)]


In [14]:
# Reterieve ASM details
cur.execute(""" SELECT * FROM monthly_asm LIMIT(5); """)
asm_details = cur.fetchall()
print(asm_details)

[(1, 2003, 1, 9801309, 4237389, 14038698), (2, 2003, 2, 8476555, 3665686, 12142241), (3, 2003, 3, 9782676, 4147309, 13929984), (4, 2003, 4, 9336457, 3874139, 13210596), (5, 2003, 5, 9432542, 3912655, 13345197)]


In [15]:
# Reterieve RPM details
cur.execute(""" SELECT * FROM monthly_rpm LIMIT(5); """)
rpm_details = cur.fetchall()
print(rpm_details)

[(1, 2003, 1, 6428843, 2976704, 9405547), (2, 2003, 2, 5861473, 2493611, 8355084), (3, 2003, 3, 7095490, 2886137, 9981627), (4, 2003, 4, 6764567, 2561054, 9325622), (5, 2003, 5, 7055380, 2783080, 9838460)]


In [16]:
# Reterieve PLF details
cur.execute(""" SELECT * FROM monthly_plf LIMIT(5); """)
plf_details = cur.fetchall()
print(plf_details)

[(1, 1, 1, 65.59, 70.25, 67.92), (2, 2, 2, 69.15, 68.03, 68.59), (3, 3, 3, 72.53, 69.59, 71.06), (4, 4, 4, 72.45, 66.11, 69.28), (5, 5, 5, 74.8, 71.13, 72.96)]


In [17]:
# Reterieve passenger details
cur.execute(""" SELECT * FROM monthly_passengers LIMIT(5); """)
passenger_details = cur.fetchall()
print(passenger_details)

[(1, 2003, 1, 5659172, 1354646, 7013818), (2, 2003, 2, 5224456, 1139470, 6363926), (3, 2003, 3, 6307672, 1320809, 7628481), (4, 2003, 4, 5905978, 1197107, 7103085), (5, 2003, 5, 6217128, 1230334, 7447462)]


In [18]:
# Reterieve passenger details
cur.execute(""" SELECT * FROM monthly_flights LIMIT(5); """)
flight_details = cur.fetchall()
print(flight_details)

[(1, 2003, 1, 65960, 12254, 78214), (2, 2003, 2, 56960, 10503, 67463), (3, 2003, 3, 65168, 11957, 77125), (4, 2003, 4, 62117, 11065, 73182), (5, 2003, 5, 62611, 10770, 73381)]


In [19]:
# Reterieve passenger details
cur.execute(""" SELECT * FROM monthly_flights LIMIT(5); """)
flight_details = cur.fetchall()
print(flight_details)

[(1, 2003, 1, 65960, 12254, 78214), (2, 2003, 2, 56960, 10503, 67463), (3, 2003, 3, 65168, 11957, 77125), (4, 2003, 4, 62117, 11065, 73182), (5, 2003, 5, 62611, 10770, 73381)]


In [20]:
# Reterieve passenger details
cur.execute(""" SELECT * FROM monthly_netincome LIMIT(5); """)
netincome_details = cur.fetchall()
print(netincome_details )

[(1, 2003, 1, -963145, -28634, -29409, -10401, -1031589), (2, 2003, 2, -284473, 30279, 118489, 2767, -132938), (3, 2003, 3, -276719, 79002, 144784, 28694, -24239), (4, 2003, 4, -295354, 63386, 74234, 28009, -129725), (5, 2004, 1, -357163, 92037, 61890, 21563, -181673)]


In [21]:
# Reterieve passenger details
cur.execute(""" SELECT * FROM monthly_or LIMIT(5); """)
or_details = cur.fetchall()
print(or_details )

[(1, 2003, 1, 2891694, 667279, 445192, 103704, 4107869), (2, 2003, 2, 3063802, 598619, 570830, 82795, 4316046), (3, 2003, 3, 3104546, 702697, 662916, 125824, 4595983), (4, 2003, 4, 3004298, 700261, 542110, 136757, 4383426), (5, 2004, 1, 3033877, 817558, 516215, 135452, 4503102)]


In [22]:
# Close connection
cur.close()
conn.close()