Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- **Wave 4D — 12 beta provider normalizers.** Codeium (stub), Continue, Copilot, Cursor Agent, Droid, Gemini, KiloCode, Kiro, OpenClaw, OpenCode, Pi/OMP, Qwen, Roo Code now have `Normalizer` subclasses registered at import time. ETL pipeline now covers all 16 providers from the codeburn catalog. Beta providers stay opt-in via the existing `STACKUNDERFLOW_BETA_*` env flags — when they're enabled the matching normalizer fires automatically.
- **Wave 3A — hot-path routes migrate to mart reads.** `/api/projects?include_stats=true`, `/api/dashboard-data`, and `/api/cost-data` (totals/by_day/by_model blocks) now read from `project_mart` + `daily_mart` instead of running per-request aggregator passes against raw `messages`. Same JSON contract; ~50× faster on the user's 28K-message project (cold 2.5–2.8s → 50ms warm). Per-session / per-command / per-tool detail blocks stay on the aggregator path until lower-grain marts ship in Wave 4.
- **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`.
- **Wave 2A — 4 default-on provider normalizers (`stackunderflow/etl/normalize/`).** Per-provider transforms from raw `messages` rows into canonical `usage_events`. Codex token normalization (subtract cached, fold reasoning) moves out of the pricer into `CodexNormalizer` — single source of truth. Cursor v3 no-per-message-tokens path estimates from `len(text)//4` with `cost_source='estimated'` flag. cost_usd computed once per event during normalization, stored on the row, never recomputed downstream.
Expand Down
48 changes: 48 additions & 0 deletions stackunderflow/etl/normalize/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,41 @@ def registered_providers() -> tuple[str, ...]:
register("cursor", CursorNormalizer)
register("cline", ClineNormalizer)

# Wave 4D — beta provider normalizers. Same import-at-bottom pattern;
# each module pulls Normalizer from .base, no circular risk. Beta
# providers stay opt-in via the existing STACKUNDERFLOW_BETA_* flags
# at the adapter layer — registering here is harmless when those
# adapters are off because no rows ever land with the matching
# ``provider`` value.
from .codeium import CodeiumNormalizer # noqa: E402
from .continue_ import ContinueNormalizer # noqa: E402
from .copilot import CopilotNormalizer # noqa: E402
from .cursor_agent import CursorAgentNormalizer # noqa: E402
from .droid import DroidNormalizer # noqa: E402
from .gemini import GeminiNormalizer # noqa: E402
from .kilocode import KiloCodeNormalizer # noqa: E402
from .kiro import KiroNormalizer # noqa: E402
from .openclaw import OpenClawNormalizer # noqa: E402
from .opencode import OpenCodeNormalizer # noqa: E402
from .pi import PiNormalizer # noqa: E402
from .qwen import QwenNormalizer # noqa: E402
from .roocode import RooCodeNormalizer # noqa: E402

register("codeium", CodeiumNormalizer)
register("continue", ContinueNormalizer)
register("copilot", CopilotNormalizer)
register("cursor_agent", CursorAgentNormalizer)
register("droid", DroidNormalizer)
register("gemini", GeminiNormalizer)
register("kilocode", KiloCodeNormalizer)
register("kiro", KiroNormalizer)
register("openclaw", OpenClawNormalizer)
register("opencode", OpenCodeNormalizer)
register("pi", PiNormalizer)
register("omp", PiNormalizer) # Pi/OMP share parser logic — same class.
register("qwen", QwenNormalizer)
register("roocode", RooCodeNormalizer)


__all__ = [
"COST_SOURCE_ESTIMATED",
Expand All @@ -79,4 +114,17 @@ def registered_providers() -> tuple[str, ...]:
"ClineNormalizer",
"CodexNormalizer",
"CursorNormalizer",
"CodeiumNormalizer",
"ContinueNormalizer",
"CopilotNormalizer",
"CursorAgentNormalizer",
"DroidNormalizer",
"GeminiNormalizer",
"KiloCodeNormalizer",
"KiroNormalizer",
"OpenClawNormalizer",
"OpenCodeNormalizer",
"PiNormalizer",
"QwenNormalizer",
"RooCodeNormalizer",
]
29 changes: 29 additions & 0 deletions stackunderflow/etl/normalize/codeium.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Codeium normalizer — discovery-only stub.

Codeium's adapter (when enabled via the beta flag) only enumerates
sessions; the on-disk format used by the Codeium client is not parsed
into individual messages, so no billable rows ever land in the
``messages`` table for this provider. The normalizer therefore yields
nothing — it exists solely so the registry has an entry for every
provider in the codeburn catalog and the lookup at the ingest seam
never KeyErrors when Codeium is enabled.

