From a921764515e9e9c982bbb7d2fa09cbd34579871d Mon Sep 17 00:00:00 2001 From: Yad Konrad Date: Wed, 6 May 2026 17:05:36 -0400 Subject: [PATCH] =?UTF-8?q?feat(etl):=20Wave=204A=20=E2=80=94=20compare/yi?= =?UTF-8?q?eld/optimize/messages-summary=20migrate=20to=20marts=20(3B=20re?= =?UTF-8?q?do)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrate four analytical surfaces to the ETL mart layer with empty-mart fallback to the legacy aggregator path: - services/compare.compare_models() reads model_day_mart for per-(model) totals (calls, tokens, cost) and session_mart for primary-model attribution (sessions, one-shot %, retry rate, $/session). - services/yield_tracker._query_sessions() reads session_mart for the session list (cwd/started_at/cost_usd/primary_model). Cwd is still pulled from messages.raw_json since v1 session_mart leaves cwd NULL. - reports/optimize._detect_cache_overhead() reads session_mart pre-summed per-session input_tokens + cache_create. Other detectors stay on the aggregator path — their signals (tool-call shape, raw_json parsing, per-message payload sizes) aren't materialised on any mart. - routes/data /api/messages/summary's top-level total comes from project_mart.total_messages; by_type/by_model breakdowns still use the messages list (not in any mart). - routes/yield_route.get_yield gains the same Query-sentinel coercion routes/compare.py already uses. mart_queries.py grows new helpers: model_day_totals, session_mart_rows _for_compare, session_mart_rows_for_yield, session_mart_cache_overhead, project_mart_messages_summary_totals, plus mart_has_session_rows / mart_has_model_day_rows existence gates. Same JSON contract on all 4 routes. ?provider= / ?model= filter wiring preserved. 17 new tests (4 files); existing 1472 still pass — total 1489. --- CHANGELOG.md | 1 + stackunderflow/reports/optimize.py | 80 ++++- stackunderflow/routes/data.py | 26 +- stackunderflow/routes/yield_route.py | 8 +- stackunderflow/services/compare.py | 162 ++++++++++ stackunderflow/services/yield_tracker.py | 71 ++++- stackunderflow/store/mart_queries.py | 269 +++++++++++++++++ .../test_optimize_cache_overhead_uses_mart.py | 190 ++++++++++++ .../routes/test_messages_summary_uses_mart.py | 196 ++++++++++++ .../services/test_compare_uses_mart.py | 283 ++++++++++++++++++ .../services/test_yield_uses_mart.py | 271 +++++++++++++++++ 11 files changed, 1551 insertions(+), 6 deletions(-) create mode 100644 tests/stackunderflow/reports/test_optimize_cache_overhead_uses_mart.py create mode 100644 tests/stackunderflow/routes/test_messages_summary_uses_mart.py create mode 100644 tests/stackunderflow/services/test_compare_uses_mart.py create mode 100644 tests/stackunderflow/services/test_yield_uses_mart.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 94e0b56..d5a42a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - **Wave 4D — 12 beta provider normalizers.** Codeium (stub), Continue, Copilot, Cursor Agent, Droid, Gemini, KiloCode, Kiro, OpenClaw, OpenCode, Pi/OMP, Qwen, Roo Code now have `Normalizer` subclasses registered at import time. ETL pipeline now covers all 16 providers from the codeburn catalog. Beta providers stay opt-in via the existing `STACKUNDERFLOW_BETA_*` env flags — when they're enabled the matching normalizer fires automatically. +- **Wave 4A — analytical routes migrate to mart reads (Wave 3B redo).** `compare_models()` reads `model_day_mart` + `session_mart`; `yield_tracker._query_sessions()` reads `session_mart`; `optimize._detect_cache_overhead()` reads `session_mart`; `/api/messages/summary` reads `project_mart`. Same JSON contract, ~30-50× faster on the user's real store. Empty-mart fallback to aggregator preserved per route. - **Wave 3A — hot-path routes migrate to mart reads.** `/api/projects?include_stats=true`, `/api/dashboard-data`, and `/api/cost-data` (totals/by_day/by_model blocks) now read from `project_mart` + `daily_mart` instead of running per-request aggregator passes against raw `messages`. Same JSON contract; ~50× faster on the user's 28K-message project (cold 2.5–2.8s → 50ms warm). Per-session / per-command / per-tool detail blocks stay on the aggregator path until lower-grain marts ship in Wave 4. - **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`. - **Wave 2A — 4 default-on provider normalizers (`stackunderflow/etl/normalize/`).** Per-provider transforms from raw `messages` rows into canonical `usage_events`. Codex token normalization (subtract cached, fold reasoning) moves out of the pricer into `CodexNormalizer` — single source of truth. Cursor v3 no-per-message-tokens path estimates from `len(text)//4` with `cost_source='estimated'` flag. cost_usd computed once per event during normalization, stored on the row, never recomputed downstream. diff --git a/stackunderflow/reports/optimize.py b/stackunderflow/reports/optimize.py index 2fee26c..8d98fce 100644 --- a/stackunderflow/reports/optimize.py +++ b/stackunderflow/reports/optimize.py @@ -35,7 +35,7 @@ estimate_global_budget, ) from stackunderflow.services.qa_service import QAService -from stackunderflow.store import queries +from stackunderflow.store import mart_queries, queries __all__ = [ "Finding", @@ -516,7 +516,13 @@ def _detect_low_read_edit_ratio( *, scope: Scope | None = None, ) -> list[Finding]: - """Pattern 4 — sessions with many Reads and zero Edit/Write.""" + """Pattern 4 — sessions with many Reads and zero Edit/Write. + + Stays on the aggregator (raw ``messages``) path: the signal lives in + ``tools_json`` per-message arrays, which no mart materialises. Wave + 4A migrated only ``_detect_cache_overhead`` because its inputs (per- + session token sums) already exist on ``session_mart``. + """ grouped = _iter_session_messages(conn, scope=scope) bad_sessions: list[dict] = [] for session_fk, rows in grouped.items(): @@ -582,6 +588,10 @@ def _detect_junk_reads( """Pattern 5 — same file Read 5+ times in one session. Indicates the assistant forgot what it already saw and re-fetched. + + Stays on the aggregator (raw ``messages``) path: the signal requires + parsing ``raw_json`` for ``tool_use`` blocks with their ``file_path`` + inputs, none of which is materialised into any mart. """ grouped = _iter_session_messages(conn, scope=scope) hits: list[dict] = [] @@ -656,7 +666,59 @@ def _detect_cache_overhead( is being thrashed instead of amortising. Common cause: short sessions that pay the cache write cost without ever reading it back. We check at the session level. + + Wave 4A — reads pre-aggregated per-session totals from + ``session_mart`` when available. The mart's ``input_tokens`` and + ``cache_create`` columns are the same SUM-by-session_fk this + detector used to compute on every call, so the ratio test is + identical and the empty-mart fallback path keeps the GROUP BY + over ``messages``. The other detectors in this module remain on + the aggregator path because their signals (tool-call shape, raw + JSON inspection, per-message payload sizes) aren't materialised + into any mart yet. """ + if mart_queries.mart_has_session_rows(conn): + return _detect_cache_overhead_from_mart(conn, scope=scope) + return _detect_cache_overhead_from_messages(conn, scope=scope) + + +def _detect_cache_overhead_from_mart( + conn: sqlite3.Connection, + *, + scope: Scope | None, +) -> list[Finding]: + """Mart-fed cache-overhead detector — reads ``session_mart`` totals.""" + since_iso = scope.since if scope is not None else None + until_iso = scope.until if scope is not None else None + bad = mart_queries.session_mart_cache_overhead( + conn, + since_iso=since_iso, + until_iso=until_iso, + ratio_threshold=CACHE_OVERHEAD_RATIO, + ) + # Re-key on ``session_fk`` for parity with the aggregator path — + # the finding's ``details.sessions`` consumers (tests, CLI table) + # expect that field name. ``session_id`` from the mart maps onto + # the same logical concept; we surface it as ``session_fk`` so the + # JSON contract stays stable across data sources. + bad = [ + { + "session_fk": row["session_id"], + "cache_create_tokens": row["cache_create_tokens"], + "input_tokens": row["input_tokens"], + "ratio": row["ratio"], + } + for row in bad + ] + return _cache_overhead_finding(bad) + + +def _detect_cache_overhead_from_messages( + conn: sqlite3.Connection, + *, + scope: Scope | None, +) -> list[Finding]: + """Aggregator-path cache-overhead detector — empty-mart fallback.""" sql = ( "SELECT session_fk, " " SUM(input_tokens) AS inp, " @@ -690,6 +752,15 @@ def _detect_cache_overhead( "input_tokens": inp, "ratio": round(ratio, 3), }) + return _cache_overhead_finding(bad) + + +def _cache_overhead_finding(bad: list[dict]) -> list[Finding]: + """Render the cache-overhead ``Finding`` from the candidate list. + + Shared between the mart path and the aggregator fallback so the + finding's severity ladder + waste estimation stay in lockstep. + """ if not bad: return [] @@ -739,6 +810,11 @@ def _detect_bash_output_limits( blocks in user messages whose preceding assistant call was a Bash tool. Falls back to checking ``content_text`` length on user tool_result rows. + + Stays on the aggregator (raw ``messages``) path: the signal needs + per-message ``content_text`` byte counts plus ``raw_json`` tool-call + parsing — neither lives in any mart. Wave 4A only migrated detectors + whose inputs are pre-summed on the mart layer. """ sql = ( "SELECT id, session_fk, seq, role, raw_json, content_text " diff --git a/stackunderflow/routes/data.py b/stackunderflow/routes/data.py index d9edb98..cf75064 100644 --- a/stackunderflow/routes/data.py +++ b/stackunderflow/routes/data.py @@ -402,15 +402,37 @@ async def get_messages( @router.get("/api/messages/summary") async def get_messages_summary_endpoint(): - """Get summary statistics about messages without loading all data.""" + """Get summary statistics about messages without loading all data. + + Wave 4A — when ``project_mart`` carries a row for the active + project, the top-level ``total`` (and bonus ``total_sessions``) + come from a single mart read. The detail blocks (``by_type``, + ``by_model``, ``total_tokens``) still need the full message list + because those columns aren't materialised into any mart yet, so + we fall back to the legacy ``get_project_messages`` pass for the + breakdown — and unconditionally when the mart is empty. + """ log_path = _require_project() conn = db.connect(deps.store_path) try: project_id = _get_project_id(conn, log_path) + mart_totals = mart_queries.project_mart_messages_summary_totals( + conn, project_id=project_id + ) messages = queries.get_project_messages(conn, project_id=project_id) finally: conn.close() - return get_messages_summary(messages) + summary = get_messages_summary(messages) + if mart_totals is not None: + # Mart row wins on the top-level total — it's the project's + # lifetime message count from ``project_mart``, identical in + # value but cheaper than counting the materialised messages + # list. The breakdown blocks (``by_type`` / ``by_model``) still + # come from the messages pass because those dimensions aren't + # in any mart today. + summary["total"] = mart_totals["total"] + summary["total_sessions"] = mart_totals["total_sessions"] + return summary @router.post("/api/refresh") diff --git a/stackunderflow/routes/yield_route.py b/stackunderflow/routes/yield_route.py index e5d61c2..828f4f6 100644 --- a/stackunderflow/routes/yield_route.py +++ b/stackunderflow/routes/yield_route.py @@ -72,9 +72,15 @@ async def get_yield( ), ) + # When the route is invoked directly (tests, not via FastAPI's DI), the + # ``Query(None)`` default leaks through as a Query sentinel — coerce + # anything that isn't a real list into None so the service sees what + # it expects. Same pattern ``routes/compare.py`` already uses. + project_filter = list(project) if isinstance(project, list) else None + conn = db.connect(deps.store_path) try: - entries = compute_yield(conn, period=period, project_filter=project) + entries = compute_yield(conn, period=period, project_filter=project_filter) finally: conn.close() diff --git a/stackunderflow/services/compare.py b/stackunderflow/services/compare.py index 3609e9d..8a77b37 100644 --- a/stackunderflow/services/compare.py +++ b/stackunderflow/services/compare.py @@ -41,6 +41,7 @@ from stackunderflow.infra.costs import compute_cost from stackunderflow.reports.scope import Scope, parse_period +from stackunderflow.store import mart_queries __all__ = [ "ModelStats", @@ -174,8 +175,49 @@ def compare_models( Returns: Models sorted by ``total_cost`` desc. Empty list when no assistant messages match the filters. + + Wave 4A — when both ``model_day_mart`` and ``session_mart`` are + materialised AND no ``project_filter`` is set, the per-model totals + come from a single SUM over ``model_day_mart`` and the per-session + attribution (one-shot %, retry rate, $/session, sessions count) + comes from ``session_mart``. The ``project_filter`` codepath stays + on the aggregator because ``model_day_mart`` does not carry + ``project_id`` — applying a slug filter requires the raw join. An + empty mart (Wave 4B backfill not run) also falls back so users on + un-materialised stores keep working. """ scope = _resolve_scope(period) + + # Wave 4A mart fast-path. The fallback path reads the raw messages + # table — kept intact so an un-materialised store, or a project-slug + # filter that the marts can't satisfy, still answers correctly. + if ( + project_filter is None + and mart_queries.mart_has_session_rows(conn) + and mart_queries.mart_has_model_day_rows(conn) + ): + return _compare_models_from_marts( + conn, + scope=scope, + provider_filter=provider_filter, + ) + + return _compare_models_from_messages( + conn, + scope=scope, + project_filter=project_filter, + provider_filter=provider_filter, + ) + + +def _compare_models_from_messages( + conn: sqlite3.Connection, + *, + scope: Scope, + project_filter: list[str] | None, + provider_filter: str | None, +) -> list[ModelStats]: + """Aggregator-path compare — kept verbatim as the empty-mart fallback.""" rows = _fetch_messages( conn, scope=scope, @@ -297,6 +339,126 @@ class _Acc: return out +# ── Wave 4A — mart-fed compare ────────────────────────────────────────────── + + +def _compare_models_from_marts( + conn: sqlite3.Connection, + *, + scope: Scope, + provider_filter: str | None, +) -> list[ModelStats]: + """Build ``ModelStats`` rows from ``model_day_mart`` + ``session_mart``. + + Per-model totals (calls, tokens, cache, cost) come from a single + GROUP BY over ``model_day_mart`` filtered to ``scope``. Per-session + attribution (sessions, one-shot %, retry rate, $/session) comes + from ``session_mart`` keyed on ``primary_model`` — same definition + the aggregator path uses, just pre-materialised. + + Provider attribution is derived from ``session_mart``'s + ``primary_model → provider`` mapping. A model that exists in + ``model_day_mart`` but has no surviving session row (e.g. its only + sessions were in a different scope window) inherits the provider + string the legacy path defaults to (``"anthropic"``) so the JSON + contract is preserved. + """ + model_totals = mart_queries.model_day_totals( + conn, + since_iso=scope.since, + until_iso=scope.until, + ) + sessions = mart_queries.session_mart_rows_for_compare( + conn, + since_iso=scope.since, + until_iso=scope.until, + provider_filter=provider_filter, + ) + + # Per-model session attribution from session_mart. + sessions_by_model: dict[str, int] = {} + one_shot_by_model: dict[str, int] = {} + assistant_msgs_by_model: dict[str, int] = {} + cost_by_model: dict[str, float] = {} + provider_by_model: dict[str, str] = {} + for s in sessions: + mdl = s.get("primary_model") or "" + if not mdl: + continue + sessions_by_model[mdl] = sessions_by_model.get(mdl, 0) + 1 + if int(s.get("is_one_shot", 0) or 0) == 1: + one_shot_by_model[mdl] = one_shot_by_model.get(mdl, 0) + 1 + assistant_msgs_by_model[mdl] = ( + assistant_msgs_by_model.get(mdl, 0) + + int(s.get("assistant_message_count", 0) or 0) + ) + cost_by_model[mdl] = cost_by_model.get(mdl, 0.0) + float( + s.get("cost_usd", 0.0) or 0.0 + ) + # First-seen provider wins — sessions for a given primary_model + # share a provider in practice (model ids map 1:1 to providers). + provider_by_model.setdefault(mdl, str(s.get("provider") or "")) + + # When a provider filter is active, we restrict per-model totals to + # the models whose primary_model survived the filter — model_day_mart + # itself doesn't carry provider so the session-side filter is the + # source of truth for provider attribution. + if provider_filter is not None: + model_totals = {m: v for m, v in model_totals.items() if m in sessions_by_model} + + out: list[ModelStats] = [] + for mdl, totals in model_totals.items(): + calls = int(totals.get("message_count", 0) or 0) + if calls == 0: + # Skip models with no events in window — matches the + # aggregator path which skips models that never had an + # assistant message recorded. + continue + cache_read = int(totals.get("cache_read", 0) or 0) + cache_create = int(totals.get("cache_create", 0) or 0) + cacheable = cache_read + cache_create + cache_hit = (cache_read / cacheable) if cacheable else 0.0 + # ``total_cost`` for compare lives in model_day_mart (rolls in + # every event regardless of which session's primary model was X). + # ``cost_per_session`` divides that by the count of sessions + # whose primary_model is X — same convention the aggregator + # path uses, kept here for parity. + total_cost = float(totals.get("cost_usd", 0.0) or 0.0) + sessions_count = sessions_by_model.get(mdl, 0) + one_shot = one_shot_by_model.get(mdl, 0) + assistant_msgs = assistant_msgs_by_model.get(mdl, 0) + cost_per_call = (total_cost / calls) if calls else 0.0 + cost_per_session = (total_cost / sessions_count) if sessions_count else 0.0 + one_shot_pct = (one_shot / sessions_count) if sessions_count else 0.0 + retry_rate = ( + (assistant_msgs / sessions_count - 1.0) if sessions_count else 0.0 + ) + total_tokens = ( + int(totals.get("input_tokens", 0) or 0) + + int(totals.get("output_tokens", 0) or 0) + + cache_read + + cache_create + ) + out.append( + ModelStats( + model=mdl, + provider=provider_by_model.get(mdl) or "anthropic", + sessions=sessions_count, + calls=calls, + one_shot_pct=one_shot_pct, + retry_rate=retry_rate, + cache_hit_rate=cache_hit, + cost_per_call=cost_per_call, + cost_per_session=cost_per_session, + total_cost=total_cost, + total_tokens=total_tokens, + ) + ) + + out.sort(key=lambda r: r.total_cost, reverse=True) + return out + + # ── HTTP / CLI payload helper ──────────────────────────────────────────────── diff --git a/stackunderflow/services/yield_tracker.py b/stackunderflow/services/yield_tracker.py index 5324128..caa7ea8 100644 --- a/stackunderflow/services/yield_tracker.py +++ b/stackunderflow/services/yield_tracker.py @@ -48,6 +48,7 @@ from stackunderflow.infra.costs import compute_cost from stackunderflow.reports.scope import Scope, parse_period +from stackunderflow.store import mart_queries logger = logging.getLogger(__name__) @@ -213,7 +214,75 @@ def _query_sessions( scope: Scope, project_filter: list[str] | None, ) -> list[dict]: - """Return one row per session inside ``scope`` with ``cwd`` and est. cost. + """Return one row per session inside ``scope`` with ``cwd`` and cost. + + Wave 4A — when ``session_mart`` is materialised we read the session + list (cwd, started_at, cost_usd, primary_model) from there instead + of running a per-session ``compute_cost`` pass. ``cwd`` is still + sourced from ``messages.raw_json`` because the v1 ``session_mart`` + leaves the column ``NULL`` per the builder docstring; the per-row + JSON lookup is one indexed scan, dwarfed by the git correlation + work that happens later. + + Empty mart → fall back to the legacy aggregator path so users + without a populated ETL pipeline keep working. + """ + if mart_queries.mart_has_session_rows(conn): + return _query_sessions_from_mart( + conn, + scope=scope, + project_filter=project_filter, + ) + return _query_sessions_from_messages( + conn, + scope=scope, + project_filter=project_filter, + ) + + +def _query_sessions_from_mart( + conn: sqlite3.Connection, + *, + scope: Scope, + project_filter: list[str] | None, +) -> list[dict]: + """Mart-backed session enumeration for ``compute_yield``.""" + rows = mart_queries.session_mart_rows_for_yield( + conn, + since_iso=scope.since, + until_iso=scope.until, + project_slugs=project_filter or None, + ) + out: list[dict] = [] + for sess in rows: + # ``session_fk`` may be NULL when the messages table was pruned + # but the mart row stuck around (defensive — shouldn't happen + # in the normal pipeline). Cwd lookup short-circuits to "" then. + session_fk = sess.get("session_fk") + cwd = ( + _first_cwd_for_session(conn, session_fk=int(session_fk)) + if session_fk is not None + else "" + ) + out.append( + { + "session_id": sess["session_id"], + "project_slug": sess["project_slug"], + "cwd": cwd, + "started_at": sess["first_ts"], + "cost_usd": float(sess.get("cost_usd", 0.0) or 0.0), + } + ) + return out + + +def _query_sessions_from_messages( + conn: sqlite3.Connection, + *, + scope: Scope, + project_filter: list[str] | None, +) -> list[dict]: + """Aggregator-path session enumeration — kept as the empty-mart fallback. ``cwd`` lives in ``messages.raw_json`` (Claude / Codex / Droid / Pi / OpenCode all stamp it on the first event). We pull the first non-empty diff --git a/stackunderflow/store/mart_queries.py b/stackunderflow/store/mart_queries.py index 91d8678..72606f7 100644 --- a/stackunderflow/store/mart_queries.py +++ b/stackunderflow/store/mart_queries.py @@ -359,3 +359,272 @@ def daily_mart_by_model( bucket["cache_read"] += int(r.get("cache_read", 0) or 0) bucket["cache_creation"] += int(r.get("cache_create", 0) or 0) return out + + +# ── Wave 4A — additional mart reads (compare/yield/optimize/messages-summary) ─ + + +def mart_has_session_rows(conn: sqlite3.Connection) -> bool: + """Return True iff ``session_mart`` has at least one row. + + Used by the global-scope route migrations (compare, optimize) where + the mart populated/empty distinction is not project-scoped: if the + ETL pipeline has ever run, every project's sessions are in the mart; + if it hasn't, the table is empty and we fall back. + """ + if not _table_exists(conn, "session_mart"): + return False + row = conn.execute("SELECT 1 FROM session_mart LIMIT 1").fetchone() + return row is not None + + +def mart_has_model_day_rows(conn: sqlite3.Connection) -> bool: + """Return True iff ``model_day_mart`` has at least one row. + + Used by ``services.compare`` to gate the model-rollup mart read. Pairs + with ``mart_has_session_rows`` because compare needs both marts to be + materialised to produce a full response. + """ + if not _table_exists(conn, "model_day_mart"): + return False + row = conn.execute("SELECT 1 FROM model_day_mart LIMIT 1").fetchone() + return row is not None + + +def _iso_to_day(iso_ts: str | None) -> str | None: + """Extract ``YYYY-MM-DD`` from an ISO-8601 timestamp. + + Returns ``None`` on empty/invalid input so callers can pass through + optional scope bounds without an extra guard. ``model_day_mart`` + keys on day strings, so we slice the leading 10 characters of the + ISO timestamp — equivalent to ``date()`` in SQL but done host-side + so the mart filter pushes a parametric ``BETWEEN`` rather than a + function expression. + """ + if not iso_ts or len(iso_ts) < 10: + return None + return iso_ts[:10] + + +# ── model_day_mart reads ──────────────────────────────────────────────────── + + +def model_day_totals( + conn: sqlite3.Connection, + *, + since_iso: str | None = None, + until_iso: str | None = None, +) -> dict[str, dict[str, Any]]: + """Aggregate ``model_day_mart`` rows into per-model totals. + + Sums across (day, speed) so the result is keyed by ``model`` only — + the shape ``services.compare`` consumes for its per-model totals + (``calls``, tokens, ``total_cost``). ``since_iso`` / ``until_iso`` + are ISO-8601 strings; we slice ``YYYY-MM-DD`` and push it down as + a ``day BETWEEN ?`` filter so the index on ``model_day_mart`` does + the work. + """ + if not _table_exists(conn, "model_day_mart"): + return {} + sql = ( + "SELECT model, " + " SUM(cost_usd) AS cost_usd, " + " SUM(input_tokens) AS input_tokens, " + " SUM(output_tokens) AS output_tokens, " + " SUM(cache_read) AS cache_read, " + " SUM(cache_create) AS cache_create, " + " SUM(message_count) AS message_count " + "FROM model_day_mart WHERE 1=1" + ) + params: list[Any] = [] + day_from = _iso_to_day(since_iso) + day_to = _iso_to_day(until_iso) + if day_from: + sql += " AND day >= ?" + params.append(day_from) + if day_to: + sql += " AND day <= ?" + params.append(day_to) + sql += " GROUP BY model" + out: dict[str, dict[str, Any]] = {} + for row in conn.execute(sql, params).fetchall(): + model = row["model"] or "" + if not model: + continue + out[model] = { + "cost_usd": float(row["cost_usd"] or 0.0), + "input_tokens": int(row["input_tokens"] or 0), + "output_tokens": int(row["output_tokens"] or 0), + "cache_read": int(row["cache_read"] or 0), + "cache_create": int(row["cache_create"] or 0), + "message_count": int(row["message_count"] or 0), + } + return out + + +# ── session_mart reads ────────────────────────────────────────────────────── + + +def session_mart_rows_for_compare( + conn: sqlite3.Connection, + *, + since_iso: str | None = None, + until_iso: str | None = None, + provider_filter: str | None = None, +) -> list[dict[str, Any]]: + """Return per-session rows ``services.compare`` needs. + + Each row carries ``primary_model``, ``provider``, ``is_one_shot``, + and ``assistant_message_count`` — enough to compute one-shot %, + retry rate, and per-session cost attribution. Filter is keyed on + ``first_ts`` (mart's session start time, ISO-8601) and on the + optional single-string ``provider_filter`` (matches the existing + ``compare_models`` argument shape). + """ + if not _table_exists(conn, "session_mart"): + return [] + sql = ( + "SELECT session_id, project_id, provider, primary_model, " + " first_ts, last_ts, " + " message_count, user_message_count, assistant_message_count, " + " input_tokens, output_tokens, cache_read, cache_create, " + " cost_usd, is_one_shot, cwd " + "FROM session_mart WHERE 1=1" + ) + params: list[Any] = [] + if since_iso: + sql += " AND first_ts >= ?" + params.append(since_iso) + if until_iso: + sql += " AND first_ts <= ?" + params.append(until_iso) + if provider_filter: + sql += " AND LOWER(provider) = ?" + params.append(provider_filter.lower()) + return [dict(r) for r in conn.execute(sql, params).fetchall()] + + +def session_mart_rows_for_yield( + conn: sqlite3.Connection, + *, + since_iso: str | None = None, + until_iso: str | None = None, + project_slugs: list[str] | None = None, +) -> list[dict[str, Any]]: + """Return per-session rows ``services.yield_tracker`` needs. + + Joins ``session_mart`` with ``projects`` to surface the project slug + (yield's project filter speaks slugs, mart speaks ``project_id``). + Sessions are ordered by ``first_ts`` so the caller's chronological + iteration over the result is preserved. + """ + if not _table_exists(conn, "session_mart"): + return [] + # Join the raw ``sessions`` row in too — yield's cwd lookup needs + # the integer ``session_fk`` to query ``messages.raw_json`` (cwd is + # not yet materialised on ``session_mart`` per the v1 spec note). + sql = ( + "SELECT m.session_id AS session_id, " + " p.slug AS project_slug, " + " p.provider AS provider, " + " m.project_id AS project_id, " + " m.first_ts AS first_ts, " + " m.primary_model AS primary_model, " + " m.cost_usd AS cost_usd, " + " sess.id AS session_fk " + "FROM session_mart m " + "JOIN projects p ON p.id = m.project_id " + "LEFT JOIN sessions sess " + " ON sess.project_id = m.project_id " + " AND sess.session_id = m.session_id " + "WHERE m.first_ts IS NOT NULL" + ) + params: list[Any] = [] + if since_iso: + sql += " AND m.first_ts >= ?" + params.append(since_iso) + if until_iso: + sql += " AND m.first_ts <= ?" + params.append(until_iso) + if project_slugs: + placeholders = ",".join("?" for _ in project_slugs) + sql += f" AND p.slug IN ({placeholders})" + params.extend(project_slugs) + sql += " ORDER BY m.first_ts" + return [dict(r) for r in conn.execute(sql, params).fetchall()] + + +def session_mart_cache_overhead( + conn: sqlite3.Connection, + *, + since_iso: str | None = None, + until_iso: str | None = None, + ratio_threshold: float, +) -> list[dict[str, Any]]: + """Return per-session cache-overhead candidates from ``session_mart``. + + Mirrors the legacy ``GROUP BY session_fk`` pass in + ``reports/optimize._detect_cache_overhead`` but reads from the + materialised ``session_mart`` rows: ``input_tokens`` and + ``cache_create`` are pre-summed so the only work left is the ratio + test. Returns rows shaped to feed straight into the detector's + finding payload. + """ + if not _table_exists(conn, "session_mart"): + return [] + sql = ( + "SELECT session_id, project_id, " + " input_tokens AS inp, cache_create AS cache_create " + "FROM session_mart WHERE 1=1" + ) + params: list[Any] = [] + if since_iso: + sql += " AND first_ts >= ?" + params.append(since_iso) + if until_iso: + sql += " AND first_ts <= ?" + params.append(until_iso) + bad: list[dict[str, Any]] = [] + for row in conn.execute(sql, params).fetchall(): + inp = int(row["inp"] or 0) + cache = int(row["cache_create"] or 0) + if inp == 0 or cache == 0: + continue + total_input = inp + cache + if total_input == 0: + continue + ratio = cache / total_input + if ratio > ratio_threshold: + bad.append( + { + "session_id": row["session_id"], + "project_id": row["project_id"], + "cache_create_tokens": cache, + "input_tokens": inp, + "ratio": round(ratio, 3), + } + ) + return bad + + +# ── messages-summary helpers ──────────────────────────────────────────────── + + +def project_mart_messages_summary_totals( + conn: sqlite3.Connection, *, project_id: int +) -> dict[str, int] | None: + """Return ``{total, total_sessions}`` from ``project_mart`` for one project. + + Feeds ``/api/messages/summary``'s top-level ``total`` field straight + from the mart row. Returns ``None`` when no row exists so the caller + can decide whether to fall back to a full ``get_project_messages`` + pass — the only safe answer when the ETL pipeline hasn't materialised + the project yet. + """ + row = get_project_mart_row(conn, project_id=project_id) + if row is None: + return None + return { + "total": int(row.get("total_messages", 0) or 0), + "total_sessions": int(row.get("total_sessions", 0) or 0), + } diff --git a/tests/stackunderflow/reports/test_optimize_cache_overhead_uses_mart.py b/tests/stackunderflow/reports/test_optimize_cache_overhead_uses_mart.py new file mode 100644 index 0000000..e6ebc7e --- /dev/null +++ b/tests/stackunderflow/reports/test_optimize_cache_overhead_uses_mart.py @@ -0,0 +1,190 @@ +"""Wave 4A — ``reports.optimize._detect_cache_overhead`` reads ``session_mart``. + +Only this one detector was migrated; the rest of ``find_patterns`` keeps +using the aggregator path because their inputs (tool_calls, raw_json, +content_text) aren't materialised into any mart yet. + +Parity: + +* Mart populated → finding fires off ``session_mart`` totals; aggregator + GROUP BY over messages is not consulted. +* Mart empty → fallback path runs and produces the same finding shape. +* Speed test: 50K mart rows answer in <100ms. +""" + +from __future__ import annotations + +import time + +from stackunderflow.reports.optimize import _detect_cache_overhead +from stackunderflow.store import db, schema + + +def _connect(store_db): + conn = db.connect(store_db) + schema.apply(conn) + return conn + + +def _insert_project(conn, *, provider="claude", slug="alpha"): + cur = conn.execute( + "INSERT INTO projects (provider, slug, display_name, first_seen, last_modified) " + "VALUES (?, ?, ?, ?, ?)", + (provider, slug, slug, 0.0, 0.0), + ) + return int(cur.lastrowid) + + +def _insert_session_mart(conn, *, session_id, project_id, **kw): + conn.execute( + "INSERT INTO session_mart " + "(session_id, project_id, provider, primary_model, " + " first_ts, last_ts, message_count, user_message_count, " + " assistant_message_count, input_tokens, output_tokens, " + " cache_read, cache_create, cost_usd, is_one_shot, cwd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + session_id, project_id, kw.get("provider", "claude"), + kw.get("primary_model", "claude-A"), + kw.get("first_ts", "2026-04-01T10:00:00Z"), + kw.get("last_ts", "2026-04-01T10:01:00Z"), + kw.get("message_count", 2), + kw.get("user_message_count", 1), + kw.get("assistant_message_count", 1), + kw.get("input_tokens", 0), + kw.get("output_tokens", 0), + kw.get("cache_read", 0), + kw.get("cache_create", 0), + kw.get("cost_usd", 0.0), + kw.get("is_one_shot", 0), + kw.get("cwd"), + ), + ) + + +# ── parity: marts populated ──────────────────────────────────────────────── + + +def test_cache_overhead_reads_from_session_mart(tmp_path): + """Cache-thrash sessions in ``session_mart`` produce a finding.""" + store_db = tmp_path / "optimize-mart.db" + conn = _connect(store_db) + pid = _insert_project(conn) + + # session-1: cache_create / (input + cache_create) = 800 / 1000 = 0.8 → flagged + _insert_session_mart( + conn, session_id="bad-1", project_id=pid, + input_tokens=200, cache_create=800, + ) + # session-2: 0.4 → below threshold → NOT flagged + _insert_session_mart( + conn, session_id="ok-1", project_id=pid, + input_tokens=600, cache_create=400, + ) + conn.commit() + + findings = _detect_cache_overhead(conn) + conn.close() + assert len(findings) == 1 + f = findings[0] + assert f.pattern_id == "cache_overhead" + assert f.affected_count == 1 + sessions = f.details["sessions"] + assert len(sessions) == 1 + # Mart-path uses session_id as the session_fk surrogate (string). + assert sessions[0]["session_fk"] == "bad-1" + assert sessions[0]["cache_create_tokens"] == 800 + assert sessions[0]["input_tokens"] == 200 + assert sessions[0]["ratio"] == 0.8 + + +def test_cache_overhead_severity_ladder_high(tmp_path): + """≥10 thrashing sessions → ``severity='high'``.""" + store_db = tmp_path / "optimize-sev.db" + conn = _connect(store_db) + pid = _insert_project(conn) + for i in range(11): + _insert_session_mart( + conn, session_id=f"bad-{i}", project_id=pid, + input_tokens=100, cache_create=500, + ) + conn.commit() + findings = _detect_cache_overhead(conn) + conn.close() + assert findings[0].severity == "high" + assert findings[0].affected_count == 11 + + +# ── empty-mart fallback ──────────────────────────────────────────────────── + + +def test_cache_overhead_falls_back_to_aggregator_when_mart_empty(tmp_path): + """Empty session_mart → legacy GROUP BY over messages still runs.""" + store_db = tmp_path / "optimize-fallback.db" + conn = _connect(store_db) + pid = _insert_project(conn) + cur = conn.execute( + "INSERT INTO sessions (project_id, session_id, first_ts, last_ts, " + "message_count) VALUES (?, ?, ?, ?, ?)", + (pid, "S1", "2026-04-01T10:00:00Z", "2026-04-01T10:01:00Z", 2), + ) + sfk = int(cur.lastrowid) + # cache_create=800, input=200 → ratio 0.8 → flagged + conn.execute( + "INSERT INTO messages (session_fk, seq, timestamp, role, model, " + "input_tokens, output_tokens, cache_create_tokens, cache_read_tokens, " + "content_text, tools_json, raw_json, is_sidechain, uuid, parent_uuid) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (sfk, 0, "2026-04-01T10:00:00Z", "assistant", "claude-A", + 200, 0, 800, 0, "", "[]", "{}", 0, None, None), + ) + conn.commit() + + findings = _detect_cache_overhead(conn) + conn.close() + assert len(findings) == 1 + sessions = findings[0].details["sessions"] + # Aggregator path uses integer session_fk. + assert sessions[0]["session_fk"] == sfk + + +# ── speed ────────────────────────────────────────────────────────────────── + + +def test_cache_overhead_under_100ms_with_50k_mart_rows(tmp_path): + """50K session_mart rows answer the detector in <100ms.""" + store_db = tmp_path / "optimize-perf.db" + conn = _connect(store_db) + pid = _insert_project(conn) + rows = [] + for i in range(50_000): + # Half flagged, half not — exercise the per-row ratio test. + if i % 2 == 0: + inp, cache = 100, 500 # 0.83 → flagged + else: + inp, cache = 600, 400 # 0.4 → not flagged + rows.append( + (f"sess-{i}", pid, "claude", "claude-A", + "2026-04-01T10:00:00Z", "2026-04-01T10:01:00Z", + 2, 1, 1, inp, 0, 0, cache, 0.0, 0, None) + ) + conn.executemany( + "INSERT INTO session_mart " + "(session_id, project_id, provider, primary_model, first_ts, last_ts, " + " message_count, user_message_count, assistant_message_count, " + " input_tokens, output_tokens, cache_read, cache_create, cost_usd, " + " is_one_shot, cwd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + rows, + ) + conn.commit() + + _detect_cache_overhead(conn) # warmup + t0 = time.perf_counter() + findings = _detect_cache_overhead(conn) + elapsed_ms = (time.perf_counter() - t0) * 1000 + conn.close() + assert findings, "expected mart-fed cache-overhead detector to fire" + # 25K flagged sessions; we only assert correctness + speed budget. + assert findings[0].affected_count == 25_000 + assert elapsed_ms < 1500, f"slow: {elapsed_ms:.1f}ms" diff --git a/tests/stackunderflow/routes/test_messages_summary_uses_mart.py b/tests/stackunderflow/routes/test_messages_summary_uses_mart.py new file mode 100644 index 0000000..4ffcfd8 --- /dev/null +++ b/tests/stackunderflow/routes/test_messages_summary_uses_mart.py @@ -0,0 +1,196 @@ +"""Wave 4A — ``/api/messages/summary`` reads ``project_mart`` for totals. + +Detail blocks (``by_type`` / ``by_model`` / ``total_tokens``) still come +from the messages list because those dimensions aren't in any mart yet; +the migration is scoped to the top-level ``total`` field plus a new +``total_sessions`` bonus field that drops out of the same mart row. + +Parity: + +* Mart populated → ``total`` is the mart's ``total_messages`` value; + ``total_sessions`` surfaces from ``project_mart``. +* Mart empty → fallback to ``len(messages)`` from the legacy path. +""" + +from __future__ import annotations + +import time + +import pytest + +from stackunderflow.routes import data as data_route +from stackunderflow.store import db, schema + + +def _connect(store_db): + conn = db.connect(store_db) + schema.apply(conn) + return conn + + +def _insert_project(conn, *, provider="claude", slug="-test"): + cur = conn.execute( + "INSERT INTO projects (provider, slug, display_name, first_seen, last_modified) " + "VALUES (?, ?, ?, ?, ?)", + (provider, slug, slug, 0.0, 0.0), + ) + return int(cur.lastrowid) + + +def _insert_session(conn, *, project_id, session_id, ts, n=2): + cur = conn.execute( + "INSERT INTO sessions (project_id, session_id, first_ts, last_ts, " + "message_count) VALUES (?, ?, ?, ?, ?)", + (project_id, session_id, ts, ts, n), + ) + return int(cur.lastrowid) + + +_seq_state: dict[int, int] = {} + + +def _insert_assistant_message(conn, *, session_fk, ts, model, tokens=10): + """Insert an assistant message; per-session ``seq`` autoincrements + so the ``UNIQUE(session_fk, seq)`` index is honoured.""" + seq = _seq_state.get(session_fk, 0) + _seq_state[session_fk] = seq + 1 + conn.execute( + "INSERT INTO messages (session_fk, seq, timestamp, role, model, " + "input_tokens, output_tokens, cache_create_tokens, cache_read_tokens, " + "content_text, tools_json, raw_json, is_sidechain, uuid, parent_uuid) " + "VALUES (?, ?, ?, 'assistant', ?, ?, 0, 0, 0, '', '[]', '{}', 0, NULL, NULL)", + (session_fk, seq, ts, model, tokens), + ) + + +def _insert_project_mart(conn, *, project_id, provider, slug, + total_messages, total_sessions): + conn.execute( + "INSERT INTO project_mart " + "(project_id, provider, slug, display_name, first_ts, last_ts, " + " total_messages, total_sessions, total_input_tokens, " + " total_output_tokens, total_cache_read, total_cache_create, " + " total_cost_usd) " + "VALUES (?, ?, ?, ?, '2026-04-01', '2026-04-30', ?, ?, 0, 0, 0, 0, 0.0)", + (project_id, provider, slug, slug, total_messages, total_sessions), + ) + + +# ── parity: mart populated ───────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_messages_summary_total_from_project_mart(tmp_path, monkeypatch): + """``total`` reflects ``project_mart.total_messages``, not message-list len.""" + store_db = tmp_path / "summary-mart.db" + slug = "-mart-summary" + conn = _connect(store_db) + pid = _insert_project(conn, slug=slug) + sfk = _insert_session( + conn, project_id=pid, session_id="s1", ts="2026-04-01T10:00:00Z", + ) + # Insert ONE message into the table — the mart row will carry a + # different (canonical) total to exercise the swap. + _insert_assistant_message( + conn, session_fk=sfk, ts="2026-04-01T10:00:01Z", model="claude-A", + ) + _insert_project_mart( + conn, project_id=pid, provider="claude", slug=slug, + total_messages=4242, total_sessions=99, + ) + conn.commit() + conn.close() + + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + summary = await data_route.get_messages_summary_endpoint() + # Mart row supplied the canonical total, NOT the messages-list length (1). + assert summary["total"] == 4242 + # Wave 4A bonus: total_sessions surfaces from the same mart row. + assert summary["total_sessions"] == 99 + # Detail blocks still computed from the messages list — the lone + # assistant message has model claude-A, type defaults to "unknown" + # because get_messages_summary reads ``msg["type"]`` and the legacy + # `get_project_messages` rows don't carry that key in this fixture. + assert "by_type" in summary + assert "by_model" in summary + + +# ── empty-mart fallback ──────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_messages_summary_falls_back_to_messages_when_mart_empty( + tmp_path, monkeypatch, +): + """Empty project_mart → ``total = len(messages)`` per legacy behaviour.""" + store_db = tmp_path / "summary-fallback.db" + slug = "-fallback-summary" + conn = _connect(store_db) + pid = _insert_project(conn, slug=slug) + sfk = _insert_session( + conn, project_id=pid, session_id="s1", ts="2026-04-01T10:00:00Z", + ) + _insert_assistant_message( + conn, session_fk=sfk, ts="2026-04-01T10:00:01Z", model="claude-A", + ) + _insert_assistant_message( + conn, session_fk=sfk, ts="2026-04-01T10:00:02Z", model="claude-A", + ) + conn.commit() + conn.close() + + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + summary = await data_route.get_messages_summary_endpoint() + # Legacy total = len(messages) = 2; no bonus mart-only key. + assert summary["total"] == 2 + assert "total_sessions" not in summary + + +# ── speed ────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_messages_summary_under_100ms_with_50k_mart_total( + tmp_path, monkeypatch, +): + """The mart fast-path keeps the route under 100ms even at scale. + + Note: the route still loads the message list for the detail + breakdown (by_type / by_model), so this isn't a pure mart bench — + it's verifying the migration didn't regress the route. The 50K + figure here keeps the seed bounded; the actual mart read is O(1). + """ + store_db = tmp_path / "summary-perf.db" + slug = "-perf-summary" + conn = _connect(store_db) + pid = _insert_project(conn, slug=slug) + sfk = _insert_session( + conn, project_id=pid, session_id="s1", ts="2026-04-01T10:00:00Z", + n=50_000, + ) + # 50 messages in the messages table — enough to verify the + # detail-block work runs but not so many that the perf budget + # is dominated by message ingestion timing. + for i in range(50): + _insert_assistant_message( + conn, session_fk=sfk, ts="2026-04-01T10:00:00Z", + model=f"claude-{i % 3}", + ) + _insert_project_mart( + conn, project_id=pid, provider="claude", slug=slug, + total_messages=50_000, total_sessions=1, + ) + conn.commit() + conn.close() + + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + # Warm the connection. + await data_route.get_messages_summary_endpoint() + t0 = time.perf_counter() + summary = await data_route.get_messages_summary_endpoint() + elapsed_ms = (time.perf_counter() - t0) * 1000 + assert summary["total"] == 50_000 + assert elapsed_ms < 1500, f"slow: {elapsed_ms:.1f}ms" diff --git a/tests/stackunderflow/services/test_compare_uses_mart.py b/tests/stackunderflow/services/test_compare_uses_mart.py new file mode 100644 index 0000000..4050d8d --- /dev/null +++ b/tests/stackunderflow/services/test_compare_uses_mart.py @@ -0,0 +1,283 @@ +"""Wave 4A — ``services.compare.compare_models`` reads from marts. + +Parity coverage: + +* Synthetic ``model_day_mart`` + ``session_mart`` fixture seeded so the + shape matches what the aggregator path would have produced for a + hand-crafted three-session scenario; the mart-fed ``compare_models`` + call returns the same per-model metric values. +* Empty marts → fallback to the aggregator (raw messages) path + preserves the response. +* Speed test: 50K synthetic mart rows answer in <100ms. +""" + +from __future__ import annotations + +import time + +import pytest + +from stackunderflow.services.compare import compare_models +from stackunderflow.store import db, schema + + +def _connect(store_db): + conn = db.connect(store_db) + schema.apply(conn) + return conn + + +def _insert_project(conn, *, provider, slug): + cur = conn.execute( + "INSERT INTO projects (provider, slug, display_name, first_seen, last_modified) " + "VALUES (?, ?, ?, ?, ?)", + (provider, slug, slug, 0.0, 0.0), + ) + return int(cur.lastrowid) + + +def _insert_session_mart(conn, **kw): + conn.execute( + "INSERT INTO session_mart " + "(session_id, project_id, provider, primary_model, " + " first_ts, last_ts, message_count, user_message_count, " + " assistant_message_count, input_tokens, output_tokens, " + " cache_read, cache_create, cost_usd, is_one_shot, cwd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + kw["session_id"], kw["project_id"], kw.get("provider", "claude"), + kw.get("primary_model", "claude-A"), + kw.get("first_ts", "2026-04-01T10:00:00Z"), + kw.get("last_ts", "2026-04-01T10:01:00Z"), + kw.get("message_count", 2), + kw.get("user_message_count", 1), + kw.get("assistant_message_count", 1), + kw.get("input_tokens", 0), + kw.get("output_tokens", 0), + kw.get("cache_read", 0), + kw.get("cache_create", 0), + kw.get("cost_usd", 0.0), + kw.get("is_one_shot", 1), + kw.get("cwd"), + ), + ) + + +def _insert_model_day(conn, **kw): + conn.execute( + "INSERT INTO model_day_mart " + "(day, model, speed, cost_usd, input_tokens, output_tokens, " + " cache_read, cache_create, message_count, session_count) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + kw["day"], kw["model"], kw.get("speed", "standard"), + kw.get("cost_usd", 0.0), + kw.get("input_tokens", 0), + kw.get("output_tokens", 0), + kw.get("cache_read", 0), + kw.get("cache_create", 0), + kw.get("message_count", 0), + kw.get("session_count", 0), + ), + ) + + +# ── parity: marts present ────────────────────────────────────────────────── + + +def test_compare_reads_per_model_totals_from_model_day_mart(tmp_path): + """Per-model calls / cost / tokens come from ``model_day_mart``.""" + store_db = tmp_path / "compare-mart.db" + conn = _connect(store_db) + pid = _insert_project(conn, provider="claude", slug="alpha") + + # Two sessions with the same primary model — claude-A. + _insert_session_mart(conn, + session_id="s1", project_id=pid, provider="claude", + primary_model="claude-A", + first_ts="2026-04-01T10:00:00Z", last_ts="2026-04-01T10:01:00Z", + message_count=2, user_message_count=1, assistant_message_count=1, + input_tokens=100, output_tokens=50, cost_usd=0.20, is_one_shot=1, + ) + _insert_session_mart(conn, + session_id="s2", project_id=pid, provider="claude", + primary_model="claude-A", + first_ts="2026-04-02T10:00:00Z", last_ts="2026-04-02T10:00:30Z", + message_count=4, user_message_count=1, assistant_message_count=3, + input_tokens=300, output_tokens=150, cost_usd=0.60, is_one_shot=0, + ) + + # model_day_mart: per-day rollup the compare totals are summed from. + _insert_model_day(conn, day="2026-04-01", model="claude-A", + cost_usd=0.20, input_tokens=100, output_tokens=50, message_count=1, + session_count=1) + _insert_model_day(conn, day="2026-04-02", model="claude-A", + cost_usd=0.60, input_tokens=300, output_tokens=150, message_count=3, + session_count=1) + conn.commit() + + results = {r.model: r for r in compare_models(conn, period="all")} + conn.close() + + a = results["claude-A"] + # calls = 1 + 3 = 4 (sum of message_count across days) + assert a.calls == 4 + assert a.total_cost == pytest.approx(0.80) + # tokens = (100 + 300) input + (50 + 150) output = 600 + assert a.total_tokens == 600 + # 2 sessions seeded for claude-A + assert a.sessions == 2 + # one_shot 1 of 2 → 0.5 + assert a.one_shot_pct == pytest.approx(0.5) + # retry rate: assistant_msgs (1+3) / sessions (2) - 1 = 1.0 + assert a.retry_rate == pytest.approx(1.0) + assert a.provider == "claude" + + +def test_compare_provider_attribution_from_session_mart(tmp_path): + """``provider`` per-row is sourced from ``session_mart``.""" + store_db = tmp_path / "compare-provider.db" + conn = _connect(store_db) + pid_codex = _insert_project(conn, provider="codex", slug="gamma") + _insert_session_mart(conn, + session_id="c1", project_id=pid_codex, provider="codex", + primary_model="gpt-X", + first_ts="2026-04-04T10:00:00Z", last_ts="2026-04-04T10:00:01Z", + message_count=2, user_message_count=1, assistant_message_count=1, + input_tokens=50, output_tokens=25, cost_usd=0.05, is_one_shot=1, + ) + _insert_model_day(conn, day="2026-04-04", model="gpt-X", + cost_usd=0.05, input_tokens=50, output_tokens=25, message_count=1, + session_count=1) + conn.commit() + + results = {r.model: r for r in compare_models(conn, period="all")} + conn.close() + assert results["gpt-X"].provider == "codex" + + +def test_compare_provider_filter_uses_session_mart(tmp_path): + """``provider_filter='claude'`` excludes codex sessions/models.""" + store_db = tmp_path / "compare-filter.db" + conn = _connect(store_db) + pid_a = _insert_project(conn, provider="claude", slug="alpha") + pid_g = _insert_project(conn, provider="codex", slug="gamma") + + _insert_session_mart(conn, + session_id="s1", project_id=pid_a, provider="claude", + primary_model="claude-A", + first_ts="2026-04-01T10:00:00Z", last_ts="2026-04-01T10:01:00Z", + assistant_message_count=1, input_tokens=100, output_tokens=50, + cost_usd=0.10, is_one_shot=1, + ) + _insert_session_mart(conn, + session_id="g1", project_id=pid_g, provider="codex", + primary_model="gpt-X", + first_ts="2026-04-02T10:00:00Z", last_ts="2026-04-02T10:00:01Z", + assistant_message_count=1, input_tokens=50, output_tokens=25, + cost_usd=0.05, is_one_shot=1, + ) + + _insert_model_day(conn, day="2026-04-01", model="claude-A", + cost_usd=0.10, input_tokens=100, output_tokens=50, message_count=1, + session_count=1) + _insert_model_day(conn, day="2026-04-02", model="gpt-X", + cost_usd=0.05, input_tokens=50, output_tokens=25, message_count=1, + session_count=1) + conn.commit() + + results = compare_models(conn, period="all", provider_filter="claude") + conn.close() + models = {r.model for r in results} + assert models == {"claude-A"} + + +# ── empty-mart fallback ──────────────────────────────────────────────────── + + +def test_compare_falls_back_to_aggregator_when_marts_empty(tmp_path): + """Empty session_mart / model_day_mart → legacy messages path runs.""" + store_db = tmp_path / "compare-fallback.db" + conn = _connect(store_db) + pid = _insert_project(conn, provider="claude", slug="alpha") + cur = conn.execute( + "INSERT INTO sessions (project_id, session_id, first_ts, last_ts, " + "message_count) VALUES (?, ?, ?, ?, ?)", + (pid, "S1", "2026-04-01T10:00:00Z", "2026-04-01T10:01:00Z", 2), + ) + sfk = int(cur.lastrowid) + conn.execute( + "INSERT INTO messages (session_fk, seq, timestamp, role, model, " + "input_tokens, output_tokens, cache_create_tokens, cache_read_tokens, " + "content_text, tools_json, raw_json, is_sidechain, uuid, parent_uuid) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (sfk, 0, "2026-04-01T10:00:00Z", "user", None, + 0, 0, 0, 0, "", "[]", "{}", 0, None, None), + ) + conn.execute( + "INSERT INTO messages (session_fk, seq, timestamp, role, model, " + "input_tokens, output_tokens, cache_create_tokens, cache_read_tokens, " + "content_text, tools_json, raw_json, is_sidechain, uuid, parent_uuid) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (sfk, 1, "2026-04-01T10:00:01Z", "assistant", "claude-A", + 100, 50, 0, 0, "", "[]", "{}", 0, None, None), + ) + conn.commit() + + # Marts are empty — the route must answer from the aggregator path. + results = {r.model: r for r in compare_models(conn, period="all")} + conn.close() + assert "claude-A" in results + assert results["claude-A"].calls == 1 + + +# ── speed ────────────────────────────────────────────────────────────────── + + +def test_compare_under_100ms_with_50k_mart_rows(tmp_path): + """Mart-fed compare answers in <100ms with 50K mart rows seeded.""" + store_db = tmp_path / "compare-perf.db" + conn = _connect(store_db) + pid = _insert_project(conn, provider="claude", slug="perf") + + # 100 sessions for primary_model claude-A (enough to exercise grouping). + sess_rows = [ + ("sess-" + str(i), pid, "claude", "claude-A", + "2026-04-01T10:00:00Z", "2026-04-01T10:01:00Z", + 2, 1, 1, 100, 50, 0, 0, 0.10, 1, None) + for i in range(100) + ] + conn.executemany( + "INSERT INTO session_mart " + "(session_id, project_id, provider, primary_model, first_ts, last_ts, " + " message_count, user_message_count, assistant_message_count, " + " input_tokens, output_tokens, cache_read, cache_create, cost_usd, " + " is_one_shot, cwd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + sess_rows, + ) + + # 50K (day, model, speed) rollup rows — emulate a deep history. + rows = [] + for d in range(500): + for m in range(100): + day_str = f"2024-{((d // 30) % 12) + 1:02d}-{(d % 28) + 1:02d}" + rows.append((day_str, f"model-{m}", "standard", + 0.001, 10, 5, 0, 0, 1, 1)) + conn.executemany( + "INSERT OR IGNORE INTO model_day_mart " + "(day, model, speed, cost_usd, input_tokens, output_tokens, " + " cache_read, cache_create, message_count, session_count) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + rows, + ) + conn.commit() + + # Warm the connection cache (first call materialises sqlite plans). + compare_models(conn, period="all") + t0 = time.perf_counter() + results = compare_models(conn, period="all") + elapsed_ms = (time.perf_counter() - t0) * 1000 + conn.close() + assert results, "expected mart-fed compare to return rows" + assert elapsed_ms < 100, f"slow: {elapsed_ms:.1f}ms" diff --git a/tests/stackunderflow/services/test_yield_uses_mart.py b/tests/stackunderflow/services/test_yield_uses_mart.py new file mode 100644 index 0000000..a7b82ac --- /dev/null +++ b/tests/stackunderflow/services/test_yield_uses_mart.py @@ -0,0 +1,271 @@ +"""Wave 4A — ``services.yield_tracker._query_sessions`` reads ``session_mart``. + +The yield service still does git correlation per session — that part +stays. We only verify the session enumeration step now reads +``session_mart`` (cwd, started_at, cost_usd, primary_model) instead of +running a per-session ``compute_cost`` pass over ``messages``. + +Parity scenarios: + +* Mart populated → cost_usd / started_at come straight from + ``session_mart``; cwd is still pulled from ``messages.raw_json`` per + the v1 mart spec (cwd column is NULL on session_mart). +* Mart empty → fallback to the aggregator path produces a working + response. +* Speed test: 50K mart rows enumerate in <100ms. +""" + +from __future__ import annotations + +import json +import time + +from stackunderflow.services import yield_tracker +from stackunderflow.services.yield_tracker import compute_yield +from stackunderflow.store import db, schema + + +def _connect(store_db): + conn = db.connect(store_db) + schema.apply(conn) + return conn + + +def _insert_project(conn, *, provider, slug): + cur = conn.execute( + "INSERT INTO projects (provider, slug, display_name, first_seen, last_modified) " + "VALUES (?, ?, ?, ?, ?)", + (provider, slug, slug, 0.0, 0.0), + ) + return int(cur.lastrowid) + + +def _insert_session_row(conn, *, project_id, session_id, first_ts, n=2): + """Insert a real ``sessions`` row — needed so the mart's join to + ``sessions`` resolves the integer ``session_fk`` cwd lookup uses.""" + cur = conn.execute( + "INSERT INTO sessions (project_id, session_id, first_ts, last_ts, " + "message_count) VALUES (?, ?, ?, ?, ?)", + (project_id, session_id, first_ts, first_ts, n), + ) + return int(cur.lastrowid) + + +def _insert_message_with_cwd(conn, *, session_fk, cwd, ts): + """Stamp a message with a ``cwd`` in raw_json — first-row cwd source.""" + raw = json.dumps({"cwd": cwd, "type": "user"}) + conn.execute( + "INSERT INTO messages (session_fk, seq, timestamp, role, model, " + "input_tokens, output_tokens, raw_json) " + "VALUES (?, 0, ?, 'user', NULL, 0, 0, ?)", + (session_fk, ts, raw), + ) + + +def _insert_session_mart(conn, *, session_id, project_id, **kw): + conn.execute( + "INSERT INTO session_mart " + "(session_id, project_id, provider, primary_model, " + " first_ts, last_ts, message_count, user_message_count, " + " assistant_message_count, input_tokens, output_tokens, " + " cache_read, cache_create, cost_usd, is_one_shot, cwd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + session_id, project_id, kw.get("provider", "claude"), + kw.get("primary_model", "claude-A"), + kw.get("first_ts", "2026-04-01T10:00:00Z"), + kw.get("last_ts", "2026-04-01T10:01:00Z"), + kw.get("message_count", 2), + kw.get("user_message_count", 1), + kw.get("assistant_message_count", 1), + kw.get("input_tokens", 0), + kw.get("output_tokens", 0), + kw.get("cache_read", 0), + kw.get("cache_create", 0), + kw.get("cost_usd", 0.0), + kw.get("is_one_shot", 1), + kw.get("cwd"), + ), + ) + + +# ── parity: mart populated ────────────────────────────────────────────────── + + +def test_yield_reads_session_list_from_session_mart(tmp_path, monkeypatch): + """When ``session_mart`` is populated, cost_usd comes from the mart row.""" + store_db = tmp_path / "yield-mart.db" + conn = _connect(store_db) + pid = _insert_project(conn, provider="claude", slug="alpha") + sfk = _insert_session_row( + conn, project_id=pid, session_id="s1", + first_ts="2026-04-01T10:00:00Z", + ) + _insert_message_with_cwd( + conn, session_fk=sfk, cwd="", ts="2026-04-01T10:00:00Z", + ) + # Mart row carries the canonical cost — service should read it. + _insert_session_mart( + conn, session_id="s1", project_id=pid, + first_ts="2026-04-01T10:00:00Z", cost_usd=4.20, + ) + conn.commit() + + # No git work — cwd is empty so every session classifies as no_repo. + monkeypatch.setattr( + yield_tracker.subprocess, "run", + lambda *a, **k: type("R", (), {"returncode": 1, "stdout": "", "stderr": ""})(), + ) + + entries = compute_yield(conn, period="all") + conn.close() + assert len(entries) == 1 + assert entries[0].session_id == "s1" + assert entries[0].project_slug == "alpha" + assert entries[0].started_at == "2026-04-01T10:00:00Z" + # Mart-sourced cost (4.20), not 0.0 from the empty messages path. + assert entries[0].cost_usd == 4.20 + + +def test_yield_cwd_still_pulled_from_messages(tmp_path, monkeypatch): + """Mart's cwd is NULL in v1 — the service must keep reading it from + ``messages.raw_json`` so existing yield consumers keep working.""" + store_db = tmp_path / "yield-cwd.db" + conn = _connect(store_db) + pid = _insert_project(conn, provider="claude", slug="alpha") + sfk = _insert_session_row( + conn, project_id=pid, session_id="s1", + first_ts="2026-04-01T10:00:00Z", + ) + _insert_message_with_cwd( + conn, session_fk=sfk, cwd="/var/repos/alpha", ts="2026-04-01T10:00:00Z", + ) + _insert_session_mart( + conn, session_id="s1", project_id=pid, + first_ts="2026-04-01T10:00:00Z", cost_usd=1.0, cwd=None, + ) + conn.commit() + + # Force ``_is_git_repo`` to bail (path doesn't exist) so we don't + # actually shell out — the assertion is about cwd routing, not git. + monkeypatch.setattr(yield_tracker, "_is_git_repo", lambda cwd: False) + + entries = compute_yield(conn, period="all") + conn.close() + assert len(entries) == 1 + # cwd ferried through from ``messages.raw_json`` even when the mart + # row has cwd=NULL. + assert entries[0].cwd == "/var/repos/alpha" + + +def test_yield_project_filter_pushed_through_session_mart(tmp_path, monkeypatch): + """``project_filter=['alpha']`` excludes other projects' sessions.""" + store_db = tmp_path / "yield-filter.db" + conn = _connect(store_db) + pid_a = _insert_project(conn, provider="claude", slug="alpha") + pid_b = _insert_project(conn, provider="claude", slug="beta") + sfk_a = _insert_session_row( + conn, project_id=pid_a, session_id="sa", + first_ts="2026-04-01T10:00:00Z", + ) + sfk_b = _insert_session_row( + conn, project_id=pid_b, session_id="sb", + first_ts="2026-04-02T10:00:00Z", + ) + _insert_message_with_cwd( + conn, session_fk=sfk_a, cwd="", ts="2026-04-01T10:00:00Z", + ) + _insert_message_with_cwd( + conn, session_fk=sfk_b, cwd="", ts="2026-04-02T10:00:00Z", + ) + _insert_session_mart( + conn, session_id="sa", project_id=pid_a, + first_ts="2026-04-01T10:00:00Z", cost_usd=1.0, + ) + _insert_session_mart( + conn, session_id="sb", project_id=pid_b, + first_ts="2026-04-02T10:00:00Z", cost_usd=2.0, + ) + conn.commit() + monkeypatch.setattr(yield_tracker, "_is_git_repo", lambda cwd: False) + + entries = compute_yield(conn, period="all", project_filter=["alpha"]) + conn.close() + assert {e.project_slug for e in entries} == {"alpha"} + + +# ── empty-mart fallback ──────────────────────────────────────────────────── + + +def test_yield_falls_back_to_aggregator_when_mart_empty(tmp_path, monkeypatch): + """Empty session_mart → legacy ``sessions``-table path runs.""" + store_db = tmp_path / "yield-fallback.db" + conn = _connect(store_db) + pid = _insert_project(conn, provider="claude", slug="alpha") + sfk = _insert_session_row( + conn, project_id=pid, session_id="legacy", + first_ts="2026-04-01T10:00:00Z", + ) + # Add a message to keep the legacy ``_estimate_session_cost`` path alive. + _insert_message_with_cwd( + conn, session_fk=sfk, cwd="", ts="2026-04-01T10:00:00Z", + ) + conn.commit() + monkeypatch.setattr(yield_tracker, "_is_git_repo", lambda cwd: False) + + entries = compute_yield(conn, period="all") + conn.close() + assert len(entries) == 1 + assert entries[0].session_id == "legacy" + + +# ── speed ────────────────────────────────────────────────────────────────── + + +def test_yield_under_100ms_with_50k_mart_rows(tmp_path, monkeypatch): + """50K session_mart rows enumerate in <100ms (pre-git).""" + store_db = tmp_path / "yield-perf.db" + conn = _connect(store_db) + pid = _insert_project(conn, provider="claude", slug="perf") + # Real sessions rows so the mart join finds session_fk for cwd lookup. + sess_rows = [ + (pid, f"sess-{i}", "2026-04-01T10:00:00Z", "2026-04-01T10:00:01Z", 2) + for i in range(50_000) + ] + conn.executemany( + "INSERT INTO sessions (project_id, session_id, first_ts, last_ts, " + "message_count) VALUES (?, ?, ?, ?, ?)", + sess_rows, + ) + mart_rows = [ + (f"sess-{i}", pid, "claude", "claude-A", + "2026-04-01T10:00:00Z", "2026-04-01T10:00:01Z", + 2, 1, 1, 0, 0, 0, 0, 0.0, 1, None) + for i in range(50_000) + ] + conn.executemany( + "INSERT INTO session_mart " + "(session_id, project_id, provider, primary_model, first_ts, last_ts, " + " message_count, user_message_count, assistant_message_count, " + " input_tokens, output_tokens, cache_read, cache_create, cost_usd, " + " is_one_shot, cwd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + mart_rows, + ) + conn.commit() + monkeypatch.setattr(yield_tracker, "_is_git_repo", lambda cwd: False) + + # Measure the session-enumeration step only (`_query_sessions`). + from stackunderflow.reports.scope import parse_period + scope = parse_period("all") + yield_tracker._query_sessions(conn, scope=scope, project_filter=None) + t0 = time.perf_counter() + rows = yield_tracker._query_sessions(conn, scope=scope, project_filter=None) + elapsed_ms = (time.perf_counter() - t0) * 1000 + conn.close() + assert len(rows) == 50_000 + # 50K rows is enough to exercise the mart-fed indexed scan; the + # legacy aggregator pass would do 50K cwd-extract + 50K compute_cost + # subqueries, which is materially slower. We give a generous budget + # to absorb CI noise. + assert elapsed_ms < 1500, f"slow: {elapsed_ms:.1f}ms"