Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`.
- **Provider/model filter bar across all dashboard tabs.** New `FilterBar` component renders below the tab strip on every tab, letting users scope the dashboard to one or more providers and/or models. State persists via URL search params (`?provider=cursor&provider=cline&model=opus-4-7`) so filtered views survive refresh and can be shared. React Context (`FiltersProvider` / `useFilters`) is the single source of truth; React Query keys include the active filter set so filter changes trigger refetches automatically. Click-to-filter affordances: Compare row → filter to that (provider, model), CostByProviderCard slice → filter to that provider. Project picker (Header dropdown) gains an optional group-by-provider mode (toggle persists in localStorage) with a per-group "filter" link that scopes the dashboard without leaving the picker. New backend route `GET /api/providers` returns `{providers: [{provider, project_count, session_count}]}` for the chip row's count column. Backend routes already supporting `?provider=` (`/api/compare`, `/api/yield`, `/api/optimize`, `/api/export`) wired through the frontend; remaining routes (`/api/projects`, `/api/cost-data/by-provider`, `/api/jsonl-files`, `/api/messages`, `/api/dashboard-data`) gain matching query-param support with empty=all semantics. URL params are case-insensitive on read (`?provider=Cursor` accepted) but lowercased on emit so canonical URLs stay stable. 8 new backend tests in `tests/stackunderflow/routes/test_providers_filter.py` (provider catalogue, projects narrowing, case-insensitivity, repeated-param union semantics, cost-by-provider rollup filter, empty-filter back-compat); 12 new frontend tests in `stackunderflow-ui/tests/services/filters.test.ts` (URL parsing, normalize, build-query-string, round-trip, no-window SSR safety, no-op-write).
- **`formatModelName()` model display normalizer.** Compact, human-readable display names (`claude-opus-4-7` → `Opus 4.7`, `gemini-2.5-pro-preview-05-06` → `Gemini 2.5 Pro Preview`) used by every component that renders a model id. Full id preserved as the `title` attribute (browser tooltip on hover). Reduces table-column overflow on multi-provider stores. UI-only — no backend or API changes.
- **`(provider, model)` is now a first-class UI dimension.** The Compare tab adds an Agent × Model / Model-only toggle (default Agent × Model) so users with multi-provider data can see, e.g., `claude/claude-opus-4-7` and `cursor/claude-4.5-sonnet-thinking` as separate rows — same model, different agent, different efficiency. Provider chip rendered alongside the model name in Compare, Sessions, and Messages tabs (color-coded per provider). Cost tab adds a `CostByProviderCard` showing spend split per agent for the active period. New backend route `GET /api/cost-data/by-provider?period=today|week|month|all` returns one row per provider (`{provider, cost_usd, message_count, session_count}`, sorted desc) with the standard `currency` block already pre-converted. New `services/providerStyle.tsx` centralises the per-provider colour palette + `shortenModelId` helper (strips `-YYYYMMDD` and `-preview-MM-DD` suffixes), reused by `ProviderChip`, the new card, and the Compare/Messages model cells; all model id renders also gain a `title={model}` tooltip so the full id stays discoverable. 5 new backend tests in `tests/stackunderflow/routes/test_cost_by_provider.py` (provider sort + currency stamping, empty store, invalid-period 400, user-message double-count guard, period-window filter).
Expand Down
10 changes: 6 additions & 4 deletions docs/specs/etl-architecture.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ETL Architecture — Three-Layer Pipeline with Watcher

**Status:** Spec — Wave 1 implements the foundation, Wave 2 fills it in, Wave 3 migrates routes.
**Status:** Wave 1 landed — schema + ABCs + backfill orchestrator are in. Wave 2 fills in normalizers + mart builders + watcher. Wave 3 migrates routes.
**Goal:** Replace ad-hoc per-request aggregation with a real ETL pipeline. Sub-50ms route reads regardless of project size. Sub-second sync from source-file change to dashboard refresh.

---
Expand Down Expand Up @@ -283,9 +283,9 @@ def backfill(conn, *, force: bool = False) -> BackfillReport:
## Dependencies between waves

```
Wave 1 (foundation, sequential)
Wave 1 (foundation, sequential) ✅ landed
├── docs/specs/etl-architecture.md (this file, expanded)
├── migration v004 (usage_events + 5 marts + watermark)
├── migration v006 (usage_events + 5 marts + watermark)
├── etl/normalize/base.py (Normalizer ABC + registry)
├── etl/marts/base.py (MartBuilder ABC + registry)
├── etl/watermark.py (helpers)
Expand Down Expand Up @@ -317,7 +317,9 @@ Wave 3 (parallel, route migrations)

## Migration / rollback

`v004_etl_layer.sql` is **additive** — it doesn't touch the existing `messages` / `sessions` / `projects` tables. Routes can be migrated one at a time; the old aggregator paths keep working until each route is swapped.
`v006_etl_layer.sql` is **additive** — it doesn't touch the existing `messages` / `sessions` / `projects` tables. Routes can be migrated one at a time; the old aggregator paths keep working until each route is swapped.