If a future spec adds a parsed message format we'll implement the
transform here; until then this is intentionally a no-op.
"""

from __future__ import annotations

from collections.abc import Iterable

from .base import Normalizer


class CodeiumNormalizer(Normalizer):
provider_name = "codeium"

def normalize(self, msg_row: dict) -> Iterable[dict]: # noqa: ARG002
# Discovery-only — never any billable rows. Returning early as
# a generator function gives an empty iterator without raising.
return
yield # pragma: no cover — unreachable, makes this a generator
118 changes: 118 additions & 0 deletions stackunderflow/etl/normalize/continue_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Continue (continue.dev) normalizer.

Continue stores chat history in SQLite ``.db`` files under its config
directory. Token counts may or may not be present per row depending on
the Continue version and the underlying provider — newer versions
record explicit input/output counts on the assistant turn, older
versions persist only the rendered text.

The normalizer keeps the policy simple and **defensive**:

* Trust the canonical token columns the adapter wrote when at least
one is non-zero — stamp ``cost_source='rate_card'`` if the model is
in the canonical rate card, ``'unknown'`` otherwise.
* Otherwise fall back to ``len(content_text) // 4`` on the input side
with output = 0, and stamp ``cost_source='estimated'``. This mirrors
the same recovery path the Cursor v3 normalizer uses for missing
per-bubble counts.

Provider-specific provenance worth preserving in ``raw_extras``: the
``provider`` field Continue records for the underlying model gateway
(``"anthropic"`` / ``"openai"`` / ``"ollama"`` / ...) and the
``modelTitle`` display name so the UI can disambiguate proxy-routed
models from native ones.
"""

from __future__ import annotations

import json
from collections.abc import Iterable

from stackunderflow.infra.costs import RATE_CARD

from .base import (
COST_SOURCE_ESTIMATED,
COST_SOURCE_RATE_CARD,
COST_SOURCE_UNKNOWN,
Normalizer,
)

_DEFAULT_MODEL = "continue-auto"
_RAW_EXTRAS_FIELDS = ("provider", "modelTitle", "completionOptions")


class ContinueNormalizer(Normalizer):
provider_name = "continue"

def normalize(self, msg_row: dict) -> Iterable[dict]:
role = str(msg_row.get("role") or "")
if role != "assistant":
return

input_tokens = int(msg_row.get("input_tokens") or 0)
output_tokens = int(msg_row.get("output_tokens") or 0)
cache_read = int(msg_row.get("cache_read_tokens") or 0)
cache_create = int(msg_row.get("cache_create_tokens") or 0)

