# Dependencies

In [1]:
# pip install gtfs-realtime-bindings pandas requests
from google.transit import gtfs_realtime_pb2
from google.protobuf.json_format import MessageToDict
import pandas as pd
from requests import get
import time
import datetime
from datetime import datetime
import psycopg2
from sqlalchemy import create_engine

# Request API

In [3]:
def request_api_rapidkl(category, watermark):

    URL = f'https://api.data.gov.my/gtfs-realtime/vehicle-position/prasarana?category={category}'

    # Parse the GTFS Realtime feed
    feed = gtfs_realtime_pb2.FeedMessage()
    response = get(URL)
    feed.ParseFromString(response.content)

    # Extract and print vehicle position information
    vehicle_positions = [MessageToDict(entity.vehicle) for entity in feed.entity]
    df = pd.json_normalize(vehicle_positions)

    if df.empty:
        print(f'ERROR: Dataframe is empty - {watermark}')
    else:
        print(f'STATUS: Dataframe created - {watermark}')

    return df


def generate_rapidkl_data(category, requests_amt):
    dfs = []
    for _ in range(requests_amt):
        df_output = request_api_rapidkl(category, datetime.now())
        dfs.append(df_output)
        time.sleep(30)

    if all([x.empty for x in dfs]):
        print('ERROR: All dataframe(s) is empty. Failed to generate dataset')
    else:
        df_concat = pd.concat(dfs)
        return df_concat
    
df_fetch = generate_rapidkl_data('rapid-bus-kl', 5)

STATUS: Dataframe created - 2025-01-21 20:32:21.636727
STATUS: Dataframe created - 2025-01-21 20:32:52.278136
STATUS: Dataframe created - 2025-01-21 20:33:23.465952
STATUS: Dataframe created - 2025-01-21 20:33:56.269366
STATUS: Dataframe created - 2025-01-21 20:34:26.702685


In [4]:
def rename_col(df):
    return df.rename({
        'trip.tripId': 'trip_id',
        'trip.startTime': 'start_time',
        'trip.startDate': 'start_date',
        'trip.routeId': 'route_id',
        'position.latitude': 'latitude',
        'position.longitude': 'longitude',
        'position.bearing': 'bearing',
        'position.speed': 'speed',
        'vehicle.id': 'vehicle_id',
        'vehicle.licensePlate': 'license_plate'
        }, axis=1)

def convert_unixtime_to_standard(unixtime):
    return datetime.fromtimestamp(int(unixtime))

In [5]:
df_rapid = rename_col(df_fetch)
df_rapid['timestamp'] = df_fetch.apply(lambda x: convert_unixtime_to_standard(x['timestamp']), axis=1)
df_rapid.to_csv('rapid-kl-bus_2.csv', index=False)

In [8]:
df_1 = pd.read_csv('rapid-kl-bus.csv')
df_2 = pd.read_csv('rapid-kl-bus_2.csv')
df_merge = pd.concat([df_1, df_2])
df_merge.shape

(2455, 11)

# Connecting DB

In [30]:
def connect_db():
    try:
        conn = psycopg2.connect(
            host="localhost",
            database="postgres",
            user="postgres",
            port='54320',
            password="postgres")
        print('STATUS: DB connection(1) succeed')
        return conn
    except:
        print('ERROR: DB connection(1) failed')

def query_db(query:str):
    """ 
    Use for CREATE, INSERT syntax
    """
    try: 
        conn = connect_db()
        cur = conn.cursor()
        cur.execute(query)
        cur.close()
        conn.commit()
        print('STATUS: Query succeed')

    except:
        conn.rollback()
        print('ERROR: Query failed!')


def connect_db_v2():
    try:
        engine = create_engine('postgresql+psycopg2://postgres:postgres@localhost:54320/postgres')
        print('STATUS: DB connection(2) succeed')
        return engine
    except:
        print('ERROR: DB connection(2) failed')

def fetch_db(query:str):
    """
    Use for SELECT syntax
    """
    engine = connect_db_v2()
    return pd.read_sql_query(query, con=engine)

connect_db()
connect_db_v2()

STATUS: DB connection(1) succeed
STATUS: DB connection(2) succeed


