# 10 - Bronze Vehicle Positions

**Version**: 3.0.0

**Purpose**: Parse raw protobuf files from Volume and write to Bronze Delta table.

## Design Principles (Fundamentals of Data Engineering - Reis & Housley)

| Principle | Implementation |
|-----------|----------------|
| **Idempotency** | Watermark pattern - safe to replay 10x with same result |
| **Dead-Letter Queue** | Quarantine table isolates corrupted records |
| **Observability** | ingestion_log + pipeline_metadata + silent failure alerts |
| **Loose Coupling** | Ingestion and Bronze are separate notebooks |
| **Reversible Decisions** | Full/Incremental mode - can always reprocess |
| **FinOps** | Partitioning by date to optimize query costs |

**Input**: `/Volumes/workspace/stm_raw/vehicle_positions_pb/`

**Output**: `stm_bronze.vehicle_positions`

In [0]:
%pip install gtfs-realtime-bindings --quiet

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


## Cell 1: Configuration

In [0]:
import os
import uuid
import time
from datetime import datetime
from google.transit import gtfs_realtime_pb2
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, LongType, IntegerType
)
from pyspark.sql.functions import col, from_unixtime, to_date, current_timestamp, lit

# -----------------------------------------------------------------------------
# VERSION
# -----------------------------------------------------------------------------
NOTEBOOK_VERSION = "3.0.0"
NOTEBOOK_NAME = "10_Bronze_Vehicle_Positions"

# -----------------------------------------------------------------------------
# WIDGETS
# -----------------------------------------------------------------------------
dbutils.widgets.dropdown("mode", "incremental", ["incremental", "full"], "Processing mode")
dbutils.widgets.text("process_date", "", "Process date (YYYY-MM-DD, empty=all)")
dbutils.widgets.dropdown("alert_on_zero", "1", ["0", "1"], "Alert if zero records")

MODE = dbutils.widgets.get("mode")
PROCESS_DATE = dbutils.widgets.get("process_date")
ALERT_ON_ZERO = dbutils.widgets.get("alert_on_zero") == "1"

# -----------------------------------------------------------------------------
# CATALOG
# -----------------------------------------------------------------------------
CATALOG = "workspace"
spark.sql(f"USE CATALOG {CATALOG}")

# -----------------------------------------------------------------------------
# PATHS AND TABLES
# -----------------------------------------------------------------------------
VOLUME_PATH = "/Volumes/workspace/stm_raw/vehicle_positions_pb"
TARGET_TABLE = "stm_bronze.vehicle_positions"
LOG_TABLE = "stm_bronze.ingestion_log"
QUARANTINE_TABLE = "stm_bronze.quarantine"
METADATA_TABLE = "stm_bronze.pipeline_metadata"

# -----------------------------------------------------------------------------
# RUN IDENTIFIERS
# -----------------------------------------------------------------------------
BATCH_ID = str(uuid.uuid4())
SOURCE_SYSTEM = "STM_GTFS_RT"
START_TIME = time.time()

# -----------------------------------------------------------------------------
# OBSERVABILITY: Initial log
# -----------------------------------------------------------------------------
print("=" * 70)
print(f"[INFO] {NOTEBOOK_NAME} v{NOTEBOOK_VERSION}")
print("=" * 70)
print(f"[INFO] Batch ID:     {BATCH_ID}")
print(f"[INFO] Mode:         {MODE}")
print(f"[INFO] Date filter:  {PROCESS_DATE if PROCESS_DATE else 'ALL'}")
print(f"[INFO] Alert on 0:   {ALERT_ON_ZERO}")
print(f"[INFO] Source:       {VOLUME_PATH}")
print(f"[INFO] Target:       {TARGET_TABLE}")
print("=" * 70)

