From 3569890f58127f2e77c5b6426646f16fce98d334 Mon Sep 17 00:00:00 2001 From: Yad Konrad Date: Tue, 5 May 2026 00:20:18 -0400 Subject: [PATCH] =?UTF-8?q?feat(etl):=20Wave=201=20=E2=80=94=20usage=5Feve?= =?UTF-8?q?nts=20+=20marts=20schema=20+=20base=20classes=20+=20backfill=20?= =?UTF-8?q?orchestrator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the three-layer ETL foundation from docs/specs/etl-architecture.md. Migration v006 adds usage_events + 5 marts + mart_watermark (additive, existing tables untouched). New stackunderflow.etl package: Normalizer + MartBuilder ABCs with last-wins registries, watermark helpers, and a backfill orchestrator skeleton. New CLI: stackunderflow etl backfill — no-op until Wave 2 fills in the per-provider normalizers and mart builders, but the contract (BackfillReport shape, refresh_all_marts return type, registry semantics) is locked so Waves 2 and 3 can dispatch in parallel against it. Renumbered the migration from the spec's v004 to v006 (v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute that shipped between the spec and Wave 1) — the spec doc is updated to match. 39 new tests (v006 migration shape, registry overwrite semantics, watermark round-trip, backfill orchestrator + force=True). 1375 backend tests pass total. --- CHANGELOG.md | 1 + docs/specs/etl-architecture.md | 10 +- stackunderflow/cli.py | 45 ++++ stackunderflow/etl/__init__.py | 29 ++ stackunderflow/etl/backfill.py | 143 ++++++++++ stackunderflow/etl/marts/__init__.py | 50 ++++ stackunderflow/etl/marts/base.py | 56 ++++ stackunderflow/etl/normalize/__init__.py | 58 ++++ stackunderflow/etl/normalize/base.py | 53 ++++ stackunderflow/etl/watermark.py | 88 +++++++ .../store/migrations/v006_etl_layer.sql | 150 +++++++++++ stackunderflow/store/schema.py | 2 +- tests/stackunderflow/etl/__init__.py | 0 tests/stackunderflow/etl/test_backfill.py | 195 ++++++++++++++ tests/stackunderflow/etl/test_registries.py | 163 ++++++++++++ tests/stackunderflow/etl/test_watermark.py | 149 +++++++++++ .../store/test_migration_v005.py | 14 +- .../store/test_migration_v006.py | 248 ++++++++++++++++++ tests/stackunderflow/store/test_schema.py | 4 +- 19 files changed, 1449 insertions(+), 9 deletions(-) create mode 100644 stackunderflow/etl/__init__.py create mode 100644 stackunderflow/etl/backfill.py create mode 100644 stackunderflow/etl/marts/__init__.py create mode 100644 stackunderflow/etl/marts/base.py create mode 100644 stackunderflow/etl/normalize/__init__.py create mode 100644 stackunderflow/etl/normalize/base.py create mode 100644 stackunderflow/etl/watermark.py create mode 100644 stackunderflow/store/migrations/v006_etl_layer.sql create mode 100644 tests/stackunderflow/etl/__init__.py create mode 100644 tests/stackunderflow/etl/test_backfill.py create mode 100644 tests/stackunderflow/etl/test_registries.py create mode 100644 tests/stackunderflow/etl/test_watermark.py create mode 100644 tests/stackunderflow/store/test_migration_v006.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b6ed5ef..617cacc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`. - **Provider/model filter bar across all dashboard tabs.** New `FilterBar` component renders below the tab strip on every tab, letting users scope the dashboard to one or more providers and/or models. State persists via URL search params (`?provider=cursor&provider=cline&model=opus-4-7`) so filtered views survive refresh and can be shared. React Context (`FiltersProvider` / `useFilters`) is the single source of truth; React Query keys include the active filter set so filter changes trigger refetches automatically. Click-to-filter affordances: Compare row → filter to that (provider, model), CostByProviderCard slice → filter to that provider. Project picker (Header dropdown) gains an optional group-by-provider mode (toggle persists in localStorage) with a per-group "filter" link that scopes the dashboard without leaving the picker. New backend route `GET /api/providers` returns `{providers: [{provider, project_count, session_count}]}` for the chip row's count column. Backend routes already supporting `?provider=` (`/api/compare`, `/api/yield`, `/api/optimize`, `/api/export`) wired through the frontend; remaining routes (`/api/projects`, `/api/cost-data/by-provider`, `/api/jsonl-files`, `/api/messages`, `/api/dashboard-data`) gain matching query-param support with empty=all semantics. URL params are case-insensitive on read (`?provider=Cursor` accepted) but lowercased on emit so canonical URLs stay stable. 8 new backend tests in `tests/stackunderflow/routes/test_providers_filter.py` (provider catalogue, projects narrowing, case-insensitivity, repeated-param union semantics, cost-by-provider rollup filter, empty-filter back-compat); 12 new frontend tests in `stackunderflow-ui/tests/services/filters.test.ts` (URL parsing, normalize, build-query-string, round-trip, no-window SSR safety, no-op-write). - **`formatModelName()` model display normalizer.** Compact, human-readable display names (`claude-opus-4-7` → `Opus 4.7`, `gemini-2.5-pro-preview-05-06` → `Gemini 2.5 Pro Preview`) used by every component that renders a model id. Full id preserved as the `title` attribute (browser tooltip on hover). Reduces table-column overflow on multi-provider stores. UI-only — no backend or API changes. - **`(provider, model)` is now a first-class UI dimension.** The Compare tab adds an Agent × Model / Model-only toggle (default Agent × Model) so users with multi-provider data can see, e.g., `claude/claude-opus-4-7` and `cursor/claude-4.5-sonnet-thinking` as separate rows — same model, different agent, different efficiency. Provider chip rendered alongside the model name in Compare, Sessions, and Messages tabs (color-coded per provider). Cost tab adds a `CostByProviderCard` showing spend split per agent for the active period. New backend route `GET /api/cost-data/by-provider?period=today|week|month|all` returns one row per provider (`{provider, cost_usd, message_count, session_count}`, sorted desc) with the standard `currency` block already pre-converted. New `services/providerStyle.tsx` centralises the per-provider colour palette + `shortenModelId` helper (strips `-YYYYMMDD` and `-preview-MM-DD` suffixes), reused by `ProviderChip`, the new card, and the Compare/Messages model cells; all model id renders also gain a `title={model}` tooltip so the full id stays discoverable. 5 new backend tests in `tests/stackunderflow/routes/test_cost_by_provider.py` (provider sort + currency stamping, empty store, invalid-period 400, user-message double-count guard, period-window filter). diff --git a/docs/specs/etl-architecture.md b/docs/specs/etl-architecture.md index 1efc1ce..b70cadb 100644 --- a/docs/specs/etl-architecture.md +++ b/docs/specs/etl-architecture.md @@ -1,6 +1,6 @@ # ETL Architecture — Three-Layer Pipeline with Watcher -**Status:** Spec — Wave 1 implements the foundation, Wave 2 fills it in, Wave 3 migrates routes. +**Status:** Wave 1 landed — schema + ABCs + backfill orchestrator are in. Wave 2 fills in normalizers + mart builders + watcher. Wave 3 migrates routes. **Goal:** Replace ad-hoc per-request aggregation with a real ETL pipeline. Sub-50ms route reads regardless of project size. Sub-second sync from source-file change to dashboard refresh. --- @@ -283,9 +283,9 @@ def backfill(conn, *, force: bool = False) -> BackfillReport: ## Dependencies between waves ``` -Wave 1 (foundation, sequential) +Wave 1 (foundation, sequential) ✅ landed ├── docs/specs/etl-architecture.md (this file, expanded) - ├── migration v004 (usage_events + 5 marts + watermark) + ├── migration v006 (usage_events + 5 marts + watermark) ├── etl/normalize/base.py (Normalizer ABC + registry) ├── etl/marts/base.py (MartBuilder ABC + registry) ├── etl/watermark.py (helpers) @@ -317,7 +317,9 @@ Wave 3 (parallel, route migrations) ## Migration / rollback -`v004_etl_layer.sql` is **additive** — it doesn't touch the existing `messages` / `sessions` / `projects` tables. Routes can be migrated one at a time; the old aggregator paths keep working until each route is swapped. +`v006_etl_layer.sql` is **additive** — it doesn't touch the existing `messages` / `sessions` / `projects` tables. Routes can be migrated one at a time; the old aggregator paths keep working until each route is swapped. + +> **Numbering note.** Earlier drafts of this spec called the migration `v004_etl_layer.sql`. Two unrelated migrations (`v004_clean_synthetic_models.sql`, `v005_cursor_workspace_redistribute.py`) shipped between the spec being written and Wave 1 landing, so the actual file is `v006_etl_layer.sql`. `schema.CURRENT_VERSION` bumps to 6. Rollback: drop `usage_events` + 5 marts + `mart_watermark`. Routes that already migrated would 500 on read — keep the old aggregator code in tree until every route is migrated and a release ships. diff --git a/stackunderflow/cli.py b/stackunderflow/cli.py index 49da6c4..7f95e7b 100644 --- a/stackunderflow/cli.py +++ b/stackunderflow/cli.py @@ -1244,6 +1244,51 @@ def reindex(): click.echo(f"Done: {counts}") +# ── ETL ────────────────────────────────────────────────────────────────────── +# +# Wave 1 of the ETL refactor (see ``docs/specs/etl-architecture.md``) ships +# the schema, ABCs, and orchestrator skeleton. The `etl backfill` command +# is wired now so Wave 2 (normalizers + mart builders) can ship without a +# CLI-touching follow-up — until Wave 2 registers, this is a no-op that +# prints zero counts. + +@cli.group("etl") +def etl_group(): + """Run the ETL pipeline (raw messages → events → marts).""" + + +@etl_group.command("backfill") +@click.option( + "--force", + is_flag=True, + help="Drop events + marts + watermarks and rebuild from scratch.", +) +def etl_backfill_cmd(force: bool): + """Convert all existing messages into usage_events, then refresh marts. + + No-op until Wave 2 lands the per-provider normalizers and mart + builders. Until then the orchestrator returns zero counts so it's + safe to wire into deploy scripts and the CLI test suite. + """ + from stackunderflow.etl import backfill as etl_backfill + + conn = _open_store() + try: + report = etl_backfill(conn, force=force) + finally: + conn.close() + + click.echo(f" events inserted: {report.events_inserted:,}") + click.echo(f" events skipped (duplicate): {report.events_skipped_duplicate:,}") + if report.marts_refreshed: + click.echo(" marts refreshed:") + for name, count in sorted(report.marts_refreshed.items()): + click.echo(f" {name:<14s} {count:>8,} events") + else: + click.echo(" marts refreshed: (none registered)") + click.echo(f" duration: {report.duration_seconds:.3f}s") + + # ── helpers ────────────────────────────────────────────────────────────────── def _ensure_state_dir() -> None: diff --git a/stackunderflow/etl/__init__.py b/stackunderflow/etl/__init__.py new file mode 100644 index 0000000..adb2e43 --- /dev/null +++ b/stackunderflow/etl/__init__.py @@ -0,0 +1,29 @@ +"""ETL layer — three-stage pipeline that turns raw ``messages`` rows into +analyst-friendly marts. + +Layout:: + + etl/ + normalize/ per-provider Normalizer ABC + plug-in registry + marts/ per-mart MartBuilder ABC + plug-in registry + watermark.py read/write/refresh helpers keyed on `mart_watermark` + backfill.py orchestrator: run every normalizer, refresh every mart + +Wave 1 ships only the foundation — the registries are empty until Wave 2 +registers the provider normalizers and mart builders. Calling +``backfill()`` on a Wave 1 install is a no-op (returns zero counts). See +``docs/specs/etl-architecture.md``. +""" + +from __future__ import annotations + +from .backfill import BackfillReport, backfill +from .watermark import get_watermark, refresh_all_marts, set_watermark + +__all__ = [ + "BackfillReport", + "backfill", + "get_watermark", + "set_watermark", + "refresh_all_marts", +] diff --git a/stackunderflow/etl/backfill.py b/stackunderflow/etl/backfill.py new file mode 100644 index 0000000..0e04daf --- /dev/null +++ b/stackunderflow/etl/backfill.py @@ -0,0 +1,143 @@ +"""Backfill orchestrator — one-shot conversion + mart rebuild. + +Walks every existing ``messages`` row through the registered normalizers +to materialize ``usage_events`` rows, then refreshes every registered +mart from those events. + +Wave 1 ships only the orchestrator skeleton — the registries are empty +until Wave 2 lands, so calling :func:`backfill` on a Wave 1 install is a +no-op (returns zero counts, empty marts dict). The method contract is +locked so Wave 2 can dispatch in parallel. + +See ``docs/specs/etl-architecture.md``. +""" + +from __future__ import annotations + +import logging +import sqlite3 +import time +from dataclasses import dataclass, field + +from .normalize import all as _all_normalizers +from .watermark import refresh_all_marts + +_log = logging.getLogger(__name__) + + +@dataclass +class BackfillReport: + """Summary of one ``backfill()`` call. + + Returned to the caller (CLI, API, watcher) so they can render + progress, log timing, or drive further work. ``marts_refreshed`` + is a copy of the dict returned by + :func:`stackunderflow.etl.watermark.refresh_all_marts` — empty + when no mart builders are registered (Wave 1 default). + """ + + events_inserted: int = 0 + events_skipped_duplicate: int = 0 + marts_refreshed: dict[str, int] = field(default_factory=dict) + duration_seconds: float = 0.0 + + +def _drop_events_and_marts(conn: sqlite3.Connection) -> None: + """``force=True`` path: empty every fact + mart table, reset watermarks. + + Schema stays intact (``DELETE``, not ``DROP``). The next call to + :func:`backfill` rebuilds everything from raw ``messages``. + """ + conn.execute("DELETE FROM usage_events") + conn.execute("DELETE FROM daily_mart") + conn.execute("DELETE FROM session_mart") + conn.execute("DELETE FROM project_mart") + conn.execute("DELETE FROM provider_day_mart") + conn.execute("DELETE FROM model_day_mart") + conn.execute("DELETE FROM mart_watermark") + + +def backfill( + conn: sqlite3.Connection, + *, + force: bool = False, +) -> BackfillReport: + """One-shot: convert all existing ``messages`` into ``usage_events``, + then refresh every mart from the new watermark. + + ``force=True`` empties events + marts + watermarks and rebuilds from + scratch. Default is incremental — already-converted messages are + skipped via the ``UNIQUE(source_message_fk)`` index. + + Wave 1 behaviour + ---------------- + Both registries are empty in Wave 1, so: + + * No normalizer runs → ``events_inserted = 0``, + ``events_skipped_duplicate = 0`` + * :func:`refresh_all_marts` returns ``{}`` → ``marts_refreshed = {}`` + + Wave 2 fills both in. The orchestrator shape stays put. + """ + start = time.perf_counter() + report = BackfillReport() + + if force: + _drop_events_and_marts(conn) + + normalizers = _all_normalizers() + if not normalizers: + # Wave 1: nothing to convert, nothing to refresh that depends on + # new events. Still call refresh_all_marts so empty marts can + # finalize their watermarks (no-op when their registry is also + # empty, so this is the canonical Wave 1 fall-through). + report.marts_refreshed = refresh_all_marts(conn) + report.duration_seconds = time.perf_counter() - start + return report + + # Wave 2 fills this in. The shape is locked: each normalizer is + # applied to its provider's ``messages`` rows, yielded events are + # inserted with ``INSERT OR IGNORE`` (so the UNIQUE source_message_fk + # index turns duplicates into a counted skip). + inserted, skipped = _run_normalizers(conn, normalizers) + report.events_inserted = inserted + report.events_skipped_duplicate = skipped + + report.marts_refreshed = refresh_all_marts(conn) + report.duration_seconds = time.perf_counter() - start + return report + + +def _run_normalizers( + conn: sqlite3.Connection, + normalizers: dict, +) -> tuple[int, int]: + """Run every registered normalizer over its provider's messages. + + Returns ``(inserted, skipped_duplicate)``. The implementation here is + a placeholder skeleton that Wave 2 fills in; we keep it threaded so + the orchestrator's report shape is exercised even before real + normalizers register. + + Skeleton contract (Wave 2 will replace, not extend): + + 1. For each ``(provider, NormalizerCls)`` pair: + + * Open a cursor selecting ``messages`` rows for that provider's + projects (joined via ``sessions`` → ``projects``). + * For each row, call ``NormalizerCls().normalize(row)`` and + ``INSERT OR IGNORE`` each yielded event. + * Count inserts vs ignored duplicates (via ``conn.changes``). + + 2. Sum the per-provider counters, return. + """ + # Wave 1: registries are populated only in tests with a stub builder + # that has no real messages to walk. We log the intent and return + # zeros so the rest of the orchestrator still exercises the + # ``marts_refreshed`` path. + _log.debug( + "backfill: %d normalizer(s) registered, Wave-2 implementation " + "pending — skipping event insertion", + len(normalizers), + ) + return 0, 0 diff --git a/stackunderflow/etl/marts/__init__.py b/stackunderflow/etl/marts/__init__.py new file mode 100644 index 0000000..7235094 --- /dev/null +++ b/stackunderflow/etl/marts/__init__.py @@ -0,0 +1,50 @@ +"""Mart builder registry. + +Each mart ships a :class:`MartBuilder` subclass. Modules import this +package and call :func:`register` at import time so the orchestrator +can discover them via :func:`all`. + +Registry is module-level state keyed on the mart ``name`` (e.g. +``"daily"``, ``"session"``). ``register`` is **last-wins** — re-registering +silently overwrites the prior class so hot-reload and test overrides +work without error gymnastics. +""" + +from __future__ import annotations + +from .base import MartBuilder + +_REGISTRY: dict[str, type[MartBuilder]] = {} + + +def register(name: str, mart_cls: type[MartBuilder]) -> None: + """Register *mart_cls* under *name*. + + Last-wins: re-registering the same name silently overwrites. + """ + _REGISTRY[name] = mart_cls + + +def get(name: str) -> type[MartBuilder] | None: + """Return the registered class for *name*, or ``None``.""" + return _REGISTRY.get(name) + + +def all() -> dict[str, type[MartBuilder]]: # noqa: A001 — spec-defined name + """Return a snapshot of the registry (a copy, safe to iterate). + + Shadows the ``all`` builtin by design — the spec + (``docs/specs/etl-architecture.md``) names this method ``all()`` to + pair with ``register()``/``get()``. Callers import the module + (``from stackunderflow.etl import marts``) and call + ``marts.all()`` so the builtin stays accessible. + """ + return dict(_REGISTRY) + + +def _clear() -> None: + """Test-only: reset the registry between tests.""" + _REGISTRY.clear() + + +__all__ = ["MartBuilder", "register", "get", "all"] diff --git a/stackunderflow/etl/marts/base.py b/stackunderflow/etl/marts/base.py new file mode 100644 index 0000000..6510364 --- /dev/null +++ b/stackunderflow/etl/marts/base.py @@ -0,0 +1,56 @@ +"""MartBuilder ABC — incremental + full-rebuild contract for marts. + +See ``docs/specs/etl-architecture.md`` §"MartBuilder ABC". Wave 2 ships +the five default mart builders (daily, session, project, provider_day, +model_day). Wave 1 only defines the contract. +""" + +from __future__ import annotations + +import sqlite3 +from abc import ABC, abstractmethod + + +class MartBuilder(ABC): + """Per-mart transform: ``usage_events`` rows → mart rows. + + Subclasses set ``name`` (the registry key) and implement + :meth:`refresh`. Each mart builder owns its rebuild SQL; no mart + depends on another, and each mart maintains an independent + ``mart_watermark.last_event_id`` so partial failures self-heal. + """ + + name: str # "daily" | "session" | "project" | "provider_day" | "model_day" + + @abstractmethod + def refresh(self, conn: sqlite3.Connection, since_event_id: int) -> int: + """Upsert mart rows for ``usage_events`` with ``id > since_event_id``. + + Returns the highest ``event_id`` consumed. Caller persists this as + the new watermark via + :func:`stackunderflow.etl.watermark.set_watermark`. + + **Idempotent**: re-running with the same *since_event_id* is a + no-op for already-built rows. Implementations use + ``INSERT ... ON CONFLICT DO UPDATE`` so a re-run after a partial + failure self-heals. + + Returning the same value as *since_event_id* (or 0) means there + was nothing new to process — the watermark stays put. + """ + + def rebuild_from_scratch( # noqa: B027 — concrete no-op default by design + self, conn: sqlite3.Connection + ) -> None: + """Drop + re-create + full backfill of this mart. + + Default implementation is a no-op so subclasses can override only + when their incremental refresh has a structural change. Used by + the ``--force`` / ``--rebuild`` paths in :mod:`backfill`. Must be + idempotent. + + Concrete (not abstract) so subclasses with a perfectly-incremental + ``refresh`` don't need to write a stub. The ``backfill(force=True)`` + path uses ``DELETE FROM `` directly, so most builders never + need to override this. + """ diff --git a/stackunderflow/etl/normalize/__init__.py b/stackunderflow/etl/normalize/__init__.py new file mode 100644 index 0000000..c021d47 --- /dev/null +++ b/stackunderflow/etl/normalize/__init__.py @@ -0,0 +1,58 @@ +"""Provider normalizer registry. + +Each provider's adapter ships a :class:`Normalizer` subclass that turns a +``messages`` row into 0..N ``usage_events`` rows. Modules import this +package and call :func:`register` at import time so the orchestrator can +discover them via :func:`all`. + +Registry is module-level state keyed on ``provider_name``. ``register`` +is **last-wins**: re-registering the same name silently overwrites the +prior class. That makes hot-reload during development and registry- +overrides in tests trivial — no "already registered" error to navigate +around. +""" + +from __future__ import annotations + +from .base import Normalizer + +_REGISTRY: dict[str, type[Normalizer]] = {} + + +def register(provider: str, normalizer_cls: type[Normalizer]) -> None: + """Register *normalizer_cls* for *provider*. + + Last-wins: re-registering the same provider silently overwrites the + prior class. Tests and hot-reload depend on this behaviour. + """ + _REGISTRY[provider] = normalizer_cls + + +def get(provider: str) -> type[Normalizer] | None: + """Return the registered class for *provider*, or ``None``.""" + return _REGISTRY.get(provider) + + +def all() -> dict[str, type[Normalizer]]: # noqa: A001 — spec-defined name + """Return a snapshot of the registry. + + Returns a *copy* so callers can iterate while other code registers + without mutating the live dict mid-loop. + + Shadows the ``all`` builtin by design — the spec + (``docs/specs/etl-architecture.md``) names this method ``all()`` to + pair with ``register()``/``get()``. Callers either ``from ... import + normalize`` then ``normalize.all()``, or never need the builtin. + """ + return dict(_REGISTRY) + + +def _clear() -> None: + """Test-only: reset the registry between tests. + + Public ``register`` is last-wins, so most tests don't need this. Use + only when a test wants to assert the empty-registry path.""" + _REGISTRY.clear() + + +__all__ = ["Normalizer", "register", "get", "all"] diff --git a/stackunderflow/etl/normalize/base.py b/stackunderflow/etl/normalize/base.py new file mode 100644 index 0000000..2ac8962 --- /dev/null +++ b/stackunderflow/etl/normalize/base.py @@ -0,0 +1,53 @@ +"""Normalizer ABC — per-provider transform from ``messages`` rows to +``usage_events`` rows. + +See ``docs/specs/etl-architecture.md`` §"Normalizer ABC". Wave 2 ships +the four default subclasses (claude, codex, cursor, cline). Wave 1 only +defines the contract. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Iterable + + +class Normalizer(ABC): + """Per-provider transform: ``messages`` row → ``usage_events`` row(s). + + Subclasses set ``provider_name`` (the registry key) and implement + :meth:`normalize`. Most providers yield 0 or 1 events per messages + row (assistant turns with usage); some (cline tasks) may yield N. + + Provider-specific quirks resolved here ONLY: + + * codex: subtract cached from input, fold reasoning into output + * cursor: estimate tokens from text length when zero, mark + ``cost_source='estimated'`` + * cline: per-task → per-event split keyed by ``api_req_started`` + + Cost is computed during normalization and stored on the row, so the + mart layer never re-applies a rate card. + """ + + provider_name: str # "claude" | "codex" | "cursor" | ... + + @abstractmethod + def normalize(self, msg_row: dict) -> Iterable[dict]: + """Convert one ``messages`` row into 0..N ``usage_events`` rows. + + ``msg_row`` is a ``sqlite3.Row``-style mapping with the columns + documented in ``v001_initial.sql`` (plus ``speed`` from v003). + Yielded dicts must match the ``usage_events`` column shape from + ``v006_etl_layer.sql`` — at minimum: + + * ``source_message_fk`` (int, the messages.id) + * ``provider`` (str, == ``self.provider_name``) + * ``project_id`` (int) + * ``session_id`` (str) + * ``ts`` (ISO8601 UTC) + * ``day`` (YYYY-MM-DD, derived from ts) + * ``role`` (user | assistant | tool | system) + + Optional keys default per the schema's DEFAULT clauses. + """ diff --git a/stackunderflow/etl/watermark.py b/stackunderflow/etl/watermark.py new file mode 100644 index 0000000..c943011 --- /dev/null +++ b/stackunderflow/etl/watermark.py @@ -0,0 +1,88 @@ +"""Watermark helpers for the marts layer. + +Each mart maintains an independent ``last_event_id`` watermark so that +incremental refresh can pick up where the previous run left off. The +helpers in this module wrap the four operations the orchestrator needs: + +* :func:`get_watermark` — read the current ``last_event_id`` for a mart + (returns ``0`` when the mart has never been built). +* :func:`set_watermark` — upsert ``last_event_id`` + ``last_refresh_ts``. +* :func:`refresh_all_marts` — for each registered mart, read its + watermark, call ``refresh(since=)``, persist the returned + ``last_event_id`` back. Returns ``{mart_name: events_processed}``. + +All three are idempotent and safe to call from any thread holding a +connection (callers manage transactions themselves). +""" + +from __future__ import annotations + +import sqlite3 +from datetime import UTC, datetime + +from .marts import all as _all_marts + + +def get_watermark(conn: sqlite3.Connection, mart_name: str) -> int: + """Return the current ``last_event_id`` watermark for *mart_name*. + + Missing watermark → ``0`` (never been refreshed). The caller passes + this into :meth:`MartBuilder.refresh` as ``since_event_id``. + """ + row = conn.execute( + "SELECT last_event_id FROM mart_watermark WHERE mart_name = ?", + (mart_name,), + ).fetchone() + if row is None: + return 0 + # sqlite3.Row supports both index and name access; raw tuples don't + # have ``keys`` so we fall back to positional indexing. + return int(row["last_event_id"]) if hasattr(row, "keys") else int(row[0]) + + +def set_watermark( + conn: sqlite3.Connection, + mart_name: str, + last_event_id: int, +) -> None: + """Upsert the watermark for *mart_name* to *last_event_id*. + + Stamps ``last_refresh_ts`` with the current UTC time. Idempotent + by virtue of ``ON CONFLICT DO UPDATE``. + """ + now = datetime.now(UTC).isoformat() + conn.execute( + """ + INSERT INTO mart_watermark (mart_name, last_event_id, last_refresh_ts) + VALUES (?, ?, ?) + ON CONFLICT(mart_name) DO UPDATE SET + last_event_id = excluded.last_event_id, + last_refresh_ts = excluded.last_refresh_ts + """, + (mart_name, int(last_event_id), now), + ) + + +def refresh_all_marts(conn: sqlite3.Connection) -> dict[str, int]: + """Refresh every registered mart from its current watermark. + + For each mart in :func:`stackunderflow.etl.marts.all`: + + 1. Read the current watermark (``0`` if missing) + 2. Instantiate the mart builder and call ``refresh(conn, watermark)`` + 3. Persist the returned ``last_event_id`` via :func:`set_watermark` + 4. Record ``events_processed = new_watermark - old_watermark`` in + the result dict + + Returns ``{mart_name: events_processed}``. An empty registry returns + ``{}`` — that's the Wave 1 default until Wave 2 lands. Idempotent: + re-running with no new events returns ``{name: 0}`` for each mart. + """ + out: dict[str, int] = {} + for name, cls in _all_marts().items(): + old = get_watermark(conn, name) + builder = cls() + new = builder.refresh(conn, old) + set_watermark(conn, name, new) + out[name] = max(0, int(new) - int(old)) + return out diff --git a/stackunderflow/store/migrations/v006_etl_layer.sql b/stackunderflow/store/migrations/v006_etl_layer.sql new file mode 100644 index 0000000..5311f98 --- /dev/null +++ b/stackunderflow/store/migrations/v006_etl_layer.sql @@ -0,0 +1,150 @@ +-- v006: ETL foundation — usage_events fact table + 5 marts + watermark. +-- +-- See ``docs/specs/etl-architecture.md`` for the full design. Migration +-- is **additive**: it does not touch existing ``messages`` / ``sessions`` / +-- ``projects`` / ``ingest_log`` tables. Existing routes and aggregator +-- code keep working unchanged. Wave 1 only lays the schema; Waves 2 and +-- 3 fill in the normalizers, mart builders, watcher and route migrations. +-- +-- Note on numbering: the spec refers to this as ``v004_etl_layer.sql``, +-- but two migrations (v004 synthetic-models cleanup, v005 cursor-workspace +-- redistribute) shipped between the spec being written and Wave 1 landing. +-- The migration is therefore wired in as v006 — the spec doc is updated +-- to match. + +BEGIN; + +-- ── canonical fact table ──────────────────────────────────────────────────── +-- +-- One row per billable event. ``source_message_fk`` is the dedup key — +-- re-running normalization for an already-converted ``messages`` row is +-- a no-op (UNIQUE constraint + ON CONFLICT handling in the normalizer). +CREATE TABLE usage_events ( + id INTEGER PRIMARY KEY, + -- provenance + source_message_fk INTEGER NOT NULL REFERENCES messages(id) ON DELETE CASCADE, + provider TEXT NOT NULL, + account TEXT NOT NULL DEFAULT 'default', + project_id INTEGER NOT NULL REFERENCES projects(id), + session_id TEXT NOT NULL, + -- temporal + ts TEXT NOT NULL, + day TEXT NOT NULL, + -- model + tier + model TEXT NOT NULL DEFAULT '', + speed TEXT NOT NULL DEFAULT 'standard', + -- canonical 4-token shape (Anthropic-style) + input_tokens INTEGER NOT NULL DEFAULT 0, + output_tokens INTEGER NOT NULL DEFAULT 0, + cache_read_tokens INTEGER NOT NULL DEFAULT 0, + cache_create_tokens INTEGER NOT NULL DEFAULT 0, + -- cost (computed during normalization, stored) + cost_usd REAL NOT NULL DEFAULT 0.0, + cost_source TEXT NOT NULL DEFAULT 'rate_card', + -- structural + role TEXT NOT NULL, + -- extensibility — JSON; provider-specific fields preserved verbatim + raw_extras TEXT +); + +CREATE INDEX idx_events_day ON usage_events(day); +CREATE INDEX idx_events_project ON usage_events(project_id, day); +CREATE INDEX idx_events_provider ON usage_events(provider, day); +CREATE INDEX idx_events_session ON usage_events(session_id); +CREATE INDEX idx_events_model ON usage_events(model, day); +CREATE UNIQUE INDEX uniq_events_msg ON usage_events(source_message_fk); + +-- ── marts ─────────────────────────────────────────────────────────────────── +-- +-- Each mart owns its rebuild SQL. No mart depends on another. Watermarks +-- are tracked separately in ``mart_watermark`` so each can refresh +-- independently. + +CREATE TABLE daily_mart ( + day TEXT NOT NULL, + project_id INTEGER NOT NULL, + provider TEXT NOT NULL, + model TEXT NOT NULL DEFAULT '', + speed TEXT NOT NULL DEFAULT 'standard', + input_tokens INTEGER NOT NULL DEFAULT 0, + output_tokens INTEGER NOT NULL DEFAULT 0, + cache_read INTEGER NOT NULL DEFAULT 0, + cache_create INTEGER NOT NULL DEFAULT 0, + message_count INTEGER NOT NULL DEFAULT 0, + session_count INTEGER NOT NULL DEFAULT 0, + cost_usd REAL NOT NULL DEFAULT 0.0, + PRIMARY KEY (day, project_id, provider, model, speed) +); +CREATE INDEX idx_daily_mart_project ON daily_mart(project_id, day); + +CREATE TABLE session_mart ( + session_id TEXT PRIMARY KEY, + project_id INTEGER NOT NULL, + provider TEXT NOT NULL, + primary_model TEXT, + first_ts TEXT NOT NULL, + last_ts TEXT NOT NULL, + message_count INTEGER NOT NULL DEFAULT 0, + user_message_count INTEGER NOT NULL DEFAULT 0, + assistant_message_count INTEGER NOT NULL DEFAULT 0, + input_tokens INTEGER NOT NULL DEFAULT 0, + output_tokens INTEGER NOT NULL DEFAULT 0, + cache_read INTEGER NOT NULL DEFAULT 0, + cache_create INTEGER NOT NULL DEFAULT 0, + cost_usd REAL NOT NULL DEFAULT 0.0, + is_one_shot INTEGER NOT NULL DEFAULT 0, + cwd TEXT +); +CREATE INDEX idx_session_mart_project ON session_mart(project_id); +CREATE INDEX idx_session_mart_first ON session_mart(first_ts); + +CREATE TABLE project_mart ( + project_id INTEGER PRIMARY KEY, + provider TEXT NOT NULL, + slug TEXT NOT NULL, + display_name TEXT NOT NULL, + first_ts TEXT, + last_ts TEXT, + total_messages INTEGER NOT NULL DEFAULT 0, + total_sessions INTEGER NOT NULL DEFAULT 0, + total_input_tokens INTEGER NOT NULL DEFAULT 0, + total_output_tokens INTEGER NOT NULL DEFAULT 0, + total_cache_read INTEGER NOT NULL DEFAULT 0, + total_cache_create INTEGER NOT NULL DEFAULT 0, + total_cost_usd REAL NOT NULL DEFAULT 0.0 +); + +CREATE TABLE provider_day_mart ( + day TEXT NOT NULL, + provider TEXT NOT NULL, + cost_usd REAL NOT NULL DEFAULT 0.0, + message_count INTEGER NOT NULL DEFAULT 0, + session_count INTEGER NOT NULL DEFAULT 0, + project_count INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (day, provider) +); +CREATE INDEX idx_provider_day_mart_day ON provider_day_mart(day); + +CREATE TABLE model_day_mart ( + day TEXT NOT NULL, + model TEXT NOT NULL, + speed TEXT NOT NULL DEFAULT 'standard', + cost_usd REAL NOT NULL DEFAULT 0.0, + input_tokens INTEGER NOT NULL DEFAULT 0, + output_tokens INTEGER NOT NULL DEFAULT 0, + cache_read INTEGER NOT NULL DEFAULT 0, + cache_create INTEGER NOT NULL DEFAULT 0, + message_count INTEGER NOT NULL DEFAULT 0, + session_count INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (day, model, speed) +); + +CREATE TABLE mart_watermark ( + mart_name TEXT PRIMARY KEY, + last_event_id INTEGER NOT NULL DEFAULT 0, + last_refresh_ts TEXT NOT NULL +); + +PRAGMA user_version = 6; + +COMMIT; diff --git a/stackunderflow/store/schema.py b/stackunderflow/store/schema.py index 747ebbc..b8f1167 100644 --- a/stackunderflow/store/schema.py +++ b/stackunderflow/store/schema.py @@ -26,7 +26,7 @@ _MIGRATIONS_DIR = Path(__file__).parent / "migrations" -CURRENT_VERSION = 5 +CURRENT_VERSION = 6 def apply(conn: sqlite3.Connection) -> None: diff --git a/tests/stackunderflow/etl/__init__.py b/tests/stackunderflow/etl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/stackunderflow/etl/test_backfill.py b/tests/stackunderflow/etl/test_backfill.py new file mode 100644 index 0000000..2d1fc39 --- /dev/null +++ b/tests/stackunderflow/etl/test_backfill.py @@ -0,0 +1,195 @@ +"""Backfill orchestrator — Wave 1 shape contract. + +Wave 1 ships only the orchestrator skeleton: registries are empty until +Wave 2 lands, so :func:`backfill` returns zero-count reports. These +tests pin the orchestrator shape (the BackfillReport fields, the +empty-registry default, the ``force=True`` reset path) so Wave 2 can +fill in the bodies without changing the public surface. +""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest + +from stackunderflow.etl import marts as marts_registry +from stackunderflow.etl import normalize as normalize_registry +from stackunderflow.etl.backfill import BackfillReport, backfill +from stackunderflow.etl.marts.base import MartBuilder +from stackunderflow.etl.watermark import get_watermark, set_watermark +from stackunderflow.store import db, schema + + +@pytest.fixture +def conn(tmp_path: Path) -> sqlite3.Connection: + c = db.connect(tmp_path / "store.db") + schema.apply(c) + yield c + c.close() + + +@pytest.fixture(autouse=True) +def _clean_registries(): + normalize_registry._clear() + marts_registry._clear() + yield + normalize_registry._clear() + marts_registry._clear() + + +def _seed_minimal_event(conn: sqlite3.Connection) -> int: + """Insert one ``usage_events`` row by way of the upstream tables. + + Returns the inserted event id. Used to verify ``force=True`` empties + the table even when there's data to drop. + """ + conn.execute( + "INSERT INTO projects (provider, slug, display_name, " + "first_seen, last_modified) VALUES ('claude', 'p', 'p', 0, 0)" + ) + proj_id = conn.execute("SELECT id FROM projects").fetchone()["id"] + conn.execute( + "INSERT INTO sessions (project_id, session_id) VALUES (?, 's')", + (proj_id,), + ) + sess_id = conn.execute("SELECT id FROM sessions").fetchone()["id"] + conn.execute( + "INSERT INTO messages (session_fk, seq, timestamp, role, raw_json) " + "VALUES (?, 0, '2026-04-01T00:00:00+00:00', 'assistant', '{}')", + (sess_id,), + ) + msg_id = conn.execute("SELECT id FROM messages").fetchone()["id"] + conn.execute( + "INSERT INTO usage_events (" + " source_message_fk, provider, project_id, session_id, " + " ts, day, role" + ") VALUES (?, 'claude', ?, 's', " + " '2026-04-01T00:00:00+00:00', '2026-04-01', 'assistant')", + (msg_id, proj_id), + ) + return conn.execute( + "SELECT id FROM usage_events ORDER BY id DESC LIMIT 1" + ).fetchone()["id"] + + +# ── empty-registry shape (Wave 1 default) ─────────────────────────────────── + + +def test_backfill_empty_store_empty_report(conn): + """Fresh DB + empty registries → all-zero report.""" + report = backfill(conn) + + assert isinstance(report, BackfillReport) + assert report.events_inserted == 0 + assert report.events_skipped_duplicate == 0 + assert report.marts_refreshed == {} + # Timing must be a non-negative float (perf_counter delta). + assert report.duration_seconds >= 0 + + +def test_backfill_idempotent(conn): + """Re-running on a fresh store yields the same shape.""" + first = backfill(conn) + second = backfill(conn) + + # Both runs report zero-counts; the field shape is what we're + # locking down for Wave 2. + assert first.events_inserted == 0 + assert first.events_skipped_duplicate == 0 + assert first.marts_refreshed == {} + assert second.events_inserted == 0 + assert second.events_skipped_duplicate == 0 + assert second.marts_refreshed == {} + + +# ── force=True reset ──────────────────────────────────────────────────────── + + +def test_backfill_force_drops_events_and_marts(conn): + """``force=True`` empties usage_events + every mart + watermarks.""" + _seed_minimal_event(conn) + set_watermark(conn, "daily", 999) + + # Sanity: data is present before the force run. + assert ( + conn.execute("SELECT COUNT(*) FROM usage_events").fetchone()[0] == 1 + ) + assert get_watermark(conn, "daily") == 999 + + backfill(conn, force=True) + + # All cleared. (No normalizers registered, so no re-population.) + assert ( + conn.execute("SELECT COUNT(*) FROM usage_events").fetchone()[0] == 0 + ) + assert get_watermark(conn, "daily") == 0 + for tbl in ( + "daily_mart", + "session_mart", + "project_mart", + "provider_day_mart", + "model_day_mart", + "mart_watermark", + ): + count = conn.execute(f"SELECT COUNT(*) FROM {tbl}").fetchone()[0] # noqa: S608 — table name is a hard-coded literal + assert count == 0, f"{tbl} should be empty after force=True" + + +def test_backfill_force_is_idempotent(conn): + """Running ``force=True`` twice doesn't error and leaves everything empty.""" + _seed_minimal_event(conn) + + backfill(conn, force=True) + # Second force run on already-empty tables must still be a clean no-op. + report = backfill(conn, force=True) + + assert report.events_inserted == 0 + assert ( + conn.execute("SELECT COUNT(*) FROM usage_events").fetchone()[0] == 0 + ) + + +# ── orchestrator wires marts even with empty normalizer registry ──────────── + + +class _StubMart(MartBuilder): + name = "stub" + + def refresh(self, conn, since_event_id: int) -> int: + # No-op refresh: returns the existing watermark. + return since_event_id + + +def test_backfill_calls_refresh_all_marts_when_normalizers_empty(conn): + """Even with no normalizers, registered marts get a refresh pass. + + This pins the Wave 1 fall-through: ``refresh_all_marts`` runs so + each mart can finalize its watermark even when no new events + arrived. Wave 2 keeps this behaviour when it lands. + """ + marts_registry.register("stub", _StubMart) + + report = backfill(conn) + + # The mart was visited even though no normalizer ran. + assert "stub" in report.marts_refreshed + assert report.marts_refreshed["stub"] == 0 + + +def test_backfill_report_shape_locked(): + """Pin the dataclass field set so Wave 2 can't quietly add or drop fields.""" + report = BackfillReport() + field_names = {f.name for f in BackfillReport.__dataclass_fields__.values()} + assert field_names == { + "events_inserted", + "events_skipped_duplicate", + "marts_refreshed", + "duration_seconds", + } + # Defaults are sensible. + assert report.events_inserted == 0 + assert report.events_skipped_duplicate == 0 + assert report.marts_refreshed == {} + assert report.duration_seconds == 0.0 diff --git a/tests/stackunderflow/etl/test_registries.py b/tests/stackunderflow/etl/test_registries.py new file mode 100644 index 0000000..2fb51fa --- /dev/null +++ b/tests/stackunderflow/etl/test_registries.py @@ -0,0 +1,163 @@ +"""Plug-in registry contracts for normalize + marts. + +Both registries follow the same shape: ``register(name, cls)`` / +``get(name)`` / ``all()`` with last-wins semantics on duplicate names. +These tests pin both registries against the same checklist so a future +divergence shows up immediately. Wave 2 dispatches in parallel against +this contract — the tests are the contract. +""" + +from __future__ import annotations + +from collections.abc import Iterable + +from stackunderflow.etl import marts as marts_registry +from stackunderflow.etl import normalize as normalize_registry +from stackunderflow.etl.marts.base import MartBuilder +from stackunderflow.etl.normalize.base import Normalizer + +# ── normalizer fixtures ───────────────────────────────────────────────────── + + +class _DummyClaude(Normalizer): + provider_name = "claude-test" + + def normalize(self, msg_row: dict) -> Iterable[dict]: # pragma: no cover + return iter(()) + + +class _DummyClaudeAlt(Normalizer): + """Second class with the same provider_name to exercise overwrite.""" + + provider_name = "claude-test" + + def normalize(self, msg_row: dict) -> Iterable[dict]: # pragma: no cover + return iter(()) + + +class _DummyCodex(Normalizer): + provider_name = "codex-test" + + def normalize(self, msg_row: dict) -> Iterable[dict]: # pragma: no cover + return iter(()) + + +# ── mart fixtures ─────────────────────────────────────────────────────────── + + +class _DummyDailyMart(MartBuilder): + name = "daily-test" + + def refresh(self, conn, since_event_id: int) -> int: # pragma: no cover + return since_event_id + + +class _DummyDailyMartAlt(MartBuilder): + name = "daily-test" + + def refresh(self, conn, since_event_id: int) -> int: # pragma: no cover + return since_event_id + + +class _DummySessionMart(MartBuilder): + name = "session-test" + + def refresh(self, conn, since_event_id: int) -> int: # pragma: no cover + return since_event_id + + +# ── normalize registry ────────────────────────────────────────────────────── + + +def test_normalize_register_get_all(): + normalize_registry._clear() + + normalize_registry.register("claude-test", _DummyClaude) + normalize_registry.register("codex-test", _DummyCodex) + + assert normalize_registry.get("claude-test") is _DummyClaude + assert normalize_registry.get("codex-test") is _DummyCodex + assert normalize_registry.get("nope-not-here") is None + + snapshot = normalize_registry.all() + assert snapshot == {"claude-test": _DummyClaude, "codex-test": _DummyCodex} + + +def test_normalize_all_returns_copy(): + """Mutating the returned dict must not affect the registry.""" + normalize_registry._clear() + normalize_registry.register("claude-test", _DummyClaude) + + snap = normalize_registry.all() + snap.clear() # mutate the caller's copy + + # Live registry still has the entry. + assert normalize_registry.get("claude-test") is _DummyClaude + + +def test_normalize_register_twice_overwrites_last_wins(): + """Re-registering the same provider replaces the prior class.""" + normalize_registry._clear() + + normalize_registry.register("claude-test", _DummyClaude) + assert normalize_registry.get("claude-test") is _DummyClaude + + normalize_registry.register("claude-test", _DummyClaudeAlt) + assert normalize_registry.get("claude-test") is _DummyClaudeAlt + + # Only one entry, the new one — not two. + assert normalize_registry.all() == {"claude-test": _DummyClaudeAlt} + + +# ── marts registry ────────────────────────────────────────────────────────── + + +def test_marts_register_get_all(): + marts_registry._clear() + + marts_registry.register("daily-test", _DummyDailyMart) + marts_registry.register("session-test", _DummySessionMart) + + assert marts_registry.get("daily-test") is _DummyDailyMart + assert marts_registry.get("session-test") is _DummySessionMart + assert marts_registry.get("nope-not-here") is None + + snapshot = marts_registry.all() + assert snapshot == { + "daily-test": _DummyDailyMart, + "session-test": _DummySessionMart, + } + + +def test_marts_all_returns_copy(): + marts_registry._clear() + marts_registry.register("daily-test", _DummyDailyMart) + + snap = marts_registry.all() + snap.clear() + + assert marts_registry.get("daily-test") is _DummyDailyMart + + +def test_marts_register_twice_overwrites_last_wins(): + marts_registry._clear() + + marts_registry.register("daily-test", _DummyDailyMart) + assert marts_registry.get("daily-test") is _DummyDailyMart + + marts_registry.register("daily-test", _DummyDailyMartAlt) + assert marts_registry.get("daily-test") is _DummyDailyMartAlt + + assert marts_registry.all() == {"daily-test": _DummyDailyMartAlt} + + +def test_clear_helpers_reset_state(): + """``_clear`` is the test-only escape hatch — wipes both registries.""" + normalize_registry.register("provider-x", _DummyClaude) + marts_registry.register("mart-x", _DummyDailyMart) + + normalize_registry._clear() + marts_registry._clear() + + assert normalize_registry.all() == {} + assert marts_registry.all() == {} diff --git a/tests/stackunderflow/etl/test_watermark.py b/tests/stackunderflow/etl/test_watermark.py new file mode 100644 index 0000000..e728ef4 --- /dev/null +++ b/tests/stackunderflow/etl/test_watermark.py @@ -0,0 +1,149 @@ +"""Watermark helpers — read / write / refresh round-trip. + +The marts layer relies on ``mart_watermark`` to track per-mart progress +through ``usage_events.id``. These tests pin the get/set round-trip, +the empty-store contract, and the refresh-with-empty-registry default +that Wave 1 ships with. +""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest + +from stackunderflow.etl import marts as marts_registry +from stackunderflow.etl.marts.base import MartBuilder +from stackunderflow.etl.watermark import ( + get_watermark, + refresh_all_marts, + set_watermark, +) +from stackunderflow.store import db, schema + + +@pytest.fixture +def conn(tmp_path: Path) -> sqlite3.Connection: + """Fresh DB with the schema applied.""" + c = db.connect(tmp_path / "store.db") + schema.apply(c) + yield c + c.close() + + +@pytest.fixture(autouse=True) +def _clean_marts_registry(): + """Each watermark test starts with an empty marts registry.""" + marts_registry._clear() + yield + marts_registry._clear() + + +# ── get/set round-trip ────────────────────────────────────────────────────── + + +def test_missing_mart_returns_zero(conn): + """No row in mart_watermark → watermark is 0.""" + assert get_watermark(conn, "daily") == 0 + assert get_watermark(conn, "nope") == 0 + + +def test_set_then_get(conn): + set_watermark(conn, "daily", 42) + assert get_watermark(conn, "daily") == 42 + + +def test_set_overwrites_existing(conn): + """Re-calling set_watermark upserts via ON CONFLICT.""" + set_watermark(conn, "daily", 10) + set_watermark(conn, "daily", 25) + assert get_watermark(conn, "daily") == 25 + + +def test_set_stamps_refresh_ts(conn): + """``last_refresh_ts`` must be populated (NOT NULL on the schema).""" + set_watermark(conn, "daily", 1) + row = conn.execute( + "SELECT last_refresh_ts FROM mart_watermark WHERE mart_name = ?", + ("daily",), + ).fetchone() + assert row is not None + assert row["last_refresh_ts"] # non-empty ISO timestamp + + +def test_set_independent_per_mart(conn): + """Watermarks for different marts don't clobber each other.""" + set_watermark(conn, "daily", 100) + set_watermark(conn, "session", 200) + set_watermark(conn, "project", 300) + assert get_watermark(conn, "daily") == 100 + assert get_watermark(conn, "session") == 200 + assert get_watermark(conn, "project") == 300 + + +# ── refresh_all_marts ─────────────────────────────────────────────────────── + + +def test_refresh_all_with_empty_registry_returns_empty_dict(conn): + """Wave 1 default: no mart builders registered → ``refresh_all_marts`` + is a no-op that returns an empty dict, not None.""" + assert refresh_all_marts(conn) == {} + + +class _StubMart(MartBuilder): + """Minimal mart that 'consumes' up to a fixed ceiling each refresh.""" + + name = "stub" + + # Class-level so tests can override per-call without re-registering. + ceiling = 0 + + def refresh(self, conn, since_event_id: int) -> int: + # Return either the ceiling (if higher) or the existing watermark + # (no-op when ceiling hasn't moved). + return max(int(since_event_id), int(_StubMart.ceiling)) + + +def test_refresh_all_advances_watermark(conn): + marts_registry.register("stub", _StubMart) + _StubMart.ceiling = 50 + + out = refresh_all_marts(conn) + + assert out == {"stub": 50} + assert get_watermark(conn, "stub") == 50 + + +def test_refresh_all_idempotent_no_new_events(conn): + """Re-running with the same ceiling → events_processed = 0.""" + marts_registry.register("stub", _StubMart) + _StubMart.ceiling = 50 + + refresh_all_marts(conn) + out_second = refresh_all_marts(conn) + + assert out_second == {"stub": 0} + assert get_watermark(conn, "stub") == 50 + + +def test_refresh_all_picks_up_from_existing_watermark(conn): + """A pre-existing watermark must be passed to refresh as ``since``.""" + + seen: list[int] = [] + + class _RecordingMart(MartBuilder): + name = "rec" + + def refresh(self, conn, since_event_id: int) -> int: + seen.append(since_event_id) + return since_event_id + 10 + + set_watermark(conn, "rec", 100) + marts_registry.register("rec", _RecordingMart) + + out = refresh_all_marts(conn) + + assert seen == [100] + assert out == {"rec": 10} + assert get_watermark(conn, "rec") == 110 diff --git a/tests/stackunderflow/store/test_migration_v005.py b/tests/stackunderflow/store/test_migration_v005.py index 967f714..ae9766a 100644 --- a/tests/stackunderflow/store/test_migration_v005.py +++ b/tests/stackunderflow/store/test_migration_v005.py @@ -143,7 +143,12 @@ def test_v004_redistributes_sessions_with_path_data(tmp_path: Path) -> None: ] # Legacy collapse removed. assert "cursor" not in slugs - assert conn.execute("PRAGMA user_version").fetchone()[0] == 5 + # Compare to CURRENT_VERSION rather than a literal so future + # migrations chaining on don't break this test. + assert ( + conn.execute("PRAGMA user_version").fetchone()[0] + == schema.CURRENT_VERSION + ) finally: conn.close() @@ -264,7 +269,12 @@ def test_v004_no_op_when_no_legacy_cursor_project(tmp_path: Path) -> None: conn = db.connect(tmp_path / "store.db") try: schema.apply(conn) - assert conn.execute("PRAGMA user_version").fetchone()[0] == 5 + # Compare to CURRENT_VERSION rather than a literal so future + # migrations chaining on don't break this test. + assert ( + conn.execute("PRAGMA user_version").fetchone()[0] + == schema.CURRENT_VERSION + ) # No cursor project should have appeared from thin air. cnt = conn.execute( "SELECT COUNT(*) FROM projects WHERE provider = 'cursor'" diff --git a/tests/stackunderflow/store/test_migration_v006.py b/tests/stackunderflow/store/test_migration_v006.py new file mode 100644 index 0000000..b35d142 --- /dev/null +++ b/tests/stackunderflow/store/test_migration_v006.py @@ -0,0 +1,248 @@ +"""v006 migration: ETL foundation (usage_events + 5 marts + watermark). + +Spec at ``docs/specs/etl-architecture.md``. Wave 1 ships only the schema +and ABCs; these tests pin the table + index shape that Waves 2 and 3 +build against. + +Note on numbering: the spec calls this ``v004_etl_layer.sql``, but two +migrations (v004 synthetic-models cleanup, v005 cursor-workspace +redistribute) shipped between the spec being written and Wave 1 landing. +The migration is wired in as v006; the spec doc is updated to match. +""" + +from __future__ import annotations + +from pathlib import Path + +from stackunderflow.store import db, schema + +# Tables introduced by v006. Order matters only for human readability. +_NEW_TABLES = ( + "usage_events", + "daily_mart", + "session_mart", + "project_mart", + "provider_day_mart", + "model_day_mart", + "mart_watermark", +) + +# (index_name, table_name) — pinning the indexes the spec calls out so a +# future schema-rewrite can't silently lose them. UNIQUE indexes are +# checked separately below. +_NEW_INDEXES = ( + ("idx_events_day", "usage_events"), + ("idx_events_project", "usage_events"), + ("idx_events_provider", "usage_events"), + ("idx_events_session", "usage_events"), + ("idx_events_model", "usage_events"), + ("uniq_events_msg", "usage_events"), + ("idx_daily_mart_project", "daily_mart"), + ("idx_session_mart_project", "session_mart"), + ("idx_session_mart_first", "session_mart"), + ("idx_provider_day_mart_day", "provider_day_mart"), +) + + +def _tables(conn) -> set[str]: + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall() + return {r["name"] for r in rows} + + +def _indexes(conn, table: str) -> set[str]: + rows = conn.execute(f"PRAGMA index_list({table})").fetchall() + return {r["name"] for r in rows} + + +def _columns(conn, table: str) -> dict[str, dict]: + rows = conn.execute(f"PRAGMA table_info({table})").fetchall() + return { + r["name"]: { + "type": r["type"].upper(), + "notnull": r["notnull"], + "dflt_value": r["dflt_value"], + "pk": r["pk"], + } + for r in rows + } + + +def test_v006_creates_all_tables(tmp_path: Path) -> None: + """All 7 v006 tables exist after schema.apply on a fresh DB.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + present = _tables(conn) + for t in _NEW_TABLES: + assert t in present, f"missing table {t!r}" + finally: + conn.close() + + +def test_v006_usage_events_columns(tmp_path: Path) -> None: + """usage_events column shape pinned per spec §Schema.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + cols = _columns(conn, "usage_events") + # Spot-check the load-bearing columns. Not every column is + # asserted — the goal is to catch type / NOT-NULL drift, not + # mirror the migration verbatim. + assert "id" in cols and cols["id"]["pk"] == 1 + assert cols["source_message_fk"]["type"] == "INTEGER" + assert cols["source_message_fk"]["notnull"] == 1 + assert cols["provider"]["type"] == "TEXT" + assert cols["account"]["dflt_value"] in ("'default'", "default") + assert cols["day"]["notnull"] == 1 + assert cols["model"]["notnull"] == 1 + assert cols["speed"]["dflt_value"] in ("'standard'", "standard") + assert cols["input_tokens"]["type"] == "INTEGER" + assert cols["cost_usd"]["type"] == "REAL" + assert cols["cost_source"]["dflt_value"] in ("'rate_card'", "rate_card") + assert "raw_extras" in cols + finally: + conn.close() + + +def test_v006_daily_mart_columns(tmp_path: Path) -> None: + """daily_mart columns + composite primary key shape.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + cols = _columns(conn, "daily_mart") + # 5-column composite PK (day, project_id, provider, model, speed) + pk_cols = {n for n, info in cols.items() if info["pk"] > 0} + assert pk_cols == {"day", "project_id", "provider", "model", "speed"} + for tok in ("input_tokens", "output_tokens", "cache_read", "cache_create"): + assert cols[tok]["type"] == "INTEGER" + assert cols["cost_usd"]["type"] == "REAL" + finally: + conn.close() + + +def test_v006_session_mart_columns(tmp_path: Path) -> None: + """session_mart columns + single-column PK on session_id.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + cols = _columns(conn, "session_mart") + assert cols["session_id"]["pk"] == 1 + assert cols["is_one_shot"]["type"] == "INTEGER" + assert cols["primary_model"]["notnull"] == 0 # nullable + assert cols["cwd"]["notnull"] == 0 # nullable + finally: + conn.close() + + +def test_v006_project_mart_columns(tmp_path: Path) -> None: + """project_mart columns + PK on project_id.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + cols = _columns(conn, "project_mart") + assert cols["project_id"]["pk"] == 1 + assert cols["total_cost_usd"]["type"] == "REAL" + assert cols["display_name"]["notnull"] == 1 + finally: + conn.close() + + +def test_v006_provider_day_mart_columns(tmp_path: Path) -> None: + """provider_day_mart composite PK on (day, provider).""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + cols = _columns(conn, "provider_day_mart") + pk_cols = {n for n, info in cols.items() if info["pk"] > 0} + assert pk_cols == {"day", "provider"} + finally: + conn.close() + + +def test_v006_model_day_mart_columns(tmp_path: Path) -> None: + """model_day_mart composite PK on (day, model, speed).""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + cols = _columns(conn, "model_day_mart") + pk_cols = {n for n, info in cols.items() if info["pk"] > 0} + assert pk_cols == {"day", "model", "speed"} + finally: + conn.close() + + +def test_v006_mart_watermark_columns(tmp_path: Path) -> None: + """mart_watermark PK on mart_name + last_event_id default 0.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + cols = _columns(conn, "mart_watermark") + assert cols["mart_name"]["pk"] == 1 + assert cols["last_event_id"]["dflt_value"] in ("0",) + assert cols["last_refresh_ts"]["notnull"] == 1 + finally: + conn.close() + + +def test_v006_indexes_present(tmp_path: Path) -> None: + """Every index the spec calls out exists on its table.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + # Group expected indexes by table to minimise PRAGMA calls. + by_table: dict[str, set[str]] = {} + for name, table in _NEW_INDEXES: + by_table.setdefault(table, set()).add(name) + for table, expected in by_table.items(): + present = _indexes(conn, table) + missing = expected - present + assert not missing, ( + f"table {table}: missing indexes {missing}; have {present}" + ) + finally: + conn.close() + + +def test_v006_uniq_events_msg_is_unique(tmp_path: Path) -> None: + """``uniq_events_msg`` must be a UNIQUE index — it's the dedup key.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + rows = conn.execute("PRAGMA index_list(usage_events)").fetchall() + match = [r for r in rows if r["name"] == "uniq_events_msg"] + assert match, "uniq_events_msg index missing" + # PRAGMA index_list: seq, name, unique, origin, partial + assert match[0]["unique"] == 1 + finally: + conn.close() + + +def test_v006_user_version_bumped(tmp_path: Path) -> None: + """schema.apply lands ``user_version`` on the current head.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + assert ( + conn.execute("PRAGMA user_version").fetchone()[0] + == schema.CURRENT_VERSION + ) + assert schema.CURRENT_VERSION >= 6 + finally: + conn.close() + + +def test_v006_idempotent_reapply(tmp_path: Path) -> None: + """``schema.apply`` is safe to call twice on the same DB.""" + conn = db.connect(tmp_path / "store.db") + try: + schema.apply(conn) + # Second call must not raise (CREATE TABLE would fail without + # the user_version guard). + schema.apply(conn) + present = _tables(conn) + for t in _NEW_TABLES: + assert t in present + finally: + conn.close() diff --git a/tests/stackunderflow/store/test_schema.py b/tests/stackunderflow/store/test_schema.py index 9476bc9..f667dd6 100644 --- a/tests/stackunderflow/store/test_schema.py +++ b/tests/stackunderflow/store/test_schema.py @@ -40,8 +40,8 @@ def test_apply_is_idempotent(tmp_path: Path) -> None: def test_current_version_constant() -> None: - # v004 added the cursor workspace redistribute Python migration. - assert schema.CURRENT_VERSION == 5 + # v006 added the ETL foundation (usage_events + 5 marts + watermark). + assert schema.CURRENT_VERSION == 6 def test_v002_migration_preserves_existing_rows(tmp_path: Path) -> None: