# Uday Srivastava - Hrytos - AI Intern Assessment
# Singapore Public Transport Query Agent (Take-Home Assessment)

This notebook covers both assignment parts:
- **Part 1:** Agentic workflow for Singapore transport queries using live APIs.
- **Part 2:** Simulation of **10 users** with captured responses.

Focus is on engineering approach, workflow structure, and documentation quality.


## Approach and Thought Process

### Design intent
The solution is **tool-first** (API-grounded) rather than LLM-only. This improves reliability, traceability, and debuggability.

### Workflow stages
1. `parse_query` -> extract intent/entities from user text
2. `enrich_context` -> add weather, traffic, time-of-day, holiday/event context
3. `run_transport_tool` -> call relevant transport tools (bus/traffic/taxi)
4. `compose_response` -> deterministic response, optional Groq refinement


## Evaluation Criteria Mapping

1. **Agentic workflow design**: explicit node-based pipeline with tool routing.
2. **Input variety and constraints**: simulation includes weather, traffic, time-of-day, holidays, events.
3. **Solution structure**: clear sectioning from setup -> tools -> workflow -> simulation -> deployment.
4. **Clarity and documentation**: concise inline explanations and rationale.
5. **Deployment understanding**: practical production architecture and operations notes.


## README

### Setup
1. Create and activate a local virtual environment.
2. Set environment variables:
   - `LTA_ACCOUNT_KEY`
   - `GROQ_API_KEY`
3. Open the notebook and run all cells top-to-bottom.

### Expected output
- Part 1 cells show successful agent workflow execution.
- Part 2 displays a styled table with exactly 10 user simulations and responses.

In [1]:
from __future__ import annotations

import json
import os
import re
import time
from datetime import datetime, timedelta
from functools import lru_cache
from html import escape
from typing import Any, TypedDict, Literal
from zoneinfo import ZoneInfo

import requests

try:
    from IPython.display import HTML, Markdown, display
except Exception:
    HTML = None
    Markdown = None

    def display(obj):
        print(obj)

SG_TZ = ZoneInfo("Asia/Singapore")


def now_sg() -> datetime:
    return datetime.now(SG_TZ)


print("Setup complete. Time now (SG):", now_sg().isoformat(timespec="seconds"))


Setup complete. Time now (SG): 2026-02-23T03:52:42+08:00


## Configuration

Environment variables:
- `LTA_ACCOUNT_KEY` for live LTA DataMall access
- `GROQ_API_KEY` for optional response refinement
- `GROQ_MODEL` (default: `llama-3.1-8b-instant`)
- `GROQ_CALL_BUDGET` (default: `6`, set `-1` for unlimited)

If API keys or network fail, deterministic mock fallback keeps the notebook runnable.


In [2]:
LTA_ACCOUNT_KEY = os.getenv("LTA_ACCOUNT_KEY", "").strip()
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "").strip()
GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant").strip()

USE_MOCK_IF_UNAVAILABLE = True
USE_GROQ_SYNTHESIS = bool(GROQ_API_KEY)
SHOW_LLM_ERRORS = False

GROQ_CALL_BUDGET = int(os.getenv("GROQ_CALL_BUDGET", "6"))
GROQ_CALL_COUNT = 0

HTTP_TIMEOUT_SECONDS = 8

LTA_BASE = "https://datamall2.mytransport.sg/ltaodataservice"
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
NAGER_HOLIDAY_URL = "https://date.nager.at/api/v3/PublicHolidays"
GROQ_URL = "https://api.groq.com/openai/v1/chat/completions"

print("LTA key configured:", bool(LTA_ACCOUNT_KEY))
print("Groq key configured:", bool(GROQ_API_KEY))
print("Groq synthesis enabled:", USE_GROQ_SYNTHESIS)
print("Groq call budget:", GROQ_CALL_BUDGET)
print("Mock fallback enabled:", USE_MOCK_IF_UNAVAILABLE)


LTA key configured: True
Groq key configured: True
Groq synthesis enabled: True
Groq call budget: 6
Mock fallback enabled: True


## Step 1 - API Tool Layer

Engineering choices:
- retries + timeout for resilience
- lightweight caching for efficiency
- explicit source labels for traceability
- deterministic fallback for reproducibility


In [3]:
def _headers_for_lta() -> dict[str, str]:
    if not LTA_ACCOUNT_KEY:
        return {"accept": "application/json"}
    return {"AccountKey": LTA_ACCOUNT_KEY, "accept": "application/json"}