[INFO] 10_Bronze_Vehicle_Positions v3.0.0
[INFO] Batch ID:     f51d0886-975d-4de7-b37e-c3b80abc3de3
[INFO] Mode:         incremental
[INFO] Date filter:  ALL
[INFO] Alert on 0:   True
[INFO] Source:       /Volumes/workspace/stm_raw/vehicle_positions_pb
[INFO] Target:       stm_bronze.vehicle_positions


## Cell 2: Idempotency - Watermark Functions

**Book Reference**: "Un pipeline doit pouvoir etre rejoue 10 fois avec le meme resultat"

The watermark pattern tracks which files have been processed. This ensures:
- No duplicates on re-run
- Safe recovery after failures
- Incremental processing efficiency

In [0]:
def get_processed_files():
    """
    IDEMPOTENCY: Get set of files already processed (our watermark).
    
    This query acts as our checkpoint - we skip any file already in Bronze.
    Pattern from: Fundamentals of Data Engineering, Section 4.2
    """
    try:
        df = spark.sql(f"""
            SELECT DISTINCT ingestion_file 
            FROM {TARGET_TABLE}
        """)
        processed = {row.ingestion_file for row in df.collect()}
        print(f"[INFO] Watermark: {len(processed)} files already processed")
        return processed
    except Exception as e:
        print(f"[WARN] Could not read watermark (table may be empty): {str(e)[:50]}")
        return set()


def get_all_source_files(base_path, date_filter=None):
    """
    Scan Volume for all .pb files.
    Supports optional date filtering for targeted reprocessing.
    """
    files = []
    
    if not os.path.exists(base_path):
        print(f"[WARN] Volume path does not exist: {base_path}")
        return files
    
    for date_dir in sorted(os.listdir(base_path)):
        if not date_dir.startswith("date="):
            continue
        
        # Apply date filter if specified
        if date_filter:
            dir_date = date_dir.replace("date=", "")
            if dir_date != date_filter:
                continue
        
        date_path = f"{base_path}/{date_dir}"
        
        for hour_dir in sorted(os.listdir(date_path)):
            if not hour_dir.startswith("hour="):
                continue
            
            hour_path = f"{date_path}/{hour_dir}"
            
            for f in os.listdir(hour_path):
                if f.endswith(".pb"):
                    files.append({
                        "full_path": f"{hour_path}/{f}",
                        "file_name": f,
                        "date": date_dir.replace("date=", ""),
                        "hour": hour_dir.replace("hour=", "")
                    })
    
    return files


def get_files_to_process(base_path, date_filter, mode):
    """
    IDEMPOTENCY: Determine which files need processing.
    
    - incremental: Only new files (not in watermark)
    - full: All files (for reprocessing after bug fix)
    
    This implements the "reversible decision" principle - we can always
    switch to full mode to reprocess everything.
    """
    all_files = get_all_source_files(base_path, date_filter if date_filter else None)
    
    print(f"[INFO] Total files in Volume: {len(all_files)}")
    
    if mode == "full":
        print(f"[INFO] FULL mode: will process all {len(all_files)} files")
        return all_files
    
    # Incremental: filter out already processed files
    processed = get_processed_files()
    new_files = [f for f in all_files if f["file_name"] not in processed]
    
    skipped = len(all_files) - len(new_files)
    print(f"[INFO] INCREMENTAL mode: {skipped} skipped, {len(new_files)} new files to process")
    
    return new_files

## Cell 3: Protobuf Parsing Functions

