In [1]:
import pandas as pd
import requests
import json
import os
import io
import py7zr
import zipfile
import csv
import tempfile
from datetime import datetime
from pathlib import Path
from itertools import islice
from collections import defaultdict
from dotenv import load_dotenv
import shutil
import copy
from google.transit import gtfs_realtime_pb2
from google.protobuf.message import DecodeError
import logging

load_dotenv()

True

In [2]:
KODA_KEY = os.getenv("API_KODA_KEY")
GTFS_RT_KEY = os.getenv("API_GTFS_RT_KEY")  # clé GTFS Regional Realtime

In [3]:
#host_api = "https://api.koda.trafiklab.se/KoDa/api/v2"

#base_history = "gtfs-rt"
#base_reference = "gtfs-static"

#operator = "sl"

#type_of_api = "TripUpdates"

#url_history = host_api + base_history + operator + type_of_api
#url_reference = host_api + base_reference + operator

#call_date = "2025-03-15"

#params = {
 #   "date": call_date, 
 #   "key": KODA_KEY
#}

In [4]:
def call_koda_api(base_url, date, operator = "sl", endpoint=""):
    if endpoint != "":
        api_url = f"https://api.koda.trafiklab.se/KoDa/api/v2/{base_url}/{operator}/{endpoint}"
    else:
        api_url = f"https://api.koda.trafiklab.se/KoDa/api/v2/{base_url}/{operator}"
        
    params = {
        "date": date, 
        "key": KODA_KEY
    }
    
    request = requests.get(f"{api_url}", params=params, timeout=20)
    print(api_url)
    print(request)
    print(len(request.content), "BYTES")

    return request

def call_koda_history_api(date):
    request = call_koda_api("gtfs-rt", date, endpoint="TripUpdates")
    return request

def call_koda_reference_api(date):
    request = call_koda_api("gtfs-static", date)
    return request

In [5]:
r_history = call_koda_history_api("2025-03-15")

https://api.koda.trafiklab.se/KoDa/api/v2/gtfs-rt/sl/TripUpdates
<Response [200]>
41448769 BYTES


In [None]:
r_reference = call_koda_reference_api("2025-03-15")

In [None]:
def read_koda_history_day(request, items_by_batch=400):
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s | %(levelname)s | %(message)s",
        datefmt="%H:%M:%S"
    )

    logger = logging.getLogger("gtfs")

    archive_bytes = io.BytesIO(request.content)
    
    # FICHIER TEMPORAIRES
    tmpdir = tempfile.mkdtemp(prefix="koda_")
    tmp = Path(tmpdir)

    history_entities = []
    bad_files = []

    BATCH = items_by_batch

    feed = gtfs_realtime_pb2.FeedMessage()

    # 1) liste des candidats
    archive_bytes.seek(0)
    with py7zr.SevenZipFile(archive_bytes, mode="r") as z:
        candidates = [n for n in z.getnames() if n.lower().endswith(".pb")]

    print("Nb fichiers .pb:", len(candidates))

    # 2) extraction + parse par batch
    for i in range(0, len(candidates), BATCH):
        batch = candidates[i:i+BATCH]

        logger.info(
            "Batch %d–%d / %d (%.1f%%)",
            i + 1,
            min(i + BATCH, len(candidates)),
            len(candidates),
            100 * (i + len(batch)) / len(candidates)
        )

        try:
            archive_bytes.seek(0)
            with py7zr.SevenZipFile(archive_bytes, mode="r") as z:
                z.extract(path=tmpdir, targets=batch)
        except Exception as e:
            # si un batch plante, on retombe en mode "un par un" juste pour ce batch
            for name in batch:
                try:
                    archive_bytes.seek(0)
                    with py7zr.SevenZipFile(archive_bytes, mode="r") as z:
                        z.extract(path=tmpdir, targets=[name])
                except Exception as e2:
                    bad_files.append((name, f"ExtractError: {e2!r}"))
            continue

        for name in batch:
            p = tmp / name
            try:
                raw = p.read_bytes()
                feed.Clear()
                try:
                    feed.ParseFromString(raw)
                except DecodeError as de:
                    bad_files.append((name, f"DecodeError: {de!r}"))
                    continue

                for entity in feed.entity:
                    history_entities.append(copy.deepcopy(entity))

            except Exception as e:
                bad_files.append((name, f"Read/ParseError: {e!r}"))
            finally:
                p.unlink(missing_ok=True)

    shutil.rmtree(tmpdir, ignore_errors=True)

    print("✅ Total entités enregistrées:", len(history_entities))
    print("⚠️ Fichiers ignorés:", len(bad_files))
    print("Exemples:", bad_files[:5])

    return history_entities, bad_files

In [None]:
history_entities, bad_files = read_koda_history_day(r_history, 500)

11:48:08 | INFO | Batch 1–500 / 6031 (8.3%)


Nb fichiers .pb: 6031