def _http_request_json(
    method: str,
    url: str,
    params: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    json_body: dict[str, Any] | None = None,
    retries: int = 2,
    timeout: int = HTTP_TIMEOUT_SECONDS,
) -> tuple[dict[str, Any] | list[Any] | None, str | None]:
    last_error: str | None = None
    for attempt in range(retries + 1):
        try:
            response = requests.request(
                method=method,
                url=url,
                params=params,
                headers=headers,
                json=json_body,
                timeout=timeout,
            )
            if response.status_code == 200:
                return response.json(), None
            last_error = f"HTTP {response.status_code}: {response.text[:180]}"
        except Exception as exc:
            last_error = f"{type(exc).__name__}: {exc}"
        if attempt < retries:
            time.sleep(0.5 * (2**attempt))
    return None, (last_error or "Unknown HTTP error")


def _http_get_json(url: str, params: dict[str, Any] | None = None, headers: dict[str, str] | None = None, retries: int = 2):
    return _http_request_json("GET", url, params=params, headers=headers, retries=retries)


def _http_post_json(url: str, headers: dict[str, str] | None = None, json_body: dict[str, Any] | None = None, retries: int = 2):
    return _http_request_json("POST", url, headers=headers, json_body=json_body, retries=retries)


def _iso_in_minutes(minutes: int) -> str:
    return (now_sg() + timedelta(minutes=minutes)).isoformat()


