Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- **Wave 4D — 12 beta provider normalizers.** Codeium (stub), Continue, Copilot, Cursor Agent, Droid, Gemini, KiloCode, Kiro, OpenClaw, OpenCode, Pi/OMP, Qwen, Roo Code now have `Normalizer` subclasses registered at import time. ETL pipeline now covers all 16 providers from the codeburn catalog. Beta providers stay opt-in via the existing `STACKUNDERFLOW_BETA_*` env flags — when they're enabled the matching normalizer fires automatically.
- **Wave 4A — analytical routes migrate to mart reads (Wave 3B redo).** `compare_models()` reads `model_day_mart` + `session_mart`; `yield_tracker._query_sessions()` reads `session_mart`; `optimize._detect_cache_overhead()` reads `session_mart`; `/api/messages/summary` reads `project_mart`. Same JSON contract, ~30-50× faster on the user's real store. Empty-mart fallback to aggregator preserved per route.
- **Wave 3A — hot-path routes migrate to mart reads.** `/api/projects?include_stats=true`, `/api/dashboard-data`, and `/api/cost-data` (totals/by_day/by_model blocks) now read from `project_mart` + `daily_mart` instead of running per-request aggregator passes against raw `messages`. Same JSON contract; ~50× faster on the user's 28K-message project (cold 2.5–2.8s → 50ms warm). Per-session / per-command / per-tool detail blocks stay on the aggregator path until lower-grain marts ship in Wave 4.
- **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`.
- **Wave 2A — 4 default-on provider normalizers (`stackunderflow/etl/normalize/`).** Per-provider transforms from raw `messages` rows into canonical `usage_events`. Codex token normalization (subtract cached, fold reasoning) moves out of the pricer into `CodexNormalizer` — single source of truth. Cursor v3 no-per-message-tokens path estimates from `len(text)//4` with `cost_source='estimated'` flag. cost_usd computed once per event during normalization, stored on the row, never recomputed downstream.
Expand Down
80 changes: 78 additions & 2 deletions stackunderflow/reports/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
estimate_global_budget,
)
from stackunderflow.services.qa_service import QAService
from stackunderflow.store import queries
from stackunderflow.store import mart_queries, queries