Engine(postgresql+psycopg2://postgres:***@localhost:54320/postgres)

In [8]:
fetch_db('SELECT * FROM dim_drivers')

Unnamed: 0,driver_id,plate_num
0,driver_0000,WVE5137
1,driver_0001,WVL602
2,driver_0002,WUW1483
3,driver_0003,VGG9462
4,driver_0004,WVB6987
...,...,...
205,driver_0205,SF2113398
206,driver_0206,VEN1373
207,driver_0207,WVE5068
208,driver_0208,WPC3856


In [44]:
# TESTING
# query_db("""
#             CREATE TABLE ammar_test (
#             name TEXT, 
#             PRIMARY KEY (name)
#         );
#         """)
# query_db("""
#         INSERT INTO ammar_test 
#             (name) 
#             VALUES ('jack')
#         ;
#         """)
# fetch_db("SELECT * FROM ammar_test")

syntax executed
connection close
syntax executed
connection close


Unnamed: 0,name
0,jack


# Database Schema

In [59]:
# query_db('CREATE SCHEMA rapidkl;')

# Tables

## fact_daily_trip

In [15]:
engine = connect_db_v2()
df_merge.to_sql('fact_daily_trip', con=engine, schema='rapidkl', if_exists='replace')

STATUS: DB connection(2) succeed


455

In [17]:
fetch_db('SELECT * FROM rapidkl.fact_daily_trip')

STATUS: DB connection(2) succeed


Unnamed: 0,index,timestamp,trip_id,start_time,start_date,route_id,latitude,longitude,bearing,speed,vehicle_id,license_plate
0,0,2025-01-20 08:54:31,weekday_U1700_U170002_3,08:46:31,20250120,U1700,3.254817,101.693990,0.0,0.00,WVD4971,WVD4971
1,1,2025-01-20 08:54:28,weekday_P0010_P001002_3,08:47:58,20250120,P0010,3.084800,101.627830,92.0,59.26,WA3714M,WA3714M
2,2,2025-01-20 08:54:37,weekday_U6400_U640001_3,08:20:06,20250120,U6400,3.081413,101.666470,279.5,27.22,WPA5621,WPA5621
3,3,2025-01-20 08:54:49,weekday_U3030_U303002_1,08:31:28,20250120,U3030,3.159460,101.744630,278.6,5.00,WVJ8197,WVJ8197
4,4,2025-01-20 08:53:57,weekday_U6520_U652002_0,07:40:52,20250120,U6520,3.057708,101.688690,268.6,5.74,WPC8505,WPC8505
...,...,...,...,...,...,...,...,...,...,...,...,...
2450,1048,2025-01-21 20:33:34,weekday_U1730_U173001_5,19:26:05,20250121,U1730,3.257517,101.672676,349.0,51.86,WVH6234,WVH6234
2451,1049,2025-01-21 20:33:39,weekday_U1900_U190001_8,20:15:09,20250121,U1900,3.178200,101.685840,298.0,1.85,WVA4412,WVA4412
2452,1050,2025-01-21 20:33:24,weekday_S0070_S007002_5,18:40:25,20250121,S0070,3.230967,101.723830,0.0,0.00,WPY7751,WPY7751
2453,1051,2025-01-21 20:33:45,weekday_S0070_S007002_5,19:41:45,20250121,S0070,3.235367,101.704000,117.0,42.60,WB5638Q,WB5638Q


## dim_drivers

In [None]:
df_trip = fetch_db('SELECT * FROM rapidkl.fact_daily_trip')
bus_plates = df_trip['license_plate'].unique()b
driver_names = [f'driver_{str(x+1).zfill(5)}' for x in range(len(bus_plates))]
df_drivers = pd.DataFrame({'driver_id':[x+1 for x in range(len(bus_plates))], 'driver_name':driver_names})
df_drivers.to_sql('dim_drivers', con=engine, schema='rapidkl', if_exists='replace', index=False)

STATUS: DB connection(2) succeed


403

In [19]:
fetch_db("SELECT * FROM rapidkl.dim_drivers")

STATUS: DB connection(2) succeed


Unnamed: 0,driver_id,driver_name
0,1,driver_00001
1,2,driver_00002
2,3,driver_00003
3,4,driver_00004
4,5,driver_00005
...,...,...
398,399,driver_00399
399,400,driver_00400
400,401,driver_00401
401,402,driver_00402


## dim_busses

In [21]:
df_trip = fetch_db('SELECT * FROM rapidkl.fact_daily_trip')
bus_plates = sorted(df_trip['license_plate'].unique())
bus_id = [x+1 for x in range(len(bus_plates))]
df_bus = pd.DataFrame({'bus_id':bus_id, 'bus_plates': bus_plates})
df_bus.to_sql('dim_busses', con=engine, schema='rapidkl', if_exists='replace', index=False)

STATUS: DB connection(2) succeed


403

In [22]:
fetch_db("SELECT * FROM rapidkl.dim_busses")

STATUS: DB connection(2) succeed


Unnamed: 0,bus_id,bus_plates
0,1,BNG4014
1,2,CDH8296
2,3,CDH8332
3,4,PJK1473
4,5,PKY1292
...,...,...
398,399,WWC4624
399,400,WWC4681
400,401,WWC6423
401,402,WWD4612


## fact_trips

In [23]:
df_fct_trips = fetch_db(
    """
    SELECT 
        timestamp, 
        trip_id, 
        start_time, 
        driver_name, 
        bus_plates,
        route_id, 
        speed
    FROM (
        SELECT *
        FROM rapidkl.fact_daily_trip fdt
        JOIN rapidkl.dim_busses vv 
        ON fdt.vehicle_id = vv.bus_plates
        JOIN rapidkl.dim_drivers dd
        ON vv.bus_id = dd.driver_id
    ) jjj;
"""
)
df_fct_trips.to_sql('fact_trips', 
                con=engine, 
                schema='rapidkl', 
                if_exists='replace',
                index=False)

STATUS: DB connection(2) succeed


455

In [24]:
fetch_db('SELECT * FROM rapidkl.fact_trips')

STATUS: DB connection(2) succeed


Unnamed: 0,timestamp,trip_id,start_time,driver_name,bus_plates,route_id,speed
0,2025-01-21 20:33:40,weekday_U6000_U600002_23,20:01:40,driver_00001,BNG4014,U6000,15.37
1,2025-01-21 20:33:01,weekday_U6000_U600002_23,20:01:40,driver_00001,BNG4014,U6000,30.93
2,2025-01-21 20:32:47,weekday_U6000_U600002_23,20:01:40,driver_00001,BNG4014,U6000,16.67
3,2025-01-21 20:31:24,weekday_U6000_U600002_23,20:01:40,driver_00001,BNG4014,U6000,6.11
4,2025-01-21 20:31:24,weekday_U6000_U600002_23,20:01:40,driver_00001,BNG4014,U6000,6.11
...,...,...,...,...,...,...,...
2450,2025-01-20 08:56:51,weekday_T2000_T200002_0,08:30:51,driver_00403,WWF2843,T2000,46.30
2451,2025-01-20 08:56:21,weekday_T2000_T200002_0,08:30:51,driver_00403,WWF2843,T2000,53.71
2452,2025-01-20 08:55:21,weekday_T2000_T200002_0,08:30:51,driver_00403,WWF2843,T2000,35.19
2453,2025-01-20 08:54:51,weekday_T2000_T200002_0,08:30:51,driver_00403,WWF2843,T2000,48.15


## fact_driving_behavior

In [25]:
speed_status_df = fetch_db(
    """
    WITH speed_count AS (
        SELECT 
            DATE(timestamp) as date, 
            bus_plates, 
            driver_name, 
            CASE 
                WHEN speed > 60 THEN 1
                ELSE 0
            END count_breach_speed
        FROM rapidkl.fact_trips
        WHERE DATE(timestamp) = CURRENT_DATE
    )
    SELECT 
        date, 
        bus_plates, 
        driver_name, 
        CASE
            WHEN sum_breach_daily = 0 THEN 'Safe'
            WHEN sum_breach_daily = 1 THEN 'Cautious'
            WHEN sum_breach_daily = 2 THEN 'Cautious'
            WHEN sum_breach_daily > 2 THEN 'Danger'
        END behavior
    FROM (
        SELECT 
            date, 
            bus_plates, 
            driver_name, 
            SUM(count_breach_speed) sum_breach_daily
        FROM speed_count
        GROUP BY date, bus_plates, driver_name
        ) hhh
    """
)
speed_status_df.to_sql('fact_driving_behavior', 
            con=engine, schema='rapidkl', 
            if_exists='replace', 
            index=False)

STATUS: DB connection(2) succeed


230

In [27]:
df_bh = fetch_db('SELECT * FROM rapidkl.fact_driving_behavior')
df_bh[df_bh['behavior']!='Safe']

STATUS: DB connection(2) succeed


Unnamed: 0,date,bus_plates,driver_name,behavior
13,2025-01-21,VFA2791,driver_00023,Danger
19,2025-01-21,VFK4581,driver_00032,Cautious
23,2025-01-21,VGF8476,driver_00036,Cautious
24,2025-01-21,VGG9462,driver_00037,Cautious
33,2025-01-21,VGK8927,driver_00051,Cautious
46,2025-01-21,W9762Q,driver_00073,Cautious
92,2025-01-21,WB8743F,driver_00144,Cautious
109,2025-01-21,WPB3247,driver_00184,Cautious
142,2025-01-21,WUW2542,driver_00242,Cautious
152,2025-01-21,WUY4963,driver_00260,Cautious


## fact_bus_maintenance

In [45]:
import datetime
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)

