# TICKET-10 Golden Path E2E Validation

This notebook validates the full Docker integration path for TICKET-10.

## What this covers
- Ghostfolio + agent health checks
- Seeded-data precondition verification
- Five scripted `/api/agent/chat` queries over SSE
- Event-order and tool-routing assertions
- Follow-up continuity check using a shared `thread_id`

## Prerequisites
- Stack running with:
  - `docker compose -f docker/docker-compose.yml -f docker/docker-compose.agent.yml --env-file .env up -d --build`
- `.env` contains a valid `GHOSTFOLIO_ACCESS_TOKEN`
- Seed data already imported (`docker/seed-data.json`)

In [None]:
from __future__ import annotations

import json
from pathlib import Path
from typing import Any
from urllib import error, request

GHOSTFOLIO_URL = "http://localhost:3333"
AGENT_URL = "http://localhost:8000"
EXPECTED_MIN_ACTIVITY_COUNT = 26
SSE_TIMEOUT_SECONDS = 180


def find_repo_root(start: Path | None = None) -> Path:
    current = (start or Path.cwd()).resolve()
    for candidate in [current, *current.parents]:
        if (candidate / ".env").exists() and (candidate / "docker").exists():
            return candidate
    raise FileNotFoundError("Could not locate repository root with .env and docker/ directory")


def read_env(path: Path) -> dict[str, str]:
    env: dict[str, str] = {}
    for raw_line in path.read_text().splitlines():
        line = raw_line.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, value = line.split("=", 1)
        env[key.strip()] = value.strip().strip('"')
    return env


REPO_ROOT = find_repo_root()
ENV_PATH = REPO_ROOT / ".env"
SEED_PATH = REPO_ROOT / "docker" / "seed-data.json"
ENV = read_env(ENV_PATH)

print(f"Repo root: {REPO_ROOT}")
print(f"Seed file exists: {SEED_PATH.exists()}")
print(f"Access token present in .env: {bool(ENV.get('GHOSTFOLIO_ACCESS_TOKEN'))}")

In [None]:
def http_json(
    method: str,
    url: str,
    payload: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    timeout: int = 30,
) -> tuple[int, dict[str, Any]]:
    body: bytes | None = None
    request_headers = {"Content-Type": "application/json"}
    if headers:
        request_headers.update(headers)

    if payload is not None:
        body = json.dumps(payload).encode("utf-8")

    req = request.Request(url, data=body, headers=request_headers, method=method)

    try:
        with request.urlopen(req, timeout=timeout) as resp:
            raw = resp.read().decode("utf-8")
            return resp.status, json.loads(raw) if raw else {}
    except error.HTTPError as exc:
        raw = exc.read().decode("utf-8")
        try:
            parsed = json.loads(raw) if raw else {}
        except Exception:
            parsed = {"raw": raw}
        return exc.code, parsed


def get_bearer_token(access_token: str) -> str:
    status, payload = http_json(
        "POST",
        f"{GHOSTFOLIO_URL}/api/v1/auth/anonymous",
        payload={"accessToken": access_token},
    )
    assert status == 201, f"Auth exchange failed (status={status}, payload={payload})"

    auth_token = payload.get("authToken")
    assert isinstance(auth_token, str) and auth_token, "Auth token missing from auth response"
    return auth_token


def parse_sse_block(block: str) -> dict[str, Any] | None:
    if not block.strip():
        return None

    event_type: str | None = None
    payload: dict[str, Any] = {}
    for line in block.splitlines():
        if line.startswith("event:"):
            event_type = line.split(":", 1)[1].strip()
        elif line.startswith("data:"):
            try:
                payload = json.loads(line.split(":", 1)[1].strip())
            except Exception:
                payload = {}

    if event_type is None:
        return None

    return {"event": event_type, "data": payload}


def stream_chat(message: str, thread_id: str | None = None, timeout: int = SSE_TIMEOUT_SECONDS) -> dict[str, Any]:
    request_payload: dict[str, Any] = {"message": message}
    if thread_id:
        request_payload["thread_id"] = thread_id

    req = request.Request(
        f"{AGENT_URL}/api/agent/chat",
        data=json.dumps(request_payload).encode("utf-8"),
        headers={"Content-Type": "application/json", "Accept": "text/event-stream"},
        method="POST",
    )

    events: list[dict[str, Any]] = []
    status_code = 0

    with request.urlopen(req, timeout=timeout) as resp:
        status_code = resp.status
        buffer = ""
        while True:
            chunk = resp.read(2048)
            if not chunk:
                break

            buffer += chunk.decode("utf-8", errors="ignore")
            while "\n\n" in buffer:
                block, buffer = buffer.split("\n\n", 1)
                parsed = parse_sse_block(block)
                if parsed:
                    events.append(parsed)
                    if parsed["event"] in {"done", "error"}:
                        break

            if events and events[-1]["event"] in {"done", "error"}:
                break

    event_types = [item["event"] for item in events]
    terminal_event = event_types[-1] if event_types else None
    tool_calls = [item for item in events if item["event"] == "tool_call"]

    done_payload = events[-1]["data"] if terminal_event == "done" else {}
    error_payload = events[-1]["data"] if terminal_event == "error" else {}

    response = done_payload.get("response", {}) if isinstance(done_payload, dict) else {}
    response_message = response.get("message", "") if isinstance(response, dict) else ""

    return {
        "status_code": status_code,
        "message": message,
        "thread_id": done_payload.get("thread_id") if isinstance(done_payload, dict) else thread_id,
        "event_types": event_types,
        "terminal_event": terminal_event,
        "tool_calls": tool_calls,
        "done_payload": done_payload,
        "error_payload": error_payload,
        "response_message": response_message,
        "events": events,
    }