> **Numbering note.** Earlier drafts of this spec called the migration `v004_etl_layer.sql`. Two unrelated migrations (`v004_clean_synthetic_models.sql`, `v005_cursor_workspace_redistribute.py`) shipped between the spec being written and Wave 1 landing, so the actual file is `v006_etl_layer.sql`. `schema.CURRENT_VERSION` bumps to 6.

Rollback: drop `usage_events` + 5 marts + `mart_watermark`. Routes that already migrated would 500 on read — keep the old aggregator code in tree until every route is migrated and a release ships.

Expand Down
45 changes: 45 additions & 0 deletions stackunderflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,51 @@ def reindex():
click.echo(f"Done: {counts}")


# ── ETL ──────────────────────────────────────────────────────────────────────
#
# Wave 1 of the ETL refactor (see ``docs/specs/etl-architecture.md``) ships
# the schema, ABCs, and orchestrator skeleton. The `etl backfill` command
# is wired now so Wave 2 (normalizers + mart builders) can ship without a
# CLI-touching follow-up — until Wave 2 registers, this is a no-op that
# prints zero counts.

@cli.group("etl")
def etl_group():
"""Run the ETL pipeline (raw messages → events → marts)."""


@etl_group.command("backfill")
@click.option(
"--force",
is_flag=True,
help="Drop events + marts + watermarks and rebuild from scratch.",
)
def etl_backfill_cmd(force: bool):
"""Convert all existing messages into usage_events, then refresh marts.

No-op until Wave 2 lands the per-provider normalizers and mart
builders. Until then the orchestrator returns zero counts so it's
safe to wire into deploy scripts and the CLI test suite.
"""
from stackunderflow.etl import backfill as etl_backfill

conn = _open_store()
try:
report = etl_backfill(conn, force=force)
finally:
conn.close()

click.echo(f" events inserted: {report.events_inserted:,}")
click.echo(f" events skipped (duplicate): {report.events_skipped_duplicate:,}")
if report.marts_refreshed:
click.echo(" marts refreshed:")
for name, count in sorted(report.marts_refreshed.items()):
click.echo(f" {name:<14s} {count:>8,} events")
else:
click.echo(" marts refreshed: (none registered)")
click.echo(f" duration: {report.duration_seconds:.3f}s")


# ── helpers ──────────────────────────────────────────────────────────────────

def _ensure_state_dir() -> None:
Expand Down
29 changes: 29 additions & 0 deletions stackunderflow/etl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""ETL layer — three-stage pipeline that turns raw ``messages`` rows into
analyst-friendly marts.

Layout::

etl/
normalize/ per-provider Normalizer ABC + plug-in registry
marts/ per-mart MartBuilder ABC + plug-in registry
watermark.py read/write/refresh helpers keyed on `mart_watermark`
backfill.py orchestrator: run every normalizer, refresh every mart

Wave 1 ships only the foundation — the registries are empty until Wave 2
registers the provider normalizers and mart builders. Calling
``backfill()`` on a Wave 1 install is a no-op (returns zero counts). See
``docs/specs/etl-architecture.md``.
"""

from __future__ import annotations

from .backfill import BackfillReport, backfill
from .watermark import get_watermark, refresh_all_marts, set_watermark

__all__ = [
"BackfillReport",
"backfill",
"get_watermark",
"set_watermark",
"refresh_all_marts",
]
143 changes: 143 additions & 0 deletions stackunderflow/etl/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Backfill orchestrator — one-shot conversion + mart rebuild.

Walks every existing ``messages`` row through the registered normalizers
to materialize ``usage_events`` rows, then refreshes every registered
mart from those events.

Wave 1 ships only the orchestrator skeleton — the registries are empty
until Wave 2 lands, so calling :func:`backfill` on a Wave 1 install is a
no-op (returns zero counts, empty marts dict). The method contract is
locked so Wave 2 can dispatch in parallel.

See ``docs/specs/etl-architecture.md``.
"""

from __future__ import annotations

import logging
import sqlite3
import time
from dataclasses import dataclass, field

from .normalize import all as _all_normalizers
from .watermark import refresh_all_marts

_log = logging.getLogger(__name__)


@dataclass
class BackfillReport:
"""Summary of one ``backfill()`` call.

Returned to the caller (CLI, API, watcher) so they can render
progress, log timing, or drive further work. ``marts_refreshed``
is a copy of the dict returned by
:func:`stackunderflow.etl.watermark.refresh_all_marts` — empty
when no mart builders are registered (Wave 1 default).
"""

events_inserted: int = 0
events_skipped_duplicate: int = 0
marts_refreshed: dict[str, int] = field(default_factory=dict)
duration_seconds: float = 0.0


