<div class="alert alert-block alert-success">
    <h1>Project Portfolio # 1</h1>
    <h2>Part 2: OLTP > Data Mart Pipeline</h2>
    <h3>Coded by: Ariba Khan</h3>
</div>

A data mart will be created to record important metrics and analytics focusing on the following areas:

- Delay Analysis
- Revenue Analysis
- Incidents Analysis

#### Grain

One row of the fact table will represent one trip.

#### Dimension Tables

The following list consists of our dimension table entities, and the table following it breaks down their attributes:

- DimDate
- DimRoutes
- DimIncidents
- DimComplaints
- DimBuses
- DimPayments

#### Fact Table

The fact table snapshot will combine all columns of the dimension tables and will have the following facts as key columns:

- Scheduled_Departure
- Scheduled_Arrival
- Actual_Departure
- Actual_Arrival
- Delay_Departure
- Delay_Arrival
- IncidentCount
- ComplaintCount
- TotalFare

<div class="alert alert-block alert-success">
    <h3>Importing All Needed Libraries</h3>
</div>

In [1]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings("ignore", category=Warning)

<div class="alert alert-block alert-success">
    <h3>Connecting OLTP and Data Mart</h3>
</div>

In [2]:
# OLTP connection
oltp_engine = create_engine('postgresql://aribaandsumbal:DAWproject@localhost:5432/Public Transport (Karachi)')

# Data Mart connection
dw_engine = create_engine('postgresql://aribaandsumbal:DAWproject@localhost:5432/Public Transport - Data Mart')

<div class="alert alert-block alert-success">
    <h3>Data Ingestion</h3>
</div>

In [3]:
def extract_table(table_name, engine):
    query = f'SELECT * FROM "{table_name}";'
    return pd.read_sql(query, engine)

In [4]:
tables = ['Buses', 'Routes', 'Fares', 'Tickets', 'Payments', 'Incidents', 'Complaints', 'Trips']

staging_data = {table: extract_table(table, oltp_engine) for table in tables}

<div class="alert alert-block alert-success">
    <h3>Staging Area</h3>
</div>

In [5]:
# treating missing values

def handle_missing_values(df):
    if df.isnull().any().any():
        df = df.interpolate()
    return df

In [6]:
for table, df in staging_data.items():
    staging_data[table] = handle_missing_values(df)

In [7]:
# storing all tables in separate dataframes

buses_df = staging_data['Buses']
routes_df = staging_data['Routes']
fares_df = staging_data['Fares']
tickets_df = staging_data['Tickets']
payments_df = staging_data['Payments']
incidents_df = staging_data['Incidents']
complaints_df = staging_data['Complaints']
trips_df = staging_data['Trips']

#### Creating Dimension Tables

In [8]:
## DimDate

start_date = datetime(2023, 6, 1)
end_date = datetime(2024, 5, 31)
date_range = pd.date_range(start_date, end_date)

# creating DimDate dataframe
dim_date = pd.DataFrame({
    'DateID': date_range,
    'Day': date_range.day,
    'Week': date_range.isocalendar().week,
    'Month': date_range.month,
    'Quarter': date_range.quarter,
    'Year': date_range.year,
    'DayOfWeek': date_range.dayofweek + 1
})

In [9]:
## DimBuses

dim_buses = buses_df.copy()
dim_buses.rename(columns={'bus_id': 'BusID', 'bus_name': 'BusName', 'bus_type': 'BusType', 'capacity': 'Cap'}, inplace=True)

In [10]:
## DimRoutes

dim_routes = routes_df.copy()
dim_routes.rename(columns={
    'route_id': 'RouteID', 'route_name': 'RouteName', 'route_origin': 'RouteOrigin', 
    'route_dest': 'RouteDestination', 'origin_lat': 'OriginLat', 'origin_long': 'OriginLong', 
    'dest_lat': 'DestLat', 'dest_long': 'DestLong'}, inplace=True)
dim_routes = dim_routes.drop(columns = 'bus_id')