In [0]:
def parse_protobuf_file(file_info):
    """
    Parse a single protobuf file and extract vehicle positions.
    
    Returns:
        (records, errors) - tuple of successful records and error records
    
    Error records go to the Dead-Letter Queue (quarantine table).
    """
    records = []
    errors = []
    file_path = file_info["full_path"]
    file_name = file_info["file_name"]
    
    try:
        # Read raw protobuf bytes
        with open(file_path, "rb") as f:
            raw_data = f.read()
        
        # Parse protobuf
        feed = gtfs_realtime_pb2.FeedMessage()
        feed.ParseFromString(raw_data)
        
        # Extract each vehicle entity
        for entity in feed.entity:
            try:
                if not entity.HasField("vehicle"):
                    continue
                
                v = entity.vehicle
                
                # Vehicle ID (with fallback)
                bus_id = v.vehicle.id if v.vehicle.id else entity.id
                
                # Trip information (optional fields)
                has_trip = v.HasField("trip")
                trip_id = v.trip.trip_id if has_trip else None
                route_id = v.trip.route_id if has_trip else None
                start_time = v.trip.start_time if has_trip else None
                start_date = v.trip.start_date if has_trip else None
                
                # Position (optional fields)
                has_position = v.HasField("position")
                if has_position:
                    latitude = float(v.position.latitude)
                    longitude = float(v.position.longitude)
                    speed = float(v.position.speed) if v.position.HasField("speed") else None
                    bearing = float(v.position.bearing) if v.position.HasField("bearing") else None
                else:
                    latitude = None
                    longitude = None
                    speed = None
                    bearing = None
                
                # Status fields
                current_status = int(v.current_status) if v.HasField("current_status") else None
                occupancy_status = int(v.occupancy_status) if v.HasField("occupancy_status") else None
                
                record = {
                    "bus_id": bus_id,
                    "trip_id": trip_id,
                    "route_id": route_id,
                    "start_time": start_time,
                    "start_date": start_date,
                    "latitude": latitude,
                    "longitude": longitude,
                    "speed": speed,
                    "bearing": bearing,
                    "current_status": current_status,
                    "occupancy_status": occupancy_status,
                    "event_timestamp_raw": v.timestamp,
                    "ingestion_file": file_name
                }
                records.append(record)
                
            except Exception as e:
                # DEAD-LETTER QUEUE: Record-level parse error
                errors.append({
                    "source_table": "vehicle_positions",
                    "source_file": file_name,
                    "error_type": "RECORD_PARSE_ERROR",
                    "error_message": str(e)[:500],
                    "raw_record": str(entity)[:1000],
                    "batch_id": BATCH_ID
                })
    
    except Exception as e:
        # DEAD-LETTER QUEUE: File-level read error
        errors.append({
            "source_table": "vehicle_positions",
            "source_file": file_name,
            "error_type": "FILE_READ_ERROR",
            "error_message": str(e)[:500],
            "raw_record": None,
            "batch_id": BATCH_ID
        })
    
    return records, errors

## Cell 4: Observability - Logging Functions

**Book Reference**: "L'observabilite inclut la detection des derives de donnees et de schemas"

In [0]:
def log_to_ingestion_log(file_count, record_count, error_count, status, duration, error_message=None):
    """
    OBSERVABILITY: Log processing metrics to ingestion_log table.
    
    This provides operational metadata for monitoring and debugging.
    """
    safe_error = error_message.replace("'", "''")[:500] if error_message else None
    
    spark.sql(f"""
        INSERT INTO {LOG_TABLE}
        VALUES (
            '{BATCH_ID}',
            'vehicle_positions_bronze',
            'batch_{BATCH_ID[:8]}',
            '{VOLUME_PATH}',
            {record_count},
            '{status}',
            {f"'{safe_error}'" if safe_error else "NULL"},
            current_timestamp() - INTERVAL {int(duration)} SECONDS,
            current_timestamp(),
            {duration:.2f}
        )
    """)


def log_to_pipeline_metadata(status, details):
    """
    OBSERVABILITY: Log to pipeline_metadata for lineage tracking.
    """
    try:
        current_user = spark.sql("SELECT current_user()").collect()[0][0]
    except:
        current_user = "unknown"
    
    params = f'{{"mode": "{MODE}", "process_date": "{PROCESS_DATE}", "files": "{details}"}}'
    params = params.replace("'", "''")
    
    spark.sql(f"""
        INSERT INTO {METADATA_TABLE}
        VALUES (
            '{BATCH_ID}',
            '{NOTEBOOK_NAME}',
            '{NOTEBOOK_VERSION}',
            'BRONZE',
            '{status}',
            '{current_user}',
            current_timestamp(),
            '{params}'
        )
    """)