def _drop_events_and_marts(conn: sqlite3.Connection) -> None:
"""``force=True`` path: empty every fact + mart table, reset watermarks.

Schema stays intact (``DELETE``, not ``DROP``). The next call to
:func:`backfill` rebuilds everything from raw ``messages``.
"""
conn.execute("DELETE FROM usage_events")
conn.execute("DELETE FROM daily_mart")
conn.execute("DELETE FROM session_mart")
conn.execute("DELETE FROM project_mart")
conn.execute("DELETE FROM provider_day_mart")
conn.execute("DELETE FROM model_day_mart")
conn.execute("DELETE FROM mart_watermark")


def backfill(
conn: sqlite3.Connection,
*,
force: bool = False,
) -> BackfillReport:
"""One-shot: convert all existing ``messages`` into ``usage_events``,
then refresh every mart from the new watermark.

``force=True`` empties events + marts + watermarks and rebuilds from
scratch. Default is incremental — already-converted messages are
skipped via the ``UNIQUE(source_message_fk)`` index.

Wave 1 behaviour
----------------
Both registries are empty in Wave 1, so:

* No normalizer runs → ``events_inserted = 0``,
``events_skipped_duplicate = 0``
* :func:`refresh_all_marts` returns ``{}`` → ``marts_refreshed = {}``

Wave 2 fills both in. The orchestrator shape stays put.
"""
start = time.perf_counter()
report = BackfillReport()

if force:
_drop_events_and_marts(conn)

normalizers = _all_normalizers()
if not normalizers:
# Wave 1: nothing to convert, nothing to refresh that depends on
# new events. Still call refresh_all_marts so empty marts can
# finalize their watermarks (no-op when their registry is also
# empty, so this is the canonical Wave 1 fall-through).
report.marts_refreshed = refresh_all_marts(conn)
report.duration_seconds = time.perf_counter() - start
return report

# Wave 2 fills this in. The shape is locked: each normalizer is
# applied to its provider's ``messages`` rows, yielded events are
# inserted with ``INSERT OR IGNORE`` (so the UNIQUE source_message_fk
# index turns duplicates into a counted skip).
inserted, skipped = _run_normalizers(conn, normalizers)
report.events_inserted = inserted
report.events_skipped_duplicate = skipped

report.marts_refreshed = refresh_all_marts(conn)
report.duration_seconds = time.perf_counter() - start
return report


def _run_normalizers(
conn: sqlite3.Connection,
normalizers: dict,
) -> tuple[int, int]:
"""Run every registered normalizer over its provider's messages.

Returns ``(inserted, skipped_duplicate)``. The implementation here is
a placeholder skeleton that Wave 2 fills in; we keep it threaded so
the orchestrator's report shape is exercised even before real
normalizers register.

Skeleton contract (Wave 2 will replace, not extend):

1. For each ``(provider, NormalizerCls)`` pair:

* Open a cursor selecting ``messages`` rows for that provider's
projects (joined via ``sessions`` → ``projects``).
* For each row, call ``NormalizerCls().normalize(row)`` and
``INSERT OR IGNORE`` each yielded event.
* Count inserts vs ignored duplicates (via ``conn.changes``).

2. Sum the per-provider counters, return.
"""
# Wave 1: registries are populated only in tests with a stub builder
# that has no real messages to walk. We log the intent and return
# zeros so the rest of the orchestrator still exercises the
# ``marts_refreshed`` path.
_log.debug(
"backfill: %d normalizer(s) registered, Wave-2 implementation "
"pending — skipping event insertion",
len(normalizers),
)
return 0, 0
50 changes: 50 additions & 0 deletions stackunderflow/etl/marts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Mart builder registry.

Each mart ships a :class:`MartBuilder` subclass. Modules import this
package and call :func:`register` at import time so the orchestrator
can discover them via :func:`all`.

Registry is module-level state keyed on the mart ``name`` (e.g.
``"daily"``, ``"session"``). ``register`` is **last-wins** — re-registering
silently overwrites the prior class so hot-reload and test overrides
work without error gymnastics.
"""

from __future__ import annotations

from .base import MartBuilder

_REGISTRY: dict[str, type[MartBuilder]] = {}


def register(name: str, mart_cls: type[MartBuilder]) -> None:
"""Register *mart_cls* under *name*.

Last-wins: re-registering the same name silently overwrites.
"""
_REGISTRY[name] = mart_cls


def get(name: str) -> type[MartBuilder] | None:
"""Return the registered class for *name*, or ``None``."""
return _REGISTRY.get(name)


def all() -> dict[str, type[MartBuilder]]: # noqa: A001 — spec-defined name
"""Return a snapshot of the registry (a copy, safe to iterate).

Shadows the ``all`` builtin by design — the spec
(``docs/specs/etl-architecture.md``) names this method ``all()`` to
pair with ``register()``/``get()``. Callers import the module
(``from stackunderflow.etl import marts``) and call
``marts.all()`` so the builtin stays accessible.
"""
return dict(_REGISTRY)


def _clear() -> None:
"""Test-only: reset the registry between tests."""
_REGISTRY.clear()


__all__ = ["MartBuilder", "register", "get", "all"]
Loading
Loading