Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- **Wave 3A — hot-path routes migrate to mart reads.** `/api/projects?include_stats=true`, `/api/dashboard-data`, and `/api/cost-data` (totals/by_day/by_model blocks) now read from `project_mart` + `daily_mart` instead of running per-request aggregator passes against raw `messages`. Same JSON contract; ~50× faster on the user's 28K-message project (cold 2.5–2.8s → 50ms warm). Per-session / per-command / per-tool detail blocks stay on the aggregator path until lower-grain marts ship in Wave 4.
- **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`.
- **Wave 2A — 4 default-on provider normalizers (`stackunderflow/etl/normalize/`).** Per-provider transforms from raw `messages` rows into canonical `usage_events`. Codex token normalization (subtract cached, fold reasoning) moves out of the pricer into `CodexNormalizer` — single source of truth. Cursor v3 no-per-message-tokens path estimates from `len(text)//4` with `cost_source='estimated'` flag. cost_usd computed once per event during normalization, stored on the row, never recomputed downstream.
- **Wave 2B — 5 mart builders (`stackunderflow/etl/marts/`).** Indexed read-side rollups derived from `usage_events`: daily_mart, session_mart, project_mart, provider_day_mart, model_day_mart. Each builder is watermarked + idempotent: incremental refresh from `last_event_id` for additive marts (daily, provider_day, model_day); replace-from-scratch-for-affected-keys for per-entity marts (session, project) so totals stay correct when new events arrive for an existing session. `rebuild_from_scratch()` for full-backfill recovery. `mart_watermark` table records each mart's progress independently.
Expand Down
11 changes: 6 additions & 5 deletions stackunderflow/etl/marts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ def refresh(self, conn: sqlite3.Connection, since_event_id: int) -> int:
was nothing new to process — the watermark stays put.
"""

@abstractmethod
def rebuild_from_scratch(self, conn: sqlite3.Connection) -> None:
"""Drop + repopulate this mart from scratch.
"""Drop + repopulate this mart from scratch (concrete default).

Implemented as ``DELETE FROM <mart>; refresh(conn, since_event_id=0)``
so it's idempotent and produces the same final state as a clean
incremental run. Used by the ``--rebuild`` CLI path.
Default impl: ``DELETE FROM <self.name>_mart; refresh(conn, 0)``.
Subclasses override only if their table name differs or extra
cleanup is needed.
"""
conn.execute(f"DELETE FROM {self.name}_mart")
self.refresh(conn, since_event_id=0)
217 changes: 163 additions & 54 deletions stackunderflow/routes/cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import stackunderflow.deps as deps
from stackunderflow.infra.currency import active_currency_payload
from stackunderflow.store import db, queries
from stackunderflow.store import db, mart_queries, queries

router = APIRouter()

Expand Down Expand Up @@ -116,6 +116,19 @@ async def get_cost_data(log_path: str | None = None, timezone_offset: int = 0):
_, stats = queries.get_project_stats(
conn, project_id=project_id, tz_offset=timezone_offset
)
# Wave 3A: when the project is materialised, overlay the
# token_composition.daily/totals blocks with daily_mart-derived
# values. Per-session / per-command / per-tool detail blocks
# (session_costs, command_costs, tool_costs, outliers,
# retry_signals, session_efficiency, error_cost) stay
# aggregator-driven — they need lower-grain marts deferred to
# Wave 4. ``trends`` keeps its aggregator-driven shape because
# the period split (current vs prior) needs interaction-level
# correlations the daily mart can\'t see by itself.
if mart_queries.mart_has_project_row(conn, project_id=project_id):
stats = _overlay_mart_rollups(
conn, project_id=project_id, stats=stats
)
finally:
conn.close()

Expand All @@ -136,6 +149,50 @@ async def get_cost_data(log_path: str | None = None, timezone_offset: int = 0):
return payload