def _time_bucket(minutes: int = 5) -> str:
    n = now_sg()
    floored = n.replace(minute=(n.minute // minutes) * minutes, second=0, microsecond=0)
    return floored.isoformat(timespec="minutes")


def _mock_bus_arrival(stop_code: str, service_no: str | None = None, error: str | None = None) -> dict[str, Any]:
    base = [
        {"ServiceNo": "14", "NextBus": {"EstimatedArrival": _iso_in_minutes(3), "Load": "SDA", "Feature": "WAB"}, "NextBus2": {"EstimatedArrival": _iso_in_minutes(9)}},
        {"ServiceNo": "67", "NextBus": {"EstimatedArrival": _iso_in_minutes(6), "Load": "SEA", "Feature": "WAB"}, "NextBus2": {"EstimatedArrival": _iso_in_minutes(12)}},
        {"ServiceNo": "190", "NextBus": {"EstimatedArrival": _iso_in_minutes(2), "Load": "LSD", "Feature": ""}, "NextBus2": {"EstimatedArrival": _iso_in_minutes(8)}},
    ]
    if service_no:
        base = [s for s in base if s["ServiceNo"] == service_no]
    return {"source": "mock", "bus_stop_code": stop_code, "services": base, "warning": f"Using mock bus data ({error})" if error else "Using mock bus data (no LTA key)"}


def _mock_traffic(error: str | None = None) -> dict[str, Any]:
    incidents = [
        {"Type": "Accident", "Message": "Accident on PIE (towards Tuas) near Clementi Ave 6"},
        {"Type": "Road Works", "Message": "Road works on CTE (towards SLE) after Ang Mo Kio Ave 1"},
    ]
    return {"source": "mock", "incident_count": len(incidents), "incidents": incidents, "warning": f"Using mock traffic data ({error})" if error else "Using mock traffic data"}


def _mock_taxi(error: str | None = None) -> dict[str, Any]:
    return {"source": "mock", "taxi_count": 2480, "warning": f"Using mock taxi data ({error})" if error else "Using mock taxi data"}


def _mock_weather(error: str | None = None) -> dict[str, Any]:
    return {"source": "mock", "temperature_c": 30.2, "rain_mm": 0.6, "precip_probability": 70, "warning": f"Using mock weather data ({error})" if error else "Using mock weather data"}


def _mock_holidays(year: int) -> dict[str, str]:
    return {f"{year}-01-01": "New Year's Day", f"{year}-05-01": "Labour Day", f"{year}-08-09": "National Day", f"{year}-12-25": "Christmas Day"}


def _eta_minutes(iso_value: str | None) -> int | None:
    if not iso_value:
        return None
    try:
        dt = datetime.fromisoformat(iso_value.replace("Z", "+00:00")).astimezone(SG_TZ)
        return max(0, int(round((dt - now_sg()).total_seconds() / 60)))
    except Exception:
        return None


@lru_cache(maxsize=32)
def fetch_bus_arrival_cached(stop_code: str, service_no: str | None, bucket: str) -> dict[str, Any]:
    if not LTA_ACCOUNT_KEY and USE_MOCK_IF_UNAVAILABLE:
        return _mock_bus_arrival(stop_code, service_no)

    params = {"BusStopCode": stop_code, **({"ServiceNo": service_no} if service_no else {})}
    payload, error = _http_get_json(
        f"{LTA_BASE}/v3/BusArrival",
        params=params,
        headers=_headers_for_lta(),
    )

    if payload is None:
        return _mock_bus_arrival(stop_code, service_no, error) if USE_MOCK_IF_UNAVAILABLE else {"source": "error", "error": error}

    services = payload.get("Services", []) if isinstance(payload, dict) else []
    return {"source": "lta", "bus_stop_code": stop_code, "services": services, "warning": None}


def fetch_bus_arrival(stop_code: str, service_no: str | None = None) -> dict[str, Any]:
    return fetch_bus_arrival_cached(stop_code, service_no, _time_bucket(1))


@lru_cache(maxsize=16)
def fetch_traffic_incidents_cached(bucket: str) -> dict[str, Any]:
    if not LTA_ACCOUNT_KEY and USE_MOCK_IF_UNAVAILABLE:
        return _mock_traffic()
    payload, error = _http_get_json(f"{LTA_BASE}/TrafficIncidents", headers=_headers_for_lta())
    if payload is None:
        return _mock_traffic(error) if USE_MOCK_IF_UNAVAILABLE else {"source": "error", "incident_count": 0, "incidents": [], "warning": error}
    incidents = payload.get("value", []) if isinstance(payload, dict) else []
    return {"source": "lta", "incident_count": len(incidents), "incidents": incidents[:5], "warning": None}


def fetch_traffic_incidents() -> dict[str, Any]:
    return fetch_traffic_incidents_cached(_time_bucket(5))


@lru_cache(maxsize=16)
def fetch_taxi_availability_cached(bucket: str) -> dict[str, Any]:
    if not LTA_ACCOUNT_KEY and USE_MOCK_IF_UNAVAILABLE:
        return _mock_taxi()
    payload, error = _http_get_json(f"{LTA_BASE}/Taxi-Availability", headers=_headers_for_lta())
    if payload is None:
        return _mock_taxi(error) if USE_MOCK_IF_UNAVAILABLE else {"source": "error", "taxi_count": None, "warning": error}
    coords = payload.get("value", []) if isinstance(payload, dict) else []
    return {"source": "lta", "taxi_count": len(coords), "warning": None}


def fetch_taxi_availability() -> dict[str, Any]:
    return fetch_taxi_availability_cached(_time_bucket(5))


@lru_cache(maxsize=12)
def fetch_weather_now_cached(bucket: str) -> dict[str, Any]:
    payload, error = _http_get_json(
        OPEN_METEO_URL,
        params={
            "latitude": 1.3521,
            "longitude": 103.8198,
            "timezone": "Asia/Singapore",
            "current": "temperature_2m,rain,precipitation",
            "hourly": "precipitation_probability",
            "forecast_days": 1,
        },
        headers={"accept": "application/json"},
    )
    if payload is None:
        return _mock_weather(error) if USE_MOCK_IF_UNAVAILABLE else {"source": "error", "warning": error}

    current = payload.get("current", {}) if isinstance(payload, dict) else {}
    hourly = payload.get("hourly", {}) if isinstance(payload, dict) else {}
    now_hour_key = now_sg().strftime("%Y-%m-%dT%H:00")
    times = hourly.get("time", [])
    probs = hourly.get("precipitation_probability", [])
    precip_prob = probs[times.index(now_hour_key)] if now_hour_key in times and times.index(now_hour_key) < len(probs) else None

    return {"source": "open-meteo", "temperature_c": current.get("temperature_2m"), "rain_mm": current.get("rain"), "precip_probability": precip_prob, "warning": None}


def fetch_weather_now() -> dict[str, Any]:
    return fetch_weather_now_cached(_time_bucket(30))


@lru_cache(maxsize=4)
def fetch_public_holidays_cached(year: int) -> dict[str, str]:
    payload, error = _http_get_json(f"{NAGER_HOLIDAY_URL}/{year}/SG", headers={"accept": "application/json"})
    if payload is None:
        return _mock_holidays(year) if USE_MOCK_IF_UNAVAILABLE else {}
    if not isinstance(payload, list):
        return _mock_holidays(year)
    out = {}
    for item in payload:
        if isinstance(item, dict) and item.get("date") and item.get("localName"):
            out[str(item["date"])] = str(item["localName"])
    return out


def fetch_public_holidays(year: int | None = None) -> dict[str, str]:
    return fetch_public_holidays_cached(year or now_sg().year)


def _consume_groq_budget() -> tuple[bool, str | None]:
    global GROQ_CALL_COUNT
    if GROQ_CALL_BUDGET >= 0 and GROQ_CALL_COUNT >= GROQ_CALL_BUDGET:
        return False, "Groq call budget reached"
    GROQ_CALL_COUNT += 1
    return True, None


def call_groq_chat(prompt: str) -> tuple[str | None, str | None]:
    if not USE_GROQ_SYNTHESIS or not GROQ_API_KEY:
        return None, "Groq disabled or key missing"

    ok, err = _consume_groq_budget()
    if not ok:
        return None, err

    payload, error = _http_post_json(
        GROQ_URL,
        headers={"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"},
        json_body={
            "model": GROQ_MODEL,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.2,
            "max_tokens": 220,
        },
    )
    if payload is None:
        return None, error

    try:
        return payload["choices"][0]["message"]["content"].strip(), None
    except Exception as exc:
        return None, f"Groq parse error: {exc}"


print("Tool layer loaded.")


Tool layer loaded.


## Step 2 - Agent Workflow (LangGraph + Fallback)

The workflow generates a grounded deterministic answer first, then optionally refines language with Groq.
This keeps behavior stable under rate limits or provider downtime.


In [4]:
class AgentState(TypedDict, total=False):
    user_query: str
    intent: Literal["bus_arrival", "traffic", "taxi", "general"]
    entities: dict[str, Any]
    context: dict[str, Any]
    tool_result: dict[str, Any]
    response: str
    model_used: str
    steps: list[str]


BUS_STOP_RE = re.compile(r"\b(\d{5})\b")
SERVICE_RE = re.compile(r"(?:service|svc)\s*([A-Za-z0-9]+)", re.IGNORECASE)


def parse_query(state: AgentState) -> AgentState:
    q = state["user_query"].strip()
    ql = q.lower()
    stop_match = BUS_STOP_RE.search(q)
    service_match = SERVICE_RE.search(q)

    mentions_holiday = any(k in ql for k in ["holiday", "public holiday"])
    mentions_weather = any(k in ql for k in ["weather", "rain", "storm"])

    if any(k in ql for k in ["taxi", "cab"]):
        intent: Literal["bus_arrival", "traffic", "taxi", "general"] = "taxi"
    elif any(k in ql for k in ["traffic", "accident", "incident", "jam", "congestion"]):
        intent = "traffic"
    elif mentions_holiday and not stop_match and not service_match:
        intent = "general"
    elif any(k in ql for k in ["bus", "arrival", "next"]) or (stop_match is not None):
        intent = "bus_arrival"
    elif mentions_weather:
        intent = "general"
    else:
        intent = "general"

    state["intent"] = intent
    state["entities"] = {
        "bus_stop_code": stop_match.group(1) if stop_match else None,
        "service_no": service_match.group(1) if service_match else None,
        "mentions_weather": mentions_weather,
        "mentions_holiday": mentions_holiday,
        "mentions_event": any(k in ql for k in ["national day", "f1", "marathon", "event"]),
    }
    state.setdefault("steps", []).append("parse_query")
    return state


def enrich_context(state: AgentState) -> AgentState:
    now = now_sg()
    holidays = fetch_public_holidays(now.year)
    traffic = fetch_traffic_incidents()
    weather = fetch_weather_now()

    is_peak = now.weekday() < 5 and ((7 <= now.hour < 10) or (17 <= now.hour < 20))
    today_key = now.date().isoformat()
    ql = state["user_query"].lower()

    event_note = None
    if "national day" in ql:
        event_note = "National Day-related crowds may increase waiting time in city areas."
    elif "f1" in ql:
        event_note = "F1 event windows may trigger road diversions and heavy crowding."
    elif "marathon" in ql:
        event_note = "Marathon road closures can affect buses near race routes."

    state["context"] = {
        "timestamp_sg": now.isoformat(timespec="seconds"),
        "time_of_day": "peak" if is_peak else "off_peak",
        "is_public_holiday": today_key in holidays,
        "holiday_name": holidays.get(today_key),
        "weather": weather,
        "traffic_snapshot": {
            "incident_count": traffic.get("incident_count", 0),
            "top_incident": (traffic.get("incidents") or [{}])[0].get("Message"),
        },
        "event_note": event_note,
    }
    state.setdefault("steps", []).append("enrich_context")
    return state


def run_transport_tool(state: AgentState) -> AgentState:
    intent = state.get("intent", "general")
    entities = state.get("entities", {})
    if intent == "bus_arrival":
        stop_code = entities.get("bus_stop_code")
        if not stop_code:
            state["tool_result"] = {
                "source": "none",
                "error": "Bus stop code missing. Please include a 5-digit Singapore bus stop code.",
            }
        else:
            state["tool_result"] = fetch_bus_arrival(
                stop_code=stop_code,
                service_no=entities.get("service_no"),
            )
    elif intent == "traffic":
        state["tool_result"] = fetch_traffic_incidents()
    elif intent == "taxi":
        state["tool_result"] = fetch_taxi_availability()
    else:
        state["tool_result"] = {
            "source": "multi",
            "weather": fetch_weather_now(),
            "traffic": fetch_traffic_incidents(),
            "taxi": fetch_taxi_availability(),
        }

    state.setdefault("steps", []).append("run_transport_tool")
    return state


def _advisory_lines(context: dict[str, Any]) -> list[str]:
    out: list[str] = []
    weather = context.get("weather", {})
    traffic = context.get("traffic_snapshot", {})

    if (weather.get("precip_probability") or 0) >= 60:
        out.append("High rain probability: add 5-10 minute buffer.")
    if (traffic.get("incident_count") or 0) > 0:
        out.append("Live traffic incidents detected: expect mild schedule variability.")
    if context.get("time_of_day") == "peak":
        out.append("Peak-hour demand is active: expect fuller buses.")
    if context.get("is_public_holiday"):
        out.append(
            f"Today is {context.get('holiday_name') or 'Public holiday'}: commuter patterns can differ from weekdays."
        )
    if context.get("event_note"):
        out.append(context["event_note"])

    return out


def _build_deterministic_response(state: AgentState) -> str:
    intent = state.get("intent", "general")
    context = state.get("context", {})
    result = state.get("tool_result", {})

    lines: list[str] = []

    if intent == "bus_arrival":
        if result.get("error"):
            lines.append(result["error"])
        else:
            lines.append(f"Bus arrival update for stop {result.get('bus_stop_code')}:")
            lines.append(f"Data source: {result.get('source', 'unknown')}")
            services = result.get("services", [])

            if not services:
                if 0 <= now_sg().hour < 5:
                    lines.append("No live arrivals now. This may be due to late-night service gaps at this stop.")
                else:
                    lines.append("No live arrivals returned for this stop/service at the moment.")
                tx = fetch_taxi_availability().get("taxi_count")
                if tx is not None:
                    lines.append(f"Fallback option: around {tx} taxis are available in the latest snapshot.")
            else:
                load_map = {
                    "SEA": "Seats Available",
                    "SDA": "Standing Available",
                    "LSD": "Limited Standing",
                }
                for svc in services[:3]:
                    next_bus = svc.get("NextBus") or {}
                    eta1 = _eta_minutes(next_bus.get("EstimatedArrival"))
                    eta2 = _eta_minutes((svc.get("NextBus2") or {}).get("EstimatedArrival"))
                    load = load_map.get(next_bus.get("Load"), "N/A")
                    feature = "Wheelchair Accessible" if next_bus.get("Feature") == "WAB" else "Standard"
                    lines.append(
                        f"- Service {svc.get('ServiceNo', '?')}: next {eta1 if eta1 is not None else 'N/A'} min, "
                        f"following {eta2 if eta2 is not None else 'N/A'} min, load {load}, feature {feature}"
                    )

            if result.get("warning"):
                lines.append(f"Note: {result['warning']}")

    elif intent == "traffic":
        lines.append(f"Traffic snapshot: {result.get('incident_count', 0)} active incidents in feed.")
        lines.append(f"Data source: {result.get('source', 'unknown')}")
        first = (result.get("incidents") or [{}])[0].get("Message")
        if first:
            lines.append(f"Top incident: {first}")

    elif intent == "taxi":
        lines.append(f"Data source: {result.get('source', 'unknown')}")
        taxis = result.get("taxi_count")
        lines.append(f"Taxi availability snapshot: ~{taxis if taxis is not None else 'N/A'} taxis reported.")

    else:
        w = (result.get("weather") or {}).get("precip_probability")
        t = (result.get("traffic") or {}).get("incident_count")
        x = (result.get("taxi") or {}).get("taxi_count")
        lines.append("Data sources: weather/open-meteo, traffic/lta, taxi/lta")
        lines.append("General commute planning summary:")
        lines.append(f"- Rain probability: {w if w is not None else 'N/A'}%")
        lines.append(f"- Traffic incidents: {t if t is not None else 'N/A'}")
        lines.append(f"- Taxi count snapshot: {x if x is not None else 'N/A'}")

    adv = _advisory_lines(context)
    if adv:
        lines.append("Advisory:")
        lines.extend([f"- {x}" for x in adv])

    lines.append(f"Generated at (SG): {context.get('timestamp_sg', now_sg().isoformat(timespec='seconds'))}")
    return "\n".join(lines)


def _refine_with_groq_if_enabled(state: AgentState, deterministic_text: str) -> tuple[str, str]:
    if not USE_GROQ_SYNTHESIS:
        return deterministic_text, "rules-only"

    payload = {
        "query": state.get("user_query"),
        "intent": state.get("intent"),
        "entities": state.get("entities"),
        "context": state.get("context"),
        "tool_result": state.get("tool_result"),
        "deterministic_draft": deterministic_text,
    }
    prompt = (
        "You are a precise Singapore public transport assistant. Use only grounded JSON. "
        "Do not invent data. Keep answer concise (max 120 words).\n\n"
        f"Grounded JSON:\n{json.dumps(payload, ensure_ascii=True)}"
    )

    text, err = call_groq_chat(prompt)
    if text:
        return text, f"groq:{GROQ_MODEL}"
    if SHOW_LLM_ERRORS and err:
        return deterministic_text + f"\n[Groq fallback reason: {err}]", "rules-fallback"
    return deterministic_text, "rules-fallback"

def compose_response(state: AgentState) -> AgentState:
    draft = _build_deterministic_response(state)
    final_text, model_used = _refine_with_groq_if_enabled(state, draft)
    state["response"] = final_text
    state["model_used"] = model_used
    state.setdefault("steps", []).append("compose_response")
    return state


try:
    from langgraph.graph import END, StateGraph

    graph = StateGraph(AgentState)
    graph.add_node("parse_query", parse_query)
    graph.add_node("enrich_context", enrich_context)
    graph.add_node("run_transport_tool", run_transport_tool)
    graph.add_node("compose_response", compose_response)

    graph.set_entry_point("parse_query")
    graph.add_edge("parse_query", "enrich_context")
    graph.add_edge("enrich_context", "run_transport_tool")
    graph.add_edge("run_transport_tool", "compose_response")
    graph.add_edge("compose_response", END)

    compiled_agent = graph.compile()

    def run_agent(user_query: str) -> AgentState:
        return compiled_agent.invoke({"user_query": user_query, "steps": []})

    runner_mode = "langgraph"

except Exception as graph_exc:

    def run_agent(user_query: str) -> AgentState:
        s: AgentState = {"user_query": user_query, "steps": []}
        s = parse_query(s)
        s = enrich_context(s)
        s = run_transport_tool(s)
        s = compose_response(s)
        return s

    runner_mode = f"fallback ({graph_exc})"

print("Agent runner mode:", runner_mode)


Agent runner mode: langgraph


## Part 1 - Functional Demo

Single query to verify end-to-end workflow behavior.


In [5]:
sample_query = "When is the next bus at stop 01013?"
sample_result = run_agent(sample_query)

print("User query:", sample_query)
print()
print("Agent response:")
print(sample_result["response"])
print()
print("Model used:", sample_result.get("model_used"))
print("Executed steps:", sample_result.get("steps"))

User query: When is the next bus at stop 01013?

Agent response:
Bus arrival update for stop 01013:
Data source: lta
No live arrivals now. This may be due to late-night service gaps at this stop.
Fallback option: around 500 taxis are available in the latest snapshot.
Advisory:
- Live traffic incidents detected: expect mild schedule variability.

Model used: groq:llama-3.1-8b-instant
Executed steps: ['parse_query', 'enrich_context', 'run_transport_tool', 'compose_response']


## Part 2 - Multi-User Simulation (10 Requests)

Inputs intentionally vary constraints across:
- weather
- traffic
- peak/off-peak timing
- public holiday context
- special-event context


In [7]:
simulated_users = [
    {"user_id": "U01", "constraints": "time_of_day, traffic", "query": "When is the next bus at stop 01013?"},
    {"user_id": "U02", "constraints": "weather, traffic", "query": "I'm at Hotel Grand Pacific (Stop 01012). Is it raining? Should I take the bus?"},
    {"user_id": "U03", "constraints": "accessibility", "query": "Any wheelchair accessible buses at 01019?"},
    {"user_id": "U04", "constraints": "holiday, time_of_day", "query": "Is it a public holiday today? Will bus frequency be affected?"},
    {"user_id": "U05", "constraints": "weather", "query": "I hate rain. Check the weather before checking bus 12 at stop 01029."},
    {"user_id": "U06", "constraints": "weather, traffic", "query": "Hello! Plan my commute now."},
    {"user_id": "U07", "constraints": "crowding", "query": "What is the load like for buses at stop 01019?"},
    {"user_id": "U08", "constraints": "comfort, luggage", "query": "I am carrying heavy luggage. Any non-crowded buses at 01012?"},
    {"user_id": "U09", "constraints": "holiday, bus_arrival", "query": "Check if it's a holiday and tell me the bus times for 01029."},
    {"user_id": "U10", "constraints": "invalid_stop", "query": "Bus stop 99999 please."},
]

RUN_SIMULATION_WITH_GROQ = True

results: list[dict[str, str]] = []
original_use_groq = USE_GROQ_SYNTHESIS
original_budget = GROQ_CALL_BUDGET
original_count = GROQ_CALL_COUNT

if RUN_SIMULATION_WITH_GROQ and GROQ_API_KEY:
    USE_GROQ_SYNTHESIS = True
    if GROQ_CALL_BUDGET >= 0:
        GROQ_CALL_BUDGET = max(GROQ_CALL_BUDGET, len(simulated_users) + 2)
    GROQ_CALL_COUNT = 0
else:
    USE_GROQ_SYNTHESIS = False
    if RUN_SIMULATION_WITH_GROQ and not GROQ_API_KEY:
        print("Groq key missing. Running simulation in rules-only mode.")

for item in simulated_users:
    agent_out = run_agent(item["query"])
    results.append(
        {
            "user_id": item["user_id"],
            "constraints": item["constraints"],
            "query": item["query"],
            "model": str(agent_out.get("model_used", "unknown")),
            "response": str(agent_out.get("response", "")),
        }
    )

USE_GROQ_SYNTHESIS = original_use_groq
GROQ_CALL_BUDGET = original_budget
GROQ_CALL_COUNT = original_count


def _clean_response_text(raw: str) -> str:
    t = (raw or "").strip()

    # Remove fenced blocks like ```json ... ```
    if t.startswith("```"):
        t = re.sub(r"^```(?:json)?\s*", "", t, flags=re.IGNORECASE)
        t = re.sub(r"\s*```$", "", t, flags=re.MULTILINE).strip()

    # If response is JSON-like, flatten it to readable lines
    try:
        obj = json.loads(t)
        if isinstance(obj, dict):
            payload = obj.get("response", obj)

            if isinstance(payload, str):
                return payload.strip()

            if isinstance(payload, dict):
                lines: list[str] = []
                if payload.get("summary"):
                    lines.append(str(payload["summary"]))
                if payload.get("advisory"):
                    lines.append(f"Advisory: {payload['advisory']}")

                w = payload.get("weather")
                if isinstance(w, dict):
                    temp = w.get("temperature", w.get("temperature_c", "N/A"))
                    rain_prob = w.get("rain_probability", w.get("precip_probability", "N/A"))
                    lines.append(f"Weather: {temp}C | Rain prob: {rain_prob}%")

                tr = payload.get("traffic")
                if isinstance(tr, dict):
                    lines.append(f"Traffic incidents: {tr.get('incidents', tr.get('incident_count', 'N/A'))}")

                tx = payload.get("taxi")
                if isinstance(tx, dict):
                    lines.append(f"Taxi available: {tx.get('count', tx.get('taxi_count', 'N/A'))}")

                if lines:
                    return "\n".join(lines)

        if isinstance(obj, list):
            return json.dumps(obj, ensure_ascii=False)
    except Exception:
        pass

    return t


def render_results_table(rows: list[dict[str, str]]) -> None:
    if HTML is None:
        for r in rows:
            print(f"{r['user_id']} | {r['model']} | {r['constraints']} | {r['query']} | {_clean_response_text(r['response'])}")
        return

    style = (
        "<style>"
        "table.sim-results {width: 100%; border-collapse: collapse; table-layout: fixed; "
        "font-family: -apple-system, BlinkMacSystemFont, Segoe UI, sans-serif;}"
        ".sim-results th, .sim-results td {border: 1px solid #d9d9d9; padding: 8px; vertical-align: top; text-align: left;}"
        ".sim-results th {background: #f3f6fa; font-weight: 600;}"
        ".sim-results tr:nth-child(even) {background: #fbfcfe;}"
        ".sim-results .col-id {width: 7%;}"
        ".sim-results .col-model {width: 16%;}"
        ".sim-results .col-cons {width: 13%;}"
        ".sim-results .col-query {width: 24%;}"
        ".sim-results .col-resp {width: 40%; line-height: 1.35;}"
        ".sim-results .response {white-space: pre-wrap; overflow-wrap: anywhere; word-break: break-word;}"
        "</style>"
    )

    header = (
        "<tr>"
        "<th class='col-id'>User</th>"
        "<th class='col-model'>Model</th>"
        "<th class='col-cons'>Constraints</th>"
        "<th class='col-query'>Query</th>"
        "<th class='col-resp'>Response</th>"
        "</tr>"
    )

    body_rows = []
    for r in rows:
        resp = _clean_response_text(r["response"])
        body_rows.append(
            "<tr>"
            f"<td>{escape(r['user_id'])}</td>"
            f"<td>{escape(r['model'])}</td>"
            f"<td>{escape(r['constraints'])}</td>"
            f"<td>{escape(r['query'])}</td>"
            f"<td class='response'>{escape(resp)}</td>"
            "</tr>"
        )

    html = style + "<table class='sim-results'>" + header + "".join(body_rows) + "</table>"
    display(HTML(html))


model_counts: dict[str, int] = {}
for r in results:
    model_counts[r["model"]] = model_counts.get(r["model"], 0) + 1

render_results_table(results)

summary = ", ".join(f"`{k}` = {v}" for k, v in model_counts.items())
if Markdown is not None:
    display(Markdown("**Model usage distribution:** " + summary))
else:
    print("Model usage distribution:", summary)

User,Model,Constraints,Query,Response
U01,groq:llama-3.1-8b-instant,"time_of_day, traffic",When is the next bus at stop 01013?,Bus Arrival Update for Stop 01013: - Data source: LTA - No live arrivals now. This may be due to late-night service gaps at this stop. - Fallback option: around 500 taxis are available in the latest snapshot. - Advisory:  - Live traffic incidents detected: expect mild schedule variability. Generated at (SG): 2026-02-23T04:22:28+08:00
U02,groq:llama-3.1-8b-instant,"weather, traffic",I'm at Hotel Grand Pacific (Stop 01012). Is it raining? Should I take the bus?,It is not raining at your location. The current temperature is 23.9°C. Bus arrival information is not available at this time.
U03,groq:llama-3.1-8b-instant,accessibility,Any wheelchair accessible buses at 01019?,Bus arrival update for stop 01019: Data source: lta No live arrivals now. This may be due to late-night service gaps at this stop. Fallback option: around 500 taxis are available in the latest snapshot. Advisory: - Live traffic incidents detected: expect mild schedule variability. No wheelchair accessible buses available at 01019.
U04,groq:llama-3.1-8b-instant,"holiday, time_of_day",Is it a public holiday today? Will bus frequency be affected?,"No, it is not a public holiday today. Bus frequency will not be affected."
U05,groq:llama-3.1-8b-instant,weather,I hate rain. Check the weather before checking bus 12 at stop 01029.,The current weather is: - Temperature: 23.9°C - Rainfall: 0.0 mm - Precipitation probability: 0% There are no live bus arrivals at stop 01029.
U06,groq:llama-3.1-8b-instant,"weather, traffic",Hello! Plan my commute now.,"Your commute is planned. Current weather is 23.9°C with 0% rain probability. There are 16 traffic incidents, including road works on SLE and AYE. Taxi count is 500. Expect mild schedule variability due to traffic incidents. Your journey will be updated accordingly."
U07,groq:llama-3.1-8b-instant,crowding,What is the load like for buses at stop 01019?,Bus arrival update for stop 01019: Data source: lta No live arrivals now. This may be due to late-night service gaps at this stop. Fallback option: around 500 taxis are available in the latest snapshot. Advisory: - Live traffic incidents detected: expect mild schedule variability.
U08,groq:llama-3.1-8b-instant,"comfort, luggage",I am carrying heavy luggage. Any non-crowded buses at 01012?,**Bus Arrival Update for Stop 01012** Data source: LTA No live arrivals now. This may be due to late-night service gaps at this stop. Fallback option: around 500 taxis are available in the latest snapshot. Advisory: - Live traffic incidents detected: expect mild schedule variability. **Traffic Snapshot** - Incident count: 16 - Top incident: Road Works on SLE (towards BKE) at Woodlands Ave 2 Exit.
U09,groq:llama-3.1-8b-instant,"holiday, bus_arrival",Check if it's a holiday and tell me the bus times for 01029.,It's not a public holiday. Bus times for 01029: No live arrivals now. This may be due to late-night service gaps at this stop. Fallback option: around 500 taxis are available in the latest snapshot. Advisory: - Live traffic incidents detected: expect mild schedule variability.
U10,groq:llama-3.1-8b-instant,invalid_stop,Bus stop 99999 please.,Bus stop 99999 is not served by any bus services. Data source: LTA No live arrivals now. This may be due to late-night service gaps at this stop. Fallback option: around 500 taxis are available in the latest snapshot. Advisory: - Live traffic incidents detected: expect mild schedule variability.


**Model usage distribution:** `groq:llama-3.1-8b-instant` = 10

## Design Decisions and Assumptions

- Tool-driven orchestration is used for response grounding.
- LLM is optional and only applied as a response-polish layer.
- Bus stop queries assume a valid 5-digit Singapore stop code.
- Event handling is keyword-based in this take-home version.


## Deployment Considerations (Real-World)

1. Serve the graph behind FastAPI.
2. Add Redis cache for high-frequency snapshot data.
3. Use async HTTP + circuit breakers for upstream reliability.
4. Add observability (node latency, fallback rate, API error rate).
5. Store keys in a secret manager and enforce rotation.
6. Add CI tests with fixed payload fixtures.