def check_silent_failure(record_count, file_count):
    """
    OBSERVABILITY: Detect silent failures.
    
    Book Reference: "Des alertes sont-elles configurees pour les echecs silencieux (ex: volume de data a 0)?"
    
    A silent failure is when the pipeline succeeds but produces no data.
    This could indicate:
    - Source system is down
    - API returning empty responses
    - Parsing logic broken
    """
    if record_count == 0 and file_count > 0:
        alert_msg = f"SILENT FAILURE ALERT: {file_count} files processed but 0 records extracted!"
        print(f"\n{'!'*70}")
        print(f"[ALERT] {alert_msg}")
        print(f"{'!'*70}\n")
        
        # Log alert to metadata
        log_to_pipeline_metadata("ALERT_SILENT_FAILURE", alert_msg)
        
        return True
    return False

## Cell 5: Get Files to Process

In [0]:
print("\n[STEP 1/6] Identifying files to process...")
print("-" * 50)

files_to_process = get_files_to_process(VOLUME_PATH, PROCESS_DATE, MODE)

if len(files_to_process) == 0:
    print("[INFO] No new files to process")
    log_to_pipeline_metadata("SUCCESS", "No new files")
    log_to_ingestion_log(0, 0, 0, "SUCCESS", time.time() - START_TIME, "No new files to process")
    dbutils.notebook.exit("NO_NEW_FILES")

# Show sample
print(f"\n[INFO] Will process {len(files_to_process)} files")
print("\nSample files (first 5):")
for f in files_to_process[:5]:
    print(f"  {f['date']} {f['hour']}:00 - {f['file_name']}")
if len(files_to_process) > 5:
    print(f"  ... and {len(files_to_process) - 5} more")


[STEP 1/6] Identifying files to process...
--------------------------------------------------
[INFO] Total files in Volume: 291
[INFO] Watermark: 287 files already processed
[INFO] INCREMENTAL mode: 287 skipped, 4 new files to process

[INFO] Will process 4 files

Sample files (first 5):
  2026-01-30 00:00 - stm_positions_1769732363.pb
  2026-01-30 00:00 - stm_positions_1769732663.pb
  2026-01-30 00:00 - stm_positions_1769732963.pb
  2026-01-30 00:00 - stm_positions_1769733263.pb


## Cell 6: Parse All Files

In [0]:
print(f"\n[STEP 2/6] Parsing {len(files_to_process)} protobuf files...")
print("-" * 50)

all_records = []
all_errors = []
parse_start = time.time()

for i, file_info in enumerate(files_to_process):
    records, errors = parse_protobuf_file(file_info)
    all_records.extend(records)
    all_errors.extend(errors)
    
    # Progress every 50 files
    if (i + 1) % 50 == 0:
        elapsed = time.time() - parse_start
        rate = (i + 1) / elapsed
        print(f"[INFO] Parsed {i + 1}/{len(files_to_process)} files | Records: {len(all_records):,} | Rate: {rate:.1f} files/sec")

parse_duration = time.time() - parse_start

print(f"\n[INFO] Parsing complete")
print(f"  - Files parsed:  {len(files_to_process)}")
print(f"  - Records:       {len(all_records):,}")
print(f"  - Errors:        {len(all_errors)}")
print(f"  - Duration:      {parse_duration:.2f}s")


[STEP 2/6] Parsing 4 protobuf files...
--------------------------------------------------

[INFO] Parsing complete
  - Files parsed:  4
  - Records:       2,454
  - Errors:        0
  - Duration:      2.16s


## Cell 7: Silent Failure Detection

In [0]:
print(f"\n[STEP 3/6] Checking for silent failures...")
print("-" * 50)