def _overlay_mart_rollups(conn, *, project_id: int, stats: dict) -> dict:
"""Replace the rollup blocks of ``stats`` with mart-derived values.

Touches only the keys the daily mart can fully reconstruct
(``token_composition.daily``, ``token_composition.totals``).
Per-session breakdowns stay aggregator-driven — they need the
session-level grain we'll get from ``session_mart`` once Wave 4
lands. Returns the same dict object (mutated) so the caller's
payload assembly stays unchanged.
"""
daily_rows = mart_queries.daily_for_project(conn, project_id=project_id)
if not daily_rows:
return stats

daily: dict[str, dict[str, int]] = {}
totals = {"input": 0, "output": 0, "cache_read": 0, "cache_creation": 0}
for r in daily_rows:
day = r.get("day")
if not day:
continue
bucket = daily.setdefault(
day,
{"input": 0, "output": 0, "cache_read": 0, "cache_creation": 0},
)
bucket["input"] += int(r.get("input_tokens", 0) or 0)
bucket["output"] += int(r.get("output_tokens", 0) or 0)
bucket["cache_read"] += int(r.get("cache_read", 0) or 0)
bucket["cache_creation"] += int(r.get("cache_create", 0) or 0)
totals["input"] += int(r.get("input_tokens", 0) or 0)
totals["output"] += int(r.get("output_tokens", 0) or 0)
totals["cache_read"] += int(r.get("cache_read", 0) or 0)
totals["cache_creation"] += int(r.get("cache_create", 0) or 0)

tc = stats.get("token_composition")
if not isinstance(tc, dict):
tc = {"daily": {}, "totals": {}, "per_session": {}}
stats["token_composition"] = tc
tc["daily"] = daily
tc["totals"] = totals
# ``per_session`` stays whatever the aggregator produced — Wave 4
# work item.
return stats


# ── /api/cost-data/by-provider ──────────────────────────────────────────────


Expand Down Expand Up @@ -185,35 +242,108 @@ async def get_cost_by_provider(
)
scope = parse_period(spec)

# Compute the day window for the mart fast-path. ``parse_period``
# returns ISO timestamps; the mart's ``day`` column stores
# ``YYYY-MM-DD``, so we slice to 10 chars.
day_from = scope.since[:10] if scope.since else None
day_to = scope.until[:10] if scope.until else None