In [None]:
access_token = ENV.get("GHOSTFOLIO_ACCESS_TOKEN", "")
assert access_token, "GHOSTFOLIO_ACCESS_TOKEN is missing from .env"

status_gf, payload_gf = http_json("GET", f"{GHOSTFOLIO_URL}/api/v1/health")
status_agent, payload_agent = http_json("GET", f"{AGENT_URL}/health")
assert status_gf == 200, f"Ghostfolio health failed: {status_gf}, {payload_gf}"
assert status_agent == 200, f"Agent health failed: {status_agent}, {payload_agent}"

bearer_token = get_bearer_token(access_token)

orders_status, orders_payload = http_json(
    "GET",
    f"{GHOSTFOLIO_URL}/api/v1/order?range=max",
    headers={"Authorization": f"Bearer {bearer_token}"},
)
details_status, details_payload = http_json(
    "GET",
    f"{GHOSTFOLIO_URL}/api/v1/portfolio/details",
    headers={"Authorization": f"Bearer {bearer_token}"},
)

activities = orders_payload.get("activities", []) if isinstance(orders_payload, dict) else []
holdings = details_payload.get("holdings", {}) if isinstance(details_payload, dict) else {}

assert orders_status == 200, f"Orders fetch failed: {orders_status}, {orders_payload}"
assert details_status == 200, f"Portfolio details fetch failed: {details_status}, {details_payload}"
assert isinstance(activities, list), "Orders payload is not a list"
assert isinstance(holdings, dict), "Holdings payload is not a dictionary"
assert len(activities) >= EXPECTED_MIN_ACTIVITY_COUNT, (
    f"Expected at least {EXPECTED_MIN_ACTIVITY_COUNT} activities, got {len(activities)}"
)
assert len(holdings) > 0, "Seed precondition failed: holdings are empty"

print(
    {
        "ghostfolio_health": payload_gf,
        "agent_health": payload_agent,
        "orders_count": len(activities),
        "details_holdings_count": len(holdings),
    }
)

In [None]:
GOLDEN_PATH_QUERIES = [
    "How is my portfolio doing ytd?",
    "Categorize my transactions for max range.",
    "Estimate my tax liability for 2025 in middle bracket.",
    "Am I diversified enough for a balanced profile?",
    "Based on that, where am I most concentrated?",
]

EXPECTED_PRIMARY_TOOLS = {
    1: "analyze_portfolio_performance",
    2: "categorize_transactions",
    3: "estimate_capital_gains_tax",
    4: "advise_asset_allocation",
    5: "advise_asset_allocation",
}


def assert_query_success(result: dict[str, Any], expected_tool: str, label: str) -> None:
    assert result["status_code"] == 200, f"{label}: HTTP status was {result['status_code']}"
    assert result["event_types"], f"{label}: no SSE events received"
    assert result["event_types"][0] == "thinking", f"{label}: first event was not thinking"
    assert result["terminal_event"] == "done", (
        f"{label}: expected terminal done event, got {result['terminal_event']}"
    )

    assert "tool_call" in result["event_types"], f"{label}: tool_call event missing"
    assert "tool_result" in result["event_types"], f"{label}: tool_result event missing"

    first_tool_call = result["tool_calls"][0]["data"] if result["tool_calls"] else {}
    observed_tool = first_tool_call.get("tool")
    assert observed_tool == expected_tool, (
        f"{label}: expected tool {expected_tool}, got {observed_tool}"
    )

    assert isinstance(result["response_message"], str) and result["response_message"].strip(), (
        f"{label}: empty final response message"
    )

In [None]:
results: dict[int, dict[str, Any]] = {}

# Queries 1-3 run as independent prompts.
for index in range(1, 4):
    prompt = GOLDEN_PATH_QUERIES[index - 1]
    result = stream_chat(prompt)
    assert_query_success(result, EXPECTED_PRIMARY_TOOLS[index], f"Q{index}")
    results[index] = result

# Query 4 starts a thread that Query 5 follows.
q4_result = stream_chat(GOLDEN_PATH_QUERIES[3])
assert_query_success(q4_result, EXPECTED_PRIMARY_TOOLS[4], "Q4")

follow_up_thread_id = q4_result.get("thread_id")
assert isinstance(follow_up_thread_id, str) and follow_up_thread_id, (
    "Q4 did not return a thread_id for continuity check"
)

q5_result = stream_chat(GOLDEN_PATH_QUERIES[4], thread_id=follow_up_thread_id)
assert_query_success(q5_result, EXPECTED_PRIMARY_TOOLS[5], "Q5")
assert q5_result.get("thread_id") == follow_up_thread_id, (
    "Q5 thread_id does not match Q4 thread_id"
)

results[4] = q4_result
results[5] = q5_result

print({
    "query_count": len(results),
    "follow_up_thread_id": follow_up_thread_id,
    "q5_thread_id": q5_result.get("thread_id"),
    "q5_terminal_event": q5_result.get("terminal_event"),
})

In [None]:
snapshots: list[dict[str, Any]] = []
for index in range(1, 6):
    result = results[index]
    first_tool_call = result["tool_calls"][0]["data"] if result["tool_calls"] else {}
    snapshots.append(
        {
            "query": GOLDEN_PATH_QUERIES[index - 1],
            "tool": first_tool_call.get("tool"),
            "event_types": result["event_types"],
            "thread_id": result.get("thread_id"),
            "response_preview": result.get("response_message", "")[:220],
        }
    )

snapshots

## Notes

- If precondition checks fail, re-run seed import before running Q1-Q5.
- The follow-up continuity check in this notebook verifies same-thread routing via `thread_id` between Q4 and Q5.
- Keep snapshot output from the final cell for regression comparison before demos.