Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- **Wave 4D — 12 beta provider normalizers.** Codeium (stub), Continue, Copilot, Cursor Agent, Droid, Gemini, KiloCode, Kiro, OpenClaw, OpenCode, Pi/OMP, Qwen, Roo Code now have `Normalizer` subclasses registered at import time. ETL pipeline now covers all 16 providers from the codeburn catalog. Beta providers stay opt-in via the existing `STACKUNDERFLOW_BETA_*` env flags — when they're enabled the matching normalizer fires automatically.
- **Wave 4A — analytical routes migrate to mart reads (Wave 3B redo).** `compare_models()` reads `model_day_mart` + `session_mart`; `yield_tracker._query_sessions()` reads `session_mart`; `optimize._detect_cache_overhead()` reads `session_mart`; `/api/messages/summary` reads `project_mart`. Same JSON contract, ~30-50× faster on the user's real store. Empty-mart fallback to aggregator preserved per route.
- **Wave 4B — backfill actually populates `usage_events`.** `stackunderflow etl backfill` now reads every message from the `messages` table, runs the matching provider normalizer (Wave 2A), and inserts into `usage_events`; `--force` rebuilds from scratch. Idempotent via `uniq_events_msg` UNIQUE index. The ingest writer (`stackunderflow/ingest/writer.py`) gets a normalize+insert hook so newly-ingested messages auto-create events without needing a backfill pass. Marts auto-refresh via `refresh_all_marts()` after each batch.
- **Wave 3A — hot-path routes migrate to mart reads.** `/api/projects?include_stats=true`, `/api/dashboard-data`, and `/api/cost-data` (totals/by_day/by_model blocks) now read from `project_mart` + `daily_mart` instead of running per-request aggregator passes against raw `messages`. Same JSON contract; ~50× faster on the user's 28K-message project (cold 2.5–2.8s → 50ms warm). Per-session / per-command / per-tool detail blocks stay on the aggregator path until lower-grain marts ship in Wave 4.
- **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`.
- **Wave 2A — 4 default-on provider normalizers (`stackunderflow/etl/normalize/`).** Per-provider transforms from raw `messages` rows into canonical `usage_events`. Codex token normalization (subtract cached, fold reasoning) moves out of the pricer into `CodexNormalizer` — single source of truth. Cursor v3 no-per-message-tokens path estimates from `len(text)//4` with `cost_source='estimated'` flag. cost_usd computed once per event during normalization, stored on the row, never recomputed downstream.
Expand Down
62 changes: 50 additions & 12 deletions stackunderflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1263,11 +1263,11 @@ def reindex():

# ── ETL ──────────────────────────────────────────────────────────────────────
#
# Wave 1 of the ETL refactor (see ``docs/specs/etl-architecture.md``) ships
# the schema, ABCs, and orchestrator skeleton. The `etl backfill` command
# is wired now so Wave 2 (normalizers + mart builders) can ship without a
# CLI-touching follow-up — until Wave 2 registers, this is a no-op that
# prints zero counts.
# The ETL refactor (see ``docs/specs/etl-architecture.md``) ships in
# multiple waves. Wave 1 laid the schema + orchestrator skeleton; Wave
# 2 registered normalizers + mart builders + the watcher; Wave 4B
# (this PR) wires the orchestrator body so ``etl backfill`` actually
# populates ``usage_events`` against real data.

@cli.group("etl")
def etl_group():
Expand All @@ -1283,27 +1283,65 @@ def etl_group():
def etl_backfill_cmd(force: bool):
"""Convert all existing messages into usage_events, then refresh marts.

No-op until Wave 2 lands the per-provider normalizers and mart
builders. Until then the orchestrator returns zero counts so it's
safe to wire into deploy scripts and the CLI test suite.
Default mode is incremental: messages already converted on a prior
run are skipped via the ``uniq_events_msg`` UNIQUE index.

``--force`` first wipes ``usage_events`` + ``mart_watermark``,
rebuilds every mart from scratch, and then runs the normalize
pass fresh — useful after a normalizer change or a model rate
update.
"""
from stackunderflow.etl import backfill as etl_backfill

# Optional progress bar — falls back to periodic log lines from the
# orchestrator (one every 10K events) when tqdm isn't installed.
progress_cb = _build_backfill_progress_callback()

conn = _open_store()
try:
report = etl_backfill(conn, force=force)
report = etl_backfill(conn, force=force, progress_callback=progress_cb)
finally:
conn.close()
if progress_cb is not None and hasattr(progress_cb, "close"):
progress_cb.close()

click.echo(f" events inserted: {report.events_inserted:,}")
click.echo("\nBackfill complete.")
click.echo(f" events inserted: {report.events_inserted:,}")
click.echo(f" events skipped (duplicate): {report.events_skipped_duplicate:,}")
if report.marts_refreshed:
click.echo(" marts refreshed:")
for name, count in sorted(report.marts_refreshed.items()):
click.echo(f" {name:<14s} {count:>8,} events")
else:
click.echo(" marts refreshed: (none registered)")
click.echo(f" duration: {report.duration_seconds:.3f}s")
click.echo(" marts refreshed: (none registered)")
click.echo(f" duration: {report.duration_seconds:.3f}s")


def _build_backfill_progress_callback():
"""Return a tqdm-backed progress callback, or None if tqdm is absent.

The returned callable matches the ``backfill()`` orchestrator's
``progress_callback`` signature (``cb(events_so_far, messages_seen)``)
and exposes a ``.close()`` method the CLI calls in the ``finally``
block so the bar gets rendered out cleanly even on Ctrl+C.
"""
try:
from tqdm import tqdm
except ImportError:
return None

bar = tqdm(unit="msg", desc="backfill", dynamic_ncols=True, leave=True)
last_messages = [0]

def _cb(events_so_far: int, messages_seen: int) -> None:
delta = messages_seen - last_messages[0]
if delta > 0:
bar.update(delta)
last_messages[0] = messages_seen
bar.set_postfix(events=f"{events_so_far:,}")

_cb.close = bar.close # type: ignore[attr-defined]
return _cb


# ── helpers ──────────────────────────────────────────────────────────────────
Expand Down
Loading
Loading