# **STIB-MIVB Schedule: Preprocessing & DB Setup + Load**

In [1]:
import numpy as np
import pandas as pd
import geopandas as gpd
import gtfs_kit as gk
import psycopg2
import psycopg2.extras as extras
from IPython.display import display, HTML
from psycopg2 import Error
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

## 1. Load Data

In [2]:
# get path of gtfs files

path1 = 'stib_data/schedule/03_Sep'
path2 = 'stib_data/schedule/23_Sep'

In [3]:
# check list of available files in feed

gk.list_feed(path1)

Unnamed: 0,file_name,file_size
0,agency.txt,148
1,calendar.txt,18880
2,calendar_dates.txt,6874
3,routes.txt,5189
4,shapes.txt,14890589
5,stops.txt,153717
6,stop_times.txt,140458620
7,translations.txt,78184
8,trips.txt,8799054


In [4]:
# load files as gtfs feed

sched1 = gk.read_feed(path1, dist_units = 'm')
sched2 = gk.read_feed(path2, dist_units = 'm')

## 2. Preprocess Data

In [5]:
# sched1.validate()

In [6]:
# sched2.validate()

In [7]:
# function for preprocessing trips
# includes column rename and updating trip direction to align with shapefile data

def trips_preprocess(feed):
    feed.trips = feed.trips.rename(
        columns = {
            'direction_id': 'direction'
        }
    )
    feed.trips.direction = feed.trips.direction + 1

In [8]:
# apply trips preprocessing

trips_preprocess(sched1)
trips_preprocess(sched2)

In [9]:
# function for preprocessing routes
# includes column rename, column drop, mapping of vehicle type, adding extra derived column, prepend hash symbol for colour columns

def routes_preprocess(feed):
    feed.routes = feed.routes.rename(
        columns = {
            'route_short_name': 'line',
            'route_long_name': 'line_name',
            'route_type': 'vehicle',
            'route_color': 'route_color_hex',
            'route_text_color': 'route_text_color_hex',
        }
    ).drop(
        columns = ['route_desc', 'route_url']
    )
    
    for index, row in feed.routes.iterrows():
        if row['vehicle'] == 3:
            feed.routes.at[index,'vehicle'] = 'BUS'
        elif row['vehicle'] == 0:
            feed.routes.at[index,'vehicle'] = 'TRAM'
        elif row['vehicle'] == 1:
            feed.routes.at[index,'vehicle'] = 'METRO'
        else:
            pass
    
    feed.routes['line_desc'] = feed.routes.line + ' (' + feed.routes.line_name + ')'

    feed.routes.route_color_hex = '#' + feed.routes.route_color_hex
    feed.routes.route_text_color_hex = '#' + feed.routes.route_text_color_hex

In [10]:
# apply routes preprocessing

routes_preprocess(sched1)
routes_preprocess(sched2)

In [11]:
# function for preprocessing stop times
# includes adding an extra derived column, to deal with arrival time going beyond 24:00:00 due to midnight schedule indication

def stop_times_preprocess(feed):
    feed.stop_times['arrival_time_norm'] = feed.stop_times['arrival_time'].apply(
        lambda x: str("{0:0=2d}".format(int(x[:2]) - 24)) + x[2:] if int(x[:2]) >= 24 else x
    )

In [12]:
# apply stop times preprocessing

stop_times_preprocess(sched1)
stop_times_preprocess(sched2)

In [13]:
# function for preprocessing stops
# includes column drop

def stops_preprocess(feed):
    feed.stops = feed.stops.drop(
        columns = [
            'stop_code', 
            'stop_desc', 
            'zone_id', 
            'stop_url'
        ]
    )

In [14]:
# apply stops preprocessing

stops_preprocess(sched1)
stops_preprocess(sched2)

In [15]:
# identify the distinct groups for scheduled day types

sched1.calendar[['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']].drop_duplicates()

Unnamed: 0,monday,tuesday,wednesday,thursday,friday,saturday,sunday
0,1,1,1,1,1,0,0
1,0,0,0,0,0,1,0
2,0,0,0,0,0,0,1


In [16]:
# function for preprocessing calendar
# includes mapping of day type, column drop, conversion of date columns to standard format

def calendar_preprocess(feed):
    feed.calendar['day_category'] = ''
    
    for index, row in feed.calendar.iterrows():
        if (row['saturday'] == 1) & (row['sunday'] == 0):
            feed.calendar.at[index,'day_category'] = 'Saturday'
        elif (row['saturday'] == 0) & (row['sunday'] == 1):
            feed.calendar.at[index,'day_category'] = 'Sunday'
        else:
            feed.calendar.at[index,'day_category'] = 'Weekday'
    
    feed.calendar = feed.calendar.drop(
        columns = [
            'monday', 
            'tuesday', 
            'wednesday', 
            'thursday',
            'friday',
            'saturday',
            'sunday'
        ]
    )
    
    feed.calendar.start_date = pd.to_datetime(feed.calendar.start_date, format = '%Y%m%d')
    feed.calendar.end_date = pd.to_datetime(feed.calendar.start_date, format = '%Y%m%d')

In [17]:
# apply calendar preprocessing

calendar_preprocess(sched1)
calendar_preprocess(sched2)

In [18]:
# function for preprocessing calendar date exceptions
# includes mapping of exception type, conversion of date column to standard format

def calendar_dates_preprocess(feed):
    for index, row in feed.calendar_dates.iterrows():
        if row['exception_type'] == 1:
            feed.calendar_dates.at[index,'exception_type'] = 'Added'
        elif row['exception_type'] == 2:
            feed.calendar_dates.at[index,'exception_type'] = 'Removed'
        else:
            pass
    
    feed.calendar_dates.date = pd.to_datetime(feed.calendar_dates.date, format = '%Y%m%d')

In [19]:
# apply calendar date exceptions preprocessing

calendar_dates_preprocess(sched1)
calendar_dates_preprocess(sched2)

In [20]:
# check for null on route columns
# other columns dont have issues after preprocessing done above

sched1.routes.isna().sum()

route_id                0
line                    0
line_name               0
vehicle                 0
route_color_hex         1
route_text_color_hex    0
line_desc               0
dtype: int64

In [21]:
# update hex colour to white for the null value

sched1.routes.at[sched1.routes.index[sched1.routes.route_color_hex.isnull()].tolist()[0],'route_color_hex'] = '#FFFFFF'

In [22]:
# confirm no more nulls on route data

sched1.routes.isna().sum()

route_id                0
line                    0
line_name               0
vehicle                 0
route_color_hex         0
route_text_color_hex    0
line_desc               0
dtype: int64

In [23]:
# check the min and max dates for both of the gtfs feeds

print('\nSchedule 1: Min & Max Dates')
print(sched1.calendar.start_date.min())
print(sched1.calendar.end_date.max())

print('\nSchedule 2: Min & Max Dates')
print(sched2.calendar.start_date.min())
print(sched2.calendar.end_date.max())
print('\n')


Schedule 1: Min & Max Dates
2021-08-23 00:00:00
2021-09-19 00:00:00

Schedule 2: Min & Max Dates
2021-09-20 00:00:00
2021-10-17 00:00:00




In [24]:
# show a quick preview for each of the tables to be used

print('\nTrips')
display(sched1.trips.head(3))
print('\nRoutes')
display(sched1.routes.head(3))
print('\nStop Times')
display(sched1.stop_times.head(3))
print('\nStops')
display(sched1.stops.head(3))
print('\nCalendar')
display(sched1.calendar.head(3))
print('\nCalendar Dates Exception')
display(sched1.calendar_dates.head(3))
print('\nShapes')
display(sched1.shapes.head(3))


Trips


Unnamed: 0,route_id,service_id,trip_id,trip_headsign,direction,block_id,shape_id
0,24,235954071,112387248235954071,SIMONIS,2,8902800,087b0196
1,24,235954071,112387249235954071,SIMONIS,2,8902802,087b0196
2,24,235954071,112387253235954071,SIMONIS,2,8902801,087b0196



Routes