11:48:10 | INFO | Batch 501–1000 / 6031 (16.6%)
11:48:12 | INFO | Batch 1001–1500 / 6031 (24.9%)
11:48:14 | INFO | Batch 1501–2000 / 6031 (33.2%)
11:48:18 | INFO | Batch 2001–2500 / 6031 (41.5%)
11:48:23 | INFO | Batch 2501–3000 / 6031 (49.7%)
11:48:30 | INFO | Batch 3001–3500 / 6031 (58.0%)
11:48:38 | INFO | Batch 3501–4000 / 6031 (66.3%)
11:48:49 | INFO | Batch 4001–4500 / 6031 (74.6%)
11:48:58 | INFO | Batch 4501–5000 / 6031 (82.9%)
11:49:08 | INFO | Batch 5001–5500 / 6031 (91.2%)
11:49:18 | INFO | Batch 5501–6000 / 6031 (99.5%)
11:49:28 | INFO | Batch 6001–6031 / 6031 (100.0%)


✅ Total entités enregistrées: 3113034
⚠️ Fichiers ignorés: 0
Exemples: []


In [None]:
for e in history_entities[:20]:
    print(e)

id: "14010516897425752"
trip_update {
  trip {
    trip_id: "14010000685561305"
    start_date: "20250314"
    schedule_relationship: SCHEDULED
  }
  vehicle {
    id: "9031001003003885"
  }
  stop_time_update {
    stop_sequence: 1
    stop_id: "9022001041441002"
    arrival {
      delay: -131
      time: 1741983709
      uncertainty: 0
    }
    departure {
      delay: 9018
      time: 1741992858
      uncertainty: 0
    }
  }
  timestamp: 1741992858
}

