# Rush Hour Reality - Data Collection
## Dublin Bus Delays Analysis

This notebook collects real-time GTFS data from Dublin Bus

In [None]:
# RUSH HOUR REALITY - Data Collection Script
# Dublin Bus Real-Time Delay Analysis
# Team: ZA
# Date: November 7, 2025

# Install Required Packages
!pip install gtfs-realtime-bindings==1.0.0
!pip install polars
!pip install requests
!pip install protobuf

print("All packages installed successfully")

All packages installed successfully


In [None]:
# Import Libraries
import requests
import polars as pl
from google.transit import gtfs_realtime_pb2
from datetime import datetime, timezone
import time
import json
from pathlib import Path
from typing import Dict, List

print("All libraries imported successfully")


All libraries imported successfully


In [None]:
# Configuration
# Got the API key from: https://developer.nationaltransport.ie/

# Stored API key securely
from google.colab import userdata

try:
    NTA_API_KEY = userdata.get('NTA_API_KEY')
    print("API Key loaded from Colab Secrets")
except:
    print("API Load Fail")

# API Endpoints
GTFS_REALTIME_URL = "https://api.nationaltransport.ie/gtfsr/v2/TripUpdates"
VEHICLE_POSITIONS_URL = "https://api.nationaltransport.ie/gtfsr/v2/Vehicles"

# Static GTFS Data URL (for reference data)
STATIC_GTFS_URL = "https://www.transportforireland.ie/transitData/Data/GTFS_All.zip"

print("Configuration set")

API Key loaded from Colab Secrets
Configuration set


In [None]:
# Helper Functions for GTFS-R Data Parsing

def fetch_gtfs_realtime_data(url: str, api_key: str) -> gtfs_realtime_pb2.FeedMessage:
    """
    Fetch GTFS-Realtime data from NTA API

    Args:
        url: API endpoint URL
        api_key: Your NTA API subscription key

    Returns:
        FeedMessage: Parsed protobuf message
    """
    headers = {
        'x-api-key': api_key,
        'Cache-Control': 'no-cache'
    }

    try:
        response = requests.get(url, headers=headers, timeout=30)
        response.raise_for_status()

        # Parse protobuf
        feed = gtfs_realtime_pb2.FeedMessage()
        feed.ParseFromString(response.content)

        return feed

    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")
        return None


def parse_trip_updates_to_dict(feed: gtfs_realtime_pb2.FeedMessage) -> List[Dict]:
    """
    Parse trip updates from protobuf to list of dictionaries

    Args:
        feed: GTFS-Realtime FeedMessage

    Returns:
        List of dictionaries with trip update data
    """
    trip_updates = []
    timestamp = datetime.fromtimestamp(feed.header.timestamp, tz=timezone.utc)

    for entity in feed.entity:
        if entity.HasField('trip_update'):
            trip = entity.trip_update.trip
            vehicle = entity.trip_update.vehicle

            # Extract stop time updates
            for stop_time_update in entity.trip_update.stop_time_update:
                record = {
                    'collection_timestamp': timestamp.isoformat(),
                    'entity_id': entity.id,
                    'trip_id': trip.trip_id if trip.HasField('trip_id') else None,
                    'route_id': trip.route_id if trip.HasField('route_id') else None,
                    'direction_id': trip.direction_id if trip.HasField('direction_id') else None,
                    'start_date': trip.start_date if trip.HasField('start_date') else None,
                    'start_time': trip.start_time if trip.HasField('start_time') else None,
                    'vehicle_id': vehicle.id if vehicle.HasField('id') else None,
                    'vehicle_label': vehicle.label if vehicle.HasField('label') else None,
                    'stop_sequence': stop_time_update.stop_sequence if stop_time_update.HasField('stop_sequence') else None,
                    'stop_id': stop_time_update.stop_id if stop_time_update.HasField('stop_id') else None,
                    'arrival_delay': stop_time_update.arrival.delay if stop_time_update.HasField('arrival') and stop_time_update.arrival.HasField('delay') else None,
                    'arrival_time': stop_time_update.arrival.time if stop_time_update.HasField('arrival') and stop_time_update.arrival.HasField('time') else None,
                    'departure_delay': stop_time_update.departure.delay if stop_time_update.HasField('departure') and stop_time_update.departure.HasField('delay') else None,
                    'departure_time': stop_time_update.departure.time if stop_time_update.HasField('departure') and stop_time_update.departure.HasField('time') else None,
                }

                trip_updates.append(record)

    return trip_updates