Unnamed: 0,route_id,line,line_name,vehicle,route_color_hex,route_text_color_hex,line_desc
0,2,1,GARE DE L'OUEST - STOCKEL,METRO,#C4008F,#FFFFFF,1 (GARE DE L'OUEST - STOCKEL)
1,4,2,SIMONIS - ELISABETH,METRO,#F57000,#FFFFFF,2 (SIMONIS - ELISABETH)
2,5,3,ESPLANADE - CHURCHILL,TRAM,#B5BA05,#000000,3 (ESPLANADE - CHURCHILL)



Stop Times


Unnamed: 0,trip_id,arrival_time,departure_time,stop_id,stop_sequence,pickup_type,drop_off_type,arrival_time_norm
0,112387248235954071,21:07:00,21:07:00,4014,1,0,0,21:07:00
1,112387248235954071,21:09:00,21:09:00,3231,2,0,0,21:09:00
2,112387248235954071,21:10:08,21:10:08,3232,3,0,0,21:10:08



Stops


Unnamed: 0,stop_id,stop_name,stop_lat,stop_lon,location_type,parent_station
0,0089,MONTGOMERY,50.838006,4.40897,0,37.0
1,0470F,SIMONIS,50.863666,4.329612,0,
2,0471,SIMONIS,50.863732,4.329236,0,53.0



Calendar


Unnamed: 0,service_id,start_date,end_date,day_category
0,234578052,2021-08-23,2021-08-23,Weekday
1,236986502,2021-08-28,2021-08-28,Saturday
2,236988602,2021-08-29,2021-08-29,Sunday



Calendar Dates Exception


Unnamed: 0,service_id,date,exception_type
0,237476501,2021-09-12,Added
1,237566502,2021-09-19,Added
2,238162502,2021-09-19,Added



Shapes


Unnamed: 0,shape_id,shape_pt_lat,shape_pt_lon,shape_pt_sequence
0,001m0042,50.841872,4.464541,10001
1,001m0042,50.84327,4.463607,10002
2,001m0042,50.843517,4.463443,10003


In [25]:
# simplify the names for the tables
# temp for now we use only sched1, as later we will combine both gtfs feeds together

routes = sched1.routes
stops = sched1.stops
calendar = sched1.calendar
shapes = sched1.shapes
calendar_dates = sched1.calendar_dates
trips = sched1.trips
stop_times = sched1.stop_times

## 3. DB Setup & Load

In [26]:
# set up connection variables
db_host = "localhost"
db_port = "5432"
db_user = "postgres"
db_pass = "password"
db_name = "postgres"

# function to connect with postgres
def connect_postgres(db_host, db_port, db_user, db_pass, db_name):
    try:
        # Connect to an existing database
        connection = psycopg2.connect(host = db_host,
                                      port = db_port,
                                      user = db_user,
                                      password = db_pass,
                                      database = db_name)
        # Set auto-commit
        connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT);
        # Create a cursor to perform database operations
        cur = connection.cursor()
        # Print PostgreSQL details
        print("PostgreSQL server information")
        print(connection.get_dsn_parameters(), "\n")
        # Executing a SQL query
        cur.execute("SELECT version();")
        # Fetch result
        record = cur.fetchone()
        print("You are connected to - ", record, "\n")

    except (Exception, Error) as error:
        print("Error while connecting to PostgreSQL", error)
    else:
        return cur

In [27]:
# connect to postgres

cur = connect_postgres(db_host, db_port, db_user, db_pass, db_name)

PostgreSQL server information
{'user': 'postgres', 'dbname': 'postgres', 'host': 'localhost', 'port': '5432', 'tty': '', 'options': '', 'sslmode': 'prefer', 'sslcompression': '0', 'gssencmode': 'prefer', 'krbsrvname': 'postgres', 'target_session_attrs': 'any'} 

You are connected to -  ('PostgreSQL 14.5, compiled by Visual C++ build 1914, 64-bit',) 



In [28]:
# drop db if exists

db_name = "stib_transport"