id: "14010516479723124"
trip_update {
  trip {
    trip_id: "14010000668271329"
    start_date: "20250314"
    schedule_relationship: SCHEDULED
  }
  vehicle {
    id: "9031001004302576"
  }
  stop_time_update {
    stop_sequence: 25
    stop_id: "9022001006071004"
    arrival {
      delay: -65
      time: 1741992535
      uncertainty: 0
    }
    departure {
      delay: 9
      time: 1741992609
      uncertainty: 0
    }
  }
  stop_time_update {
    stop_sequence: 26
    stop_id: "9022001006081002"
    arrival {
      delay: -66


In [None]:
def read_koda_reference_data(request, file_name):
    archive_bytes = io.BytesIO(request.content)

    with zipfile.ZipFile(archive_bytes, "r") as z:
        with z.open(f"{file_name}.txt") as f:
            text = io.TextIOWrapper(f, encoding="utf-8")
            reader = csv.DictReader(text)
            return list(reader)

In [None]:
reference_trips = read_koda_reference_data(r_reference, "trips")
reference_trips

[{'route_id': '9011001000100000',
  'service_id': '1',
  'trip_id': '14010000664282006',
  'trip_headsign': '',
  'direction_id': '1',
  'shape_id': '1014010000482329256'},
 {'route_id': '9011001000100000',
  'service_id': '1',
  'trip_id': '14010000664304117',
  'trip_headsign': '',
  'direction_id': '1',
  'shape_id': '1014010000482329256'},
 {'route_id': '9011001000100000',
  'service_id': '1',
  'trip_id': '14010000664309301',
  'trip_headsign': '',
  'direction_id': '1',
  'shape_id': '1014010000482329256'},
 {'route_id': '9011001000100000',
  'service_id': '1',
  'trip_id': '14010000664312304',
  'trip_headsign': '',
  'direction_id': '1',
  'shape_id': '1014010000482329256'},
 {'route_id': '9011001000100000',
  'service_id': '1',
  'trip_id': '14010000664315369',
  'trip_headsign': '',
  'direction_id': '1',
  'shape_id': '1014010000482329256'},
 {'route_id': '9011001000100000',
  'service_id': '1',
  'trip_id': '14010000664316143',
  'trip_headsign': '',
  'direction_id': '1',


In [None]:
reference_routes = read_koda_reference_data(r_reference, "routes")
reference_routes

[{'route_id': '9011001000100000',
  'agency_id': '14010000000001001',
  'route_short_name': '1',
  'route_long_name': '',
  'route_type': '700',
  'route_desc': 'blåbuss'},
 {'route_id': '9011001000200000',
  'agency_id': '14010000000001001',
  'route_short_name': '2',
  'route_long_name': '',
  'route_type': '700',
  'route_desc': 'blåbuss'},
 {'route_id': '9011001000300000',
  'agency_id': '14010000000001001',
  'route_short_name': '3',
  'route_long_name': '',
  'route_type': '700',
  'route_desc': 'blåbuss'},
 {'route_id': '9011001000400000',
  'agency_id': '14010000000001001',
  'route_short_name': '4',
  'route_long_name': '',
  'route_type': '700',
  'route_desc': 'blåbuss'},
 {'route_id': '9011001000600000',
  'agency_id': '14010000000001001',
  'route_short_name': '6',
  'route_long_name': '',
  'route_type': '700',
  'route_desc': 'blåbuss'},
 {'route_id': '9011001000700000',
  'agency_id': '14010000000001001',
  'route_short_name': '7',
  'route_long_name': 'Spårväg city',
 

In [None]:
def corr_array_creation(reference_data, id_key, ref_fields:tuple):
    """
    Création d'un tableau de correspondance pour gagner du temps au merge des dict().
    `reference_data`: ,
    `id_key`: nom de la column à mettre en avant,
    `ref_fields`: liste des colonnes à prendre. ('nom_1', "nom_2",...)
    """
    ref = {}
    for r in reference_data:
        tid = r.get(id_key)
        if not tid:
            continue
        # on stocke seulement ce dont on a besoin
        ref[tid] = {k: r.get(k) for k in ref_fields}
    
    return ref

In [None]:
REF_TRIPS_FIELDS = ("route_id", "service_id", "direction_id", "shape_id", "trip_headsign")

ref_trips_corr = corr_array_creation(reference_routes, "trip_id", REF_TRIPS_FIELDS)
ref_trips_corr

{}

In [None]:
REF_ROUTES_FIELDS = ("agency_id", "route_short_name", "route_type")

ref_routes_corr = corr_array_creation(reference_trips, "route_id", REF_ROUTES_FIELDS)
ref_routes_corr

{'9011001000100000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001000200000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001000300000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001000400000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001000600000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001000700000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001001000000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001001100000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001001200000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001001300000': {'agency_id': None,
  'route_short_name': None,
  'route_type': None},
 '9011001001400000': {'agency_id': None,
  'route_short_name': None,
  'route_ty

In [None]:
def flatten_history_entity_koda(history_items):
    """Applatit le dict en un seul niveau. `history_item` égale une ligne de données, une entrée."""
    if not history_items.HasField("trip_update"):
        return

    tu = history_items.trip_update
    trip = tu.trip

    tid = trip.trip_id
    start_date = trip.start_date if trip.HasField("start_date") else None
    feed_ts = tu.timestamp if tu.HasField("timestamp") else None
    vehicle_id = tu.vehicle.id if tu.HasField("vehicle") else None

    for stu in tu.stop_time_update:
        yield {
            "entity_id": e.id,
            "trip_id": tid,
            "start_date": start_date,
            "vehicle_id": vehicle_id,
            "feed_ts": feed_ts,
            "schedule_relationship": trip.schedule_relationship, #VERIFIER QUE JE RECOIS BIEN CORRECTEMENT

            "stop_id": stu.stop_id,
            "stop_sequence": stu.stop_sequence,

            "arrival_delay": stu.arrival.delay if stu.HasField("arrival") else None,
            "arrival_time": stu.arrival.time if stu.HasField("arrival") else None,
            "arrival_uncertainty": stu.arrival.uncertainty if stu.HasField("arrival") else None,

            "departure_delay": stu.departure.delay if stu.HasField("departure") else None,
            "departure_time": stu.departure.time if stu.HasField("departure") else None,
            "departure_uncertainty": stu.departure.uncertainty if stu.HasField("departure") else None,
        }


In [None]:
def enrich_row_inplace_koda(row, lookup, id_key):
    """
    Enrichis le dictionnaire avec une autre dictionnaire basé sur un id. 
    `row`: ligne à adapté avec la référence, 
    `lookup`: dictionnaire d'index,
    `id_key`: La clef sur la quel on va merger les datas.
    """
    ref = lookup.get(row[id_key])
    if ref:
        row.update(ref)
    return row

def enrich_many_inplace_koda(row, lookups):
    """
    lookups = liste de tuples: (id_key, lookup_dict)
    ex: [("trip_id", trip_lookup), ("route_id", route_lookup)]
    """
    for id_key, lookup in lookups:
        key_val = row.get(id_key)
        if not key_val:
            continue
        ref = lookup.get(key_val)
        if ref:
            row.update(ref)
    return row


In [None]:
def iter_history_row_koda(entities, lookups):
    """
    Fais une boucle sur l'applatissemnt des données et le merge
    `entities`: les données,
    `lookup`: dictionnaire d'index,
    `id_key`: La clef sur la quel on va merger les datas.
    """
    for e in entities:
        # Applatit les données
        for row in flatten_history_entity_koda(e):
            # Enrichis/merge les données
            yield enrich_many_inplace_koda(row, lookups)

In [None]:
history_lookups = [
    ("trip_id", ref_trips_corr),    # trip_id -> route_id, shape_id, ...
    ("route_id", ref_routes_corr),  # route_id -> route_short_name, route_type, ...
]
history_data = iter_history_row_koda(history_entities, history_lookups)

In [None]:
print(next(history_data))

{'entity_id': '14010516478913279', 'trip_id': '14010000685561305', 'start_date': '20250314', 'vehicle_id': '9031001003003885', 'feed_ts': 1741992858, 'schedule_relationship': 0, 'stop_id': '9022001041441002', 'stop_sequence': 1, 'arrival_delay': -131, 'arrival_time': 1741983709, 'arrival_uncertainty': 0, 'departure_delay': 9018, 'departure_time': 1741992858, 'departure_uncertainty': 0}