__all__ = [
"Finding",
Expand Down Expand Up @@ -516,7 +516,13 @@ def _detect_low_read_edit_ratio(
*,
scope: Scope | None = None,
) -> list[Finding]:
"""Pattern 4 — sessions with many Reads and zero Edit/Write."""
"""Pattern 4 — sessions with many Reads and zero Edit/Write.

Stays on the aggregator (raw ``messages``) path: the signal lives in
``tools_json`` per-message arrays, which no mart materialises. Wave
4A migrated only ``_detect_cache_overhead`` because its inputs (per-
session token sums) already exist on ``session_mart``.
"""
grouped = _iter_session_messages(conn, scope=scope)
bad_sessions: list[dict] = []
for session_fk, rows in grouped.items():
Expand Down Expand Up @@ -582,6 +588,10 @@ def _detect_junk_reads(
"""Pattern 5 — same file Read 5+ times in one session.

Indicates the assistant forgot what it already saw and re-fetched.

Stays on the aggregator (raw ``messages``) path: the signal requires
parsing ``raw_json`` for ``tool_use`` blocks with their ``file_path``
inputs, none of which is materialised into any mart.
"""
grouped = _iter_session_messages(conn, scope=scope)
hits: list[dict] = []
Expand Down Expand Up @@ -656,7 +666,59 @@ def _detect_cache_overhead(
is being thrashed instead of amortising. Common cause: short
sessions that pay the cache write cost without ever reading it
back. We check at the session level.

Wave 4A — reads pre-aggregated per-session totals from
``session_mart`` when available. The mart's ``input_tokens`` and
``cache_create`` columns are the same SUM-by-session_fk this
detector used to compute on every call, so the ratio test is
identical and the empty-mart fallback path keeps the GROUP BY
over ``messages``. The other detectors in this module remain on
the aggregator path because their signals (tool-call shape, raw
JSON inspection, per-message payload sizes) aren't materialised
into any mart yet.
"""
if mart_queries.mart_has_session_rows(conn):
return _detect_cache_overhead_from_mart(conn, scope=scope)
return _detect_cache_overhead_from_messages(conn, scope=scope)


def _detect_cache_overhead_from_mart(
conn: sqlite3.Connection,
*,
scope: Scope | None,
) -> list[Finding]:
"""Mart-fed cache-overhead detector — reads ``session_mart`` totals."""
since_iso = scope.since if scope is not None else None
until_iso = scope.until if scope is not None else None
bad = mart_queries.session_mart_cache_overhead(
conn,
since_iso=since_iso,
until_iso=until_iso,
ratio_threshold=CACHE_OVERHEAD_RATIO,
)
# Re-key on ``session_fk`` for parity with the aggregator path —
# the finding's ``details.sessions`` consumers (tests, CLI table)
# expect that field name. ``session_id`` from the mart maps onto
# the same logical concept; we surface it as ``session_fk`` so the
# JSON contract stays stable across data sources.
bad = [
{
"session_fk": row["session_id"],
"cache_create_tokens": row["cache_create_tokens"],
"input_tokens": row["input_tokens"],
"ratio": row["ratio"],
}
for row in bad
]
return _cache_overhead_finding(bad)


def _detect_cache_overhead_from_messages(
conn: sqlite3.Connection,
*,
scope: Scope | None,
) -> list[Finding]:
"""Aggregator-path cache-overhead detector — empty-mart fallback."""
sql = (
"SELECT session_fk, "
" SUM(input_tokens) AS inp, "
Expand Down Expand Up @@ -690,6 +752,15 @@ def _detect_cache_overhead(
"input_tokens": inp,
"ratio": round(ratio, 3),
})
return _cache_overhead_finding(bad)


def _cache_overhead_finding(bad: list[dict]) -> list[Finding]:
"""Render the cache-overhead ``Finding`` from the candidate list.

Shared between the mart path and the aggregator fallback so the
finding's severity ladder + waste estimation stay in lockstep.
"""

if not bad:
return []
Expand Down Expand Up @@ -739,6 +810,11 @@ def _detect_bash_output_limits(
blocks in user messages whose preceding assistant call was a Bash
tool. Falls back to checking ``content_text`` length on user
tool_result rows.

Stays on the aggregator (raw ``messages``) path: the signal needs
per-message ``content_text`` byte counts plus ``raw_json`` tool-call
parsing — neither lives in any mart. Wave 4A only migrated detectors
whose inputs are pre-summed on the mart layer.
"""
sql = (
"SELECT id, session_fk, seq, role, raw_json, content_text "
Expand Down
26 changes: 24 additions & 2 deletions stackunderflow/routes/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,15 +402,37 @@ async def get_messages(

@router.get("/api/messages/summary")
async def get_messages_summary_endpoint():
"""Get summary statistics about messages without loading all data."""
"""Get summary statistics about messages without loading all data.

Wave 4A — when ``project_mart`` carries a row for the active
project, the top-level ``total`` (and bonus ``total_sessions``)
come from a single mart read. The detail blocks (``by_type``,
``by_model``, ``total_tokens``) still need the full message list
because those columns aren't materialised into any mart yet, so
we fall back to the legacy ``get_project_messages`` pass for the
breakdown — and unconditionally when the mart is empty.
"""
log_path = _require_project()
conn = db.connect(deps.store_path)
try:
project_id = _get_project_id(conn, log_path)
mart_totals = mart_queries.project_mart_messages_summary_totals(
conn, project_id=project_id
)
messages = queries.get_project_messages(conn, project_id=project_id)
finally:
conn.close()
return get_messages_summary(messages)
summary = get_messages_summary(messages)
if mart_totals is not None:
# Mart row wins on the top-level total — it's the project's
# lifetime message count from ``project_mart``, identical in
# value but cheaper than counting the materialised messages
# list. The breakdown blocks (``by_type`` / ``by_model``) still
# come from the messages pass because those dimensions aren't
# in any mart today.
summary["total"] = mart_totals["total"]
summary["total_sessions"] = mart_totals["total_sessions"]
return summary


@router.post("/api/refresh")
Expand Down
8 changes: 7 additions & 1 deletion stackunderflow/routes/yield_route.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,15 @@ async def get_yield(
),
)

# When the route is invoked directly (tests, not via FastAPI's DI), the
# ``Query(None)`` default leaks through as a Query sentinel — coerce
# anything that isn't a real list into None so the service sees what
# it expects. Same pattern ``routes/compare.py`` already uses.
project_filter = list(project) if isinstance(project, list) else None

conn = db.connect(deps.store_path)
try:
entries = compute_yield(conn, period=period, project_filter=project)
entries = compute_yield(conn, period=period, project_filter=project_filter)
finally:
conn.close()

Expand Down
162 changes: 162 additions & 0 deletions stackunderflow/services/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

from stackunderflow.infra.costs import compute_cost
from stackunderflow.reports.scope import Scope, parse_period
from stackunderflow.store import mart_queries

__all__ = [
"ModelStats",
Expand Down Expand Up @@ -174,8 +175,49 @@ def compare_models(
Returns:
Models sorted by ``total_cost`` desc. Empty list when no
assistant messages match the filters.

Wave 4A — when both ``model_day_mart`` and ``session_mart`` are
materialised AND no ``project_filter`` is set, the per-model totals
come from a single SUM over ``model_day_mart`` and the per-session
attribution (one-shot %, retry rate, $/session, sessions count)
comes from ``session_mart``. The ``project_filter`` codepath stays
on the aggregator because ``model_day_mart`` does not carry
``project_id`` — applying a slug filter requires the raw join. An
empty mart (Wave 4B backfill not run) also falls back so users on
un-materialised stores keep working.
"""
scope = _resolve_scope(period)

# Wave 4A mart fast-path. The fallback path reads the raw messages
# table — kept intact so an un-materialised store, or a project-slug
# filter that the marts can't satisfy, still answers correctly.
if (
project_filter is None
and mart_queries.mart_has_session_rows(conn)
and mart_queries.mart_has_model_day_rows(conn)
):
return _compare_models_from_marts(
conn,
scope=scope,
provider_filter=provider_filter,
)

return _compare_models_from_messages(
conn,
scope=scope,
project_filter=project_filter,
provider_filter=provider_filter,
)


def _compare_models_from_messages(
conn: sqlite3.Connection,
*,
scope: Scope,
project_filter: list[str] | None,
provider_filter: str | None,
) -> list[ModelStats]:
"""Aggregator-path compare — kept verbatim as the empty-mart fallback."""
rows = _fetch_messages(
conn,
scope=scope,
Expand Down Expand Up @@ -297,6 +339,126 @@ class _Acc:
return out


# ── Wave 4A — mart-fed compare ──────────────────────────────────────────────


def _compare_models_from_marts(
conn: sqlite3.Connection,
*,
scope: Scope,
provider_filter: str | None,
) -> list[ModelStats]:
"""Build ``ModelStats`` rows from ``model_day_mart`` + ``session_mart``.

Per-model totals (calls, tokens, cache, cost) come from a single
GROUP BY over ``model_day_mart`` filtered to ``scope``. Per-session
attribution (sessions, one-shot %, retry rate, $/session) comes
from ``session_mart`` keyed on ``primary_model`` — same definition
the aggregator path uses, just pre-materialised.

Provider attribution is derived from ``session_mart``'s
``primary_model → provider`` mapping. A model that exists in
``model_day_mart`` but has no surviving session row (e.g. its only
sessions were in a different scope window) inherits the provider
string the legacy path defaults to (``"anthropic"``) so the JSON
contract is preserved.
"""
model_totals = mart_queries.model_day_totals(
conn,
since_iso=scope.since,
until_iso=scope.until,
)
sessions = mart_queries.session_mart_rows_for_compare(
conn,
since_iso=scope.since,
until_iso=scope.until,
provider_filter=provider_filter,
)

# Per-model session attribution from session_mart.
sessions_by_model: dict[str, int] = {}
one_shot_by_model: dict[str, int] = {}
assistant_msgs_by_model: dict[str, int] = {}
cost_by_model: dict[str, float] = {}
provider_by_model: dict[str, str] = {}
for s in sessions:
mdl = s.get("primary_model") or ""
if not mdl:
continue
sessions_by_model[mdl] = sessions_by_model.get(mdl, 0) + 1
if int(s.get("is_one_shot", 0) or 0) == 1:
one_shot_by_model[mdl] = one_shot_by_model.get(mdl, 0) + 1
assistant_msgs_by_model[mdl] = (
assistant_msgs_by_model.get(mdl, 0)
+ int(s.get("assistant_message_count", 0) or 0)
)
cost_by_model[mdl] = cost_by_model.get(mdl, 0.0) + float(
s.get("cost_usd", 0.0) or 0.0
)
# First-seen provider wins — sessions for a given primary_model
# share a provider in practice (model ids map 1:1 to providers).
provider_by_model.setdefault(mdl, str(s.get("provider") or ""))

# When a provider filter is active, we restrict per-model totals to
# the models whose primary_model survived the filter — model_day_mart
# itself doesn't carry provider so the session-side filter is the
# source of truth for provider attribution.
if provider_filter is not None:
model_totals = {m: v for m, v in model_totals.items() if m in sessions_by_model}

out: list[ModelStats] = []
for mdl, totals in model_totals.items():
calls = int(totals.get("message_count", 0) or 0)
if calls == 0:
# Skip models with no events in window — matches the
# aggregator path which skips models that never had an
# assistant message recorded.
continue
cache_read = int(totals.get("cache_read", 0) or 0)
cache_create = int(totals.get("cache_create", 0) or 0)
cacheable = cache_read + cache_create
cache_hit = (cache_read / cacheable) if cacheable else 0.0
# ``total_cost`` for compare lives in model_day_mart (rolls in
# every event regardless of which session's primary model was X).
# ``cost_per_session`` divides that by the count of sessions
# whose primary_model is X — same convention the aggregator
# path uses, kept here for parity.
total_cost = float(totals.get("cost_usd", 0.0) or 0.0)
sessions_count = sessions_by_model.get(mdl, 0)
one_shot = one_shot_by_model.get(mdl, 0)
assistant_msgs = assistant_msgs_by_model.get(mdl, 0)
cost_per_call = (total_cost / calls) if calls else 0.0
cost_per_session = (total_cost / sessions_count) if sessions_count else 0.0
one_shot_pct = (one_shot / sessions_count) if sessions_count else 0.0
retry_rate = (
(assistant_msgs / sessions_count - 1.0) if sessions_count else 0.0
)
total_tokens = (
int(totals.get("input_tokens", 0) or 0)
+ int(totals.get("output_tokens", 0) or 0)
+ cache_read
+ cache_create
)
out.append(
ModelStats(
model=mdl,
provider=provider_by_model.get(mdl) or "anthropic",
sessions=sessions_count,
calls=calls,
one_shot_pct=one_shot_pct,
retry_rate=retry_rate,
cache_hit_rate=cache_hit,
cost_per_call=cost_per_call,
cost_per_session=cost_per_session,
total_cost=total_cost,
total_tokens=total_tokens,
)
)

out.sort(key=lambda r: r.total_cost, reverse=True)
return out


# ── HTTP / CLI payload helper ────────────────────────────────────────────────


Expand Down
Loading
Loading