cur.execute(
    f"DROP DATABASE IF EXISTS {db_name} WITH (FORCE);"
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 DROP DATABASE


In [29]:
# create db

cur.execute(
    f"""

    CREATE DATABASE {db_name}
        WITH
        OWNER = postgres
        TEMPLATE = template0
        ENCODING = 'UTF8'
        CONNECTION LIMIT = -1
        IS_TEMPLATE = False;
        
    """
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 CREATE DATABASE


In [30]:
# connect to new db

cur = connect_postgres(db_host, db_port, db_user, db_pass, db_name)

PostgreSQL server information
{'user': 'postgres', 'dbname': 'stib_transport', 'host': 'localhost', 'port': '5432', 'tty': '', 'options': '', 'sslmode': 'prefer', 'sslcompression': '0', 'gssencmode': 'prefer', 'krbsrvname': 'postgres', 'target_session_attrs': 'any'} 

You are connected to -  ('PostgreSQL 14.5, compiled by Visual C++ build 1914, 64-bit',) 



In [31]:
# create tables for db
cur.execute(
    f"""

    -- create tables

    create table if not exists sched_routes
    (
        route_id varchar(10),
        line varchar(10),
        line_name varchar(50),
        vehicle varchar(10),
        route_color_hex varchar(10),
        route_text_color_hex varchar(10),
        line_desc varchar(50),
        primary key(route_id)
    );

    create table if not exists sched_stops
    (
        stop_id varchar(10),
        stop_name varchar(50),
        stop_lat double precision,
        stop_lon double precision,
        location_type integer,
        parent_station varchar(10),
        primary key(stop_id)
    );

    create table if not exists sched_calendar
    (
        service_id varchar(25),
        start_date date,
        end_date date,
        day_category varchar(15),
        primary key(service_id)
    );

    create table if not exists sched_shapes
    (
        shape_id varchar(20),
        shape_pt_lat double precision,
        shape_pt_lon double precision,
        shape_pt_sequence integer,
        primary key(shape_id, shape_pt_sequence)
    );

    create table if not exists sched_calendar_date_exceptions
    (
        service_id varchar(25),
        date date,
        exception_type varchar(15),
        primary key(service_id, date)
    );

    create table if not exists sched_trips
    (
        route_id varchar(10),
        service_id varchar(25),
        trip_id varchar(30),
        trip_headsign varchar(50),
        direction integer,
        block_id varchar(25),
        shape_id varchar(25),
        primary key(trip_id)
    );

    create table if not exists sched_stop_times
    (
        trip_id varchar(30),
        arrival_time varchar(15),
        departure_time varchar(15),
        stop_id varchar(10),
        stop_sequence integer,
        pickup_type integer,
        drop_off_type integer,
        arrival_time_norm time,
        primary key(trip_id, arrival_time, stop_id)
    );

        
    """
)
print("SQL Status Output:\n", cur.statusmessage)

SQL Status Output:
 CREATE TABLE


In [32]:
# function to insert the data from dataframe to db table for each case

def insert_df_into_db_table(df, table_name):
  
    tuples = [tuple(x) for x in df.to_numpy()]
    cols = ','.join(list(df.columns))
    query = "INSERT INTO %s(%s) VALUES %%s" % (table_name, cols)
    try:
        extras.execute_values(cur, query, tuples)
    except (Exception, psycopg2.DatabaseError) as error:
        print("Insert Error: %s" % error)
    else:
        print(f'DB table {table_name} has been populated')

In [33]:
# apply function to insert the data from dataframe to db table for each case

insert_df_into_db_table(routes, 'sched_routes')
insert_df_into_db_table(stops, 'sched_stops')
insert_df_into_db_table(calendar, 'sched_calendar')
insert_df_into_db_table(shapes, 'sched_shapes')
insert_df_into_db_table(calendar_dates, 'sched_calendar_date_exceptions')
insert_df_into_db_table(trips, 'sched_trips')
insert_df_into_db_table(stop_times, 'sched_stop_times')

DB table sched_routes has been populated
DB table sched_stops has been populated
DB table sched_calendar has been populated
DB table sched_shapes has been populated
DB table sched_calendar_date_exceptions has been populated
DB table sched_trips has been populated
DB table sched_stop_times has been populated


In [34]:
# close cursor connection to db

cur.close()

## End.