def parse_vehicle_positions_to_dict(feed: gtfs_realtime_pb2.FeedMessage) -> List[Dict]:
    """
    Parse vehicle positions from protobuf to list of dictionaries

    Args:
        feed: GTFS-Realtime FeedMessage

    Returns:
        List of dictionaries with vehicle position data
    """
    vehicle_positions = []
    timestamp = datetime.fromtimestamp(feed.header.timestamp, tz=timezone.utc)

    for entity in feed.entity:
        if entity.HasField('vehicle'):
            vehicle = entity.vehicle
            trip = vehicle.trip
            position = vehicle.position

            record = {
                'collection_timestamp': timestamp.isoformat(),
                'entity_id': entity.id,
                'vehicle_id': vehicle.vehicle.id if vehicle.HasField('vehicle') and vehicle.vehicle.HasField('id') else None,
                'vehicle_label': vehicle.vehicle.label if vehicle.HasField('vehicle') and vehicle.vehicle.HasField('label') else None,
                'trip_id': trip.trip_id if trip.HasField('trip_id') else None,
                'route_id': trip.route_id if trip.HasField('route_id') else None,
                'direction_id': trip.direction_id if trip.HasField('direction_id') else None,
                'start_date': trip.start_date if trip.HasField('start_date') else None,
                'start_time': trip.start_time if trip.HasField('start_time') else None,
                'latitude': position.latitude if position.HasField('latitude') else None,
                'longitude': position.longitude if position.HasField('longitude') else None,
                'bearing': position.bearing if position.HasField('bearing') else None,
                'speed': position.speed if position.HasField('speed') else None,
                'current_stop_sequence': vehicle.current_stop_sequence if vehicle.HasField('current_stop_sequence') else None,
                'stop_id': vehicle.stop_id if vehicle.HasField('stop_id') else None,
                'current_status': vehicle.current_status if vehicle.HasField('current_status') else None,
                'timestamp': vehicle.timestamp if vehicle.HasField('timestamp') else None,
                'congestion_level': vehicle.congestion_level if vehicle.HasField('congestion_level') else None,
            }

            vehicle_positions.append(record)

    return vehicle_positions

print("Helper functions defined successfully")


Helper functions defined successfully


In [None]:
# Test API Connection

print("Testing NTA API connection \n")

# Test Trip Updates endpoint
print("Testing Trip Updates endpoint")
trip_feed = fetch_gtfs_realtime_data(GTFS_REALTIME_URL, NTA_API_KEY)

if trip_feed:
    print(f"Trip Updates: Received {len(trip_feed.entity)} entities")
    print(f"Feed timestamp: {datetime.fromtimestamp(trip_feed.header.timestamp, tz=timezone.utc)}")
else:
    print("Failed to fetch Trip Updates")

print()

# Test Vehicle Positions endpoint
print("Testing Vehicle Positions endpoint")
vehicle_feed = fetch_gtfs_realtime_data(VEHICLE_POSITIONS_URL, NTA_API_KEY)

if vehicle_feed:
    print(f"Vehicle Positions: Received {len(vehicle_feed.entity)} entities")
    print(f"Feed timestamp: {datetime.fromtimestamp(vehicle_feed.header.timestamp, tz=timezone.utc)}")
else:
    print("Failed to fetch Vehicle Positions")

print("\nAPI connection test complete")


Testing NTA API connection 

Testing Trip Updates endpoint
Trip Updates: Received 235 entities
Feed timestamp: 2025-12-01 00:45:35+00:00

Testing Vehicle Positions endpoint
Vehicle Positions: Received 128 entities
Feed timestamp: 2025-12-01 00:45:30+00:00

API connection test complete


In [None]:
import time
from datetime import datetime
from pathlib import Path
import polars as pl

# Collect and save data with explicit schema

