Skip to content

Commit

Permalink
Add realtime GTFS vehicle position ingestor
Browse files Browse the repository at this point in the history
closes #8
  • Loading branch information
VMois committed Apr 27, 2024
1 parent c0d01c3 commit a550845
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 0 deletions.
17 changes: 17 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ x-airflow-common:
condition: service_healthy

services:
vehicle_positions_ingestor:
build: ./gtfs_realtime_ingestor
environment:
GOOGLE_APPLICATION_CREDENTIALS: ~/creds/service_account.json
VEHICLE_LOCATION_URL: https://www.miapp.ca/GTFS_RT/Vehicle/VehiclePositions.pb
BUCKET_NAME: miwaitway
LOGLEVEL: INFO
volumes:
- ~/service_account.json:/root/creds/service_account.json
restart: on-failure
mem_limit: "150m"
logging:
driver: "json-file"
options:
max-size: "1m"
max-file: "2"

postgres:
image: postgres:13
environment:
Expand Down
9 changes: 9 additions & 0 deletions gtfs_realtime_ingestor/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM python:3.12.3-slim-bookworm

WORKDIR /app

ADD requirements.txt .
RUN pip install -r requirements.txt

ADD ingest_vehicle_positions.py .
ENTRYPOINT [ "python", "ingest_vehicle_positions.py" ]
7 changes: 7 additions & 0 deletions gtfs_realtime_ingestor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# A service to ingest vehicle positions continuously

Useful Docker command for local testing:

```bash
docker run -v /Users/vmois/Projects/miwaitway/service_account.json:/root/creds/service_account.json -e VEHICLE_LOCATION_URL="https://www.miapp.ca/GTFS_RT/Vehicle/VehiclePositions.pb" -e BUCKET_NAME=miwaitway -e LOGLEVEL=debug -e GOOGLE_APPLICATION_CREDENTIALS=/root/creds/service_account.json miwaitway_vehicle_positions_ingestor
```
128 changes: 128 additions & 0 deletions gtfs_realtime_ingestor/ingest_vehicle_positions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
Extract realtime vehicle location data from GTFS-realtime and save them to GCS as CSV files.
"""

import os
import time
import requests
import hashlib
import logging
from collections import defaultdict

from google import api_core
from google.transit import gtfs_realtime_pb2
from google.cloud import storage
import polars as pl
from typing import Optional


log_level_name = os.getenv("LOGLEVEL", "INFO").upper()
log_level = getattr(logging, log_level_name, logging.WARNING)
logging.basicConfig(level=log_level)
logger = logging.getLogger("vehicle_positions_ingestor")


CHUNS_TO_LOAD = 100

VEHICLE_LOCATION_URL = os.getenv("VEHICLE_LOCATION_URL")
logger.debug(f"VEHICLE_LOCATION_URL: {VEHICLE_LOCATION_URL}")
if VEHICLE_LOCATION_URL is None:
raise ValueError("VEHICLE_LOCATION_URL must be set")

BUCKET_NAME = os.getenv("BUCKET_NAME")
logger.debug(f"BUCKET_NAME: {BUCKET_NAME}")
if BUCKET_NAME is None:
raise ValueError("BUCKET_NAME must be set")


def get_field_value(obj, field_name: str, default=None):
try:
if obj.HasField(field_name):
return getattr(obj, field_name)
else:
return default
except ValueError:
return default


def extract_vehicle_location():
logger.info(
f"Starting extraction of vehicle locations from {VEHICLE_LOCATION_URL}."
)

previous_hash: Optional[str] = None
chunks_left = CHUNS_TO_LOAD
flattened_data = []

logger.info(f'Setting up GCS client for "{BUCKET_NAME}" bucket.')
gcs_client = storage.Client()
bucket = gcs_client.bucket(BUCKET_NAME)

logger.info("Starting to load vehicle location data (infinite loop).")
while True:
try:
response = requests.get(VEHICLE_LOCATION_URL)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(
f"Failed to load vehicle location data. Retrying in 2 seconds. Error:\n{e}"
)
time.sleep(2)
continue

current_hash = hashlib.sha256(response.content).hexdigest()
if previous_hash:
if current_hash == previous_hash:
logger.debug(
f"Vehicle location data has not changed (hash: {current_hash}). Sleeping for 2 seconds."
)
time.sleep(2)
continue

previous_hash = current_hash

logger.debug("Parsing GTFS real-time data.")
feed = gtfs_realtime_pb2.FeedMessage()
feed.ParseFromString(response.content)

for entity in feed.entity:
vehicle = entity.vehicle
trip = vehicle.trip
vehicle_data = defaultdict(lambda: None)

vehicle_data["id"] = entity.id
vehicle_data["trip_id"] = get_field_value(trip, "trip_id")
vehicle_data["route_id"] = get_field_value(trip, "route_id")
vehicle_data["direction_id"] = get_field_value(trip, "direction_id")
vehicle_data["start_date"] = get_field_value(trip, "start_date")
vehicle_data["vehicle_id"] = get_field_value(vehicle.vehicle, "id")
vehicle_data["vehicle_label"] = get_field_value(vehicle.vehicle, "label")
vehicle_data["latitude"] = get_field_value(vehicle.position, "latitude")
vehicle_data["longitude"] = get_field_value(vehicle.position, "longitude")
vehicle_data["bearing"] = get_field_value(vehicle.position, "bearing")
vehicle_data["speed"] = get_field_value(vehicle.position, "speed")
vehicle_data["timestamp"] = get_field_value(vehicle, "timestamp")
vehicle_data["occupancy_status"] = get_field_value(
vehicle, "occupancy_status", default=None
)
vehicle_data["occupancy_percentage"] = get_field_value(
vehicle, "occupancy_percentage", default=None
)
flattened_data.append(vehicle_data)

chunks_left -= 1
logger.debug(f"{chunks_left} chunks left before uploading to GCS.")

if chunks_left == 0:
chunks_left = CHUNS_TO_LOAD

df = pl.DataFrame(flattened_data).unique(keep="last")

object_path = f"realtime/vehicle_{current_hash}.csv"
logger.debug(f"Uploading chunks to GCS as {object_path}.")
blob = bucket.blob(object_path)
blob.upload_from_string(df.write_csv(include_header=True))


if __name__ == "__main__":
extract_vehicle_location()
4 changes: 4 additions & 0 deletions gtfs_realtime_ingestor/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
requests==2.31.0
gtfs-realtime-bindings==1.0.0
polars==0.20.14
google-cloud-storage==2.16.0

0 comments on commit a550845

Please sign in to comment.