In [5]:
import datetime
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
query_1 = f"""
    with geo_table AS (
    SELECT timestamp, 
            license_plate, 
            longitude, 
            latitude, 
            ROW_NUMBER () OVER (PARTITION BY license_plate ORDER BY timestamp ASC) as row_number, 
            LAG (longitude) OVER (PARTITION BY license_plate ORDER BY timestamp ASC) as prev_longitude, 
            LAG (latitude) OVER (PARTITION BY license_plate ORDER BY timestamp ASC) as prev_latitude
    FROM rapidkl.fact_daily_trip 
    ), 
    geo_today AS (
    SELECT 
            license_plate , 
            DATE(timestamp),
            SUM(distance_km) as total_distance_km
    FROM (
        SELECT timestamp, license_plate, 
            2 * 6371 * 
            ASIN(SQRT(
                POWER(SIN(RADIANS(latitude - prev_latitude) / 2), 2) +
                COS(RADIANS(prev_latitude)) * COS(RADIANS(latitude)) *
                POWER(SIN(RADIANS(longitude - prev_longitude) / 2), 2)
            )) AS distance_km
        FROM geo_table
    ) jjj
    WHERE DATE(timestamp) = DATE('{str(today)}') -- today
    GROUP BY license_plate, distance_km, timestamp
    ORDER BY distance_km DESC
    ), 
    geo_yesterday AS (
    SELECT 
            license_plate , 
            DATE(timestamp),
            SUM(distance_km) as total_distance_km
    FROM (
        SELECT timestamp, license_plate, 
            2 * 6371 * 
            ASIN(SQRT(
                POWER(SIN(RADIANS(latitude - prev_latitude) / 2), 2) +
                COS(RADIANS(prev_latitude)) * COS(RADIANS(latitude)) *
                POWER(SIN(RADIANS(longitude - prev_longitude) / 2), 2)
            )) AS distance_km
        FROM geo_table
    ) jjj
    WHERE DATE(timestamp) = DATE('{str(yesterday)}') -- yesterday
    GROUP BY license_plate, distance_km, timestamp
    ORDER BY distance_km DESC
    ), 
    total_distance_today AS (
    SELECT license_plate, SUM(total_distance_km) AS total_distance_km_today
    FROM (
        SELECT *
        FROM geo_today
        UNION ALL
        SELECT * 
        FROM geo_yesterday
    ) kkk
    GROUP BY license_plate
    )
    SELECT CURRENT_DATE as date, *, 
        CASE
            WHEN total_distance_km_today > 10000 THEN 'Need service'
            ELSE 'Good'
        END AS bus_condition
    FROM total_distance_today 
    """
df_maintenance = fetch_db(query_1)
df_maintenance

STATUS: DB connection(2) succeed


Unnamed: 0,date,license_plate,total_distance_km_today,bus_condition
0,2025-01-21,BNG4014,11.181509,Good
1,2025-01-21,CDH8296,,Good
2,2025-01-21,CDH8332,2.028472,Good
3,2025-01-21,PJK1473,3.048687,Good
4,2025-01-21,PKY1292,0.000000,Good
...,...,...,...,...
398,2025-01-21,WWC4624,2.290819,Good
399,2025-01-21,WWC4681,9.731170,Good
400,2025-01-21,WWC6423,5.284207,Good
401,2025-01-21,WWD4612,0.647726,Good


In [42]:
query_2 = f"""
        CREATE TABLE rapidkl.fact_history_warning (
        date DATE,
        license_plate TEXT, 
        total_distance_km_today REAL, 
        bus_condition TEXT    
    )
"""
query_3 = f"""
    INSERT INTO rapidkl.fact_history_warning {query_1}
"""
query_db(query_2)
query_db(query_3)

STATUS: DB connection(1) succeed
STATUS: Query succeed
STATUS: DB connection(1) succeed
STATUS: Query succeed


In [41]:
query_db('DROP TABLE rapidkl.fact_history_warning')

STATUS: DB connection(1) succeed
STATUS: Query succeed


In [44]:
fetch_db('SELECT * FROM rapidkl.fact_history_warning').info()

STATUS: DB connection(2) succeed
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 403 entries, 0 to 402
Data columns (total 4 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   date                     403 non-null    object 
 1   license_plate            403 non-null    object 
 2   total_distance_km_today  397 non-null    float64
 3   bus_condition            403 non-null    object 
dtypes: float64(1), object(3)
memory usage: 12.7+ KB


## fact_history_warning