In [11]:
## DimIncidents

dim_incidents = incidents_df[['incident_id', 'incident_type', 'incident_time', 'trip_id']].copy()
dim_incidents = dim_incidents.drop_duplicates(subset='trip_id')
dim_incidents['incident_id'] = [f'INC{str(i).zfill(3)}' for i in range(1, len(dim_incidents) + 1)]
dim_incidents.rename(columns={'incident_id': 'IncidentID', 'incident_type': 'IncType', 'incident_time': 'Time'}, inplace=True)

In [12]:
## DimTickets

# joining relevant tables together
tickets_payments = tickets_df.merge(payments_df, on='payment_id')
tickets_payments_fares = tickets_payments.merge(fares_df, on='fare_id')

# aggregating data
dim_tickets = tickets_payments_fares.groupby('trip_id').agg(
    TotalTickets=('ticket_id', 'count'),
    CashCount=('payment_method', lambda x: (x == 'Cash').sum()),
    CreditCount=('payment_method', lambda x: (x == 'Credit Card').sum()),
    DebitCount=('payment_method', lambda x: (x == 'Debit Card').sum()),
    WalletCount=('payment_method', lambda x: (x == 'Digital Wallet').sum()),
    TypeStudent=('fare_type', lambda x: (x == 'Student').sum()),
    TypeRegular=('fare_type', lambda x: (x == 'Regular').sum())
).reset_index()

# fixing primary key
dim_tickets.rename(columns={'trip_id': 'TicketID'}, inplace=True)
dim_tickets['TicketID'] = [f'TCK{str(i).zfill(3)}' for i in range(1, len(dim_tickets) + 1)]

In [13]:
## DimComplaints

# aggregating data
dim_complaints = complaints_df.groupby('trip_id').agg(
    CountDelay=('comp_type', lambda x: (x == 'Delay').sum()),
    CountCleanliness=('comp_type', lambda x: (x == 'Cleanliness').sum()),
    CountRudeStaff=('comp_type', lambda x: (x == 'Rude Staff').sum()),
    CountOvercrowding=('comp_type', lambda x: (x == 'Overcrowding').sum()),
    CountNoise=('comp_type', lambda x: (x == 'Noisy Environment').sum()),
    CountSeats=('comp_type', lambda x: (x == 'Uncomfortable Seats').sum()),
    CountRouteChange=('comp_type', lambda x: (x == 'Route Change').sum()),
    CountAirCon=('comp_type', lambda x: (x == 'Poor Air Conditioning').sum()),
    CountSafety=('comp_type', lambda x: (x == 'Safety Concerns').sum()),
    CountAccessibility=('comp_type', lambda x: (x == 'Accessibility Issues').sum())
).reset_index()

dim_complaints['ComplaintID'] = [f'CMP{str(i).zfill(3)}' for i in range(1, len(dim_complaints) + 1)]

complaints_order = ['ComplaintID', 'CountDelay', 'CountCleanliness', 'CountRudeStaff', 'CountOvercrowding', 'CountNoise', 
                    'CountSeats', 'CountRouteChange', 'CountAirCon', 'CountSafety', 'CountAccessibility', 'trip_id']
dim_complaints = dim_complaints[complaints_order]

#### Creating Fact Table

In [14]:
fact_trip_performance = trips_df.copy()

# calculating delays
fact_trip_performance['Delay_Departure'] = abs((pd.to_datetime(fact_trip_performance['actual_departure_time']) - pd.to_datetime(fact_trip_performance['departure_time'])).dt.total_seconds() / 60)
fact_trip_performance['Delay_Arrival'] = abs((pd.to_datetime(fact_trip_performance['actual_arrival_time']) - pd.to_datetime(fact_trip_performance['arrival_time'])).dt.total_seconds() / 60)

