#### BRONZE INGESTION: LIVE TRAINS -> UC VOLUME

In [0]:
import requests
import json
import os
import time
from datetime import datetime

In [0]:
RAW_ROOT = "/Volumes/rail_lakehouse/bronze_rail/raw_files"
LIVE_ROOT = f"{RAW_ROOT}/live_trains"

API_URL = (
    "https://rata.digitraffic.fi/api/v1/live-trains"
    "?station=HKI&departing_trains=50&include_nonstopping=false"
)


In [0]:
def save_live_batch(payload: list) -> None:
    if not payload:
        print("‚ö†Ô∏è No trains returned, skipping write.")
        return
    
    event_date = datetime.now().strftime("%Y-%m-%d")
    ts = datetime.now().strftime("%H%M%S_%f")
    
    folder = f"{LIVE_ROOT}/date={event_date}"
    os.makedirs(folder, exist_ok=True)
    
    file_path = f"{folder}/live_{ts}.json"
    with open(file_path, "w") as f:
        json.dump(payload, f)
    
    print(f"‚úÖ Saved {len(payload)} trains to {file_path}")

In [0]:
def ingest_live_trains(polls: int = 3, interval_seconds: int = 10):
    print(f"üöÄ Starting live trains ingestion: {polls} polls every {interval_seconds}s")
    for i in range(polls):
        print(f"\n--- Poll {i+1}/{polls} ---")
        try:
            resp = requests.get(API_URL, timeout=10)
            resp.raise_for_status()
            save_live_batch(resp.json())
        except Exception as e:
            print(f"‚ùå Error during poll {i+1}: {e}")
        if i < polls - 1:
            time.sleep(interval_seconds)
    print("üèÅ Live trains ingestion run complete.")

In [0]:
ingest_live_trains()