# Building the database

To create useful insights of translink data, we first need to amass the data over a longer period of time in a Database. To do this we used DuckDB because it integrates easily with markdown, and has a quick method to import json files.

In [1]:
# Required libraries for building the database
%pip install duckdb 
%pip install requests
%pip install PyYAML
%pip install gtfs-realtime-bindings
%pip install numpy
%pip install pandas



Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:

from enum import Enum
import requests, yaml, time, os
import numpy as np
import pandas as pd
from google.transit import gtfs_realtime_pb2
from datetime import datetime
import duckdb
import fsspec

General Transit Feed Specification

To access the gtfs feed, we need to grab it from translink's active  API. To ensure our API keys arent uploaded to the server, we used a .yaml file containing all our api keys.

In [4]:
Provider = 'Translink' # use to change data provider

#create full url from auth.yaml
with open('../.config/my_api_keys/auth.yaml', 'r') as file:
    auth = yaml.load(file,Loader=yaml.SafeLoader)

url = auth[Provider]['position_link'] + auth[Provider]['api_key0']

#create gtfs feed
gtfs_feed = gtfs_realtime_pb2.FeedMessage()

response = requests.get(url)

status_failure_flag = 0 # if response shows 4 failures in a row, exit

if response.status_code == 200:
    gtfs_feed.ParseFromString(response.content)

print(gtfs_feed.entity[0])

id: "14121598"
vehicle {
  trip {
    trip_id: "14121598"
    start_date: "20241129"
    schedule_relationship: SCHEDULED
    route_id: "30055"
    direction_id: 0
  }
  position {
    latitude: 49.2864151
    longitude: -123.140434
  }
  current_stop_sequence: 1
  current_status: IN_TRANSIT_TO
  timestamp: 1732925086
  stop_id: "1"
  vehicle {
    id: "19514"
    label: "19514"
  }
}



Now that we have a GTFS feed, how can we meaningfully and easily store it in an array? First, lets try storing the whole gtfs_feed object using `duckdb.read_json()` since GTFS feeds are essentially large Json files

In [10]:
#duckdb.read_json(gtfs_feed.entity)

It looks like duckdb doesn't know how to interpret the whole gtfs feed. instead, lets see if it can import a single entity.

In [11]:
#duckdb.read_json(gtfs_feed.entity[0])

Well, it was worth a shot. Instead, lets manually loop through the gtfs data and create a database with entries for the following in each trip:
| trip_id | ... |
| ----------- | --- |
| start_date | ... |
| schedule_relationship | ... |
| route_id | ... |
| direction_id | ... |
| latitude | ... |
| longitude | ... |
| current_stop_sequence | ... |
| current_status | ... |
| timestamp | ... |
| stop_id | ... |
| vehicle_id | ... |
| vehicle_label | ... |

In [None]:
test_db = duckdb.connect('test.db')

#create the Enum values for schedule_relationship and current_status
test_db.sql('''CREATE TABLE IF NOT EXISTS rt_position
            (trip_id VARCHAR,
            start_date DATE,
            schedule_relationship INTEGER,
            route_id VARCHAR,
            direction_id VARCHAR,
            latitude FLOAT,
            longitude FLOAT,
            current_stop_sequence VARCHAR,
            current_status VARCHAR,
            timestamp TIMESTAMP,
            stop_id VARCHAR,
            vehicle_id VARCHAR,
            vehicle_label VARCHAR,
            PRIMARY KEY (trip_id, vehicle_id, timestamp))
            ''')


Now that the database is created, Lets make a function to collect all elements from `gtfs_feed.entity` and insert them into our new database

In [2]:
Provider = 'Translink' # use to change data provider

#create full url from auth.yaml
with open('../.config/my_api_keys/auth.yaml', 'r') as file:
    auth = yaml.load(file,Loader=yaml.SafeLoader)

url = auth[Provider]['position_link'] + auth[Provider]['api_key0']

#create gtfs feed
gtfs_feed = gtfs_realtime_pb2.FeedMessage()

response = requests.get(url)

status_failure_flag = 0 # if response shows 4 failures in a row, exit

if response.status_code == 200:
    gtfs_feed.ParseFromString(response.content)

current_pos_df = pd.DataFrame()