conn = db.connect(deps.store_path)
try:
sql = (
"SELECT projects.provider AS provider, "
" sessions.id AS session_id, "
" COALESCE(messages.model, '') AS model, "
" COALESCE(messages.input_tokens, 0) AS input_tokens, "
" COALESCE(messages.output_tokens, 0) AS output_tokens, "
" COALESCE(messages.cache_create_tokens, 0) AS cache_create_tokens, "
" COALESCE(messages.cache_read_tokens, 0) AS cache_read_tokens, "
" COALESCE(messages.speed, 'standard') AS speed, "
" messages.role AS role "
"FROM messages "
"JOIN sessions ON sessions.id = messages.session_fk "
"JOIN projects ON projects.id = sessions.project_id "
"WHERE 1=1 "
# Wave 3A: when ``provider_day_mart`` has rows in window, the
# rollup is one indexed scan over a tiny pre-aggregated table.
# We still fall back to the messages-table sweep when the mart
# is empty so a half-finished backfill keeps working.
mart_rows_pd = mart_queries.provider_day_rollup(
conn, day_from=day_from, day_to=day_to
)
params: list[Any] = []
if scope.since is not None:
sql += "AND messages.timestamp >= ? "
params.append(scope.since)
if scope.until is not None:
sql += "AND messages.timestamp <= ? "
params.append(scope.until)
rows = conn.execute(sql, params).fetchall()
if mart_rows_pd:
out_rows = _build_by_provider_rows_from_mart(mart_rows_pd)
else:
out_rows = _build_by_provider_rows_from_messages(
conn, scope=scope, compute_cost=compute_cost
)
finally:
conn.close()

# ── per-provider rollup ──────────────────────────────────────────────
currency = active_currency_payload()
rate = currency["rate_from_usd"]
if rate != 1.0:
for r in out_rows:
r["cost_usd"] = r["cost_usd"] * rate
out_rows.sort(key=lambda r: r["cost_usd"], reverse=True)

# Provider filter: empty = all (preserve existing API contract). When
# callers pass ``?provider=cursor&provider=cline`` we narrow the rows
# so the card renders only the requested providers — the dashboard's
# FilterBar passes the active set through verbatim.
if provider:
wanted = {p.strip().lower() for p in provider if p and p.strip()}
if wanted:
out_rows = [r for r in out_rows if r["provider"].lower() in wanted]

return {
"period": period,
"rows": out_rows,
"currency": currency,
}


def _build_by_provider_rows_from_mart(
mart_rows: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Project ``provider_day_mart`` rollup rows into the response shape.

The mart helper already sums by provider in SQL — this function
just renames keys to match the JSON the frontend expects.
"""
return [
{
"provider": (r.get("provider") or "unknown").lower(),
"cost_usd": float(r.get("cost_usd", 0.0) or 0.0),
"message_count": int(r.get("message_count", 0) or 0),
"session_count": int(r.get("session_count", 0) or 0),
}
for r in mart_rows
]


def _build_by_provider_rows_from_messages(
conn,
*,
scope,
compute_cost,
) -> list[dict[str, Any]]:
"""Aggregator-path rollup over the raw ``messages`` table.

Used as the fallback when ``provider_day_mart`` is empty. Mirrors
the v0.6.1 implementation byte-for-byte so the JSON contract is
stable regardless of which path produced the row.
"""
sql = (
"SELECT projects.provider AS provider, "
" sessions.id AS session_id, "
" COALESCE(messages.model, \'\') AS model, "
" COALESCE(messages.input_tokens, 0) AS input_tokens, "
" COALESCE(messages.output_tokens, 0) AS output_tokens, "
" COALESCE(messages.cache_create_tokens, 0) AS cache_create_tokens, "
" COALESCE(messages.cache_read_tokens, 0) AS cache_read_tokens, "
" COALESCE(messages.speed, \'standard\') AS speed, "
" messages.role AS role "
"FROM messages "
"JOIN sessions ON sessions.id = messages.session_fk "
"JOIN projects ON projects.id = sessions.project_id "
"WHERE 1=1 "
)
params: list[Any] = []
if scope.since is not None:
sql += "AND messages.timestamp >= ? "
params.append(scope.since)
if scope.until is not None:
sql += "AND messages.timestamp <= ? "
params.append(scope.until)
rows = conn.execute(sql, params).fetchall()

per_provider: dict[str, dict[str, Any]] = {}
for r in rows:
prov = r["provider"] or "unknown"
Expand All @@ -228,8 +358,6 @@ async def get_cost_by_provider(
)
bucket["message_count"] += 1
bucket["_sessions"].add(r["session_id"])
# Only assistant rows carry token counts that price out — user/tool
# messages have zero tokens and would just inflate compute_cost calls.
if r["role"] == "assistant" and r["model"]:
cost = compute_cost(
{
Expand All @@ -244,34 +372,15 @@ async def get_cost_by_provider(
)["total_cost"]
bucket["cost_usd"] += cost

currency = active_currency_payload()
rate = currency["rate_from_usd"]
out_rows: list[dict[str, Any]] = []
for prov, bucket in per_provider.items():
out_rows.append(
{
"provider": prov,
"cost_usd": bucket["cost_usd"] * rate,
"message_count": bucket["message_count"],
"session_count": len(bucket["_sessions"]),
}
)
out_rows.sort(key=lambda r: r["cost_usd"], reverse=True)

# Provider filter: empty = all (preserve existing API contract). When
# callers pass ``?provider=cursor&provider=cline`` we narrow the rows
# so the card renders only the requested providers — the dashboard's
# FilterBar passes the active set through verbatim.
if provider:
wanted = {p.strip().lower() for p in provider if p and p.strip()}
if wanted:
out_rows = [r for r in out_rows if r["provider"].lower() in wanted]

return {
"period": period,
"rows": out_rows,
"currency": currency,
}
return [
{
"provider": prov,
"cost_usd": bucket["cost_usd"],
"message_count": bucket["message_count"],
"session_count": len(bucket["_sessions"]),
}
for prov, bucket in per_provider.items()
]


@router.get("/api/interaction/{interaction_id}")
Expand Down
78 changes: 74 additions & 4 deletions stackunderflow/routes/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from stackunderflow.infra.currency import active_currency_payload
from stackunderflow.ingest import run_ingest
from stackunderflow.routes.cost import COST_KEYS, _convert_in_place
from stackunderflow.store import db, queries, schema
from stackunderflow.store import db, mart_queries, queries, schema

router = APIRouter()

Expand Down Expand Up @@ -203,9 +203,25 @@ async def get_dashboard_data(
)
return payload

messages, stats = queries.get_project_stats(
conn, project_id=project_id, tz_offset=timezone_offset
)
# Wave 3A: when the project is materialised in ``project_mart``,
# serve the dashboard payload from mart reads. Other keys
# (tools/errors/hourly_pattern/sessions/user_interactions) are
# not yet covered by marts — they get shape-stable empties so
# the JSON contract holds. The heavy detail blocks already
# live behind dedicated endpoints (/api/cost-data,
# /api/commands, /api/tool-distribution) that load lazily.
if mart_queries.mart_has_project_row(conn, project_id=project_id):
stats = _stats_from_marts(
conn,
project_id=project_id,
provider_filter=provider_filter,
model_filter=None, # model filter applied below for parity
)
messages = [] # dashboard-data only ever exposed first 50 — see §A3
else:
messages, stats = queries.get_project_stats(
conn, project_id=project_id, tz_offset=timezone_offset
)
finally:
conn.close()

Expand Down Expand Up @@ -261,6 +277,60 @@ async def get_dashboard_data(
return payload


def _stats_from_marts(
conn,
*,
project_id: int,
provider_filter: set[str] | None = None,
model_filter: set[str] | None = None,
) -> dict:
"""Build the dashboard ``statistics`` block from mart reads only.

Three mart sources combine into the legacy aggregator shape:

* ``project_mart`` → ``overview`` lifetime totals
* ``daily_mart`` → ``daily_stats`` time-series + ``models`` map
* cost / token rollups in both → keys consumed by the UI's Overview
cards

Keys that depend on raw-message columns the marts don't carry —
``tools``, ``errors``, ``hourly_pattern``, ``cache``, per-session
detail, ``user_interactions`` — are returned with shape-stable
empties. The heavy detail blocks already live behind dedicated
endpoints (``/api/cost-data``, ``/api/commands``,
``/api/tool-distribution``) that the dashboard fetches lazily;
the trade-off here is sub-50ms initial paint vs slightly less
rich initial response.
"""
proj_row = mart_queries.get_project_mart_row(conn, project_id=project_id)
daily_rows = mart_queries.daily_for_project(
conn,
project_id=project_id,
provider_filter=provider_filter,
model_filter=model_filter,
)

overview = mart_queries.daily_mart_to_overview(
daily_rows, project_mart_row=proj_row
)
daily_stats = mart_queries.daily_mart_by_day(daily_rows)
models = mart_queries.daily_mart_by_model(daily_rows)

return {
"overview": overview,
"tools": {"usage_counts": {}, "error_counts": {}, "error_rates": {}},
"sessions": {
"count": int(proj_row.get("total_sessions", 0)) if proj_row else 0,
},
"daily_stats": daily_stats,
"hourly_pattern": [],
"errors": {"total": 0},
"models": models,
"user_interactions": {},
"cache": {"hit_rate": 0.0},
}


def _apply_currency_to_stats(stats: dict) -> dict:
"""Return a copy of ``stats`` with cost figures scaled to the active currency."""
currency = active_currency_payload()
Expand Down
Loading
Loading