estimated = False
if (
input_tokens == 0
and output_tokens == 0
and cache_read == 0
and cache_create == 0
):
text = str(msg_row.get("content_text") or "")
if not text:
return # nothing to estimate from — drop
input_tokens = max(len(text) // 4, 0)
estimated = True

if input_tokens == 0 and output_tokens == 0 and cache_read == 0 and cache_create == 0:
return

model = str(msg_row.get("model") or "") or _DEFAULT_MODEL

if estimated:
cost_source = COST_SOURCE_ESTIMATED
elif model in RATE_CARD:
cost_source = COST_SOURCE_RATE_CARD
else:
cost_source = COST_SOURCE_UNKNOWN

raw_extras = _extras_from_raw_json(msg_row.get("raw_json"))

yield self._build_event(
msg_row,
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read,
cache_create_tokens=cache_create,
cost_source=cost_source,
model=model,
raw_extras=raw_extras,
)


def _extras_from_raw_json(raw_json: object) -> dict | None:
payload = _safe_load_raw(raw_json)
if not isinstance(payload, dict):
return None
out: dict = {}
for key in _RAW_EXTRAS_FIELDS:
val = payload.get(key)
if val is not None and val != "":
out[key] = val
return out or None


def _safe_load_raw(raw_json: object) -> object | None:
if isinstance(raw_json, dict):
return raw_json
if not isinstance(raw_json, str | bytes | bytearray):
return None
try:
if isinstance(raw_json, bytes | bytearray):
return json.loads(raw_json.decode("utf-8", errors="replace"))
return json.loads(raw_json)
except (json.JSONDecodeError, ValueError, UnicodeDecodeError):
return None
138 changes: 138 additions & 0 deletions stackunderflow/etl/normalize/copilot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""GitHub Copilot normalizer.

Copilot persists in two distinct shapes:

1. **Legacy** — ``~/.copilot/session-state/{sessionId}/events.jsonl`` with
``{type: 'assistant.message', outputTokens, ...}`` events. ``inputTokens``
may not be present; when it isn't we estimate from the preceding
user message length (the adapter forwards that as ``content_text``)
and stamp ``cost_source='estimated'``.

2. **VS Code transcripts** — ``workspaceStorage/<hash>/GitHub.copilot-chat/
transcripts/*.jsonl`` with explicit ``inputTokens`` + ``outputTokens``
per turn. When both fields are present (or just non-zero on either
side) we trust them and stamp ``cost_source='rate_card'`` for known
models / ``'unknown'`` otherwise.

Cache fields stay 0 — Copilot's transcript shape doesn't bill for prompt
caching. Model id is preserved verbatim from the transcript; for legacy
events we fall back to ``copilot-auto``.
"""

from __future__ import annotations

import json
from collections.abc import Iterable

from stackunderflow.infra.costs import RATE_CARD

from .base import (
COST_SOURCE_ESTIMATED,
COST_SOURCE_RATE_CARD,
COST_SOURCE_UNKNOWN,
Normalizer,
)

_DEFAULT_MODEL = "copilot-auto"
_RAW_EXTRAS_FIELDS = ("toolCallId", "producer", "transcriptVersion")


class CopilotNormalizer(Normalizer):
provider_name = "copilot"

def normalize(self, msg_row: dict) -> Iterable[dict]:
role = str(msg_row.get("role") or "")
if role != "assistant":
return

input_tokens = int(msg_row.get("input_tokens") or 0)
output_tokens = int(msg_row.get("output_tokens") or 0)

# The transcript may also surface tokens nested in raw_json under
# ``data.outputTokens`` / ``data.inputTokens`` (newer transcript
# shape) — pick those up if the adapter didn't pre-flatten them.
if input_tokens == 0 and output_tokens == 0:
payload = _safe_load_raw(msg_row.get("raw_json"))
data = payload.get("data") if isinstance(payload, dict) else None
if isinstance(data, dict):
input_tokens = max(int(data.get("inputTokens") or 0), 0)
output_tokens = max(int(data.get("outputTokens") or 0), 0)

estimated = False
if output_tokens == 0:
# Legacy events without an explicit output count — estimate
# from text length. Skip the row entirely if we have neither
# explicit tokens nor any text to estimate from.
text = str(msg_row.get("content_text") or "")
if input_tokens == 0 and not text:
return
if not text:
# input_tokens is set but output isn't — that's a weird
# half-shape; estimate output from text length anyway
# (which here is empty), so we just keep the explicit
# input and let output stay 0. Mark estimated.
estimated = True
else:
output_tokens = max(len(text) // 4, 0)
if input_tokens == 0:
# Estimate input on the user-message length we don't
# have either; use the same text rather than zero so
# the row prices to *something*.
input_tokens = output_tokens
estimated = True

if input_tokens == 0 and output_tokens == 0:
return

model = str(msg_row.get("model") or "") or _DEFAULT_MODEL

if estimated:
cost_source = COST_SOURCE_ESTIMATED
elif model in RATE_CARD:
cost_source = COST_SOURCE_RATE_CARD
else:
cost_source = COST_SOURCE_UNKNOWN

raw_extras = _extras_from_raw_json(msg_row.get("raw_json"))

yield self._build_event(
msg_row,
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=0,
cache_create_tokens=0,
cost_source=cost_source,
model=model,
raw_extras=raw_extras,
)


def _extras_from_raw_json(raw_json: object) -> dict | None:
payload = _safe_load_raw(raw_json)
if not isinstance(payload, dict):
return None
out: dict = {}
for key in _RAW_EXTRAS_FIELDS:
val = payload.get(key)
if val is not None and val != "":
out[key] = val
# Surface ``data.producer`` from VS Code transcripts.
data = payload.get("data")
if isinstance(data, dict):
producer = data.get("producer")
if producer and "producer" not in out:
out["producer"] = producer
return out or None


def _safe_load_raw(raw_json: object) -> object | None:
if isinstance(raw_json, dict):
return raw_json
if not isinstance(raw_json, str | bytes | bytearray):
return None
try:
if isinstance(raw_json, bytes | bytearray):
return json.loads(raw_json.decode("utf-8", errors="replace"))
return json.loads(raw_json)
except (json.JSONDecodeError, ValueError, UnicodeDecodeError):
return None
Loading
Loading