for entity in gtfs_feed.entity:
    vehicle = entity.vehicle
    trip_to_append = pd.DataFrame(data = [[vehicle.trip.trip_id,
                                           pd.to_datetime(vehicle.trip.start_date, format='%Y%m%d').date(),  #convert YYYYMMDD to YYYY-MM-DD                                         
                                           vehicle.trip.schedule_relationship,
                                           vehicle.trip.route_id,
                                           vehicle.trip.direction_id,
                                           vehicle.position.latitude,
                                           vehicle.position.longitude,
                                           vehicle.current_stop_sequence,
                                           vehicle.current_status,
                                           pd.to_datetime(vehicle.timestamp, unit='s'), #convert to unix timestamp
                                           vehicle.stop_id,
                                           vehicle.vehicle.id,
                                           vehicle.vehicle.label,
                                           ]],
                                  columns=['trip_id',
                                           'start_date',
                                           'schedule_relationship',
                                           'route_id',
                                           'direction_id',
                                           'latitude',
                                           'longitude',
                                           'current_stop_sequence',
                                           'current_status',
                                           'timestamp', 
                                           'stop_id',
                                           'vehicle_id',
                                           'vehicle_label'])

    current_pos_df = pd.concat([current_pos_df,trip_to_append])
test_db.sql('INSERT INTO rt_position SELECT * FROM current_pos_df ON CONFLICT DO NOTHING')

NameError: name 'yaml' is not defined

Now that we have some data in our database, lets check if we get expected output from our data.

In [None]:
test_db.sql('SELECT * FROM rt_position').show()

┌──────────┬────────────┬───────────────────────┬──────────┬──────────────┬───────────┬─────────────┬───────────────────────┬────────────────┬─────────────────────┬─────────┬────────────┬───────────────┐
│ trip_id  │ start_date │ schedule_relationship │ route_id │ direction_id │ latitude  │  longitude  │ current_stop_sequence │ current_status │      timestamp      │ stop_id │ vehicle_id │ vehicle_label │
│ varchar  │    date    │         int32         │ varchar  │   varchar    │   float   │    float    │        varchar        │    varchar     │      timestamp      │ varchar │  varchar   │    varchar    │
├──────────┼────────────┼───────────────────────┼──────────┼──────────────┼───────────┼─────────────┼───────────────────────┼────────────────┼─────────────────────┼─────────┼────────────┼───────────────┤
│ 14121585 │ 2024-11-26 │                     0 │ 30055    │ 0            │ 49.287216 │  -123.14162 │ 1                     │ 2              │ 2024-11-26 21:59:17 │ 1       │ 19529    

In [None]:
test_db.sql('SELECT * FROM rt_position WHERE current_status != 2').show()

┌─────────┬────────────┬───────────────────────┬──────────┬──────────────┬──────────┬───────────┬───────────────────────┬────────────────┬───────────┬─────────┬────────────┬───────────────┐
│ trip_id │ start_date │ schedule_relationship │ route_id │ direction_id │ latitude │ longitude │ current_stop_sequence │ current_status │ timestamp │ stop_id │ vehicle_id │ vehicle_label │
│ varchar │    date    │         int32         │ varchar  │   varchar    │  float   │   float   │        varchar        │    varchar     │ timestamp │ varchar │  varchar   │    varchar    │
├─────────┴────────────┴───────────────────────┴──────────┴──────────────┴──────────┴───────────┴───────────────────────┴────────────────┴───────────┴─────────┴────────────┴───────────────┤
│                                                                                          0 rows                                                                                           │
└─────────────────────────────────────────────────

In [None]:
test_db.sql('SELECT * FROM rt_position WHERE schedule_relationship != 0').show()

┌─────────┬────────────┬───────────────────────┬──────────┬──────────────┬──────────┬───────────┬───────────────────────┬────────────────┬───────────┬─────────┬────────────┬───────────────┐
│ trip_id │ start_date │ schedule_relationship │ route_id │ direction_id │ latitude │ longitude │ current_stop_sequence │ current_status │ timestamp │ stop_id │ vehicle_id │ vehicle_label │
│ varchar │    date    │         int32         │ varchar  │   varchar    │  float   │   float   │        varchar        │    varchar     │ timestamp │ varchar │  varchar   │    varchar    │
├─────────┴────────────┴───────────────────────┴──────────┴──────────────┴──────────┴───────────┴───────────────────────┴────────────────┴───────────┴─────────┴────────────┴───────────────┤
│                                                                                          0 rows                                                                                           │
└─────────────────────────────────────────────────

We now have a database, but we still have two more realtime and two static tables to add. Next, lets add our Trip update data

In [None]:
url = auth[Provider]['trip_link'] + auth[Provider]['api_key0']

#create gtfs feed
gtfs_feed = gtfs_realtime_pb2.FeedMessage()

response = requests.get(url)

status_failure_flag = 0 # if response shows 4 failures in a row, exit

if response.status_code == 200:
    gtfs_feed.ParseFromString(response.content)

print(gtfs_feed.entity[28])