def collect_and_save_data(api_key: str, output_dir: str = './data/raw'):
    """
    Collect data from NTA API and save to CSV with timestamp
    Uses explicit schema to avoid type inference errors

    Args:
        api_key: NTA API key
        output_dir: Directory to save CSV files

    Returns:
        True if successful, False otherwise
    """
    # Create output directory
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S')
    success = True

    # Collect Trip Updates
    print(f"[{timestamp_str}] Fetching trip updates")
    trip_feed = fetch_gtfs_realtime_data(GTFS_REALTIME_URL, api_key)

    if trip_feed:
        try:
            trip_data = parse_trip_updates_to_dict(trip_feed)

            if trip_data:
                # Create DataFrame with schema
                df_trips = pl.DataFrame(
                    trip_data,
                    schema={
                        'collection_timestamp': pl.Utf8,
                        'entity_id': pl.Utf8,
                        'trip_id': pl.Utf8,
                        'route_id': pl.Utf8,
                        'direction_id': pl.Int64,
                        'start_date': pl.Utf8,
                        'start_time': pl.Utf8,
                        'vehicle_id': pl.Utf8,
                        'vehicle_label': pl.Utf8,
                        'stop_sequence': pl.Int64,
                        'stop_id': pl.Utf8,
                        'arrival_delay': pl.Int64,
                        'arrival_time': pl.Int64,
                        'departure_delay': pl.Int64,
                        'departure_time': pl.Int64,
                    }
                )

                trip_file = f"{output_dir}/trip_updates_{timestamp_str}.csv"
                df_trips.write_csv(trip_file)
                print(f"Saved {len(df_trips):,} trip records to {trip_file}")
            else:
                print("No trip update data received")
                success = False

        except Exception as e:
            print(f"Error processing trip updates: {e}")
            success = False
    else:
        print("Failed to fetch trip updates")
        success = False

    # Collect Vehicle Positions
    print(f"[{timestamp_str}] Fetching vehicle positions")
    vehicle_feed = fetch_gtfs_realtime_data(VEHICLE_POSITIONS_URL, api_key)

    if vehicle_feed:
        try:
            vehicle_data = parse_vehicle_positions_to_dict(vehicle_feed)

            if vehicle_data:
                # Create DataFrame with schema
                df_vehicles = pl.DataFrame(
                    vehicle_data,
                    schema={
                        'collection_timestamp': pl.Utf8,
                        'entity_id': pl.Utf8,
                        'vehicle_id': pl.Utf8,
                        'vehicle_label': pl.Utf8,
                        'trip_id': pl.Utf8,
                        'route_id': pl.Utf8,
                        'direction_id': pl.Int64,
                        'start_date': pl.Utf8,
                        'start_time': pl.Utf8,
                        'latitude': pl.Float64,
                        'longitude': pl.Float64,
                        'bearing': pl.Float64,
                        'speed': pl.Float64,
                        'current_stop_sequence': pl.Int64,
                        'stop_id': pl.Utf8,
                        'current_status': pl.Int64,
                        'timestamp': pl.Int64,
                        'congestion_level': pl.Int64,
                    }
                )

                vehicle_file = f"{output_dir}/vehicle_positions_{timestamp_str}.csv"
                df_vehicles.write_csv(vehicle_file)
                print(f"Saved {len(df_vehicles):,} vehicle records to {vehicle_file}")
            else:
                print("No vehicle position data received")

        except Exception as e:
            print(f"Error processing vehicle positions: {e}")
            # Ignore failure if trip updates succeeded
    else:
        print("Failed to fetch vehicle positions")

    print()
    return success


print("Collect_and_save_data function loaded")

Collect_and_save_data function loaded


In [None]:
from google.colab import drive
drive.mount('/content/drive/')

%cd /content/drive/MyDrive/Rush-Hour-Reality/rush-hour-reality/

In [None]:
# Download static GTFS reference data

import zipfile
import urllib.request
import polars as pl

print("Downloading static GTFS data")

# Download the zip file
urllib.request.urlretrieve(STATIC_GTFS_URL, 'gtfs_static.zip')

# Extract
with zipfile.ZipFile('gtfs_static.zip', 'r') as zip_ref:
    zip_ref.extractall('./data/gtfs_static/')

print("Static GTFS data downloaded and extracted")

# Load key reference files with Polars
df_routes = pl.read_csv('./data/gtfs_static/routes.txt')
df_stops = pl.read_csv('./data/gtfs_static/stops.txt')
df_trips = pl.read_csv('./data/gtfs_static/trips.txt')

print(f"Routes: {len(df_routes)} records")
print(f"Stops: {len(df_stops)} records")
print(f"Trips: {len(df_trips)} records")

Downloading static GTFS data
Static GTFS data downloaded and extracted
Routes: 806 records
Stops: 14055 records
Trips: 356741 records


In [None]:
# Collection loop

def run_continuous_collection(interval_minutes: int = 30):
    """Run continuous collection every N minutes"""
    collection_count = 0

    while True:
        collection_count += 1
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        print(f"Collection #{collection_count} started at {current_time}")


        success = collect_and_save_data(NTA_API_KEY)
        if success:
            print(f"Collection #{collection_count} completed successfully")
        else:
            print(f"Collection #{collection_count} completed with warnings")


        # Calculate next collection time
        next_time = datetime.now()
        next_hour = (next_time.hour + (next_time.minute + interval_minutes) // 60) % 24
        next_minute = (next_time.minute + interval_minutes) % 60
        next_time = next_time.replace(hour=next_hour, minute=next_minute, second=0, microsecond=0)

        print(f"Next collection scheduled at {next_time.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"Sleeping for {interval_minutes} minutes")

        time.sleep(interval_minutes * 60)

# Entry point
print("Starting rush hour collection script")
print("Starting collection in 3 seconds")
time.sleep(3)
run_continuous_collection(interval_minutes=30)