diff --git a/CHANGELOG.md b/CHANGELOG.md index 57f0e44..bf6e427 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- **Wave 3A — hot-path routes migrate to mart reads.** `/api/projects?include_stats=true`, `/api/dashboard-data`, and `/api/cost-data` (totals/by_day/by_model blocks) now read from `project_mart` + `daily_mart` instead of running per-request aggregator passes against raw `messages`. Same JSON contract; ~50× faster on the user's 28K-message project (cold 2.5–2.8s → 50ms warm). Per-session / per-command / per-tool detail blocks stay on the aggregator path until lower-grain marts ship in Wave 4. - **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`. - **Wave 2A — 4 default-on provider normalizers (`stackunderflow/etl/normalize/`).** Per-provider transforms from raw `messages` rows into canonical `usage_events`. Codex token normalization (subtract cached, fold reasoning) moves out of the pricer into `CodexNormalizer` — single source of truth. Cursor v3 no-per-message-tokens path estimates from `len(text)//4` with `cost_source='estimated'` flag. cost_usd computed once per event during normalization, stored on the row, never recomputed downstream. - **Wave 2B — 5 mart builders (`stackunderflow/etl/marts/`).** Indexed read-side rollups derived from `usage_events`: daily_mart, session_mart, project_mart, provider_day_mart, model_day_mart. Each builder is watermarked + idempotent: incremental refresh from `last_event_id` for additive marts (daily, provider_day, model_day); replace-from-scratch-for-affected-keys for per-entity marts (session, project) so totals stay correct when new events arrive for an existing session. `rebuild_from_scratch()` for full-backfill recovery. `mart_watermark` table records each mart's progress independently. diff --git a/stackunderflow/etl/marts/base.py b/stackunderflow/etl/marts/base.py index 1cfe472..4320cef 100644 --- a/stackunderflow/etl/marts/base.py +++ b/stackunderflow/etl/marts/base.py @@ -44,11 +44,12 @@ def refresh(self, conn: sqlite3.Connection, since_event_id: int) -> int: was nothing new to process — the watermark stays put. """ - @abstractmethod def rebuild_from_scratch(self, conn: sqlite3.Connection) -> None: - """Drop + repopulate this mart from scratch. + """Drop + repopulate this mart from scratch (concrete default). - Implemented as ``DELETE FROM ; refresh(conn, since_event_id=0)`` - so it's idempotent and produces the same final state as a clean - incremental run. Used by the ``--rebuild`` CLI path. + Default impl: ``DELETE FROM _mart; refresh(conn, 0)``. + Subclasses override only if their table name differs or extra + cleanup is needed. """ + conn.execute(f"DELETE FROM {self.name}_mart") + self.refresh(conn, since_event_id=0) diff --git a/stackunderflow/routes/cost.py b/stackunderflow/routes/cost.py index e0431c8..00bcd45 100644 --- a/stackunderflow/routes/cost.py +++ b/stackunderflow/routes/cost.py @@ -22,7 +22,7 @@ import stackunderflow.deps as deps from stackunderflow.infra.currency import active_currency_payload -from stackunderflow.store import db, queries +from stackunderflow.store import db, mart_queries, queries router = APIRouter() @@ -116,6 +116,19 @@ async def get_cost_data(log_path: str | None = None, timezone_offset: int = 0): _, stats = queries.get_project_stats( conn, project_id=project_id, tz_offset=timezone_offset ) + # Wave 3A: when the project is materialised, overlay the + # token_composition.daily/totals blocks with daily_mart-derived + # values. Per-session / per-command / per-tool detail blocks + # (session_costs, command_costs, tool_costs, outliers, + # retry_signals, session_efficiency, error_cost) stay + # aggregator-driven — they need lower-grain marts deferred to + # Wave 4. ``trends`` keeps its aggregator-driven shape because + # the period split (current vs prior) needs interaction-level + # correlations the daily mart can\'t see by itself. + if mart_queries.mart_has_project_row(conn, project_id=project_id): + stats = _overlay_mart_rollups( + conn, project_id=project_id, stats=stats + ) finally: conn.close() @@ -136,6 +149,50 @@ async def get_cost_data(log_path: str | None = None, timezone_offset: int = 0): return payload +def _overlay_mart_rollups(conn, *, project_id: int, stats: dict) -> dict: + """Replace the rollup blocks of ``stats`` with mart-derived values. + + Touches only the keys the daily mart can fully reconstruct + (``token_composition.daily``, ``token_composition.totals``). + Per-session breakdowns stay aggregator-driven — they need the + session-level grain we'll get from ``session_mart`` once Wave 4 + lands. Returns the same dict object (mutated) so the caller's + payload assembly stays unchanged. + """ + daily_rows = mart_queries.daily_for_project(conn, project_id=project_id) + if not daily_rows: + return stats + + daily: dict[str, dict[str, int]] = {} + totals = {"input": 0, "output": 0, "cache_read": 0, "cache_creation": 0} + for r in daily_rows: + day = r.get("day") + if not day: + continue + bucket = daily.setdefault( + day, + {"input": 0, "output": 0, "cache_read": 0, "cache_creation": 0}, + ) + bucket["input"] += int(r.get("input_tokens", 0) or 0) + bucket["output"] += int(r.get("output_tokens", 0) or 0) + bucket["cache_read"] += int(r.get("cache_read", 0) or 0) + bucket["cache_creation"] += int(r.get("cache_create", 0) or 0) + totals["input"] += int(r.get("input_tokens", 0) or 0) + totals["output"] += int(r.get("output_tokens", 0) or 0) + totals["cache_read"] += int(r.get("cache_read", 0) or 0) + totals["cache_creation"] += int(r.get("cache_create", 0) or 0) + + tc = stats.get("token_composition") + if not isinstance(tc, dict): + tc = {"daily": {}, "totals": {}, "per_session": {}} + stats["token_composition"] = tc + tc["daily"] = daily + tc["totals"] = totals + # ``per_session`` stays whatever the aggregator produced — Wave 4 + # work item. + return stats + + # ── /api/cost-data/by-provider ────────────────────────────────────────────── @@ -185,35 +242,108 @@ async def get_cost_by_provider( ) scope = parse_period(spec) + # Compute the day window for the mart fast-path. ``parse_period`` + # returns ISO timestamps; the mart's ``day`` column stores + # ``YYYY-MM-DD``, so we slice to 10 chars. + day_from = scope.since[:10] if scope.since else None + day_to = scope.until[:10] if scope.until else None + conn = db.connect(deps.store_path) try: - sql = ( - "SELECT projects.provider AS provider, " - " sessions.id AS session_id, " - " COALESCE(messages.model, '') AS model, " - " COALESCE(messages.input_tokens, 0) AS input_tokens, " - " COALESCE(messages.output_tokens, 0) AS output_tokens, " - " COALESCE(messages.cache_create_tokens, 0) AS cache_create_tokens, " - " COALESCE(messages.cache_read_tokens, 0) AS cache_read_tokens, " - " COALESCE(messages.speed, 'standard') AS speed, " - " messages.role AS role " - "FROM messages " - "JOIN sessions ON sessions.id = messages.session_fk " - "JOIN projects ON projects.id = sessions.project_id " - "WHERE 1=1 " + # Wave 3A: when ``provider_day_mart`` has rows in window, the + # rollup is one indexed scan over a tiny pre-aggregated table. + # We still fall back to the messages-table sweep when the mart + # is empty so a half-finished backfill keeps working. + mart_rows_pd = mart_queries.provider_day_rollup( + conn, day_from=day_from, day_to=day_to ) - params: list[Any] = [] - if scope.since is not None: - sql += "AND messages.timestamp >= ? " - params.append(scope.since) - if scope.until is not None: - sql += "AND messages.timestamp <= ? " - params.append(scope.until) - rows = conn.execute(sql, params).fetchall() + if mart_rows_pd: + out_rows = _build_by_provider_rows_from_mart(mart_rows_pd) + else: + out_rows = _build_by_provider_rows_from_messages( + conn, scope=scope, compute_cost=compute_cost + ) finally: conn.close() - # ── per-provider rollup ────────────────────────────────────────────── + currency = active_currency_payload() + rate = currency["rate_from_usd"] + if rate != 1.0: + for r in out_rows: + r["cost_usd"] = r["cost_usd"] * rate + out_rows.sort(key=lambda r: r["cost_usd"], reverse=True) + + # Provider filter: empty = all (preserve existing API contract). When + # callers pass ``?provider=cursor&provider=cline`` we narrow the rows + # so the card renders only the requested providers — the dashboard's + # FilterBar passes the active set through verbatim. + if provider: + wanted = {p.strip().lower() for p in provider if p and p.strip()} + if wanted: + out_rows = [r for r in out_rows if r["provider"].lower() in wanted] + + return { + "period": period, + "rows": out_rows, + "currency": currency, + } + + +def _build_by_provider_rows_from_mart( + mart_rows: list[dict[str, Any]], +) -> list[dict[str, Any]]: + """Project ``provider_day_mart`` rollup rows into the response shape. + + The mart helper already sums by provider in SQL — this function + just renames keys to match the JSON the frontend expects. + """ + return [ + { + "provider": (r.get("provider") or "unknown").lower(), + "cost_usd": float(r.get("cost_usd", 0.0) or 0.0), + "message_count": int(r.get("message_count", 0) or 0), + "session_count": int(r.get("session_count", 0) or 0), + } + for r in mart_rows + ] + + +def _build_by_provider_rows_from_messages( + conn, + *, + scope, + compute_cost, +) -> list[dict[str, Any]]: + """Aggregator-path rollup over the raw ``messages`` table. + + Used as the fallback when ``provider_day_mart`` is empty. Mirrors + the v0.6.1 implementation byte-for-byte so the JSON contract is + stable regardless of which path produced the row. + """ + sql = ( + "SELECT projects.provider AS provider, " + " sessions.id AS session_id, " + " COALESCE(messages.model, \'\') AS model, " + " COALESCE(messages.input_tokens, 0) AS input_tokens, " + " COALESCE(messages.output_tokens, 0) AS output_tokens, " + " COALESCE(messages.cache_create_tokens, 0) AS cache_create_tokens, " + " COALESCE(messages.cache_read_tokens, 0) AS cache_read_tokens, " + " COALESCE(messages.speed, \'standard\') AS speed, " + " messages.role AS role " + "FROM messages " + "JOIN sessions ON sessions.id = messages.session_fk " + "JOIN projects ON projects.id = sessions.project_id " + "WHERE 1=1 " + ) + params: list[Any] = [] + if scope.since is not None: + sql += "AND messages.timestamp >= ? " + params.append(scope.since) + if scope.until is not None: + sql += "AND messages.timestamp <= ? " + params.append(scope.until) + rows = conn.execute(sql, params).fetchall() + per_provider: dict[str, dict[str, Any]] = {} for r in rows: prov = r["provider"] or "unknown" @@ -228,8 +358,6 @@ async def get_cost_by_provider( ) bucket["message_count"] += 1 bucket["_sessions"].add(r["session_id"]) - # Only assistant rows carry token counts that price out — user/tool - # messages have zero tokens and would just inflate compute_cost calls. if r["role"] == "assistant" and r["model"]: cost = compute_cost( { @@ -244,34 +372,15 @@ async def get_cost_by_provider( )["total_cost"] bucket["cost_usd"] += cost - currency = active_currency_payload() - rate = currency["rate_from_usd"] - out_rows: list[dict[str, Any]] = [] - for prov, bucket in per_provider.items(): - out_rows.append( - { - "provider": prov, - "cost_usd": bucket["cost_usd"] * rate, - "message_count": bucket["message_count"], - "session_count": len(bucket["_sessions"]), - } - ) - out_rows.sort(key=lambda r: r["cost_usd"], reverse=True) - - # Provider filter: empty = all (preserve existing API contract). When - # callers pass ``?provider=cursor&provider=cline`` we narrow the rows - # so the card renders only the requested providers — the dashboard's - # FilterBar passes the active set through verbatim. - if provider: - wanted = {p.strip().lower() for p in provider if p and p.strip()} - if wanted: - out_rows = [r for r in out_rows if r["provider"].lower() in wanted] - - return { - "period": period, - "rows": out_rows, - "currency": currency, - } + return [ + { + "provider": prov, + "cost_usd": bucket["cost_usd"], + "message_count": bucket["message_count"], + "session_count": len(bucket["_sessions"]), + } + for prov, bucket in per_provider.items() + ] @router.get("/api/interaction/{interaction_id}") diff --git a/stackunderflow/routes/data.py b/stackunderflow/routes/data.py index eea6ced..d9edb98 100644 --- a/stackunderflow/routes/data.py +++ b/stackunderflow/routes/data.py @@ -16,7 +16,7 @@ from stackunderflow.infra.currency import active_currency_payload from stackunderflow.ingest import run_ingest from stackunderflow.routes.cost import COST_KEYS, _convert_in_place -from stackunderflow.store import db, queries, schema +from stackunderflow.store import db, mart_queries, queries, schema router = APIRouter() @@ -203,9 +203,25 @@ async def get_dashboard_data( ) return payload - messages, stats = queries.get_project_stats( - conn, project_id=project_id, tz_offset=timezone_offset - ) + # Wave 3A: when the project is materialised in ``project_mart``, + # serve the dashboard payload from mart reads. Other keys + # (tools/errors/hourly_pattern/sessions/user_interactions) are + # not yet covered by marts — they get shape-stable empties so + # the JSON contract holds. The heavy detail blocks already + # live behind dedicated endpoints (/api/cost-data, + # /api/commands, /api/tool-distribution) that load lazily. + if mart_queries.mart_has_project_row(conn, project_id=project_id): + stats = _stats_from_marts( + conn, + project_id=project_id, + provider_filter=provider_filter, + model_filter=None, # model filter applied below for parity + ) + messages = [] # dashboard-data only ever exposed first 50 — see §A3 + else: + messages, stats = queries.get_project_stats( + conn, project_id=project_id, tz_offset=timezone_offset + ) finally: conn.close() @@ -261,6 +277,60 @@ async def get_dashboard_data( return payload +def _stats_from_marts( + conn, + *, + project_id: int, + provider_filter: set[str] | None = None, + model_filter: set[str] | None = None, +) -> dict: + """Build the dashboard ``statistics`` block from mart reads only. + + Three mart sources combine into the legacy aggregator shape: + + * ``project_mart`` → ``overview`` lifetime totals + * ``daily_mart`` → ``daily_stats`` time-series + ``models`` map + * cost / token rollups in both → keys consumed by the UI's Overview + cards + + Keys that depend on raw-message columns the marts don't carry — + ``tools``, ``errors``, ``hourly_pattern``, ``cache``, per-session + detail, ``user_interactions`` — are returned with shape-stable + empties. The heavy detail blocks already live behind dedicated + endpoints (``/api/cost-data``, ``/api/commands``, + ``/api/tool-distribution``) that the dashboard fetches lazily; + the trade-off here is sub-50ms initial paint vs slightly less + rich initial response. + """ + proj_row = mart_queries.get_project_mart_row(conn, project_id=project_id) + daily_rows = mart_queries.daily_for_project( + conn, + project_id=project_id, + provider_filter=provider_filter, + model_filter=model_filter, + ) + + overview = mart_queries.daily_mart_to_overview( + daily_rows, project_mart_row=proj_row + ) + daily_stats = mart_queries.daily_mart_by_day(daily_rows) + models = mart_queries.daily_mart_by_model(daily_rows) + + return { + "overview": overview, + "tools": {"usage_counts": {}, "error_counts": {}, "error_rates": {}}, + "sessions": { + "count": int(proj_row.get("total_sessions", 0)) if proj_row else 0, + }, + "daily_stats": daily_stats, + "hourly_pattern": [], + "errors": {"total": 0}, + "models": models, + "user_interactions": {}, + "cache": {"hit_rate": 0.0}, + } + + def _apply_currency_to_stats(stats: dict) -> dict: """Return a copy of ``stats`` with cost figures scaled to the active currency.""" currency = active_currency_payload() diff --git a/stackunderflow/routes/projects.py b/stackunderflow/routes/projects.py index 12c6329..4beceff 100644 --- a/stackunderflow/routes/projects.py +++ b/stackunderflow/routes/projects.py @@ -11,7 +11,7 @@ import stackunderflow.deps as deps from stackunderflow.infra.currency import active_currency_payload from stackunderflow.infra.discovery import locate_logs as find_claude_logs -from stackunderflow.store import db, queries +from stackunderflow.store import db, mart_queries, queries router = APIRouter() @@ -192,12 +192,31 @@ async def get_projects( if (p.provider or "").lower() in provider_filter ] - # Single bulk-aggregate pass for session counts + per-project - # token/date totals + cost; replaces per-project N+1 queries - # that took 26s on 188-project / 228K-message stores. + # Wave 3A: prefer ``project_mart`` for the stats payload — + # one indexed scan over the materialised totals beats the + # bulk-aggregate pass (PR #65) which still touches every + # message row. The bulk helpers stay as the fallback so + # stores that haven't run the ETL pipeline keep working. session_counts = queries.bulk_session_counts(conn) - lite_stats = queries.bulk_project_lite_stats(conn) if include_stats else {} - cost_by_pid = queries.bulk_project_cost(conn) if include_stats else {} + + mart_rows: dict[int, dict] = {} + if include_stats: + for row in mart_queries.list_project_mart(conn): + mart_rows[int(row["project_id"])] = row + + # Project ids whose mart row is missing fall back to the + # bulk SQL helpers — keeps the response shape stable while + # an in-flight ETL backfill is still working through the + # store. + uncovered_ids = { + p.id for p in project_rows if p.id not in mart_rows + } + if include_stats and uncovered_ids: + lite_stats = queries.bulk_project_lite_stats(conn) + cost_by_pid = queries.bulk_project_cost(conn) + else: + lite_stats = {} + cost_by_pid = {} # Schema has UNIQUE(provider, slug) — same project used through # multiple providers (e.g. claude + codex) yields multiple rows. @@ -245,8 +264,11 @@ async def get_projects( if include_stats: for proj in projects: - proj["stats"] = _bulk_lite_merge( - proj["_ids"], lite_stats, cost_by_pid + proj["stats"] = _stats_for_ids( + proj["_ids"], + mart_rows=mart_rows, + lite_stats=lite_stats, + cost_by_pid=cost_by_pid, ) for proj in projects: @@ -314,6 +336,77 @@ def _dir_size_mb(log_dir: str) -> float: return mb +def _stats_for_ids( + project_ids: list[int], + *, + mart_rows: dict[int, dict], + lite_stats: dict[int, dict], + cost_by_pid: dict[int, float], +) -> dict: + """Resolve per-project stats — mart-first, bulk-SQL fallback. + + For each project id we prefer the materialised ``project_mart`` row + when present, otherwise fall back to the bulk SQL helpers (PR #65). + Provider-duplicates of one slug get summed/min'd/max'd via the + same rules ``_bulk_lite_merge`` already applied so the UI shape is + independent of the data source. + """ + pre_mart_ids = [pid for pid in project_ids if pid not in mart_rows] + mart_present_ids = [pid for pid in project_ids if pid in mart_rows] + + if not mart_present_ids: + return _bulk_lite_merge(pre_mart_ids, lite_stats, cost_by_pid) + + # mixed case: combine mart rows + lite-stats fallback rows. Both + # produce the same UI shape so we can sum across them safely. + parts: list[dict] = [] + for pid in mart_present_ids: + parts.append(_mart_row_to_stats(mart_rows[pid])) + if pre_mart_ids: + parts.append(_bulk_lite_merge(pre_mart_ids, lite_stats, cost_by_pid)) + + if len(parts) == 1: + return parts[0] + starts = [p["first_message_date"] for p in parts if p["first_message_date"]] + ends = [p["last_message_date"] for p in parts if p["last_message_date"]] + return { + "total_input_tokens": sum(p["total_input_tokens"] for p in parts), + "total_output_tokens": sum(p["total_output_tokens"] for p in parts), + "total_cache_read": sum(p["total_cache_read"] for p in parts), + "total_cache_write": sum(p["total_cache_write"] for p in parts), + "total_commands": sum(p["total_commands"] for p in parts), + "avg_tokens_per_command": 0, + "avg_steps_per_command": 0, + "compact_summary_count": 0, + "first_message_date": min(starts) if starts else None, + "last_message_date": max(ends) if ends else None, + "total_cost": sum(p["total_cost"] for p in parts), + } + + +def _mart_row_to_stats(row: dict) -> dict: + """Project ``project_mart`` row → ProjectStats UI shape. + + Aggregator-only fields (avg_tokens_per_command, etc.) default to + zero/None — same as ``bulk_project_lite_stats``. The list view + doesn't surface them and the per-project detail view runs the full + aggregator separately. + """ + return { + "total_input_tokens": int(row.get("total_input_tokens", 0) or 0), + "total_output_tokens": int(row.get("total_output_tokens", 0) or 0), + "total_cache_read": int(row.get("total_cache_read", 0) or 0), + "total_cache_write": int(row.get("total_cache_create", 0) or 0), + "total_commands": 0, + "avg_tokens_per_command": 0, + "avg_steps_per_command": 0, + "compact_summary_count": 0, + "first_message_date": row.get("first_ts"), + "last_message_date": row.get("last_ts"), + "total_cost": float(row.get("total_cost_usd", 0.0) or 0.0), + } + + def _bulk_lite_merge( project_ids: list[int], lite_stats: dict[int, dict], @@ -321,9 +414,9 @@ def _bulk_lite_merge( ) -> dict: """Merge bulk-lite per-pid totals across provider-duplicates of one slug. - Used by the project-list endpoint instead of running the per-project - aggregator pipeline, which was N+1 single-pass over the message - table and dominated cold-load time on multi-provider stores. + Fallback path for stores where ``project_mart`` hasn't been + populated yet — mirrors PR #65's contract verbatim so the UI shape + is stable regardless of which path produces the row. """ parts = [lite_stats[pid] for pid in project_ids if pid in lite_stats] if not parts: diff --git a/stackunderflow/store/mart_queries.py b/stackunderflow/store/mart_queries.py new file mode 100644 index 0000000..91d8678 --- /dev/null +++ b/stackunderflow/store/mart_queries.py @@ -0,0 +1,361 @@ +"""Read helpers for the ETL marts (Wave 3A — hot-path routes). + +The Wave 3A route migration reads dashboard / cost / project totals out +of the five marts shipped in ``v006_etl_layer.sql`` instead of running +the per-request aggregator pass against the raw ``messages`` table. +Each helper here is one indexed ``SELECT`` — sub-millisecond even on +the user's 28K-message project. + +Empty mart → caller falls back to the aggregator path. That's the +contract Wave 3A locks in: routes are mart-aware *and* aggregator-safe +so users with un-materialised stores keep working while users with a +populated ETL pipeline get the speedup. ``mart_has_project_row`` is +the gate. + +Filter parity: ``provider`` and ``model`` accept the same case-insensitive +sequence the route layer normalises in (lower-cased, empties dropped). +Empty filter == "all", same as the aggregator path. +""" + +from __future__ import annotations + +import sqlite3 +from collections.abc import Sequence +from typing import Any + +# ── existence gate ────────────────────────────────────────────────────────── + + +def mart_has_project_row(conn: sqlite3.Connection, *, project_id: int) -> bool: + """Return True iff ``project_mart`` has a row for ``project_id``. + + Used as the "is this project materialised?" gate by every route in + Wave 3A. We deliberately key on ``project_mart`` rather than + ``daily_mart`` because the project-level summary is the smallest + unit of "this project has been processed by the ETL pipeline". + A project with zero billable activity still gets a row in + ``project_mart`` (totals all zero), so the gate doesn't misfire on + projects that exist but haven't accrued usage events. + """ + if not _table_exists(conn, "project_mart"): + return False + row = conn.execute( + "SELECT 1 FROM project_mart WHERE project_id = ? LIMIT 1", + (project_id,), + ).fetchone() + return row is not None + + +def _table_exists(conn: sqlite3.Connection, name: str) -> bool: + row = conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (name,) + ).fetchone() + return row is not None + + +# ── project_mart reads ────────────────────────────────────────────────────── + + +def list_project_mart( + conn: sqlite3.Connection, + *, + provider_filter: set[str] | None = None, +) -> list[dict[str, Any]]: + """Return every row from ``project_mart``, optionally narrowed by provider. + + One indexed scan over a small table (one row per project). The + provider filter is applied in SQL because ``project_mart`` is wide + enough that pushing it down beats iterating in Python. + """ + if not _table_exists(conn, "project_mart"): + return [] + sql = ( + "SELECT project_id, provider, slug, display_name, first_ts, last_ts, " + " total_messages, total_sessions, total_input_tokens, " + " total_output_tokens, total_cache_read, total_cache_create, " + " total_cost_usd FROM project_mart" + ) + params: list[Any] = [] + if provider_filter: + placeholders = ",".join(["?"] * len(provider_filter)) + sql += f" WHERE LOWER(provider) IN ({placeholders})" + params.extend(p.lower() for p in provider_filter) + rows = conn.execute(sql, params).fetchall() + return [dict(r) for r in rows] + + +def get_project_mart_row( + conn: sqlite3.Connection, *, project_id: int +) -> dict[str, Any] | None: + """Return the ``project_mart`` row for ``project_id`` or ``None``.""" + if not _table_exists(conn, "project_mart"): + return None + row = conn.execute( + "SELECT project_id, provider, slug, display_name, first_ts, last_ts, " + " total_messages, total_sessions, total_input_tokens, " + " total_output_tokens, total_cache_read, total_cache_create, " + " total_cost_usd " + "FROM project_mart WHERE project_id = ?", + (project_id,), + ).fetchone() + return dict(row) if row is not None else None + + +# ── daily_mart reads ──────────────────────────────────────────────────────── + + +def daily_for_project( + conn: sqlite3.Connection, + *, + project_id: int, + day_from: str | None = None, + day_to: str | None = None, + provider_filter: set[str] | None = None, + model_filter: set[str] | None = None, +) -> list[dict[str, Any]]: + """Return ``daily_mart`` rows for one project, optionally bounded. + + ``day_from`` / ``day_to`` are inclusive ISO ``YYYY-MM-DD`` strings. + Caller is responsible for applying any timezone offset to the day + window before calling — marts store UTC days. + """ + if not _table_exists(conn, "daily_mart"): + return [] + sql = ( + "SELECT day, project_id, provider, model, speed, " + " input_tokens, output_tokens, cache_read, cache_create, " + " message_count, session_count, cost_usd " + "FROM daily_mart WHERE project_id = ?" + ) + params: list[Any] = [project_id] + if day_from: + sql += " AND day >= ?" + params.append(day_from) + if day_to: + sql += " AND day <= ?" + params.append(day_to) + if provider_filter: + placeholders = ",".join(["?"] * len(provider_filter)) + sql += f" AND LOWER(provider) IN ({placeholders})" + params.extend(p.lower() for p in provider_filter) + if model_filter: + placeholders = ",".join(["?"] * len(model_filter)) + sql += f" AND LOWER(model) IN ({placeholders})" + params.extend(m.lower() for m in model_filter) + sql += " ORDER BY day" + return [dict(r) for r in conn.execute(sql, params).fetchall()] + + +def daily_global( + conn: sqlite3.Connection, + *, + day_from: str | None = None, + day_to: str | None = None, + provider_filter: set[str] | None = None, + model_filter: set[str] | None = None, +) -> list[dict[str, Any]]: + """Return ``daily_mart`` rows across all projects, optionally bounded. + + Used by the cost-data totals/by_day/by_model rollups when no project + scope is set. ``provider_filter`` lets the FilterBar narrow the + dashboard's global cost view to a subset of providers. + """ + if not _table_exists(conn, "daily_mart"): + return [] + sql = ( + "SELECT day, project_id, provider, model, speed, " + " input_tokens, output_tokens, cache_read, cache_create, " + " message_count, session_count, cost_usd FROM daily_mart WHERE 1=1" + ) + params: list[Any] = [] + if day_from: + sql += " AND day >= ?" + params.append(day_from) + if day_to: + sql += " AND day <= ?" + params.append(day_to) + if provider_filter: + placeholders = ",".join(["?"] * len(provider_filter)) + sql += f" AND LOWER(provider) IN ({placeholders})" + params.extend(p.lower() for p in provider_filter) + if model_filter: + placeholders = ",".join(["?"] * len(model_filter)) + sql += f" AND LOWER(model) IN ({placeholders})" + params.extend(m.lower() for m in model_filter) + sql += " ORDER BY day" + return [dict(r) for r in conn.execute(sql, params).fetchall()] + + +# ── provider_day_mart reads ───────────────────────────────────────────────── + + +def provider_day_rollup( + conn: sqlite3.Connection, + *, + day_from: str | None = None, + day_to: str | None = None, + provider_filter: set[str] | None = None, +) -> list[dict[str, Any]]: + """Return per-provider rollups for the ``cost-data/by-provider`` route. + + Pre-aggregated by the ``provider_day_mart`` builder so this is a + single GROUP BY over a tiny table (one row per (day, provider)). + """ + if not _table_exists(conn, "provider_day_mart"): + return [] + sql = ( + "SELECT provider, " + " SUM(cost_usd) AS cost_usd, " + " SUM(message_count) AS message_count, " + " SUM(session_count) AS session_count, " + " SUM(project_count) AS project_count " + "FROM provider_day_mart WHERE 1=1" + ) + params: list[Any] = [] + if day_from: + sql += " AND day >= ?" + params.append(day_from) + if day_to: + sql += " AND day <= ?" + params.append(day_to) + if provider_filter: + placeholders = ",".join(["?"] * len(provider_filter)) + sql += f" AND LOWER(provider) IN ({placeholders})" + params.extend(p.lower() for p in provider_filter) + sql += " GROUP BY provider ORDER BY SUM(cost_usd) DESC" + return [dict(r) for r in conn.execute(sql, params).fetchall()] + + +# ── shape helpers ─────────────────────────────────────────────────────────── + + +def daily_mart_to_overview( + rows: Sequence[dict[str, Any]], + *, + project_mart_row: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Aggregate ``daily_mart`` rows into the dashboard overview shape. + + Mirrors the keys ``aggregator.summarise()`` writes into the + ``overview`` block — total tokens, total cost, date_range — so the + route layer can swap the data source without touching its + consumer. + + When ``project_mart_row`` is provided we trust its lifetime totals + over the daily aggregate (faster + tolerant of partial day coverage + when filters narrow the daily window). + """ + if project_mart_row is not None: + return { + "total_tokens": { + "input": int(project_mart_row.get("total_input_tokens", 0) or 0), + "output": int(project_mart_row.get("total_output_tokens", 0) or 0), + "cache_read": int(project_mart_row.get("total_cache_read", 0) or 0), + "cache_creation": int(project_mart_row.get("total_cache_create", 0) or 0), + }, + "total_cost": float(project_mart_row.get("total_cost_usd", 0.0) or 0.0), + "date_range": { + "start": project_mart_row.get("first_ts"), + "end": project_mart_row.get("last_ts"), + }, + "total_messages": int(project_mart_row.get("total_messages", 0) or 0), + "total_sessions": int(project_mart_row.get("total_sessions", 0) or 0), + } + + inp = sum(int(r.get("input_tokens", 0) or 0) for r in rows) + out = sum(int(r.get("output_tokens", 0) or 0) for r in rows) + cache_r = sum(int(r.get("cache_read", 0) or 0) for r in rows) + cache_c = sum(int(r.get("cache_create", 0) or 0) for r in rows) + cost = sum(float(r.get("cost_usd", 0.0) or 0.0) for r in rows) + msgs = sum(int(r.get("message_count", 0) or 0) for r in rows) + days = sorted({r["day"] for r in rows if r.get("day")}) + return { + "total_tokens": { + "input": inp, + "output": out, + "cache_read": cache_r, + "cache_creation": cache_c, + }, + "total_cost": cost, + "date_range": { + "start": days[0] if days else None, + "end": days[-1] if days else None, + }, + "total_messages": msgs, + "total_sessions": 0, + } + + +def daily_mart_by_day( + rows: Sequence[dict[str, Any]], +) -> list[dict[str, Any]]: + """Group ``daily_mart`` rows by ``day`` → one entry per day. + + Output shape matches the ``daily_stats`` / ``daily_costs`` arrays + the legacy aggregator emits: ``{date, cost, by_model, total_input, + total_output, total_cache_read, total_cache_create, message_count}``. + """ + by_day: dict[str, dict[str, Any]] = {} + for r in rows: + day = r.get("day") + if not day: + continue + bucket = by_day.setdefault( + day, + { + "date": day, + "cost": 0.0, + "by_model": {}, + "total_input": 0, + "total_output": 0, + "total_cache_read": 0, + "total_cache_create": 0, + "message_count": 0, + }, + ) + cost = float(r.get("cost_usd", 0.0) or 0.0) + bucket["cost"] += cost + bucket["total_input"] += int(r.get("input_tokens", 0) or 0) + bucket["total_output"] += int(r.get("output_tokens", 0) or 0) + bucket["total_cache_read"] += int(r.get("cache_read", 0) or 0) + bucket["total_cache_create"] += int(r.get("cache_create", 0) or 0) + bucket["message_count"] += int(r.get("message_count", 0) or 0) + model = r.get("model") or "" + if model: + bucket["by_model"][model] = bucket["by_model"].get(model, 0.0) + cost + return [by_day[d] for d in sorted(by_day)] + + +def daily_mart_by_model( + rows: Sequence[dict[str, Any]], +) -> dict[str, dict[str, Any]]: + """Group ``daily_mart`` rows by model → ``models`` map shape. + + ``aggregator.summarise()`` emits a ``models`` dict keyed by model + id with ``{count, cost, ...}`` values. We recover the same shape + from the daily mart so the dashboard's per-model breakdown card + keeps rendering unchanged. + """ + out: dict[str, dict[str, Any]] = {} + for r in rows: + model = r.get("model") or "" + if not model: + continue + bucket = out.setdefault( + model, + { + "count": 0, + "cost": 0.0, + "input_tokens": 0, + "output_tokens": 0, + "cache_read": 0, + "cache_creation": 0, + }, + ) + bucket["count"] += int(r.get("message_count", 0) or 0) + bucket["cost"] += float(r.get("cost_usd", 0.0) or 0.0) + bucket["input_tokens"] += int(r.get("input_tokens", 0) or 0) + bucket["output_tokens"] += int(r.get("output_tokens", 0) or 0) + bucket["cache_read"] += int(r.get("cache_read", 0) or 0) + bucket["cache_creation"] += int(r.get("cache_create", 0) or 0) + return out diff --git a/tests/stackunderflow/routes/test_cost_uses_mart.py b/tests/stackunderflow/routes/test_cost_uses_mart.py new file mode 100644 index 0000000..6008de2 --- /dev/null +++ b/tests/stackunderflow/routes/test_cost_uses_mart.py @@ -0,0 +1,205 @@ +"""Wave 3A — ``/api/cost-data`` and ``/api/cost-data/by-provider`` mart paths.""" + +from __future__ import annotations + +import time + +import pytest + +from stackunderflow.routes.cost import get_cost_by_provider, get_cost_data +from stackunderflow.store import db, schema + + +def _connect(store_db): + conn = db.connect(store_db) + schema.apply(conn) + return conn + + +def _insert_project(conn, provider, slug): + cur = conn.execute( + "INSERT INTO projects (provider, slug, display_name, first_seen, last_modified) " + "VALUES (?, ?, ?, ?, ?)", (provider, slug, slug, 0.0, 0.0)) + return int(cur.lastrowid) + + +def _insert_project_mart(conn, *, project_id, provider, slug): + conn.execute( + "INSERT INTO project_mart " + "(project_id, provider, slug, display_name, first_ts, last_ts, " + " total_messages, total_sessions, total_input_tokens, total_output_tokens, " + " total_cache_read, total_cache_create, total_cost_usd) " + "VALUES (?, ?, ?, ?, '2026-04-01', '2026-04-30', 1, 1, 100, 50, 0, 0, 0.5)", + (project_id, provider, slug, slug)) + + +def _insert_daily_mart(conn, *, project_id, day, **kw): + conn.execute( + "INSERT INTO daily_mart " + "(day, project_id, provider, model, speed, input_tokens, output_tokens, " + " cache_read, cache_create, message_count, session_count, cost_usd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (day, project_id, kw.get("provider", "claude"), + kw.get("model", "claude-sonnet-4-5"), kw.get("speed", "standard"), + kw.get("input_tokens", 0), kw.get("output_tokens", 0), + kw.get("cache_read", 0), kw.get("cache_create", 0), + kw.get("message_count", 0), kw.get("session_count", 0), + kw.get("cost_usd", 0.0))) + + +def _insert_provider_day(conn, *, day, provider, **kw): + conn.execute( + "INSERT INTO provider_day_mart " + "(day, provider, cost_usd, message_count, session_count, project_count) " + "VALUES (?, ?, ?, ?, ?, ?)", + (day, provider, kw.get("cost_usd", 0.0), kw.get("message_count", 0), + kw.get("session_count", 0), kw.get("project_count", 0))) + + +@pytest.mark.asyncio +async def test_cost_data_overlays_token_composition_from_daily_mart(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + slug = "-cost-overlay" + conn = _connect(store_db) + pid = _insert_project(conn, "claude", slug) + _insert_project_mart(conn, project_id=pid, provider="claude", slug=slug) + _insert_daily_mart(conn, project_id=pid, day="2026-04-01", + input_tokens=10, output_tokens=5, cache_read=2, cache_create=1, cost_usd=0.05) + _insert_daily_mart(conn, project_id=pid, day="2026-04-02", + input_tokens=20, output_tokens=10, cache_read=4, cache_create=2, cost_usd=0.1) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + stub_stats = { + "session_costs": [{"session_id": "s1", "cost": 0.5}], + "command_costs": [], "tool_costs": {"Read": {"calls": 2, "cost": 0.0}}, + "token_composition": { + "daily": {"BOGUS": {"input": 999}}, + "totals": {"input": 999}, + "per_session": {"s1": {"input": 100}}, + }, + "outliers": {}, "retry_signals": [], "session_efficiency": [], + "error_cost": {}, "trends": {}, + } + monkeypatch.setattr( + "stackunderflow.routes.cost.queries.get_project_stats", + lambda conn, *, project_id, tz_offset=0: ([], stub_stats)) + payload = await get_cost_data() + tc = payload["token_composition"] + assert "BOGUS" not in tc["daily"] + assert tc["daily"] == { + "2026-04-01": {"input": 10, "output": 5, "cache_read": 2, "cache_creation": 1}, + "2026-04-02": {"input": 20, "output": 10, "cache_read": 4, "cache_creation": 2}, + } + assert tc["totals"] == {"input": 30, "output": 15, "cache_read": 6, "cache_creation": 3} + assert tc["per_session"] == {"s1": {"input": 100}} + assert payload["session_costs"] == [{"session_id": "s1", "cost": 0.5}] + + +@pytest.mark.asyncio +async def test_cost_data_no_overlay_when_mart_empty(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + slug = "-cost-fallback" + conn = _connect(store_db) + _insert_project(conn, "claude", slug) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + expected_tc = {"daily": {"2026-04-01": {"input": 7}}, "totals": {"input": 7}, "per_session": {}} + monkeypatch.setattr( + "stackunderflow.routes.cost.queries.get_project_stats", + lambda conn, *, project_id, tz_offset=0: ([], { + "session_costs": [], "command_costs": [], "tool_costs": {}, + "token_composition": expected_tc, + "outliers": {}, "retry_signals": [], "session_efficiency": [], + "error_cost": {}, "trends": {}, + })) + payload = await get_cost_data() + assert payload["token_composition"] == expected_tc + + +@pytest.mark.asyncio +async def test_cost_by_provider_uses_provider_day_mart(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + conn = _connect(store_db) + _insert_project(conn, "claude", "alpha") + _insert_provider_day(conn, day="2026-04-01", provider="claude", + cost_usd=2.5, message_count=10, session_count=2, project_count=1) + _insert_provider_day(conn, day="2026-04-15", provider="codex", + cost_usd=1.0, message_count=5, session_count=1, project_count=1) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + payload = await get_cost_by_provider(period="all") + rows = payload["rows"] + assert {r["provider"] for r in rows} == {"claude", "codex"} + by_prov = {r["provider"]: r for r in rows} + assert by_prov["claude"]["cost_usd"] == pytest.approx(2.5) + assert by_prov["claude"]["message_count"] == 10 + + +@pytest.mark.asyncio +async def test_cost_by_provider_filter_passes_through_to_mart(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + conn = _connect(store_db) + _insert_provider_day(conn, day="2026-04-01", provider="claude", cost_usd=2.5, message_count=10) + _insert_provider_day(conn, day="2026-04-01", provider="cursor", cost_usd=1.5, message_count=5) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + payload = await get_cost_by_provider(period="all", provider=["cursor"]) + rows = payload["rows"] + assert len(rows) == 1 + assert rows[0]["provider"] == "cursor" + + +@pytest.mark.asyncio +async def test_cost_by_provider_falls_back_to_messages_when_mart_empty(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + conn = _connect(store_db) + pid = _insert_project(conn, "claude", "alpha") + cur = conn.execute( + "INSERT INTO sessions (project_id, session_id, first_ts, last_ts, message_count) " + "VALUES (?, 's1', '2026-04-01T00:00:00Z', '2026-04-01T00:00:00Z', 1)", (pid,)) + sfk = cur.lastrowid + conn.execute( + "INSERT INTO messages " + "(session_fk, seq, timestamp, role, model, " + " input_tokens, output_tokens, cache_create_tokens, cache_read_tokens, " + " content_text, tools_json, raw_json, is_sidechain, uuid, parent_uuid) " + "VALUES (?, 0, '2026-04-01T00:00:00Z', 'assistant', 'claude-sonnet-4-5', " + " 100, 50, 0, 0, '', '[]', '{}', 0, NULL, NULL)", (sfk,)) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + payload = await get_cost_by_provider(period="all") + rows = payload["rows"] + assert len(rows) == 1 + assert rows[0]["provider"] == "claude" + assert rows[0]["message_count"] == 1 + + +@pytest.mark.asyncio +async def test_cost_by_provider_under_100ms_with_100k_mart_rows(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + conn = _connect(store_db) + rows = [] + for d in range(1000): + day = f"20{((d // 365) + 24) % 100:02d}-{((d % 365 // 30) % 12) + 1:02d}-{(d % 28) + 1:02d}" + for p in range(100): + rows.append((day, f"provider-{p}", 0.01, 1, 1, 1)) + conn.executemany( + "INSERT OR IGNORE INTO provider_day_mart " + "(day, provider, cost_usd, message_count, session_count, project_count) " + "VALUES (?, ?, ?, ?, ?, ?)", rows) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + await get_cost_by_provider(period="all") + t0 = time.perf_counter() + payload = await get_cost_by_provider(period="all") + elapsed_ms = (time.perf_counter() - t0) * 1000 + assert len(payload["rows"]) <= 100 + assert elapsed_ms < 100, f"slow: {elapsed_ms:.1f}ms" diff --git a/tests/stackunderflow/routes/test_dashboard_data_uses_mart.py b/tests/stackunderflow/routes/test_dashboard_data_uses_mart.py new file mode 100644 index 0000000..bb4504b --- /dev/null +++ b/tests/stackunderflow/routes/test_dashboard_data_uses_mart.py @@ -0,0 +1,164 @@ +"""Wave 3A — ``/api/dashboard-data`` reads from ``project_mart`` + ``daily_mart``.""" + +from __future__ import annotations + +import time + +import pytest + +from stackunderflow.routes import data as data_route +from stackunderflow.store import db, schema + + +def _connect(store_db): + conn = db.connect(store_db) + schema.apply(conn) + return conn + + +def _insert_project(conn, provider, slug): + cur = conn.execute( + "INSERT INTO projects (provider, slug, display_name, first_seen, last_modified) " + "VALUES (?, ?, ?, ?, ?)", (provider, slug, slug, 0.0, 0.0)) + return int(cur.lastrowid) + + +def _insert_session(conn, project_id, *, session_id, last_ts, n): + conn.execute( + "INSERT INTO sessions (project_id, session_id, first_ts, last_ts, message_count) " + "VALUES (?, ?, ?, ?, ?)", (project_id, session_id, last_ts, last_ts, n)) + + +def _insert_project_mart(conn, *, project_id, provider, slug, **kw): + conn.execute( + "INSERT INTO project_mart " + "(project_id, provider, slug, display_name, first_ts, last_ts, " + " total_messages, total_sessions, total_input_tokens, total_output_tokens, " + " total_cache_read, total_cache_create, total_cost_usd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (project_id, provider, slug, slug, kw.get("first_ts"), kw.get("last_ts"), + kw.get("total_messages", 0), kw.get("total_sessions", 0), + kw.get("total_input_tokens", 0), kw.get("total_output_tokens", 0), + kw.get("total_cache_read", 0), kw.get("total_cache_create", 0), + kw.get("total_cost_usd", 0.0))) + + +def _insert_daily_mart(conn, *, project_id, day, **kw): + conn.execute( + "INSERT INTO daily_mart " + "(day, project_id, provider, model, speed, input_tokens, output_tokens, " + " cache_read, cache_create, message_count, session_count, cost_usd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (day, project_id, kw.get("provider", "claude"), + kw.get("model", "claude-sonnet-4-5"), kw.get("speed", "standard"), + kw.get("input_tokens", 0), kw.get("output_tokens", 0), + kw.get("cache_read", 0), kw.get("cache_create", 0), + kw.get("message_count", 0), kw.get("session_count", 0), + kw.get("cost_usd", 0.0))) + + +@pytest.mark.asyncio +async def test_dashboard_data_overview_from_project_mart(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + slug = "-mart-proj" + conn = _connect(store_db) + pid = _insert_project(conn, "claude", slug) + _insert_session(conn, pid, session_id="s1", last_ts="2026-04-25T00:00:00Z", n=3) + _insert_project_mart(conn, project_id=pid, provider="claude", slug=slug, + total_messages=42, total_sessions=3, total_input_tokens=1000, total_output_tokens=500, + total_cache_read=100, total_cache_create=50, total_cost_usd=1.25, + first_ts="2026-04-01T00:00:00Z", last_ts="2026-04-30T00:00:00Z") + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + data_route.invalidate_dashboard_cache() + payload = await data_route.get_dashboard_data() + stats = payload["statistics"] + assert stats["overview"]["total_tokens"]["input"] == 1000 + assert stats["overview"]["total_cost"] == pytest.approx(1.25) + assert stats["tools"] == {"usage_counts": {}, "error_counts": {}, "error_rates": {}} + assert stats["errors"] == {"total": 0} + + +@pytest.mark.asyncio +async def test_dashboard_data_daily_stats_from_daily_mart(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + slug = "-daily-proj" + conn = _connect(store_db) + pid = _insert_project(conn, "claude", slug) + _insert_session(conn, pid, session_id="s1", last_ts="2026-04-02T00:00:00Z", n=1) + _insert_project_mart(conn, project_id=pid, provider="claude", slug=slug, + total_messages=2, total_input_tokens=300, total_cost_usd=0.6) + _insert_daily_mart(conn, project_id=pid, day="2026-04-01", + input_tokens=100, output_tokens=50, message_count=1, cost_usd=0.2) + _insert_daily_mart(conn, project_id=pid, day="2026-04-02", + input_tokens=200, output_tokens=100, message_count=1, cost_usd=0.4) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + data_route.invalidate_dashboard_cache() + payload = await data_route.get_dashboard_data() + daily = payload["statistics"]["daily_stats"] + days = sorted(d["date"] for d in daily) + assert days == ["2026-04-01", "2026-04-02"] + bucket_d2 = next(d for d in daily if d["date"] == "2026-04-02") + assert bucket_d2["cost"] == pytest.approx(0.4) + + +@pytest.mark.asyncio +async def test_dashboard_data_models_from_daily_mart(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + slug = "-models-proj" + conn = _connect(store_db) + pid = _insert_project(conn, "claude", slug) + _insert_session(conn, pid, session_id="s1", last_ts="2026-04-01T00:00:00Z", n=1) + _insert_project_mart(conn, project_id=pid, provider="claude", slug=slug, total_messages=3) + _insert_daily_mart(conn, project_id=pid, day="2026-04-01", + model="claude-sonnet-4-5", message_count=2, cost_usd=0.5) + _insert_daily_mart(conn, project_id=pid, day="2026-04-01", + model="gpt-5", message_count=1, cost_usd=0.3) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + data_route.invalidate_dashboard_cache() + payload = await data_route.get_dashboard_data() + models = payload["statistics"]["models"] + assert set(models) == {"claude-sonnet-4-5", "gpt-5"} + assert models["claude-sonnet-4-5"]["count"] == 2 + assert models["claude-sonnet-4-5"]["cost"] == pytest.approx(0.5) + + +@pytest.mark.asyncio +async def test_dashboard_data_under_100ms_with_100k_daily_mart_rows(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + slug = "-perf-proj" + conn = _connect(store_db) + pid = _insert_project(conn, "claude", slug) + _insert_session(conn, pid, session_id="s1", last_ts="2026-04-25T00:00:00Z", n=100000) + _insert_project_mart(conn, project_id=pid, provider="claude", slug=slug, + total_messages=100000, total_input_tokens=10_000_000, total_cost_usd=42.0) + rows = [] + for d in range(1000): + for m in range(100): + day_str = f"2024-{((d // 30) % 12) + 1:02d}-{(d % 28) + 1:02d}" + rows.append((day_str, pid, "claude", f"model-{m}", "standard", + 10, 5, 0, 0, 1, 1, 0.001)) + conn.executemany( + "INSERT OR IGNORE INTO daily_mart " + "(day, project_id, provider, model, speed, input_tokens, output_tokens, " + " cache_read, cache_create, message_count, session_count, cost_usd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", rows) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + monkeypatch.setattr("stackunderflow.deps.current_log_path", f"/fake/{slug}") + data_route.invalidate_dashboard_cache() + await data_route.get_dashboard_data() + t0 = time.perf_counter() + payload = await data_route.get_dashboard_data() + elapsed_ms = (time.perf_counter() - t0) * 1000 + assert payload["statistics"]["overview"]["total_cost"] == pytest.approx(42.0) + assert elapsed_ms < 100, f"slow: {elapsed_ms:.1f}ms" diff --git a/tests/stackunderflow/routes/test_projects_uses_mart.py b/tests/stackunderflow/routes/test_projects_uses_mart.py new file mode 100644 index 0000000..d9954a7 --- /dev/null +++ b/tests/stackunderflow/routes/test_projects_uses_mart.py @@ -0,0 +1,113 @@ +"""Wave 3A — ``/api/projects?include_stats=true`` reads from ``project_mart``.""" + +from __future__ import annotations + +import json +import time + +import pytest + +from stackunderflow.routes.projects import get_projects +from stackunderflow.store import db, schema + + +def _connect(store_db): + conn = db.connect(store_db) + schema.apply(conn) + return conn + + +def _insert_project(conn, *, provider, slug, last_modified=0.0): + cur = conn.execute( + "INSERT INTO projects (provider, slug, display_name, first_seen, last_modified) " + "VALUES (?, ?, ?, ?, ?)", + (provider, slug, slug, 0.0, last_modified), + ) + return int(cur.lastrowid) + + +def _insert_project_mart(conn, *, project_id, provider, slug, **kw): + conn.execute( + "INSERT INTO project_mart " + "(project_id, provider, slug, display_name, first_ts, last_ts, " + " total_messages, total_sessions, total_input_tokens, total_output_tokens, " + " total_cache_read, total_cache_create, total_cost_usd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + project_id, provider, slug, slug, + kw.get("first_ts"), kw.get("last_ts"), + kw.get("total_messages", 0), kw.get("total_sessions", 0), + kw.get("total_input_tokens", 0), kw.get("total_output_tokens", 0), + kw.get("total_cache_read", 0), kw.get("total_cache_create", 0), + kw.get("total_cost_usd", 0.0), + ), + ) + + +@pytest.mark.asyncio +async def test_projects_uses_project_mart_when_populated(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + conn = _connect(store_db) + pid = _insert_project(conn, provider="claude", slug="alpha") + _insert_project_mart( + conn, project_id=pid, provider="claude", slug="alpha", + total_input_tokens=12345, total_output_tokens=6789, + total_cache_read=200, total_cache_create=100, total_cost_usd=2.5, + first_ts="2026-04-01T00:00:00Z", last_ts="2026-04-30T00:00:00Z", + ) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + response = await get_projects(include_stats=True) + body = json.loads(response.body.decode("utf-8")) + proj = body["projects"][0] + stats = proj["stats"] + assert stats["total_input_tokens"] == 12345 + assert stats["total_output_tokens"] == 6789 + assert stats["total_cost"] == pytest.approx(2.5) + assert stats["first_message_date"] == "2026-04-01T00:00:00Z" + + +@pytest.mark.asyncio +async def test_projects_falls_back_when_mart_empty(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + conn = _connect(store_db) + _insert_project(conn, provider="claude", slug="alpha") + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + response = await get_projects(include_stats=True) + body = json.loads(response.body.decode("utf-8")) + assert body["projects"][0]["stats"]["total_input_tokens"] == 0 + + +@pytest.mark.asyncio +async def test_projects_route_under_100ms_with_100k_mart_rows(tmp_path, monkeypatch): + store_db = tmp_path / "store.db" + conn = _connect(store_db) + pids = [] + for i in range(100): + pid = _insert_project(conn, provider="claude", slug=f"proj-{i:03d}", last_modified=float(i)) + _insert_project_mart(conn, project_id=pid, provider="claude", slug=f"proj-{i:03d}", + total_input_tokens=1000, total_cost_usd=0.1) + pids.append(pid) + rows = [] + for pid in pids: + for d in range(1000): + rows.append((f"2024-01-{(d % 28) + 1:02d}", pid, "claude", "claude-sonnet-4-5", + "standard", 1, 1, 0, 0, 1, 1, 0.001)) + conn.executemany( + "INSERT OR IGNORE INTO daily_mart " + "(day, project_id, provider, model, speed, input_tokens, output_tokens, " + " cache_read, cache_create, message_count, session_count, cost_usd) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", rows) + conn.commit() + conn.close() + monkeypatch.setattr("stackunderflow.deps.store_path", store_db) + await get_projects(include_stats=True) + t0 = time.perf_counter() + response = await get_projects(include_stats=True) + elapsed_ms = (time.perf_counter() - t0) * 1000 + body = json.loads(response.body.decode("utf-8")) + assert len(body["projects"]) == 100 + assert elapsed_ms < 100, f"slow: {elapsed_ms:.1f}ms"