id: "13998760"
is_deleted: false
trip_update {
  trip {
    trip_id: "13998760"
    start_date: "20241129"
    schedule_relationship: SCHEDULED
    route_id: "6614"
    direction_id: 1
  }
  stop_time_update {
    stop_sequence: 6
    arrival {
      delay: 623
      time: 1732925231
    }
    departure {
      delay: 623
      time: 1732925231
    }
    stop_id: "497"
    schedule_relationship: SCHEDULED
  }
  stop_time_update {
    stop_sequence: 7
    arrival {
      delay: 631
      time: 1732925286
    }
    departure {
      delay: 631
      time: 1732925286
    }
    stop_id: "498"
    schedule_relationship: SCHEDULED
  }
  stop_time_update {
    stop_sequence: 8
    arrival {
      delay: 622
      time: 1732925331
    }
    departure {
      delay: 622
      time: 1732925331
    }
    stop_id: "499"
    schedule_relationship: SCHEDULED
  }
  stop_time_update {
    stop_sequence: 9
    arrival {
      delay: 633
      time: 1732925388
    }
    departure {
      delay: 633
    

Based on the inconsistent number of `stop_time_update` elements i will have to a duckdb list of structs. the data will be formatted as follows


| trip_id | ... |
| ----------- | --- |
| start_date | ... |
| schedule_relationship | ... |
| route_id | ... |
| direction_id | ... |
| stop_sequence | ... |
| arrival_delay | ... |
| arrival_time | ... |
| departure_delay | ... |
| departure_time | ... |
| stop_id | ... |
| schedule_relationship_stop | ... |
| stop_id | ... |
| schedule_relationship_stop | ... |


where stop_sequence to schedule relationship stop are the data from the most recent stop update.

This was chosen to reduce the amount of overlapping data caused by the large number of stop_time_update datapoints.





In [17]:
test_db.sql('''
            CREATE TABLE IF NOT EXISTS rt_trip (
                trip_id VARCHAR,
                start_date DATE,
                schedule_relationship INTEGER,
                route_id VARCHAR,
                direction_id VARCHAR,
                next_stop_sequence VARCHAR,
                next_stop_arrival_delay INTERVAL,
                next_stop_arrival_time TIMESTAMP,
                next_stop_departure_delay INTERVAL,
                next_stop_departure_time TIMESTAMP,
                next_stop_id VARCHAR,
                next_stop_schedule_relationship VARCHAR,
                PRIMARY KEY (trip_id, next_stop_id, next_stop_arrival_time)
            )
            ''')

In [26]:
current_trip_df = pd.DataFrame()

for entity in gtfs_feed.entity:
    trip_update = entity.trip_update
    trip_to_append = pd.DataFrame(data = [[trip_update.trip.trip_id,
                                           pd.to_datetime(trip_update.trip.start_date, format='%Y%m%d').date(),  #convert YYYYMMDD to YYYY-MM-DD                                         
                                           trip_update.trip.schedule_relationship,
                                           trip_update.trip.route_id,
                                           trip_update.trip.direction_id,
                                           trip_update.stop_time_update[0].stop_sequence,
                                           pd.to_timedelta(trip_update.stop_time_update[0].arrival.delay,unit='s'),
                                           pd.to_datetime(trip_update.stop_time_update[0].arrival.time, unit='s'),
                                           pd.to_timedelta(trip_update.stop_time_update[0].departure.delay,unit='s'),
                                           pd.to_datetime(trip_update.stop_time_update[0].departure.time, unit='s'),
                                           trip_update.stop_time_update[0].stop_id,
                                           trip_update.stop_time_update[0].schedule_relationship,
                                           ]],
                                  columns=['trip_id',
                                           'start_date',
                                           'schedule_relationship',
                                           'route_id',
                                           'direction_id',
                                           'next_stop_sequence',
                                           'next_stop_arrival_delay',
                                           'next_stop_arrival_time',
                                           'next_stop_departure_delay',
                                           'next_stop_departure_time',
                                           'next_stop_id',
                                           'next_stop_schedule_relationship'])

    current_trip_df = pd.concat([current_trip_df,trip_to_append])
#print(current_trip_df)
test_db.sql('INSERT INTO rt_trip SELECT * FROM current_trip_df ON CONFLICT DO NOTHING')
num_inserted = test_db.sql("SELECT count(*) FROM rt_trip").fetchall()[0][0]
print("rt_trip inserted, total length is " + str(num_inserted))

rt_trip inserted, total length is 1095


In [None]:
test_db.sql('SELECT * FROM rt_trip').show()

┌──────────┬────────────┬───────────────────────┬──────────┬──────────────┬────────────────────┬─────────────────────────┬────────────────────────┬───────────────────────────┬──────────────────────────┬──────────────┬─────────────────────────────────┐
│ trip_id  │ start_date │ schedule_relationship │ route_id │ direction_id │ next_stop_sequence │ next_stop_arrival_delay │ next_stop_arrival_time │ next_stop_departure_delay │ next_stop_departure_time │ next_stop_id │ next_stop_schedule_relationship │
│ varchar  │    date    │         int32         │ varchar  │   varchar    │      varchar       │        interval         │       timestamp        │         interval          │        timestamp         │   varchar    │             varchar             │
├──────────┼────────────┼───────────────────────┼──────────┼──────────────┼────────────────────┼─────────────────────────┼────────────────────────┼───────────────────────────┼──────────────────────────┼──────────────┼───────────────────────────

In [None]:
test_db.sql('SELECT * FROM rt_trip where trip_id == 13997591').show()
test_ent = gtfs_feed.entity[0]
print(test_ent)


┌──────────┬────────────┬───────────────────────┬──────────┬──────────────┬────────────────────┬─────────────────────────┬────────────────────────┬───────────────────────────┬──────────────────────────┬──────────────┬─────────────────────────────────┐
│ trip_id  │ start_date │ schedule_relationship │ route_id │ direction_id │ next_stop_sequence │ next_stop_arrival_delay │ next_stop_arrival_time │ next_stop_departure_delay │ next_stop_departure_time │ next_stop_id │ next_stop_schedule_relationship │
│ varchar  │    date    │         int32         │ varchar  │   varchar    │      varchar       │        interval         │       timestamp        │         interval          │        timestamp         │   varchar    │             varchar             │
├──────────┼────────────┼───────────────────────┼──────────┼──────────────┼────────────────────┼─────────────────────────┼────────────────────────┼───────────────────────────┼──────────────────────────┼──────────────┼───────────────────────────

Finally, lets add the service alerts data.

In [10]:
url = auth[Provider]['alerts_link'] + auth[Provider]['api_key0']

#create gtfs feed
gtfs_feed = gtfs_realtime_pb2.FeedMessage()

response = requests.get(url)

status_failure_flag = 0 # if response shows 4 failures in a row, exit

if response.status_code == 200:
    gtfs_feed.ParseFromString(response.content)

print(gtfs_feed.entity[50])

id: "549688"
alert {
  active_period {
    start: 1731933000
    end: 1732968000
  }
  informed_entity {
    agency_id: "TL"
    route_id: "6628"
    route_type: 3
    stop_id: "12183"
  }
  cause: CONSTRUCTION
  effect: NO_SERVICE
  header_text {
    translation {
      text: "Stop 60023 Southbound Arlington St @ E 49 Ave is temporarily closed from Mon Nov 18 through Fri Nov 29 due to construction."
      language: "en"
    }
  }
  description_text {
    translation {
      text: ""
      language: "en"
    }
  }
  tts_header_text {
    translation {
      text: "Stop 6 0 0 2 3, is temporarily closed from Monday November 18 through Friday November 29 due to construction."
      language: "en"
    }
  }
}



In [27]:
for entity in gtfs_feed.entity:
    if entity.alert.active_period[0].start == None:
        print(entity.alert.active_period[0].start)
    #print(entity.alert.active_period[0].end)

The service alerts data is formatted where each entry in the gtfs feed is a service alert with a list of stops and expected delays.

the data will be formatted like:

| alert_id | ... |
| ----------- | --- |
| active_period_start | ... |
| active_period_end | ... |
| route_id | ... |
| route_type | ... |
| affected_stop_ids | ... |
| cause | ... |
| effect | ... |
| description| ... |
| severity_level | ... |


In [None]:
test_db.sql('''
            CREATE TABLE IF NOT EXISTS rt_alerts (
                alert_id VARCHAR,
                active_period_start TIMESTAMP,
                active_period_end TIMESTAMP,
                affected_route_ids VARCHAR[],
                affected_route_type VARCHAR[],
                affected_stop_ids VARCHAR[],
                cause VARCHAR,
                effect VARCHAR,
                description VARCHAR,
                severity_level VARCHAR,
                PRIMARY KEY (alert_id)
            )
            ''')

In [None]:
total_route = []
total_stops = []
total_types = []
for service_alert in gtfs_feed.entity:
    #base_id = service_alert.alert.informed_entity[0].route_id
    listed_route = []
    listed_stops = []
    listed_types = []
    for informed_entity in service_alert.alert.informed_entity:
        listed_route.append(informed_entity.route_id)
        listed_stops.append(informed_entity.stop_id)
        listed_types.append(informed_entity.route_type)
        #if informed_entity.route_id != base_id:
        #    print("wrong at " + informed_entity.route_id)
        #    print(service_alert)
    total_route.append(listed_route)
    total_stops.append(listed_stops)
    total_types.append(listed_types)

index = 12
print(total_route[index])
print(total_stops[index])
print(total_types[index])

['6770']
['']
[2]


In [None]:
def check_datetime_null(input):
# returns date if input is non null
# else returns None
    if(input):
        return pd.to_datetime(input,unit = 's')
    else:
        return pd.NaT


end_time_test = gtfs_feed.entity[12].alert
print(check_datetime_null(end_time_test.active_period[0].start))
print(check_datetime_null(end_time_test.active_period[0].end))
print('\n') 
print(end_time_test)

2024-10-16 17:40:00
NaT


active_period {
  start: 1729100400
}
informed_entity {
  agency_id: "TL"
  route_id: "6770"
  route_type: 2
  direction_id: 0
}
cause: CONSTRUCTION
effect: UNKNOWN_EFFECT
header_text {
  translation {
    text: "The west Park and Ride facility at Mission City Station (WCE) is undergoing resurfacing work until further notice and parking in the area may be limited until the work is complete."
    language: "en"
  }
}
description_text {
  translation {
    text: "\nCustomers may want to build extra time in their commute to find alternate parking as needed."
    language: "en"
  }
}
severity_level: INFO



In [None]:
alert = gtfs_feed.entity[16]

listed_route_id = []
listed_stop_ids = []
listed_types = []
for informed_entity in alert.alert.informed_entity:
    listed_route_id.append(informed_entity.route_id)
    listed_stop_ids.append(informed_entity.stop_id)
    listed_types.append(informed_entity.route_type)

print(alert.id)
print(check_datetime_null(alert.alert.active_period[0].start))
print(check_datetime_null(alert.alert.active_period[0].end))
print(listed_route_id)
print(listed_stop_ids)
print(listed_types)
print(alert.alert.cause)
print(alert.alert.effect)
print(alert.alert.header_text.translation[0].text)
print(alert.alert.severity_level)


545317
2024-10-31 18:14:00
None
['30053']
['8041']
[1]
1
7
Elevator at Expo Line Granville Station @ Platform 2 from the westbound platform to the eastbound is temporarily out of service. There is no elevator access to/from the eastbound platform
3


In [None]:
current_alerts_df = pd.DataFrame()

for entity in gtfs_feed.entity:
    
    listed_route_id = []
    listed_types = []
    listed_stop_ids = []
    for informed_entity in entity.alert.informed_entity:
        listed_route_id.append(informed_entity.route_id)
        listed_types.append(informed_entity.route_type)
        listed_stop_ids.append(informed_entity.stop_id)
    trip_to_append = pd.DataFrame(data = [[entity.id,
                                           check_datetime_null(entity.alert.active_period[0].start),                                         
                                           check_datetime_null(entity.alert.active_period[0].end),
                                           listed_route_id,
                                           listed_types,
                                           listed_stop_ids,
                                           entity.alert.cause,
                                           entity.alert.effect,
                                           entity.alert.header_text.translation[0].text,
                                           entity.alert.severity_level
                                           ]],
                                  columns=['alert_id',
                                           'active_period_start',
                                           'active_period_end',
                                           'affected_route_ids',
                                           'affected_route_type',
                                           'affected_stop_ids',
                                           'cause',
                                           'effect',
                                           'description',
                                           'severity_level'])

    current_alerts_df = pd.concat([current_alerts_df,trip_to_append])
print(current_alerts_df)
test_db.sql('INSERT INTO rt_alerts SELECT * FROM current_alerts_df ON CONFLICT DO NOTHING')


   alert_id active_period_start   active_period_end  \
0    454491 2024-02-25 12:30:00                 NaT   
0    500555 2024-06-14 18:40:00                 NaT   
0    500556 2024-06-14 18:42:00                 NaT   
0    511077 2024-07-18 20:17:00                 NaT   
0    526500 2024-09-04 20:51:00                 NaT   
..      ...                 ...                 ...   
0    554006 2024-11-27 19:10:00 2024-11-27 21:23:39   
0    554007 2024-11-27 19:12:00 2024-11-27 21:23:39   
0    554008 2024-11-27 19:19:00 2024-11-27 20:54:00   
0    554009 2024-11-27 20:00:00 2024-11-27 21:59:00   
0    554010 2024-11-27 19:20:00 2024-11-27 20:59:00   

              affected_route_ids affected_route_type  \
0   [30053, 30053, 30053, 30053]        [1, 1, 1, 1]   
0                         [6779]                 [3]   
0                         [6779]                 [3]   
0                 [13686, 13686]              [1, 1]   
0                         [6647]                 [3]   
.. 

In [None]:
test_db.sql('SELECT * FROM rt_alerts').show()
#test_db.sql('DROP TABLE rt_alerts')

┌──────────┬─────────────────────┬─────────────────────┬──────────────────────────────┬─────────────────────┬──────────────────────────┬─────────┬─────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────┐
│ alert_id │ active_period_start │  active_period_end  │      affected_route_ids      │ affected_route_type │    affected_stop_ids     │  cause  │ effect  │                                                                                                       description                                                                                                        │ severity_level │
│ varchar  │      timestamp      │      timestamp      │          varchar[]           │      varchar[]      │        varchar[]         │ varchar │ varchar │                                                          

In [None]:
# TODO fix if you have time and need transfers data
test_db.sql(''' 
            CREATE TABLE IF NOT EXISTS static_transfers (
                from_stop_id VARCHAR,
                to_stop_id VARCHAR,
                transfer_type VARCHAR,
                min_transfer_time INTERVAL,
                from_trip_id VARCHAR,
                to_trip_id VARCHAR,
                PRIMARY KEY (from_stop_id,to_stop_id,from_trip_id,to_trip_id)
            )
            ''')
transfers_insert = pd.read_csv('../google_transit(1)/transfers.txt')
transfers_insert['min_transfer_time'] = pd.to_timedelta(transfers_insert['min_transfer_time'])

test_db.sql("INSERT INTO static_transfers SELECT * FROM transfers_insert")

ConstraintException: Constraint Error: NOT NULL constraint failed: static_transfers.from_trip_id

In [None]:
#read in feed information for entire gtfs static import
feed_info = pd.read_csv('../google_transit(1)/feed_info.txt',parse_dates=True)
feed_info['feed_start_date'] = pd.to_datetime(feed_info['feed_start_date'], format='%Y%m%d').date()
feed_info['feed_end_date'] = pd.to_datetime(feed_info['feed_end_date'], format='%Y%m%d').date()

test_db.sql(''' 
            CREATE TABLE IF NOT EXISTS static_stops (
                stop_id VARCHAR,
                stop_code VARCHAR,
                stop_name VARCHAR,
                stop_desc VARCHAR,
                stop_lat FLOAT,
                stop_lon FLOAT,
                zone_id VARCHAR,
                stop_url VARCHAR,
                location_type VARCHAR,
                parent_station VARCHAR,
                wheelchair_boarding VARCHAR,
                feed_start_date DATE,
                feed_end_date DATE,
                PRIMARY KEY (stop_id)
            )
            ''')
stops_insert = pd.read_csv('../google_transit(1)/stops.txt')
stops_insert['start_date'] = feed_info['feed_start_date']
stops_insert['end_date'] = feed_info['feed_end_date']

test_db.sql("INSERT INTO static_stops SELECT * FROM stops_insert")

test_db.sql('SELECT * FROM static_stops').show()
test_db.sql('DROP TABLE static_stops') #remove when testing complete

  feed_publisher_name        feed_publisher_url feed_lang  feed_start_date  \
0           TransLink  https://www.translink.ca        en         20240902   

   feed_end_date    feed_version  
0       20250105  24SEP_20241101  
┌─────────┬───────────┬───────────────────────────────────────────┬───────────┬───────────┬─────────────┬─────────┬──────────┬───────────────┬────────────────┬─────────────────────┐
│ stop_id │ stop_code │                 stop_name                 │ stop_desc │ stop_lat  │  stop_lon   │ zone_id │ stop_url │ location_type │ parent_station │ wheelchair_boarding │
│ varchar │  varchar  │                  varchar                  │  varchar  │   float   │    float    │ varchar │ varchar  │    varchar    │    varchar     │       varchar       │
├─────────┼───────────┼───────────────────────────────────────────┼───────────┼───────────┼─────────────┼─────────┼──────────┼───────────────┼────────────────┼─────────────────────┤
│ 1       │ 50001.0   │ Westbound Davie St @ 

In [None]:
# only allows one translink static file to be present in the database,
# This is intended behavior for the current version but will be expanded as a personal project

queries = {
        'agency': "CREATE TABLE agency AS SELECT * FROM read_csv(?)",

        'calendar_dates' : """CREATE TABLE calendar_dates AS SELECT * FROM 
                read_csv(?,
                        header = true,
                        dateformat = '%Y%m%d',
                        columns = {
                                'service_id': 'VARCHAR',
                                'date': 'DATE',
                                'exception_type': 'VARCHAR'
                        }
                )""",

        'calendar': """CREATE TABLE calendar AS SELECT * FROM 
                read_csv(?,
                        header = true,
                        dateformat = '%Y%m%d',
                        columns = {
                                        'service_id': 'VARCHAR',
                                        'monday': 'BOOLEAN',
                                        'tuesday': 'BOOLEAN',
                                        'wednesday': 'BOOLEAN',
                                        'thursday': 'BOOLEAN',
                                        'friday': 'BOOLEAN',
                                        'saturday': 'BOOLEAN',
                                        'sunday': 'BOOLEAN',
                                        'start_date': 'DATE',
                                        'end_date': 'DATE'
                                })""",

        'direction_names_exceptions': """CREATE TABLE direction_names_exceptions AS SELECT * FROM 
                read_csv(?,
                        dateformat = '%Y%m%d',
                        all_varchar = True)""",

        'directions': """CREATE TABLE directions AS SELECT * FROM 
                read_csv(?,
                        dateformat = '%Y%m%d',
                        all_varchar = True)""",

        'feed_info': "CREATE TABLE feed_info AS SELECT * FROM read_csv(?,dateformat = '%Y%m%d')",

        'route_names_exceptions': """CREATE TABLE route_names_exceptions AS SELECT * FROM 
                read_csv(?,
                        dateformat = '%Y%m%d',
                        all_varchar = True)""",

        'routes': """CREATE TABLE routes AS SELECT * FROM 
                read_csv(?,
                        dateformat = '%Y%m%d',
                        all_varchar = True)""",

        'shapes': """CREATE TABLE  AS SELECT * FROM read_csv(?,
                        header = true,
                        dateformat = '%Y%m%d',
                        columns = {
                                        'shape_id': 'VARCHAR',
                                        'shape_pt_lat': 'DOUBLE',
                                        'shape_pt_lon': 'DOUBLE',
                                        'shape_pt_sequence': 'INTEGER',
                                        'shape_dist_traveled': 'DOUBLE'
                                })""",

        'signup_periods': """CREATE TABLE signup_periods AS SELECT * FROM read_csv(?,
                        header = true,
                        dateformat = '%Y%m%d',
                        columns = {
                                        'sign_id': 'VARCHAR',
                                        'from_date': 'DATE',
                                        'to_date': 'DATE'
                                })""",

        'stop_order_exceptions': """CREATE TABLE stop_order_exceptions AS SELECT * FROM 
                        read_csv(?, all_varchar = True)""",

        'stop_times': """CREATE TABLE stop_times AS SELECT * FROM read_csv(?,
                    header = true,
                    dateformat = '%Y%m%d',
                    columns = {
                                'trip_id': 'VARCHAR',
                                'arrival_time': 'VARCHAR',
                                'departure_time': 'VARCHAR',
                                'stop_id': 'VARCHAR',
                                'stop_sequence': 'VARCHAR',
                                'stop_headsign': 'VARCHAR',
                                'pickup_type': 'VARCHAR',
                                'drop_off_type': 'VARCHAR',
                                'shape_dist_traveled': 'DOUBLE',
                                'timepoint': 'VARCHAR'
                            })""",
        
        'stops': """CREATE TABLE stops AS SELECT * FROM read_csv(?,
                    header = true,
                    dateformat = '%Y%m%d',
                    columns = {
                                'stop_id': 'VARCHAR',
                                'stop_code': 'VARCHAR',
                                'stop_name': 'VARCHAR',
                                'stop_desc': 'VARCHAR',
                                'stop_lat': 'DOUBLE',
                                'stop_lon': 'DOUBLE',
                                'zone_id': 'VARCHAR',
                                'stop_url': 'VARCHAR',
                                'location_type': 'DOUBLE',
                                'parent_station': 'VARCHAR',
                                'wheelchair_boarding': 'VARCHAR'
                            })""",
        'transfers':"""CREATE TABLE transfers AS SELECT 
                                                from_stop_id,
                                                to_stop_id,
                                                transfer_type,
                                                (min_transfer_time || ' seconds')::INTERVAL as min_transfer_time,
                                                from_trip_id,
                                                to_trip_id
                                       
                                        FROM read_csv(?,
                                                header = true,
                                                dateformat = '%Y%m%d',
                                                columns = {
                                                                'from_stop_id': 'VARCHAR',
                                                                'to_stop_id': 'VARCHAR',
                                                                'transfer_type': 'VARCHAR',
                                                                'min_transfer_time': 'VARCHAR',
                                                                'from_trip_id': 'VARCHAR',
                                                                'to_trip_id': 'VARCHAR'
                                                        })""",
        'trips': "CREATE TABLE trips AS SELECT * FROM read_csv(?, all_varchar = True)"
}

filepath = "../google_transit(1)"


for name in queries.keys():
        dropquery = f"DROP TABLE IF EXISTS {name}"
        test_db.execute(dropquery)

for name,query in queries.items():
        full_path = filepath + "/" + name + ".txt"
        test_db.execute(query, [full_path])

test_db.sql("SELECT * from agency")



┌───────────┬─────────────┬──────────────────────────┬───────────────────┬─────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ agency_id │ agency_name │        agency_url        │  agency_timezone  │ agency_lang │                                                        agency_fare_url                                                        │
│  varchar  │   varchar   │         varchar          │      varchar      │   varchar   │                                                            varchar                                                            │
├───────────┼─────────────┼──────────────────────────┼───────────────────┼─────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ TL        │ TransLink   │ https://www.translink.ca │ America/Vancouver │ en          │ https://www.translink.ca/transit-fares/pric

In [102]:
test = pd.read_csv('../google_transit(1)/transfers.txt')
print(test)

       from_stop_id  to_stop_id  transfer_type  min_transfer_time  \
0                 1         613              0                240   
1                 1         639              2                300   
2             10017       10017              0                240   
3             10017       10018              2                300   
4             10017        8683              0                240   
...             ...         ...            ...                ...   
10088          9952        9952              0                240   
10089          9981       10139              0                240   
10090          9987        1622              0                240   
10091          9987        1679              0                240   
10092          9989        1643              0                240   

       from_trip_id  to_trip_id  
0               NaN         NaN  
1               NaN         NaN  
2               NaN         NaN  
3               NaN         NaN  
4

In [None]:
class schedule_relationship(Enum):
    SCHEDULED = 0
    ADDED = 1
    CANCELED = 2
    UNSCHEDULED = 3
    MODIFIED = 5

class mood(Enum):
    sad = 0
    ok = 1
    happy = 2 
    anxious = 3
    upset = 5

print(mood(2))

pandas_df = pd.DataFrame({"schmood": [0,1,2,3,5]})
pandas_df.apply(mood)
#print(pandas_df)
#duckdbs = duckdb.sql("Create TABLE test (mood mood)")
#duckdbs = duckdb.sql("INSERT * FROM pandas_df")
#duckdb.sql("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy', 'anxious')")
#duckdbs.show()    

mood.happy


ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

In [None]:
url = auth[Provider]['alerts_link'] + auth[Provider]['api_key0']

#create gtfs feed
gtfs_feed = gtfs_realtime_pb2.FeedMessage()

response = requests.get(url)

status_failure_flag = 0 # if response shows 4 failures in a row, exit

if response.status_code == 200:
    gtfs_feed.ParseFromString(response.content)

print(gtfs_feed.entity[0])

id: "454491"
alert {
  active_period {
    start: 1708864200
  }
  informed_entity {
    agency_id: "TL"
    route_id: "30053"
    route_type: 1
    stop_id: "8577"
  }
  informed_entity {
    agency_id: "TL"
    route_id: "30053"
    route_type: 1
    stop_id: "8578"
  }
  informed_entity {
    agency_id: "TL"
    route_id: "30053"
    route_type: 1
    stop_id: "8579"
  }
  informed_entity {
    agency_id: "TL"
    route_id: "30053"
    route_type: 1
    stop_id: "8580"
  }
  cause: CONSTRUCTION
  effect: MODIFIED_SERVICE
  url {
    translation {
      text: "https://www.translink.ca/skytrainexpansion"
      language: "en"
    }
  }
  header_text {
    translation {
      text: "Expo Line service pattern change at Sapperton Station and Braid Station starting February 25. Please follow signs to ensure you\'re boarding from the correct platform."
      language: "en"
    }
  }
  description_text {
    translation {
      text: "Customers travelling on Expo Line to Production Way-Unive

In [None]:
print(gtfs_feed.entity[2])

id: "500556"
alert {
  active_period {
    start: 1718390520
  }
  informed_entity {
    agency_id: "TL"
    route_id: "6779"
    route_type: 3
    stop_id: "4611"
  }
  cause: CONSTRUCTION
  effect: NO_SERVICE
  header_text {
    translation {
      text: "Stop 54559 Eastbound Marine Dr @ Nelson Ave is temporarily closed beginning June 14 due to road block."
      language: "en"
    }
  }
  description_text {
    translation {
      text: "\nAffected routes:\r\n262\r\n\n250 Service will continue to use this stop. "
      language: "en"
    }
  }
  tts_header_text {
    translation {
      text: "Stop 5 4 5 5 9,  is temporarily closed beginning June 14 due to road block. 250 service will continue to use this stop."
      language: "en"
    }
  }
}



In [None]:
now = datetime.now().strftime("%m/%d, %H:%M:%S")
response.status_code = 201

if response.status_code == 200:
    gtfs_feed.ParseFromString(response.content)
    
    status_failure_flag = 0
    
else:
    if status_failure_flag < 4:
        raise Warning('server responded with code ' + str(response.status_code) + ' in response at ' + now ) #TODO add status code and change to docker warning
    else:
        raise SystemExit('4 consecutive failures in response at ' + now)

Warning: server responded with code 201 in response at 11/27, 23:28:02