# creating incident and complaint counts
incident_counts = incidents_df.groupby('trip_id').size().reset_index(name='IncidentCount')
complaint_counts = complaints_df.groupby('trip_id').size().reset_index(name='ComplaintCount')
fact_trip_performance = fact_trip_performance.merge(incident_counts, how='left', left_on='trip_id', right_on='trip_id').fillna({'IncidentCount': 0})
fact_trip_performance = fact_trip_performance.merge(complaint_counts, how='left', left_on='trip_id', right_on='trip_id').fillna({'ComplaintCount': 0})

# calculating total fare
total_fare = tickets_payments_fares.groupby('trip_id')['payment_amount'].sum().reset_index(name='TotalFare')
fact_trip_performance = fact_trip_performance.merge(total_fare, how='left', on='trip_id')

# adding 'TicketID'
fact_trip_performance['TicketID'] = dim_tickets['TicketID']

# renaming columns
fact_trip_performance = fact_trip_performance.rename(columns={
    'trip_id': 'TripID', 'route_id': 'RouteID', 'bus_id': 'BusID', 'trip_date': 'DateID', 'departure_time': 'Scheduled_Departure', 
    'arrival_time': 'Scheduled_Arrival', 'actual_departure_time': 'Actual_Departure', 'actual_arrival_time': 'Actual_Arrival'
})

# adding 'IncidentID'
fact_trip_performance = fact_trip_performance.merge(dim_incidents[['IncidentID', 'trip_id']], 
                                                    how='left', left_on='TripID', right_on='trip_id')
fact_trip_performance = fact_trip_performance.drop(columns=['trip_id'])
dim_incidents = dim_incidents.drop(columns=['trip_id'])

# adding 'ComplaintID'
fact_trip_performance = fact_trip_performance.merge(dim_complaints[['ComplaintID', 'trip_id']], 
                                                    how='left', left_on='TripID', right_on='trip_id')
fact_trip_performance = fact_trip_performance.drop(columns=['trip_id'])
dim_complaints = dim_complaints.drop(columns=['trip_id'])

# dropping irrelevant columns
fact_trip_performance = fact_trip_performance.drop(columns = ['cond_id', 'driver_id', 'sched_id'])

# fixing order of columns
fact_order = ['TripID', 'DateID', 'RouteID', 'BusID', 'TicketID', 'ComplaintID', 'IncidentID', 'Scheduled_Departure',
              'Scheduled_Arrival', 'Actual_Departure', 'Actual_Arrival', 'Delay_Departure', 'Delay_Arrival', 'IncidentCount',
              'ComplaintCount', 'TotalFare']
fact_trip_performance = fact_trip_performance[fact_order]

<div class="alert alert-block alert-success">
    <h3>Loading Data Onto Data Mart</h3>
</div>

In [26]:
def load_to_dw(df, table_name, engine):
    df.to_sql(table_name, engine, if_exists='replace', index=False)

In [None]:
load_to_dw(dim_date, 'DimDate', dw_engine)
load_to_dw(dim_buses, 'DimBuses', dw_engine)
load_to_dw(dim_routes, 'DimRoutes', dw_engine)
load_to_dw(dim_incidents, 'DimIncidents', dw_engine)
load_to_dw(dim_tickets, 'DimTickets', dw_engine)
load_to_dw(dim_complaints, 'DimComplaints', dw_engine)
load_to_dw(fact_trip_performance, 'FactTripPerformance', dw_engine)

<div class="alert alert-block alert-success">
    <h3>Generating Fact Table SnapShot</h3>
</div>

In [27]:
snapshot = pd.merge(fact_trip_performance, dim_date, on='DateID', how='left')
snapshot = pd.merge(snapshot, dim_routes, on='RouteID', how='left')
snapshot = pd.merge(snapshot, dim_buses, on='BusID', how='left')
snapshot = pd.merge(snapshot, dim_tickets, on='TicketID', how='left')
snapshot = pd.merge(snapshot, dim_complaints, on='ComplaintID', how='left')
snapshot = pd.merge(snapshot, dim_incidents, on='IncidentID', how='left')

In [30]:
load_to_dw(snapshot, 'FactTableSnapshot', dw_engine)