# OBSERVABILITY: Check for silent failure (files processed but no records)
if ALERT_ON_ZERO:
    is_silent_failure = check_silent_failure(len(all_records), len(files_to_process))
    
    if is_silent_failure:
        # Still continue but mark as alert
        print("[WARN] Pipeline will continue but this should be investigated")
else:
    print("[INFO] Silent failure check disabled (alert_on_zero=0)")

if len(all_records) == 0:
    print("[WARN] No records to write - exiting")
    total_duration = time.time() - START_TIME
    log_to_ingestion_log(len(files_to_process), 0, len(all_errors), "NO_DATA", total_duration)
    log_to_pipeline_metadata("NO_DATA", f"{len(files_to_process)} files, 0 records")
    dbutils.notebook.exit("NO_RECORDS")

print(f"[INFO] {len(all_records):,} records ready for Bronze table")


[STEP 3/6] Checking for silent failures...
--------------------------------------------------
[INFO] 2,454 records ready for Bronze table


## Cell 8: Write to Bronze Table

In [0]:
print(f"\n[STEP 4/6] Writing to Bronze table...")
print("-" * 50)

# Define schema explicitly (avoid type inference issues)
schema = StructType([
    StructField("bus_id", StringType(), True),
    StructField("trip_id", StringType(), True),
    StructField("route_id", StringType(), True),
    StructField("start_time", StringType(), True),
    StructField("start_date", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("speed", DoubleType(), True),
    StructField("bearing", DoubleType(), True),
    StructField("current_status", IntegerType(), True),
    StructField("occupancy_status", IntegerType(), True),
    StructField("event_timestamp_raw", LongType(), True),
    StructField("ingestion_file", StringType(), True)
])

# Create DataFrame
df_raw = spark.createDataFrame(all_records, schema=schema)

# Add metadata columns
df_bronze = (
    df_raw
    .withColumn("event_timestamp", from_unixtime(col("event_timestamp_raw")).cast("timestamp"))
    .withColumn("ingestion_timestamp", current_timestamp())
    .withColumn("ingestion_date", to_date(from_unixtime(col("event_timestamp_raw"))))
    .withColumn("source_system", lit(SOURCE_SYSTEM))
    .withColumn("batch_id", lit(BATCH_ID))
)

# Write based on mode
write_start = time.time()

if MODE == "full":
    if PROCESS_DATE:
        # REVERSIBLE DECISION: Overwrite specific date partition
        print(f"[INFO] FULL mode: Overwriting partition ingestion_date='{PROCESS_DATE}'")
        (
            df_bronze
            .write
            .format("delta")
            .mode("overwrite")
            .option("replaceWhere", f"ingestion_date = '{PROCESS_DATE}'")
            .saveAsTable(TARGET_TABLE)
        )
    else:
        # REVERSIBLE DECISION: Full table overwrite
        print("[INFO] FULL mode: Overwriting entire table")
        (
            df_bronze
            .write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(TARGET_TABLE)
        )
else:
    # IDEMPOTENCY: Append only (watermark ensures no duplicates)
    print("[INFO] INCREMENTAL mode: Appending new records")
    (
        df_bronze
        .write
        .format("delta")
        .mode("append")
        .saveAsTable(TARGET_TABLE)
    )

write_duration = time.time() - write_start
print(f"[INFO] Write complete | {len(all_records):,} records | {write_duration:.2f}s")


[STEP 4/6] Writing to Bronze table...
--------------------------------------------------
[INFO] INCREMENTAL mode: Appending new records
[INFO] Write complete | 2,454 records | 1.64s


## Cell 9: Dead-Letter Queue - Write Errors to Quarantine

**Book Reference**: "Mecanisme d'isolation des messages corrompus pour eviter de bloquer le flux principal"

In [0]:
print(f"\n[STEP 5/6] Processing Dead-Letter Queue (Quarantine)...")
print("-" * 50)

if len(all_errors) > 0:
    print(f"[WARN] Writing {len(all_errors)} errors to quarantine table")
    
    error_schema = StructType([
        StructField("source_table", StringType(), True),
        StructField("source_file", StringType(), True),
        StructField("error_type", StringType(), True),
        StructField("error_message", StringType(), True),
        StructField("raw_record", StringType(), True),
        StructField("batch_id", StringType(), True)
    ])
    
    df_errors = spark.createDataFrame(all_errors, schema=error_schema)
    df_errors = df_errors.withColumn("created_at", current_timestamp())
    
    (
        df_errors
        .write
        .format("delta")
        .mode("append")
        .saveAsTable(QUARANTINE_TABLE)
    )
    
    # Show error summary
    print("\nError breakdown by type:")
    error_types = {}
    for e in all_errors:
        error_types[e["error_type"]] = error_types.get(e["error_type"], 0) + 1
    for err_type, count in error_types.items():
        print(f"  - {err_type}: {count}")
else:
    print("[INFO] No errors - quarantine write skipped")


[STEP 5/6] Processing Dead-Letter Queue (Quarantine)...
--------------------------------------------------
[INFO] No errors - quarantine write skipped


## Cell 10: Log and Summary

In [0]:
print(f"\n[STEP 6/6] Logging and Summary...")
print("-" * 50)

total_duration = time.time() - START_TIME

# Determine status
if len(all_errors) == 0:
    status = "SUCCESS"
elif len(all_records) > 0:
    status = "PARTIAL"  # Some records succeeded, some failed
else:
    status = "FAILED"

# Log to observability tables
log_to_ingestion_log(
    len(files_to_process),
    len(all_records),
    len(all_errors),
    status,
    total_duration
)

log_to_pipeline_metadata(
    status,
    f"Files:{len(files_to_process)} Records:{len(all_records)} Errors:{len(all_errors)}"
)

# Print summary
print("\n" + "=" * 70)
print("BRONZE PROCESSING SUMMARY")
print("=" * 70)
print(f"""
BATCH INFORMATION
  Batch ID:        {BATCH_ID}
  Mode:            {MODE}
  Date Filter:     {PROCESS_DATE if PROCESS_DATE else 'ALL'}

SOURCE (Raw Files)
  Volume:          {VOLUME_PATH}
  Files processed: {len(files_to_process)}

OUTPUT (Bronze Table)
  Table:           {TARGET_TABLE}
  Records written: {len(all_records):,}

DEAD-LETTER QUEUE (Quarantine)
  Errors:          {len(all_errors)}
  Table:           {QUARANTINE_TABLE if len(all_errors) > 0 else 'N/A (no errors)'}

OBSERVABILITY
  Status:          {status}
  Total duration:  {total_duration:.2f}s
    - Parse:       {parse_duration:.2f}s
    - Write:       {write_duration:.2f}s
  Logged to:       {LOG_TABLE}, {METADATA_TABLE}
""")
print("=" * 70)


[STEP 6/6] Logging and Summary...
--------------------------------------------------

BRONZE PROCESSING SUMMARY

BATCH INFORMATION
  Batch ID:        f51d0886-975d-4de7-b37e-c3b80abc3de3
  Mode:            incremental
  Date Filter:     ALL

SOURCE (Raw Files)
  Volume:          /Volumes/workspace/stm_raw/vehicle_positions_pb
  Files processed: 4

OUTPUT (Bronze Table)
  Table:           stm_bronze.vehicle_positions
  Records written: 2,454

DEAD-LETTER QUEUE (Quarantine)
  Errors:          0
  Table:           N/A (no errors)

OBSERVABILITY
  Status:          SUCCESS
  Total duration:  9.09s
    - Parse:       2.16s
    - Write:       1.64s
  Logged to:       stm_bronze.ingestion_log, stm_bronze.pipeline_metadata



## Cell 11: Data Verification

In [0]:
print("\n[VERIFICATION] Checking Bronze table...")
print("=" * 70)

# Total count
total_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {TARGET_TABLE}").collect()[0]["cnt"]
print(f"\nTotal rows in {TARGET_TABLE}: {total_count:,}")

# By date (last 7 days)
print("\nRows by ingestion_date (recent):")
spark.sql(f"""
    SELECT 
        ingestion_date,
        COUNT(*) as records,
        COUNT(DISTINCT bus_id) as unique_buses,
        COUNT(DISTINCT ingestion_file) as files,
        MIN(event_timestamp) as earliest,
        MAX(event_timestamp) as latest
    FROM {TARGET_TABLE}
    GROUP BY ingestion_date
    ORDER BY ingestion_date DESC
    LIMIT 7
""").display()

# Quarantine check
quarantine_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {QUARANTINE_TABLE}").collect()[0]["cnt"]
print(f"\nQuarantine table total: {quarantine_count} error records")

if quarantine_count > 0:
    print("\nRecent quarantine entries:")
    spark.sql(f"""
        SELECT 
            source_file,
            error_type,
            LEFT(error_message, 50) as error_preview,
            created_at
        FROM {QUARANTINE_TABLE}
        ORDER BY created_at DESC
        LIMIT 5
    """).display()


[VERIFICATION] Checking Bronze table...

Total rows in stm_bronze.vehicle_positions: 162,301

Rows by ingestion_date (recent):


ingestion_date,records,unique_buses,files,earliest,latest
2026-01-30,4423,697,7,2026-01-30T00:00:29.000Z,2026-01-30T00:34:08.000Z
2026-01-29,157878,1397,286,2026-01-29T00:27:26.000Z,2026-01-29T23:59:59.000Z



Quarantine table total: 0 error records


## Cell 12: Sample Data from This Batch

In [0]:
print(f"\n[SAMPLE] Records from batch {BATCH_ID[:8]}...")
spark.sql(f"""
    SELECT 
        bus_id,
        route_id,
        latitude,
        longitude,
        speed,
        event_timestamp,
        ingestion_file
    FROM {TARGET_TABLE}
    WHERE batch_id = '{BATCH_ID}'
    LIMIT 10
""").display()


[SAMPLE] Records from batch f51d0886...


bus_id,route_id,latitude,longitude,speed,event_timestamp,ingestion_file
40065,195,45.44662857055664,-73.73072814941406,7.500060081481934,2026-01-30T00:18:58.000Z,stm_positions_1769732363.pb
40064,61,45.49541091918945,-73.56298065185547,7.500060081481934,2026-01-30T00:19:06.000Z,stm_positions_1769732363.pb
40062,496,45.44105529785156,-73.65082550048828,0.0,2026-01-30T00:19:07.000Z,stm_positions_1769732363.pb
40066,35,45.45416641235352,-73.58931732177734,5.000040054321289,2026-01-30T00:19:05.000Z,stm_positions_1769732363.pb
39073,86,45.64167404174805,-73.50040435791016,3.333359956741333,2026-01-30T00:19:09.000Z,stm_positions_1769732363.pb
31114,180,45.5502815246582,-73.65648651123047,0.0,2026-01-30T00:19:03.000Z,stm_positions_1769732363.pb
31110,18,45.56889343261719,-73.57421875,7.500060081481934,2026-01-30T00:19:00.000Z,stm_positions_1769732363.pb
31117,460,45.59351348876953,-73.57719421386719,0.0,2026-01-30T00:19:00.000Z,stm_positions_1769732363.pb
39067,189,45.59778594970703,-73.52205657958984,7.500060081481934,2026-01-30T00:19:09.000Z,stm_positions_1769732363.pb
40056,95,45.56546401977539,-73.58687591552734,5.000040054321289,2026-01-30T00:19:09.000Z,stm_positions_1769732363.pb
