From da49147d50081b8b740b3b641ebfff946f397883 Mon Sep 17 00:00:00 2001 From: Yad Konrad Date: Wed, 6 May 2026 17:02:17 -0400 Subject: [PATCH] =?UTF-8?q?feat(etl):=20Wave=204D=20=E2=80=94=2012=20beta?= =?UTF-8?q?=20provider=20normalizers=20(full=20codeburn=20catalog=20covera?= =?UTF-8?q?ge)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds Normalizer subclasses for the 12 beta providers Wave 2A left for later (Codeium, Continue, Copilot, Cursor Agent, Droid, Gemini, KiloCode, Kiro, OpenClaw, OpenCode, Pi/OMP, Qwen, Roo Code). The ETL pipeline now covers all 16 providers from the codeburn catalog. Token semantics match the catalog spec exactly — Gemini and Qwen apply the cached-subtraction rule (input = promptTokenCount - cachedContentTokenCount, output = candidatesTokenCount + thoughtsTokenCount). OpenCode folds reasoning into output. Cursor Agent and Kiro stamp cost_source='estimated' unconditionally because their sources don't carry per-message tokens. Codeium is a discovery-only stub that yields zero events. KiloCode + RooCode subclass Cline since they share the on-disk format (api_req_started.text JSON blob). Beta providers stay opt-in via the existing STACKUNDERFLOW_BETA_* adapter flags — registering normalizers here is harmless when the matching adapter is off because no rows ever land with that provider value. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 1 + stackunderflow/etl/normalize/__init__.py | 48 +++++ stackunderflow/etl/normalize/codeium.py | 29 +++ stackunderflow/etl/normalize/continue_.py | 118 +++++++++++ stackunderflow/etl/normalize/copilot.py | 138 +++++++++++++ stackunderflow/etl/normalize/cursor_agent.py | 89 +++++++++ stackunderflow/etl/normalize/droid.py | 145 ++++++++++++++ stackunderflow/etl/normalize/gemini.py | 183 ++++++++++++++++++ stackunderflow/etl/normalize/kilocode.py | 23 +++ stackunderflow/etl/normalize/kiro.py | 92 +++++++++ stackunderflow/etl/normalize/openclaw.py | 168 ++++++++++++++++ stackunderflow/etl/normalize/opencode.py | 159 +++++++++++++++ stackunderflow/etl/normalize/pi.py | 150 ++++++++++++++ stackunderflow/etl/normalize/qwen.py | 149 ++++++++++++++ stackunderflow/etl/normalize/roocode.py | 17 ++ .../etl/normalize/test_codeium.py | 43 ++++ .../etl/normalize/test_continue_.py | 80 ++++++++ .../etl/normalize/test_copilot.py | 85 ++++++++ .../etl/normalize/test_cursor_agent.py | 64 ++++++ .../etl/normalize/test_droid.py | 88 +++++++++ .../etl/normalize/test_gemini.py | 147 ++++++++++++++ .../etl/normalize/test_kilocode.py | 70 +++++++ .../stackunderflow/etl/normalize/test_kiro.py | 68 +++++++ .../etl/normalize/test_openclaw.py | 109 +++++++++++ .../etl/normalize/test_opencode.py | 115 +++++++++++ tests/stackunderflow/etl/normalize/test_pi.py | 95 +++++++++ .../stackunderflow/etl/normalize/test_qwen.py | 121 ++++++++++++ .../etl/normalize/test_roocode.py | 69 +++++++ 28 files changed, 2663 insertions(+) create mode 100644 stackunderflow/etl/normalize/codeium.py create mode 100644 stackunderflow/etl/normalize/continue_.py create mode 100644 stackunderflow/etl/normalize/copilot.py create mode 100644 stackunderflow/etl/normalize/cursor_agent.py create mode 100644 stackunderflow/etl/normalize/droid.py create mode 100644 stackunderflow/etl/normalize/gemini.py create mode 100644 stackunderflow/etl/normalize/kilocode.py create mode 100644 stackunderflow/etl/normalize/kiro.py create mode 100644 stackunderflow/etl/normalize/openclaw.py create mode 100644 stackunderflow/etl/normalize/opencode.py create mode 100644 stackunderflow/etl/normalize/pi.py create mode 100644 stackunderflow/etl/normalize/qwen.py create mode 100644 stackunderflow/etl/normalize/roocode.py create mode 100644 tests/stackunderflow/etl/normalize/test_codeium.py create mode 100644 tests/stackunderflow/etl/normalize/test_continue_.py create mode 100644 tests/stackunderflow/etl/normalize/test_copilot.py create mode 100644 tests/stackunderflow/etl/normalize/test_cursor_agent.py create mode 100644 tests/stackunderflow/etl/normalize/test_droid.py create mode 100644 tests/stackunderflow/etl/normalize/test_gemini.py create mode 100644 tests/stackunderflow/etl/normalize/test_kilocode.py create mode 100644 tests/stackunderflow/etl/normalize/test_kiro.py create mode 100644 tests/stackunderflow/etl/normalize/test_openclaw.py create mode 100644 tests/stackunderflow/etl/normalize/test_opencode.py create mode 100644 tests/stackunderflow/etl/normalize/test_pi.py create mode 100644 tests/stackunderflow/etl/normalize/test_qwen.py create mode 100644 tests/stackunderflow/etl/normalize/test_roocode.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bf6e427..94e0b56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- **Wave 4D — 12 beta provider normalizers.** Codeium (stub), Continue, Copilot, Cursor Agent, Droid, Gemini, KiloCode, Kiro, OpenClaw, OpenCode, Pi/OMP, Qwen, Roo Code now have `Normalizer` subclasses registered at import time. ETL pipeline now covers all 16 providers from the codeburn catalog. Beta providers stay opt-in via the existing `STACKUNDERFLOW_BETA_*` env flags — when they're enabled the matching normalizer fires automatically. - **Wave 3A — hot-path routes migrate to mart reads.** `/api/projects?include_stats=true`, `/api/dashboard-data`, and `/api/cost-data` (totals/by_day/by_model blocks) now read from `project_mart` + `daily_mart` instead of running per-request aggregator passes against raw `messages`. Same JSON contract; ~50× faster on the user's 28K-message project (cold 2.5–2.8s → 50ms warm). Per-session / per-command / per-tool detail blocks stay on the aggregator path until lower-grain marts ship in Wave 4. - **ETL foundation: usage_events fact table + 5 marts + watermarks + backfill orchestrator (Wave 1).** Lays the schema and base classes; Waves 2 (normalizers + mart builders + watcher) and 3 (route migrations) fill in the bodies. Migration v006 (the spec called it v004, but v004/v005 were taken by the synthetic-models cleanup and cursor-workspace redistribute — the migration file is renumbered to v006 and the spec doc is updated to match) adds 7 tables (`usage_events`, `daily_mart`, `session_mart`, `project_mart`, `provider_day_mart`, `model_day_mart`, `mart_watermark`) plus indexes (`idx_events_day`, `idx_events_project`, `idx_events_provider`, `idx_events_session`, `idx_events_model`, `uniq_events_msg` UNIQUE on `source_message_fk`, `idx_daily_mart_project`, `idx_session_mart_project`, `idx_session_mart_first`, `idx_provider_day_mart_day`). New `stackunderflow.etl` package: `normalize/base.py` (`Normalizer` ABC) + `normalize/__init__.py` (last-wins `register/get/all` registry), `marts/base.py` (`MartBuilder` ABC with abstract `refresh(conn, since_event_id) -> int` and concrete no-op `rebuild_from_scratch`) + `marts/__init__.py` (last-wins registry), `watermark.py` (`get_watermark` returns 0 on missing, `set_watermark` upserts with UTC ISO8601 `last_refresh_ts`, `refresh_all_marts` iterates the marts registry and persists each mart's new watermark), and `backfill.py` (`BackfillReport` dataclass with `events_inserted`, `events_skipped_duplicate`, `marts_refreshed: dict[str, int]`, `duration_seconds`; `backfill(conn, *, force=False)` orchestrator skeleton — empty-registry no-op until Wave 2 lands, `force=True` empties events + marts + watermarks). New CLI: `stackunderflow etl backfill [--force]` (no-op until normalizers register in Wave 2; reports zero counts). Migration is **additive** — does not touch existing `messages`/`sessions`/`projects` tables, all existing routes keep working unchanged. 39 new tests across `tests/stackunderflow/store/test_migration_v006.py` (12: tables exist, columns/PKs per table, indexes present, UNIQUE on `uniq_events_msg`, idempotent re-apply), `tests/stackunderflow/etl/test_registries.py` (7: register/get/all, copy semantics, last-wins overwrite for both registries), `tests/stackunderflow/etl/test_watermark.py` (9: missing→0, set/get round-trip, overwrite, ts stamping, per-mart independence, empty-registry refresh, advance + idempotent + pickup-from-existing-watermark), `tests/stackunderflow/etl/test_backfill.py` (7: empty-store report shape, idempotent re-run, `force=True` drops events + marts + watermarks, `force=True` idempotent, mart refresh runs even with empty normalizers, BackfillReport field-set is locked). Spec at `docs/specs/etl-architecture.md`. - **Wave 2A — 4 default-on provider normalizers (`stackunderflow/etl/normalize/`).** Per-provider transforms from raw `messages` rows into canonical `usage_events`. Codex token normalization (subtract cached, fold reasoning) moves out of the pricer into `CodexNormalizer` — single source of truth. Cursor v3 no-per-message-tokens path estimates from `len(text)//4` with `cost_source='estimated'` flag. cost_usd computed once per event during normalization, stored on the row, never recomputed downstream. diff --git a/stackunderflow/etl/normalize/__init__.py b/stackunderflow/etl/normalize/__init__.py index 6f33ff9..d6dbe53 100644 --- a/stackunderflow/etl/normalize/__init__.py +++ b/stackunderflow/etl/normalize/__init__.py @@ -62,6 +62,41 @@ def registered_providers() -> tuple[str, ...]: register("cursor", CursorNormalizer) register("cline", ClineNormalizer) +# Wave 4D — beta provider normalizers. Same import-at-bottom pattern; +# each module pulls Normalizer from .base, no circular risk. Beta +# providers stay opt-in via the existing STACKUNDERFLOW_BETA_* flags +# at the adapter layer — registering here is harmless when those +# adapters are off because no rows ever land with the matching +# ``provider`` value. +from .codeium import CodeiumNormalizer # noqa: E402 +from .continue_ import ContinueNormalizer # noqa: E402 +from .copilot import CopilotNormalizer # noqa: E402 +from .cursor_agent import CursorAgentNormalizer # noqa: E402 +from .droid import DroidNormalizer # noqa: E402 +from .gemini import GeminiNormalizer # noqa: E402 +from .kilocode import KiloCodeNormalizer # noqa: E402 +from .kiro import KiroNormalizer # noqa: E402 +from .openclaw import OpenClawNormalizer # noqa: E402 +from .opencode import OpenCodeNormalizer # noqa: E402 +from .pi import PiNormalizer # noqa: E402 +from .qwen import QwenNormalizer # noqa: E402 +from .roocode import RooCodeNormalizer # noqa: E402 + +register("codeium", CodeiumNormalizer) +register("continue", ContinueNormalizer) +register("copilot", CopilotNormalizer) +register("cursor_agent", CursorAgentNormalizer) +register("droid", DroidNormalizer) +register("gemini", GeminiNormalizer) +register("kilocode", KiloCodeNormalizer) +register("kiro", KiroNormalizer) +register("openclaw", OpenClawNormalizer) +register("opencode", OpenCodeNormalizer) +register("pi", PiNormalizer) +register("omp", PiNormalizer) # Pi/OMP share parser logic — same class. +register("qwen", QwenNormalizer) +register("roocode", RooCodeNormalizer) + __all__ = [ "COST_SOURCE_ESTIMATED", @@ -79,4 +114,17 @@ def registered_providers() -> tuple[str, ...]: "ClineNormalizer", "CodexNormalizer", "CursorNormalizer", + "CodeiumNormalizer", + "ContinueNormalizer", + "CopilotNormalizer", + "CursorAgentNormalizer", + "DroidNormalizer", + "GeminiNormalizer", + "KiloCodeNormalizer", + "KiroNormalizer", + "OpenClawNormalizer", + "OpenCodeNormalizer", + "PiNormalizer", + "QwenNormalizer", + "RooCodeNormalizer", ] diff --git a/stackunderflow/etl/normalize/codeium.py b/stackunderflow/etl/normalize/codeium.py new file mode 100644 index 0000000..6433314 --- /dev/null +++ b/stackunderflow/etl/normalize/codeium.py @@ -0,0 +1,29 @@ +"""Codeium normalizer — discovery-only stub. + +Codeium's adapter (when enabled via the beta flag) only enumerates +sessions; the on-disk format used by the Codeium client is not parsed +into individual messages, so no billable rows ever land in the +``messages`` table for this provider. The normalizer therefore yields +nothing — it exists solely so the registry has an entry for every +provider in the codeburn catalog and the lookup at the ingest seam +never KeyErrors when Codeium is enabled. + +If a future spec adds a parsed message format we'll implement the +transform here; until then this is intentionally a no-op. +""" + +from __future__ import annotations + +from collections.abc import Iterable + +from .base import Normalizer + + +class CodeiumNormalizer(Normalizer): + provider_name = "codeium" + + def normalize(self, msg_row: dict) -> Iterable[dict]: # noqa: ARG002 + # Discovery-only — never any billable rows. Returning early as + # a generator function gives an empty iterator without raising. + return + yield # pragma: no cover — unreachable, makes this a generator diff --git a/stackunderflow/etl/normalize/continue_.py b/stackunderflow/etl/normalize/continue_.py new file mode 100644 index 0000000..ff4838a --- /dev/null +++ b/stackunderflow/etl/normalize/continue_.py @@ -0,0 +1,118 @@ +"""Continue (continue.dev) normalizer. + +Continue stores chat history in SQLite ``.db`` files under its config +directory. Token counts may or may not be present per row depending on +the Continue version and the underlying provider — newer versions +record explicit input/output counts on the assistant turn, older +versions persist only the rendered text. + +The normalizer keeps the policy simple and **defensive**: + +* Trust the canonical token columns the adapter wrote when at least + one is non-zero — stamp ``cost_source='rate_card'`` if the model is + in the canonical rate card, ``'unknown'`` otherwise. +* Otherwise fall back to ``len(content_text) // 4`` on the input side + with output = 0, and stamp ``cost_source='estimated'``. This mirrors + the same recovery path the Cursor v3 normalizer uses for missing + per-bubble counts. + +Provider-specific provenance worth preserving in ``raw_extras``: the +``provider`` field Continue records for the underlying model gateway +(``"anthropic"`` / ``"openai"`` / ``"ollama"`` / ...) and the +``modelTitle`` display name so the UI can disambiguate proxy-routed +models from native ones. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from stackunderflow.infra.costs import RATE_CARD + +from .base import ( + COST_SOURCE_ESTIMATED, + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, + Normalizer, +) + +_DEFAULT_MODEL = "continue-auto" +_RAW_EXTRAS_FIELDS = ("provider", "modelTitle", "completionOptions") + + +class ContinueNormalizer(Normalizer): + provider_name = "continue" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + if role != "assistant": + return + + input_tokens = int(msg_row.get("input_tokens") or 0) + output_tokens = int(msg_row.get("output_tokens") or 0) + cache_read = int(msg_row.get("cache_read_tokens") or 0) + cache_create = int(msg_row.get("cache_create_tokens") or 0) + + estimated = False + if ( + input_tokens == 0 + and output_tokens == 0 + and cache_read == 0 + and cache_create == 0 + ): + text = str(msg_row.get("content_text") or "") + if not text: + return # nothing to estimate from — drop + input_tokens = max(len(text) // 4, 0) + estimated = True + + if input_tokens == 0 and output_tokens == 0 and cache_read == 0 and cache_create == 0: + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + if estimated: + cost_source = COST_SOURCE_ESTIMATED + elif model in RATE_CARD: + cost_source = COST_SOURCE_RATE_CARD + else: + cost_source = COST_SOURCE_UNKNOWN + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=cache_read, + cache_create_tokens=cache_create, + cost_source=cost_source, + model=model, + raw_extras=raw_extras, + ) + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + for key in _RAW_EXTRAS_FIELDS: + val = payload.get(key) + if val is not None and val != "": + out[key] = val + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/copilot.py b/stackunderflow/etl/normalize/copilot.py new file mode 100644 index 0000000..80ccd0c --- /dev/null +++ b/stackunderflow/etl/normalize/copilot.py @@ -0,0 +1,138 @@ +"""GitHub Copilot normalizer. + +Copilot persists in two distinct shapes: + +1. **Legacy** — ``~/.copilot/session-state/{sessionId}/events.jsonl`` with + ``{type: 'assistant.message', outputTokens, ...}`` events. ``inputTokens`` + may not be present; when it isn't we estimate from the preceding + user message length (the adapter forwards that as ``content_text``) + and stamp ``cost_source='estimated'``. + +2. **VS Code transcripts** — ``workspaceStorage//GitHub.copilot-chat/ + transcripts/*.jsonl`` with explicit ``inputTokens`` + ``outputTokens`` + per turn. When both fields are present (or just non-zero on either + side) we trust them and stamp ``cost_source='rate_card'`` for known + models / ``'unknown'`` otherwise. + +Cache fields stay 0 — Copilot's transcript shape doesn't bill for prompt +caching. Model id is preserved verbatim from the transcript; for legacy +events we fall back to ``copilot-auto``. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from stackunderflow.infra.costs import RATE_CARD + +from .base import ( + COST_SOURCE_ESTIMATED, + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, + Normalizer, +) + +_DEFAULT_MODEL = "copilot-auto" +_RAW_EXTRAS_FIELDS = ("toolCallId", "producer", "transcriptVersion") + + +class CopilotNormalizer(Normalizer): + provider_name = "copilot" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + if role != "assistant": + return + + input_tokens = int(msg_row.get("input_tokens") or 0) + output_tokens = int(msg_row.get("output_tokens") or 0) + + # The transcript may also surface tokens nested in raw_json under + # ``data.outputTokens`` / ``data.inputTokens`` (newer transcript + # shape) — pick those up if the adapter didn't pre-flatten them. + if input_tokens == 0 and output_tokens == 0: + payload = _safe_load_raw(msg_row.get("raw_json")) + data = payload.get("data") if isinstance(payload, dict) else None + if isinstance(data, dict): + input_tokens = max(int(data.get("inputTokens") or 0), 0) + output_tokens = max(int(data.get("outputTokens") or 0), 0) + + estimated = False + if output_tokens == 0: + # Legacy events without an explicit output count — estimate + # from text length. Skip the row entirely if we have neither + # explicit tokens nor any text to estimate from. + text = str(msg_row.get("content_text") or "") + if input_tokens == 0 and not text: + return + if not text: + # input_tokens is set but output isn't — that's a weird + # half-shape; estimate output from text length anyway + # (which here is empty), so we just keep the explicit + # input and let output stay 0. Mark estimated. + estimated = True + else: + output_tokens = max(len(text) // 4, 0) + if input_tokens == 0: + # Estimate input on the user-message length we don't + # have either; use the same text rather than zero so + # the row prices to *something*. + input_tokens = output_tokens + estimated = True + + if input_tokens == 0 and output_tokens == 0: + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + if estimated: + cost_source = COST_SOURCE_ESTIMATED + elif model in RATE_CARD: + cost_source = COST_SOURCE_RATE_CARD + else: + cost_source = COST_SOURCE_UNKNOWN + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=0, + cache_create_tokens=0, + cost_source=cost_source, + model=model, + raw_extras=raw_extras, + ) + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + for key in _RAW_EXTRAS_FIELDS: + val = payload.get(key) + if val is not None and val != "": + out[key] = val + # Surface ``data.producer`` from VS Code transcripts. + data = payload.get("data") + if isinstance(data, dict): + producer = data.get("producer") + if producer and "producer" not in out: + out["producer"] = producer + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/cursor_agent.py b/stackunderflow/etl/normalize/cursor_agent.py new file mode 100644 index 0000000..c10ae30 --- /dev/null +++ b/stackunderflow/etl/normalize/cursor_agent.py @@ -0,0 +1,89 @@ +"""Cursor Agent normalizer. + +Cursor Agent persists turns as either marker-based plaintext +transcripts (legacy ``.txt``) or per-turn JSONL bubbles. **Neither +shape carries token counts.** Per the codeburn catalog, every Cursor +Agent record is estimated from text length / 4. + +Therefore the normalizer's policy is unconditional: assistant rows +estimate ``input = len(content_text) // 4``, ``output = 0``, stamp +``cost_source='estimated'``. Even when an upstream future adds explicit +tokens we still want the estimated flag — Cursor Agent never reports +billing-grade counts. + +Model id is forwarded from whatever the adapter resolved (the adapter +queries the ``conversation_summaries`` SQLite table for it); when that +fallback misses we use ``cursor-agent-auto``. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from .base import COST_SOURCE_ESTIMATED, Normalizer + +_DEFAULT_MODEL = "cursor-agent-auto" +_RAW_EXTRAS_FIELDS = ("conversationId", "transcriptType", "toolCalls") + + +class CursorAgentNormalizer(Normalizer): + provider_name = "cursor_agent" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + if role != "assistant": + return + + text = str(msg_row.get("content_text") or "") + # Prefer the explicit estimate when an adapter pre-computed one + # onto the input column; fall back to text//4 estimation here. + input_tokens = int(msg_row.get("input_tokens") or 0) + output_tokens = int(msg_row.get("output_tokens") or 0) + if input_tokens == 0 and output_tokens == 0: + if not text: + return + input_tokens = max(len(text) // 4, 0) + + if input_tokens == 0 and output_tokens == 0: + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=0, + cache_create_tokens=0, + cost_source=COST_SOURCE_ESTIMATED, + model=model, + raw_extras=raw_extras, + ) + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + for key in _RAW_EXTRAS_FIELDS: + val = payload.get(key) + if val is not None and val != "": + out[key] = val + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/droid.py b/stackunderflow/etl/normalize/droid.py new file mode 100644 index 0000000..a2b01a6 --- /dev/null +++ b/stackunderflow/etl/normalize/droid.py @@ -0,0 +1,145 @@ +"""Droid (Factory) normalizer. + +Droid is the Factory.ai agent. Its on-disk shape (``~/.factory/sessions/ +/.jsonl`` plus a sidecar ``.settings.json``) +splits assistant turns from billing data: the JSONL records every +event but does **not** carry per-message token usage; the +``.settings.json`` carries one session-level ``tokenUsage`` block: + + { "tokenUsage": { + "inputTokens": ..., + "outputTokens": ..., + "cacheCreationTokens": ..., + "cacheReadTokens": ..., + "thinkingTokens": ... + }, + "model": "..." + } + +The adapter (``stackunderflow/adapters/droid.py``) is responsible for +distributing the session-level totals across the assistant messages +inside that session before they land in the ``messages`` table — the +distribution policy is the adapter's choice (codeburn picks even +distribution; we mirror that). By the time a row reaches this +normalizer, its token columns already hold a per-message slice of the +session total. + +Policy: + +* Trust the per-row token columns when at least one is non-zero. +* Stamp ``cost_source='rate_card'`` for known models / ``'unknown'`` + otherwise — the per-row counts came from a real total, even if the + per-row split is approximate. The estimation here is *attribution*, + not counting; the session sum is exact. +* ``cost_source='estimated'`` only when the row carries no token data + at all, which means the adapter couldn't read the settings file — + fall back to ``len(content_text)//4`` on input. +* Fold ``thinkingTokens`` into output if the adapter surfaced it on the + row directly (column ``thinking_tokens`` or in ``raw_json``). +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from stackunderflow.infra.costs import RATE_CARD + +from .base import ( + COST_SOURCE_ESTIMATED, + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, + Normalizer, +) + +_DEFAULT_MODEL = "droid-auto" +_RAW_EXTRAS_FIELDS = ("sessionId", "tokenUsage", "factoryVersion") + + +class DroidNormalizer(Normalizer): + provider_name = "droid" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + if role != "assistant": + return + + input_tokens = int(msg_row.get("input_tokens") or 0) + output_tokens = int(msg_row.get("output_tokens") or 0) + cache_read = int(msg_row.get("cache_read_tokens") or 0) + cache_create = int(msg_row.get("cache_create_tokens") or 0) + + # Fold thinking tokens (Droid's reasoning slot) into output if a + # per-row column is set. + thinking = int(msg_row.get("thinking_tokens") or 0) + if thinking == 0: + payload = _safe_load_raw(msg_row.get("raw_json")) + if isinstance(payload, dict): + tu = payload.get("tokenUsage") + if isinstance(tu, dict): + thinking = max(int(tu.get("thinkingTokens") or 0), 0) + if thinking > 0: + output_tokens += thinking + + estimated = False + if ( + input_tokens == 0 + and output_tokens == 0 + and cache_read == 0 + and cache_create == 0 + ): + text = str(msg_row.get("content_text") or "") + if not text: + return + input_tokens = max(len(text) // 4, 0) + estimated = True + + if input_tokens == 0 and output_tokens == 0 and cache_read == 0 and cache_create == 0: + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + if estimated: + cost_source = COST_SOURCE_ESTIMATED + elif model in RATE_CARD: + cost_source = COST_SOURCE_RATE_CARD + else: + cost_source = COST_SOURCE_UNKNOWN + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=cache_read, + cache_create_tokens=cache_create, + cost_source=cost_source, + model=model, + raw_extras=raw_extras, + ) + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + for key in _RAW_EXTRAS_FIELDS: + val = payload.get(key) + if val is not None and val != "": + out[key] = val + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/gemini.py b/stackunderflow/etl/normalize/gemini.py new file mode 100644 index 0000000..09e52cd --- /dev/null +++ b/stackunderflow/etl/normalize/gemini.py @@ -0,0 +1,183 @@ +"""Gemini (Google) normalizer. + +Gemini CLI's transcripts (single-JSON ≤0.38 / JSONL ≥0.39) record a +``usage`` block on each assistant message: + + { "promptTokenCount": ..., # *includes* cached input + "candidatesTokenCount": ..., # the visible output + "thoughtsTokenCount": ..., # reasoning output (≥1.5-pro) + "cachedContentTokenCount": ... # subset of promptTokenCount + } + +Per the codeburn catalog the canonical Anthropic-shape mapping is: + +* ``input = promptTokenCount - cachedContentTokenCount`` (fresh input) +* ``output = candidatesTokenCount + thoughtsTokenCount`` (visible + reasoning) +* ``cache_read = cachedContentTokenCount`` +* ``cache_create = 0`` (Gemini doesn't bill prompt-cache writes the + same way; cache content is implicitly created when the same prefix + appears repeatedly within the cached-content window) + +The adapter may pre-flatten these into the canonical 4-token columns +(``input_tokens`` etc.); when that's already happened we trust the +columns. When it hasn't (synthetic test rows / older adapter +behaviour) we read the raw shape from ``raw_json.usageMetadata`` or +``raw_json.tokens`` and apply the transform here. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from stackunderflow.infra.costs import RATE_CARD + +from .base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, + Normalizer, +) + +_DEFAULT_MODEL = "gemini-auto" +_RAW_EXTRAS_FIELDS = ("responseId", "finishReason", "safetyRatings") + + +class GeminiNormalizer(Normalizer): + provider_name = "gemini" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + # Gemini logs assistant turns as ``role='gemini'`` in some + # adapter versions and ``role='assistant'`` in others; accept + # both. + if role not in ("assistant", "gemini"): + return + + canonical = _canonical_tokens(msg_row) + if canonical is None: + return + + if ( + canonical["input"] == 0 + and canonical["output"] == 0 + and canonical["cache_read"] == 0 + and canonical["cache_create"] == 0 + ): + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + cost_source = ( + COST_SOURCE_RATE_CARD + if model in RATE_CARD + else COST_SOURCE_UNKNOWN + ) + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=canonical["input"], + output_tokens=canonical["output"], + cache_read_tokens=canonical["cache_read"], + cache_create_tokens=canonical["cache_create"], + cost_source=cost_source, + model=model, + raw_extras=raw_extras, + ) + + +def _canonical_tokens(msg_row: dict) -> dict[str, int] | None: + """Return the canonical 4-key shape, applying Gemini's transform. + + Resolution order: + 1. Raw Gemini ``usageMetadata`` block (``promptTokenCount`` etc.) on + ``raw_json`` or directly on ``msg_row`` — apply cached-subtract + + thoughts-fold here. + 2. Pre-canonicalised columns the adapter wrote. + """ + raw = _raw_gemini_usage(msg_row) + if raw is not None: + prompt = max(int(raw.get("promptTokenCount") or 0), 0) + cached = max(int(raw.get("cachedContentTokenCount") or 0), 0) + candidates = max(int(raw.get("candidatesTokenCount") or 0), 0) + thoughts = max(int(raw.get("thoughtsTokenCount") or 0), 0) + # Cached is a subset of prompt — subtract to get the fresh slice. + fresh_input = max(prompt - cached, 0) + return { + "input": fresh_input, + "output": candidates + thoughts, + "cache_read": cached, + "cache_create": 0, + } + + return { + "input": int(msg_row.get("input_tokens") or 0), + "output": int(msg_row.get("output_tokens") or 0), + "cache_read": int(msg_row.get("cache_read_tokens") or 0), + "cache_create": int(msg_row.get("cache_create_tokens") or 0), + } + + +def _raw_gemini_usage(msg_row: dict) -> dict | None: + """Pull Gemini's usage block from msg_row or raw_json.""" + # Direct fields on msg_row (synthetic test path). + if ( + "promptTokenCount" in msg_row + or "candidatesTokenCount" in msg_row + or "cachedContentTokenCount" in msg_row + or "thoughtsTokenCount" in msg_row + ): + return { + "promptTokenCount": msg_row.get("promptTokenCount", 0), + "candidatesTokenCount": msg_row.get("candidatesTokenCount", 0), + "cachedContentTokenCount": msg_row.get("cachedContentTokenCount", 0), + "thoughtsTokenCount": msg_row.get("thoughtsTokenCount", 0), + } + payload = _safe_load_raw(msg_row.get("raw_json")) + if not isinstance(payload, dict): + return None + # Newer JSONL: ``usageMetadata`` block. + md = payload.get("usageMetadata") + if isinstance(md, dict): + return md + # Older single-JSON: ``tokens`` block with friendlier names. + tokens = payload.get("tokens") + if isinstance(tokens, dict) and ( + "input" in tokens or "output" in tokens or "cached" in tokens or "thoughts" in tokens + ): + # Gemini CLI ≤0.38 shape: ``{input, output, cached, thoughts, ...}`` + # where ``input`` already has cached folded in (matches the + # promptTokenCount semantic). + return { + "promptTokenCount": tokens.get("input", 0) or 0, + "candidatesTokenCount": tokens.get("output", 0) or 0, + "cachedContentTokenCount": tokens.get("cached", 0) or 0, + "thoughtsTokenCount": tokens.get("thoughts", 0) or 0, + } + return None + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + for key in _RAW_EXTRAS_FIELDS: + val = payload.get(key) + if val is not None and val != "": + out[key] = val + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/kilocode.py b/stackunderflow/etl/normalize/kilocode.py new file mode 100644 index 0000000..f460798 --- /dev/null +++ b/stackunderflow/etl/normalize/kilocode.py @@ -0,0 +1,23 @@ +"""KiloCode normalizer (Cline-family). + +KiloCode is a fork of Cline distributed as a VS Code extension under +the id ``kilocode.kilo-code``. Its on-disk layout matches Cline's +exactly — ``ui_messages.json`` per task with ``api_req_started`` events +that carry a JSON-stringified ``text`` blob holding ``tokensIn``, +``tokensOut``, ``cacheWrites``, ``cacheReads``, and a pre-computed +``cost``. + +The transform is byte-identical to the Cline normalizer; we subclass +``ClineNormalizer`` and only override ``provider_name`` so the registry +lookup can route ``provider='kilocode'`` rows here. The pricer-side +``_PROVIDER_TO_PRICER`` map already routes ``kilocode`` to ``anthropic`` +(Cline-family extensions all run against the user's Anthropic key). +""" + +from __future__ import annotations + +from .cline import ClineNormalizer + + +class KiloCodeNormalizer(ClineNormalizer): + provider_name = "kilocode" diff --git a/stackunderflow/etl/normalize/kiro.py b/stackunderflow/etl/normalize/kiro.py new file mode 100644 index 0000000..059d37b --- /dev/null +++ b/stackunderflow/etl/normalize/kiro.py @@ -0,0 +1,92 @@ +"""Kiro (Amazon Kiro Agent) normalizer. + +Kiro persists chat history in ``.chat`` files (JSON blobs) under VS +Code's globalStorage. The schema records execution metadata and a +chat array but **does not** carry token counts on any role: + + { "executionId": "...", + "actionId": "...", + "chat": [{"role": "human" | "bot" | "tool", "content": "..."}], + "metadata": {"modelId": "claude.3.5.sonnet", "startTime": ...} } + +Per the codeburn catalog the canonical recovery is to estimate from +content length / 4 and stamp ``cost_source='estimated'``. Model id +normalisation (dots → dashes for ``claude.*`` ids) lives in the +adapter; we trust the column and fall back to ``kiro-auto``. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from .base import COST_SOURCE_ESTIMATED, Normalizer + +_DEFAULT_MODEL = "kiro-auto" +_RAW_EXTRAS_FIELDS = ("executionId", "actionId", "workflowId", "metadata") + + +class KiroNormalizer(Normalizer): + provider_name = "kiro" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + # Kiro logs assistant turns as ``role='bot'`` in the source + # format; the adapter may normalise to ``'assistant'``. Accept + # both so the normalizer doesn't depend on which adapter + # version wrote the row. + if role not in ("assistant", "bot"): + return + + text = str(msg_row.get("content_text") or "") + # Trust pre-computed counts if present (e.g. an adapter upgrade + # adds them later), otherwise estimate from text length. + input_tokens = int(msg_row.get("input_tokens") or 0) + output_tokens = int(msg_row.get("output_tokens") or 0) + if input_tokens == 0 and output_tokens == 0: + if not text: + return + input_tokens = max(len(text) // 4, 0) + + if input_tokens == 0 and output_tokens == 0: + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=0, + cache_create_tokens=0, + cost_source=COST_SOURCE_ESTIMATED, + model=model, + raw_extras=raw_extras, + ) + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + for key in _RAW_EXTRAS_FIELDS: + val = payload.get(key) + if val is not None and val != "": + out[key] = val + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/openclaw.py b/stackunderflow/etl/normalize/openclaw.py new file mode 100644 index 0000000..4ed17a5 --- /dev/null +++ b/stackunderflow/etl/normalize/openclaw.py @@ -0,0 +1,168 @@ +"""OpenClaw normalizer. + +OpenClaw (and the related ``clawdbot`` / ``moltbot`` / ``moldbot`` +agents that share the format) writes JSONL sessions where each +assistant message carries an explicit ``usage`` block: + + { "type": "message", + "message": { + "role": "assistant", + "content": [...], + "model": "...", + "provider": "...", + "usage": { + "input": ..., + "output": ..., + "cacheRead": ..., + "cacheWrite": ..., + "cost": {"total": ...} # optional, provider-embedded + } + } } + +Policy: + +* Trust the explicit token block; map ``cacheWrite`` → + ``cache_create_tokens``, ``cacheRead`` → ``cache_read_tokens``. +* Stamp ``cost_source='rate_card'`` for known model ids / + ``'unknown'`` otherwise. (We recompute via ``compute_cost`` for + parity with the rest of the pipeline; the embedded ``cost.total`` + is preserved in ``raw_extras`` as a cross-reference but not + consumed.) +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from stackunderflow.infra.costs import RATE_CARD + +from .base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, + Normalizer, +) + +_DEFAULT_MODEL = "openclaw-auto" +_RAW_EXTRAS_FIELDS = ("provider", "agentName", "embeddedCost") + + +class OpenClawNormalizer(Normalizer): + provider_name = "openclaw" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + if role != "assistant": + return + + canonical = _canonical_tokens(msg_row) + if canonical is None: + return + + if all(canonical[k] == 0 for k in ("input", "output", "cache_read", "cache_create")): + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + cost_source = ( + COST_SOURCE_RATE_CARD + if model in RATE_CARD + else COST_SOURCE_UNKNOWN + ) + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=canonical["input"], + output_tokens=canonical["output"], + cache_read_tokens=canonical["cache_read"], + cache_create_tokens=canonical["cache_create"], + cost_source=cost_source, + model=model, + raw_extras=raw_extras, + ) + + +def _canonical_tokens(msg_row: dict) -> dict[str, int] | None: + """Return canonical 4-key shape from the OpenClaw usage block.""" + # Prefer raw usage block (production path); fall back to columns. + raw = _raw_usage(msg_row) + if raw is not None: + return { + "input": _safe_int(raw.get("input")), + "output": _safe_int(raw.get("output")), + "cache_read": _safe_int(raw.get("cacheRead")), + "cache_create": _safe_int(raw.get("cacheWrite")), + } + return { + "input": int(msg_row.get("input_tokens") or 0), + "output": int(msg_row.get("output_tokens") or 0), + "cache_read": int(msg_row.get("cache_read_tokens") or 0), + "cache_create": int(msg_row.get("cache_create_tokens") or 0), + } + + +def _raw_usage(msg_row: dict) -> dict | None: + """Pull the OpenClaw ``usage`` block from msg_row or raw_json.""" + direct = msg_row.get("usage") + if isinstance(direct, dict): + return direct + payload = _safe_load_raw(msg_row.get("raw_json")) + if isinstance(payload, dict): + # The persisted shape may have ``message.usage`` (full event) + # or ``usage`` directly (already unwrapped). + msg = payload.get("message") + if isinstance(msg, dict): + usage = msg.get("usage") + if isinstance(usage, dict): + return usage + usage = payload.get("usage") + if isinstance(usage, dict): + return usage + return None + + +def _safe_int(value: object) -> int: + try: + return max(int(value or 0), 0) + except (TypeError, ValueError): + return 0 + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + inner = payload.get("message") if isinstance(payload.get("message"), dict) else payload + for key in _RAW_EXTRAS_FIELDS: + val = inner.get(key) if isinstance(inner, dict) else None + if val is not None and val != "": + out[key] = val + # Preserve embedded cost for cross-reference. Inspect the parsed + # payload directly — ``_raw_usage`` operates on a msg_row shape + # (looking at ``raw_json`` key) which won't be present here. + usage = None + if isinstance(inner, dict) and isinstance(inner.get("usage"), dict): + usage = inner["usage"] + elif isinstance(payload.get("usage"), dict): + usage = payload["usage"] + if isinstance(usage, dict): + cost = usage.get("cost") + if cost is not None: + out["embeddedCost"] = cost + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/opencode.py b/stackunderflow/etl/normalize/opencode.py new file mode 100644 index 0000000..860784d --- /dev/null +++ b/stackunderflow/etl/normalize/opencode.py @@ -0,0 +1,159 @@ +"""OpenCode normalizer. + +OpenCode persists chats in a SQLite database (``opencode*.db`` under +``$XDG_DATA_HOME/opencode/``). The ``message`` table's ``data`` column +holds the per-turn payload as JSON, with the canonical token block at +``data.tokens``: + + { "role": "assistant", + "modelID": "...", + "tokens": { + "input": ..., + "output": ..., + "reasoning": ..., + "cache": {"read": ..., "write": ...} + }, + "cost": ... } + +Per the codeburn catalog the canonical mapping is: + +* ``input = tokens.input`` +* ``output = tokens.output + tokens.reasoning`` (fold reasoning into output) +* ``cache_read = tokens.cache.read`` +* ``cache_create = tokens.cache.write`` + +Model id is preserved verbatim from ``data.modelID`` (the adapter +should already lift it onto the row column). ``cost`` is not +consumed — we recompute via ``compute_cost`` for parity, preserving +the source-side cost in ``raw_extras``. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from stackunderflow.infra.costs import RATE_CARD + +from .base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, + Normalizer, +) + +_DEFAULT_MODEL = "opencode-auto" +_RAW_EXTRAS_FIELDS = ("modelID", "providerID", "embeddedCost") + + +class OpenCodeNormalizer(Normalizer): + provider_name = "opencode" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + if role != "assistant": + return + + canonical = _canonical_tokens(msg_row) + if canonical is None: + return + + if all(canonical[k] == 0 for k in ("input", "output", "cache_read", "cache_create")): + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + cost_source = ( + COST_SOURCE_RATE_CARD + if model in RATE_CARD + else COST_SOURCE_UNKNOWN + ) + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=canonical["input"], + output_tokens=canonical["output"], + cache_read_tokens=canonical["cache_read"], + cache_create_tokens=canonical["cache_create"], + cost_source=cost_source, + model=model, + raw_extras=raw_extras, + ) + + +def _canonical_tokens(msg_row: dict) -> dict[str, int] | None: + """Return canonical 4-key shape with reasoning folded into output.""" + raw = _raw_tokens(msg_row) + if raw is not None: + cache = raw.get("cache") if isinstance(raw.get("cache"), dict) else {} + return { + "input": _safe_int(raw.get("input")), + "output": _safe_int(raw.get("output")) + _safe_int(raw.get("reasoning")), + "cache_read": _safe_int(cache.get("read") if isinstance(cache, dict) else 0), + "cache_create": _safe_int(cache.get("write") if isinstance(cache, dict) else 0), + } + return { + "input": int(msg_row.get("input_tokens") or 0), + "output": int(msg_row.get("output_tokens") or 0), + "cache_read": int(msg_row.get("cache_read_tokens") or 0), + "cache_create": int(msg_row.get("cache_create_tokens") or 0), + } + + +def _raw_tokens(msg_row: dict) -> dict | None: + """Locate the OpenCode tokens block on msg_row or raw_json.""" + direct = msg_row.get("tokens") + if isinstance(direct, dict): + return direct + payload = _safe_load_raw(msg_row.get("raw_json")) + if isinstance(payload, dict): + # Canonical persistence — the message.data JSON has tokens at + # the top level. + tokens = payload.get("tokens") + if isinstance(tokens, dict): + return tokens + # If the raw_json includes a wrapping ``data`` key (some adapter + # versions persist the row that way), unwrap once. + data = payload.get("data") + if isinstance(data, dict): + tokens = data.get("tokens") + if isinstance(tokens, dict): + return tokens + return None + + +def _safe_int(value: object) -> int: + try: + return max(int(value or 0), 0) + except (TypeError, ValueError): + return 0 + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + inner = payload.get("data") if isinstance(payload.get("data"), dict) else payload + out: dict = {} + for key in _RAW_EXTRAS_FIELDS: + val = inner.get(key) if isinstance(inner, dict) else None + if val is not None and val != "": + out[key] = val + cost = inner.get("cost") if isinstance(inner, dict) else None + if cost is not None and "embeddedCost" not in out: + out["embeddedCost"] = cost + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/pi.py b/stackunderflow/etl/normalize/pi.py new file mode 100644 index 0000000..1ee1a4f --- /dev/null +++ b/stackunderflow/etl/normalize/pi.py @@ -0,0 +1,150 @@ +"""Pi (and OMP) normalizer. + +Pi and OMP share the same JSONL format — each assistant message carries +an explicit ``usage`` block: + + { "type": "message", + "message": { + "role": "assistant", + "content": [...], + "model": "...", + "usage": { + "input": ..., + "output": ..., + "cacheRead": ..., + "cacheWrite": ... + } + } } + +Per the codeburn catalog the mapping is direct: ``cacheWrite`` → +``cache_create_tokens``, ``cacheRead`` → ``cache_read_tokens``. + +This single normalizer covers both Pi and OMP — they only differ in +their on-disk root directory (``~/.pi/agent/sessions/`` vs. +``~/.omp/agent/sessions/``), which is an adapter concern. The +``provider_name`` defaults to ``pi`` but the registry can route OMP +rows here too via ``register("omp", PiNormalizer)`` if the adapter +ever distinguishes them at the provider column level. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from stackunderflow.infra.costs import RATE_CARD + +from .base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, + Normalizer, +) + +_DEFAULT_MODEL = "gpt-5" +_RAW_EXTRAS_FIELDS = ("responseId", "sessionId", "cwd") + + +class PiNormalizer(Normalizer): + provider_name = "pi" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + if role != "assistant": + return + + canonical = _canonical_tokens(msg_row) + if canonical is None: + return + + if all(canonical[k] == 0 for k in ("input", "output", "cache_read", "cache_create")): + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + cost_source = ( + COST_SOURCE_RATE_CARD + if model in RATE_CARD + else COST_SOURCE_UNKNOWN + ) + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=canonical["input"], + output_tokens=canonical["output"], + cache_read_tokens=canonical["cache_read"], + cache_create_tokens=canonical["cache_create"], + cost_source=cost_source, + model=model, + raw_extras=raw_extras, + ) + + +def _canonical_tokens(msg_row: dict) -> dict[str, int] | None: + raw = _raw_usage(msg_row) + if raw is not None: + return { + "input": _safe_int(raw.get("input")), + "output": _safe_int(raw.get("output")), + "cache_read": _safe_int(raw.get("cacheRead")), + "cache_create": _safe_int(raw.get("cacheWrite")), + } + return { + "input": int(msg_row.get("input_tokens") or 0), + "output": int(msg_row.get("output_tokens") or 0), + "cache_read": int(msg_row.get("cache_read_tokens") or 0), + "cache_create": int(msg_row.get("cache_create_tokens") or 0), + } + + +def _raw_usage(msg_row: dict) -> dict | None: + direct = msg_row.get("usage") + if isinstance(direct, dict): + return direct + payload = _safe_load_raw(msg_row.get("raw_json")) + if isinstance(payload, dict): + msg = payload.get("message") + if isinstance(msg, dict): + usage = msg.get("usage") + if isinstance(usage, dict): + return usage + usage = payload.get("usage") + if isinstance(usage, dict): + return usage + return None + + +def _safe_int(value: object) -> int: + try: + return max(int(value or 0), 0) + except (TypeError, ValueError): + return 0 + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + inner = payload.get("message") if isinstance(payload.get("message"), dict) else payload + for key in _RAW_EXTRAS_FIELDS: + val = inner.get(key) if isinstance(inner, dict) else None + if val is None and isinstance(payload, dict): + val = payload.get(key) + if val is not None and val != "": + out[key] = val + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/qwen.py b/stackunderflow/etl/normalize/qwen.py new file mode 100644 index 0000000..ab5f132 --- /dev/null +++ b/stackunderflow/etl/normalize/qwen.py @@ -0,0 +1,149 @@ +"""Qwen normalizer. + +Qwen logs assistant turns with a Gemini-shaped ``usageMetadata`` block: + + { "type": "assistant", + "model": "...", + "message": { "role": "assistant", "parts": [...] }, + "usageMetadata": { + "promptTokenCount": ..., # *includes* cached input + "candidatesTokenCount": ..., + "thoughtsTokenCount": ..., + "cachedContentTokenCount": ... + } } + +Per the codeburn catalog the canonical mapping is **identical to +Gemini's**: + +* ``input = promptTokenCount - cachedContentTokenCount`` +* ``output = candidatesTokenCount + thoughtsTokenCount`` +* ``cache_read = cachedContentTokenCount`` +* ``cache_create = 0`` + +This is intentionally a parallel implementation rather than reusing +Gemini's class — provider-specific provenance differs (Qwen surfaces +``functionCall`` arrays, Gemini surfaces ``finishReason``) and the +``provider_name`` / pricer routing differ. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable + +from stackunderflow.infra.costs import RATE_CARD + +from .base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, + Normalizer, +) + +_DEFAULT_MODEL = "qwen-auto" +_RAW_EXTRAS_FIELDS = ("uuid", "sessionId", "functionCall") + + +class QwenNormalizer(Normalizer): + provider_name = "qwen" + + def normalize(self, msg_row: dict) -> Iterable[dict]: + role = str(msg_row.get("role") or "") + if role != "assistant": + return + + canonical = _canonical_tokens(msg_row) + if canonical is None: + return + + if all(canonical[k] == 0 for k in ("input", "output", "cache_read", "cache_create")): + return + + model = str(msg_row.get("model") or "") or _DEFAULT_MODEL + + cost_source = ( + COST_SOURCE_RATE_CARD + if model in RATE_CARD + else COST_SOURCE_UNKNOWN + ) + + raw_extras = _extras_from_raw_json(msg_row.get("raw_json")) + + yield self._build_event( + msg_row, + input_tokens=canonical["input"], + output_tokens=canonical["output"], + cache_read_tokens=canonical["cache_read"], + cache_create_tokens=canonical["cache_create"], + cost_source=cost_source, + model=model, + raw_extras=raw_extras, + ) + + +def _canonical_tokens(msg_row: dict) -> dict[str, int] | None: + raw = _raw_usage_metadata(msg_row) + if raw is not None: + prompt = max(int(raw.get("promptTokenCount") or 0), 0) + cached = max(int(raw.get("cachedContentTokenCount") or 0), 0) + candidates = max(int(raw.get("candidatesTokenCount") or 0), 0) + thoughts = max(int(raw.get("thoughtsTokenCount") or 0), 0) + fresh_input = max(prompt - cached, 0) + return { + "input": fresh_input, + "output": candidates + thoughts, + "cache_read": cached, + "cache_create": 0, + } + return { + "input": int(msg_row.get("input_tokens") or 0), + "output": int(msg_row.get("output_tokens") or 0), + "cache_read": int(msg_row.get("cache_read_tokens") or 0), + "cache_create": int(msg_row.get("cache_create_tokens") or 0), + } + + +def _raw_usage_metadata(msg_row: dict) -> dict | None: + if ( + "promptTokenCount" in msg_row + or "candidatesTokenCount" in msg_row + or "cachedContentTokenCount" in msg_row + or "thoughtsTokenCount" in msg_row + ): + return { + "promptTokenCount": msg_row.get("promptTokenCount", 0), + "candidatesTokenCount": msg_row.get("candidatesTokenCount", 0), + "cachedContentTokenCount": msg_row.get("cachedContentTokenCount", 0), + "thoughtsTokenCount": msg_row.get("thoughtsTokenCount", 0), + } + payload = _safe_load_raw(msg_row.get("raw_json")) + if not isinstance(payload, dict): + return None + md = payload.get("usageMetadata") + if isinstance(md, dict): + return md + return None + + +def _extras_from_raw_json(raw_json: object) -> dict | None: + payload = _safe_load_raw(raw_json) + if not isinstance(payload, dict): + return None + out: dict = {} + for key in _RAW_EXTRAS_FIELDS: + val = payload.get(key) + if val is not None and val != "": + out[key] = val + return out or None + + +def _safe_load_raw(raw_json: object) -> object | None: + if isinstance(raw_json, dict): + return raw_json + if not isinstance(raw_json, str | bytes | bytearray): + return None + try: + if isinstance(raw_json, bytes | bytearray): + return json.loads(raw_json.decode("utf-8", errors="replace")) + return json.loads(raw_json) + except (json.JSONDecodeError, ValueError, UnicodeDecodeError): + return None diff --git a/stackunderflow/etl/normalize/roocode.py b/stackunderflow/etl/normalize/roocode.py new file mode 100644 index 0000000..c1a154e --- /dev/null +++ b/stackunderflow/etl/normalize/roocode.py @@ -0,0 +1,17 @@ +"""Roo Code normalizer (Cline-family). + +Roo Code is another Cline fork distributed under the VS Code extension +id ``rooveterinaryinc.roo-cline``. Same on-disk shape as Cline (and +KiloCode); same transform. Only ``provider_name`` differs so the +registry can route ``provider='roocode'`` rows here. + +The pricer-side provider map already routes ``roocode`` to Anthropic. +""" + +from __future__ import annotations + +from .cline import ClineNormalizer + + +class RooCodeNormalizer(ClineNormalizer): + provider_name = "roocode" diff --git a/tests/stackunderflow/etl/normalize/test_codeium.py b/tests/stackunderflow/etl/normalize/test_codeium.py new file mode 100644 index 0000000..a4ab416 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_codeium.py @@ -0,0 +1,43 @@ +"""CodeiumNormalizer — discovery-only stub yields zero events.""" + +from __future__ import annotations + +from stackunderflow.etl.normalize import CodeiumNormalizer + + +def _msg_row(**overrides) -> dict: + base = { + "id": 1, + "provider": "codeium", + "project_id": 1, + "session_id": "codeium-sess", + "timestamp": "2026-04-25T10:00:00+00:00", + "role": "assistant", + "model": "claude-sonnet-4-5-20250929", + "input_tokens": 100, + "output_tokens": 100, + "content_text": "hello world", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_codeium_yields_zero_events_on_assistant_row() -> None: + """Stub never emits events even on a fully-populated assistant row.""" + row = _msg_row() + assert list(CodeiumNormalizer().normalize(row)) == [] + + +def test_codeium_yields_zero_events_on_user_row() -> None: + row = _msg_row(role="user") + assert list(CodeiumNormalizer().normalize(row)) == [] + + +def test_codeium_yields_zero_events_on_empty_row() -> None: + row = {"id": 1, "role": "assistant"} + assert list(CodeiumNormalizer().normalize(row)) == [] + + +def test_codeium_provider_name() -> None: + assert CodeiumNormalizer.provider_name == "codeium" diff --git a/tests/stackunderflow/etl/normalize/test_continue_.py b/tests/stackunderflow/etl/normalize/test_continue_.py new file mode 100644 index 0000000..addbc64 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_continue_.py @@ -0,0 +1,80 @@ +"""ContinueNormalizer — defensive SQLite-source path; trust columns or estimate.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import ContinueNormalizer +from stackunderflow.etl.normalize.base import ( + COST_SOURCE_ESTIMATED, + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, +) + + +def _msg_row(**overrides) -> dict: + base = { + "id": 400, + "provider": "continue", + "project_id": 4, + "session_id": "continue-sess", + "timestamp": "2026-04-25T13:00:00+00:00", + "role": "assistant", + "model": "claude-sonnet-4-5-20250929", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", input_tokens=100) + assert list(ContinueNormalizer().normalize(row)) == [] + + +def test_explicit_tokens_use_rate_card() -> None: + row = _msg_row(input_tokens=200, output_tokens=100) + events = list(ContinueNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 200 + assert ev["output_tokens"] == 100 + assert ev["cost_source"] == COST_SOURCE_RATE_CARD + + +def test_estimate_from_text_when_no_tokens() -> None: + """No token columns → estimate input from text length / 4.""" + row = _msg_row(content_text="hello world from continue") + events = list(ContinueNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + # len("hello world from continue") == 25; 25 // 4 == 6 + assert ev["input_tokens"] == 6 + assert ev["output_tokens"] == 0 + assert ev["cost_source"] == COST_SOURCE_ESTIMATED + + +def test_unknown_model_stamps_unknown() -> None: + row = _msg_row(input_tokens=100, output_tokens=100, model="continue-future-2030") + events = list(ContinueNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_UNKNOWN + + +def test_no_tokens_no_text_yields_zero_events() -> None: + row = _msg_row() + assert list(ContinueNormalizer().normalize(row)) == [] + + +def test_raw_extras_preserves_provider_quirks() -> None: + raw = {"provider": "openai", "modelTitle": "Continue / GPT-4"} + row = _msg_row(input_tokens=100, output_tokens=50, raw_json=json.dumps(raw)) + events = list(ContinueNormalizer().normalize(row)) + extras = json.loads(events[0]["raw_extras"]) + assert extras["provider"] == "openai" + assert extras["modelTitle"] == "Continue / GPT-4" diff --git a/tests/stackunderflow/etl/normalize/test_copilot.py b/tests/stackunderflow/etl/normalize/test_copilot.py new file mode 100644 index 0000000..b3af217 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_copilot.py @@ -0,0 +1,85 @@ +"""CopilotNormalizer — explicit transcript tokens vs. legacy estimation path.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import CopilotNormalizer +from stackunderflow.etl.normalize.base import ( + COST_SOURCE_ESTIMATED, + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, +) + + +def _msg_row(**overrides) -> dict: + base = { + "id": 500, + "provider": "copilot", + "project_id": 5, + "session_id": "copilot-sess", + "timestamp": "2026-04-25T14:00:00+00:00", + "role": "assistant", + "model": "gpt-4o", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", output_tokens=10) + assert list(CopilotNormalizer().normalize(row)) == [] + + +def test_explicit_transcript_tokens_use_rate_card() -> None: + row = _msg_row(input_tokens=300, output_tokens=200) + events = list(CopilotNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 300 + assert ev["output_tokens"] == 200 + assert ev["cache_read_tokens"] == 0 + assert ev["cache_create_tokens"] == 0 + assert ev["cost_source"] == COST_SOURCE_RATE_CARD + + +def test_legacy_event_estimates_from_text_length() -> None: + """Legacy event without explicit output tokens — estimate from text.""" + row = _msg_row(content_text="hello copilot world response") + events = list(CopilotNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + # len("hello copilot world response") == 28; 28 // 4 == 7 + assert ev["output_tokens"] == 7 + assert ev["cost_source"] == COST_SOURCE_ESTIMATED + + +def test_data_subkey_in_raw_json_used() -> None: + """VS Code transcript shape — tokens nested in ``data``.""" + raw = {"data": {"inputTokens": 400, "outputTokens": 250, "producer": "copilot-agent"}} + row = _msg_row(raw_json=json.dumps(raw)) + events = list(CopilotNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 400 + assert ev["output_tokens"] == 250 + extras = json.loads(ev["raw_extras"]) + assert extras["producer"] == "copilot-agent" + + +def test_unknown_model_stamps_unknown() -> None: + row = _msg_row(input_tokens=100, output_tokens=100, model="copilot-mystery-2030") + events = list(CopilotNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_UNKNOWN + + +def test_no_tokens_no_text_yields_zero_events() -> None: + row = _msg_row() + assert list(CopilotNormalizer().normalize(row)) == [] diff --git a/tests/stackunderflow/etl/normalize/test_cursor_agent.py b/tests/stackunderflow/etl/normalize/test_cursor_agent.py new file mode 100644 index 0000000..b558ae7 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_cursor_agent.py @@ -0,0 +1,64 @@ +"""CursorAgentNormalizer — always-estimated text/4 path.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import CursorAgentNormalizer +from stackunderflow.etl.normalize.base import COST_SOURCE_ESTIMATED + + +def _msg_row(**overrides) -> dict: + base = { + "id": 600, + "provider": "cursor_agent", + "project_id": 6, + "session_id": "cursor-agent-sess", + "timestamp": "2026-04-25T15:00:00+00:00", + "role": "assistant", + "model": "claude-sonnet-4-5-20250929", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", content_text="user message") + assert list(CursorAgentNormalizer().normalize(row)) == [] + + +def test_estimate_from_text_length() -> None: + """Cursor Agent always estimates from text//4; even known models stamp estimated.""" + row = _msg_row(content_text="this is a cursor agent assistant response") + events = list(CursorAgentNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + # len("this is a cursor agent assistant response") == 41; 41 // 4 == 10 + assert ev["input_tokens"] == 10 + assert ev["output_tokens"] == 0 + assert ev["cost_source"] == COST_SOURCE_ESTIMATED + + +def test_provider_name() -> None: + assert CursorAgentNormalizer.provider_name == "cursor_agent" + + +def test_no_text_yields_zero_events() -> None: + row = _msg_row() + assert list(CursorAgentNormalizer().normalize(row)) == [] + + +def test_raw_extras_preserves_conversation_metadata() -> None: + raw = {"conversationId": "conv-abc-123", "transcriptType": "jsonl"} + row = _msg_row(content_text="response text", raw_json=json.dumps(raw)) + events = list(CursorAgentNormalizer().normalize(row)) + extras = json.loads(events[0]["raw_extras"]) + assert extras["conversationId"] == "conv-abc-123" + assert extras["transcriptType"] == "jsonl" diff --git a/tests/stackunderflow/etl/normalize/test_droid.py b/tests/stackunderflow/etl/normalize/test_droid.py new file mode 100644 index 0000000..957b873 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_droid.py @@ -0,0 +1,88 @@ +"""DroidNormalizer — adapter pre-distributes session totals; we trust columns.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import DroidNormalizer +from stackunderflow.etl.normalize.base import ( + COST_SOURCE_ESTIMATED, + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, +) + + +def _msg_row(**overrides) -> dict: + base = { + "id": 700, + "provider": "droid", + "project_id": 7, + "session_id": "droid-sess", + "timestamp": "2026-04-25T16:00:00+00:00", + "role": "assistant", + "model": "claude-sonnet-4-5-20250929", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", input_tokens=50) + assert list(DroidNormalizer().normalize(row)) == [] + + +def test_pre_distributed_tokens_use_rate_card() -> None: + """Adapter has already split session-level totals across messages.""" + row = _msg_row( + input_tokens=500, + output_tokens=300, + cache_read_tokens=100, + cache_create_tokens=50, + ) + events = list(DroidNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 500 + assert ev["output_tokens"] == 300 + assert ev["cache_read_tokens"] == 100 + assert ev["cache_create_tokens"] == 50 + assert ev["cost_source"] == COST_SOURCE_RATE_CARD + + +def test_thinking_tokens_folded_into_output() -> None: + """Spec: thinkingTokens (Droid's reasoning slot) folds into output.""" + row = _msg_row(input_tokens=200, output_tokens=300, thinking_tokens=100) + events = list(DroidNormalizer().normalize(row)) + assert len(events) == 1 + assert events[0]["output_tokens"] == 400 + + +def test_thinking_tokens_from_raw_json() -> None: + raw = {"tokenUsage": {"thinkingTokens": 75}} + row = _msg_row(input_tokens=100, output_tokens=200, raw_json=json.dumps(raw)) + events = list(DroidNormalizer().normalize(row)) + assert events[0]["output_tokens"] == 275 + + +def test_estimate_when_no_settings_data() -> None: + """Adapter couldn't read settings — estimate from text.""" + row = _msg_row(content_text="droid assistant message text") + events = list(DroidNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + # len("droid assistant message text") == 28; 28 // 4 == 7 + assert ev["input_tokens"] == 7 + assert ev["cost_source"] == COST_SOURCE_ESTIMATED + + +def test_unknown_model_stamps_unknown() -> None: + row = _msg_row(input_tokens=100, output_tokens=100, model="droid-mystery-2030") + events = list(DroidNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_UNKNOWN diff --git a/tests/stackunderflow/etl/normalize/test_gemini.py b/tests/stackunderflow/etl/normalize/test_gemini.py new file mode 100644 index 0000000..b4bfb25 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_gemini.py @@ -0,0 +1,147 @@ +"""GeminiNormalizer — cached subtraction + thoughts fold into output.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import GeminiNormalizer +from stackunderflow.etl.normalize.base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, +) + + +def _msg_row(**overrides) -> dict: + base = { + "id": 800, + "provider": "gemini", + "project_id": 8, + "session_id": "gemini-sess", + "timestamp": "2026-04-25T17:00:00+00:00", + "role": "assistant", + "model": "gemini-2.5-pro", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", input_tokens=10) + assert list(GeminiNormalizer().normalize(row)) == [] + + +def test_subtract_cached_from_input() -> None: + """Spec: prompt=1000, cached=300 → input=700, cache_read=300.""" + row = _msg_row( + promptTokenCount=1000, + cachedContentTokenCount=300, + candidatesTokenCount=500, + thoughtsTokenCount=0, + ) + events = list(GeminiNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 700 + assert ev["output_tokens"] == 500 + assert ev["cache_read_tokens"] == 300 + assert ev["cache_create_tokens"] == 0 + + +def test_fold_thoughts_into_output() -> None: + """Spec: candidates=500, thoughts=200 → output=700.""" + row = _msg_row( + promptTokenCount=100, + cachedContentTokenCount=0, + candidatesTokenCount=500, + thoughtsTokenCount=200, + ) + events = list(GeminiNormalizer().normalize(row)) + assert events[0]["output_tokens"] == 700 + assert events[0]["input_tokens"] == 100 + + +def test_combined_subtract_and_fold() -> None: + row = _msg_row( + promptTokenCount=1000, + cachedContentTokenCount=300, + candidatesTokenCount=500, + thoughtsTokenCount=200, + ) + events = list(GeminiNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 700 + assert ev["output_tokens"] == 700 + assert ev["cache_read_tokens"] == 300 + assert ev["cache_create_tokens"] == 0 + + +def test_raw_json_usage_metadata_path() -> None: + """Production JSONL ≥0.39 path: usageMetadata in raw_json.""" + raw = { + "usageMetadata": { + "promptTokenCount": 1000, + "cachedContentTokenCount": 300, + "candidatesTokenCount": 500, + "thoughtsTokenCount": 200, + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(GeminiNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 700 + assert ev["output_tokens"] == 700 + assert ev["cache_read_tokens"] == 300 + + +def test_legacy_tokens_block_in_raw_json() -> None: + """Older single-JSON ≤0.38 path: ``tokens`` block.""" + raw = { + "tokens": { + "input": 1000, + "output": 500, + "cached": 300, + "thoughts": 200, + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(GeminiNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 700 + assert ev["output_tokens"] == 700 + assert ev["cache_read_tokens"] == 300 + + +def test_role_gemini_accepted() -> None: + """Some adapter versions emit role='gemini' for assistant turns.""" + row = _msg_row(role="gemini", promptTokenCount=100, candidatesTokenCount=50) + events = list(GeminiNormalizer().normalize(row)) + assert len(events) == 1 + assert events[0]["input_tokens"] == 100 + assert events[0]["output_tokens"] == 50 + + +def test_unknown_model_stamps_unknown() -> None: + row = _msg_row( + promptTokenCount=100, + candidatesTokenCount=100, + model="gemini-future-2030", + ) + events = list(GeminiNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_UNKNOWN + + +def test_known_model_stamps_rate_card() -> None: + row = _msg_row( + promptTokenCount=100, + candidatesTokenCount=100, + model="claude-sonnet-4-5-20250929", # ensure RATE_CARD lookup works + ) + events = list(GeminiNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_RATE_CARD diff --git a/tests/stackunderflow/etl/normalize/test_kilocode.py b/tests/stackunderflow/etl/normalize/test_kilocode.py new file mode 100644 index 0000000..f9987ba --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_kilocode.py @@ -0,0 +1,70 @@ +"""KiloCodeNormalizer — Cline-family wrapper, only provider_name differs.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import KiloCodeNormalizer + + +def _msg_row(**overrides) -> dict: + base = { + "id": 900, + "provider": "kilocode", + "project_id": 9, + "session_id": "kilo-task-id", + "timestamp": "2026-04-25T18:00:00+00:00", + "role": "assistant", + "model": "claude-sonnet-4-5-20250929", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_provider_name_is_kilocode() -> None: + assert KiloCodeNormalizer.provider_name == "kilocode" + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user") + assert list(KiloCodeNormalizer().normalize(row)) == [] + + +def test_api_req_started_tokens_parsed_from_text_blob() -> None: + """Cline-shape parsing — kilocode shares the api_req_started.text JSON.""" + text = json.dumps({ + "tokensIn": 500, + "tokensOut": 200, + "cacheReads": 1000, + "cacheWrites": 300, + "cost": 0.0042, + }) + raw = {"text": text} + row = _msg_row(raw_json=json.dumps(raw)) + events = list(KiloCodeNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 500 + assert ev["output_tokens"] == 200 + assert ev["cache_read_tokens"] == 1000 + assert ev["cache_create_tokens"] == 300 + assert ev["provider"] == "kilocode" + + +def test_canonical_columns_fallback() -> None: + row = _msg_row(input_tokens=100, output_tokens=50) + events = list(KiloCodeNormalizer().normalize(row)) + assert len(events) == 1 + assert events[0]["input_tokens"] == 100 + + +def test_no_tokens_yields_zero_events() -> None: + row = _msg_row() + assert list(KiloCodeNormalizer().normalize(row)) == [] diff --git a/tests/stackunderflow/etl/normalize/test_kiro.py b/tests/stackunderflow/etl/normalize/test_kiro.py new file mode 100644 index 0000000..38e7f08 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_kiro.py @@ -0,0 +1,68 @@ +"""KiroNormalizer — estimate from text length, always estimated.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import KiroNormalizer +from stackunderflow.etl.normalize.base import COST_SOURCE_ESTIMATED + + +def _msg_row(**overrides) -> dict: + base = { + "id": 1000, + "provider": "kiro", + "project_id": 10, + "session_id": "kiro-workflow-id", + "timestamp": "2026-04-25T19:00:00+00:00", + "role": "assistant", + "model": "claude-3-5-sonnet-20241022", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", content_text="user prompt") + assert list(KiroNormalizer().normalize(row)) == [] + + +def test_role_bot_accepted_as_assistant() -> None: + """Kiro source format uses ``role='bot'`` for the assistant.""" + row = _msg_row(role="bot", content_text="bot response text") + events = list(KiroNormalizer().normalize(row)) + assert len(events) == 1 + assert events[0]["cost_source"] == COST_SOURCE_ESTIMATED + + +def test_estimate_from_text_length() -> None: + """Spec: text//4 estimation.""" + row = _msg_row(content_text="kiro agent assistant response message") + events = list(KiroNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + # len("kiro agent assistant response message") == 37; 37 // 4 == 9 + assert ev["input_tokens"] == 9 + assert ev["output_tokens"] == 0 + assert ev["cost_source"] == COST_SOURCE_ESTIMATED + + +def test_no_text_yields_zero_events() -> None: + row = _msg_row() + assert list(KiroNormalizer().normalize(row)) == [] + + +def test_raw_extras_preserves_workflow_metadata() -> None: + raw = {"executionId": "exec-123", "workflowId": "wf-456"} + row = _msg_row(content_text="response", raw_json=json.dumps(raw)) + events = list(KiroNormalizer().normalize(row)) + extras = json.loads(events[0]["raw_extras"]) + assert extras["executionId"] == "exec-123" + assert extras["workflowId"] == "wf-456" diff --git a/tests/stackunderflow/etl/normalize/test_openclaw.py b/tests/stackunderflow/etl/normalize/test_openclaw.py new file mode 100644 index 0000000..f6b6530 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_openclaw.py @@ -0,0 +1,109 @@ +"""OpenClawNormalizer — explicit usage block with cacheRead/cacheWrite.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import OpenClawNormalizer +from stackunderflow.etl.normalize.base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, +) + + +def _msg_row(**overrides) -> dict: + base = { + "id": 1100, + "provider": "openclaw", + "project_id": 11, + "session_id": "openclaw-sess", + "timestamp": "2026-04-25T20:00:00+00:00", + "role": "assistant", + "model": "claude-sonnet-4-5-20250929", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", input_tokens=50) + assert list(OpenClawNormalizer().normalize(row)) == [] + + +def test_explicit_usage_block_canonical_mapping() -> None: + """Spec: cacheWrite → cache_create_tokens, cacheRead → cache_read_tokens.""" + raw = { + "message": { + "usage": { + "input": 800, + "output": 400, + "cacheRead": 1200, + "cacheWrite": 200, + } + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(OpenClawNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 800 + assert ev["output_tokens"] == 400 + assert ev["cache_read_tokens"] == 1200 + assert ev["cache_create_tokens"] == 200 + assert ev["cost_source"] == COST_SOURCE_RATE_CARD + + +def test_canonical_columns_fallback() -> None: + row = _msg_row( + input_tokens=200, + output_tokens=100, + cache_read_tokens=50, + cache_create_tokens=25, + ) + events = list(OpenClawNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 200 + assert ev["cache_read_tokens"] == 50 + assert ev["cache_create_tokens"] == 25 + + +def test_embedded_cost_preserved_in_raw_extras() -> None: + raw = { + "message": { + "provider": "anthropic", + "usage": { + "input": 100, + "output": 50, + "cacheRead": 0, + "cacheWrite": 0, + "cost": {"total": 0.0042}, + }, + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(OpenClawNormalizer().normalize(row)) + extras = json.loads(events[0]["raw_extras"]) + assert extras["embeddedCost"] == {"total": 0.0042} + assert extras["provider"] == "anthropic" + + +def test_no_usage_data_yields_zero_events() -> None: + row = _msg_row() + assert list(OpenClawNormalizer().normalize(row)) == [] + + +def test_unknown_model_stamps_unknown() -> None: + row = _msg_row( + input_tokens=100, + output_tokens=100, + model="openclaw-future-2030", + ) + events = list(OpenClawNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_UNKNOWN diff --git a/tests/stackunderflow/etl/normalize/test_opencode.py b/tests/stackunderflow/etl/normalize/test_opencode.py new file mode 100644 index 0000000..2be294b --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_opencode.py @@ -0,0 +1,115 @@ +"""OpenCodeNormalizer — SQLite source; tokens.{input,output,reasoning,cache}.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import OpenCodeNormalizer +from stackunderflow.etl.normalize.base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, +) + + +def _msg_row(**overrides) -> dict: + base = { + "id": 1200, + "provider": "opencode", + "project_id": 12, + "session_id": "opencode-sess", + "timestamp": "2026-04-25T21:00:00+00:00", + "role": "assistant", + "model": "claude-sonnet-4-5-20250929", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", input_tokens=10) + assert list(OpenCodeNormalizer().normalize(row)) == [] + + +def test_canonical_mapping_with_reasoning_fold() -> None: + """Spec: tokens.reasoning folds into output; cache.{read,write}.""" + raw = { + "tokens": { + "input": 1000, + "output": 400, + "reasoning": 200, + "cache": {"read": 800, "write": 100}, + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(OpenCodeNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 1000 + assert ev["output_tokens"] == 600 # 400 + 200 reasoning + assert ev["cache_read_tokens"] == 800 + assert ev["cache_create_tokens"] == 100 + assert ev["cost_source"] == COST_SOURCE_RATE_CARD + + +def test_data_wrapped_payload() -> None: + """Some adapter versions persist the payload nested under ``data``.""" + raw = { + "data": { + "modelID": "claude-sonnet-4-5-20250929", + "tokens": { + "input": 500, + "output": 250, + "reasoning": 0, + "cache": {"read": 0, "write": 0}, + }, + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(OpenCodeNormalizer().normalize(row)) + assert events[0]["input_tokens"] == 500 + assert events[0]["output_tokens"] == 250 + + +def test_canonical_columns_fallback_when_no_raw() -> None: + row = _msg_row(input_tokens=100, output_tokens=50, cache_read_tokens=20) + events = list(OpenCodeNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 100 + assert ev["cache_read_tokens"] == 20 + + +def test_no_tokens_yields_zero_events() -> None: + row = _msg_row() + assert list(OpenCodeNormalizer().normalize(row)) == [] + + +def test_unknown_model_stamps_unknown() -> None: + row = _msg_row( + input_tokens=100, + output_tokens=100, + model="opencode-mystery-2030", + ) + events = list(OpenCodeNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_UNKNOWN + + +def test_embedded_cost_in_raw_extras() -> None: + raw = { + "data": { + "modelID": "claude-sonnet-4-5-20250929", + "tokens": {"input": 100, "output": 50, "reasoning": 0, "cache": {"read": 0, "write": 0}}, + "cost": 0.0123, + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(OpenCodeNormalizer().normalize(row)) + extras = json.loads(events[0]["raw_extras"]) + assert extras["embeddedCost"] == 0.0123 + assert extras["modelID"] == "claude-sonnet-4-5-20250929" diff --git a/tests/stackunderflow/etl/normalize/test_pi.py b/tests/stackunderflow/etl/normalize/test_pi.py new file mode 100644 index 0000000..23e8419 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_pi.py @@ -0,0 +1,95 @@ +"""PiNormalizer — also covers OMP (same parser, different roots).""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import PiNormalizer +from stackunderflow.etl.normalize.base import ( + COST_SOURCE_RATE_CARD, + COST_SOURCE_UNKNOWN, +) + + +def _msg_row(**overrides) -> dict: + base = { + "id": 1300, + "provider": "pi", + "project_id": 13, + "session_id": "pi-sess", + "timestamp": "2026-04-25T22:00:00+00:00", + "role": "assistant", + "model": "gpt-5", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", input_tokens=10) + assert list(PiNormalizer().normalize(row)) == [] + + +def test_explicit_usage_canonical_mapping() -> None: + """Spec: cacheWrite → cache_create_tokens, cacheRead → cache_read_tokens.""" + raw = { + "message": { + "usage": { + "input": 600, + "output": 300, + "cacheRead": 900, + "cacheWrite": 150, + } + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(PiNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 600 + assert ev["output_tokens"] == 300 + assert ev["cache_read_tokens"] == 900 + assert ev["cache_create_tokens"] == 150 + + +def test_omp_routing_uses_same_class() -> None: + """OMP rows route through the same PiNormalizer registration.""" + from stackunderflow.etl.normalize import get + assert get("omp") is PiNormalizer + + +def test_canonical_columns_fallback() -> None: + row = _msg_row( + input_tokens=100, + output_tokens=50, + cache_read_tokens=200, + cache_create_tokens=25, + ) + events = list(PiNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 100 + assert ev["cache_read_tokens"] == 200 + + +def test_no_usage_data_yields_zero_events() -> None: + row = _msg_row() + assert list(PiNormalizer().normalize(row)) == [] + + +def test_unknown_model_stamps_unknown() -> None: + row = _msg_row(input_tokens=100, output_tokens=100, model="pi-future-2030") + events = list(PiNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_UNKNOWN + + +def test_known_gpt_model_stamps_rate_card() -> None: + row = _msg_row(input_tokens=100, output_tokens=100, model="gpt-5-codex") + events = list(PiNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_RATE_CARD diff --git a/tests/stackunderflow/etl/normalize/test_qwen.py b/tests/stackunderflow/etl/normalize/test_qwen.py new file mode 100644 index 0000000..67dbef6 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_qwen.py @@ -0,0 +1,121 @@ +"""QwenNormalizer — Gemini-shaped usageMetadata with cached subtraction.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import QwenNormalizer +from stackunderflow.etl.normalize.base import COST_SOURCE_UNKNOWN + + +def _msg_row(**overrides) -> dict: + base = { + "id": 1400, + "provider": "qwen", + "project_id": 14, + "session_id": "qwen-sess", + "timestamp": "2026-04-25T23:00:00+00:00", + "role": "assistant", + "model": "qwen-3-coder", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user", input_tokens=10) + assert list(QwenNormalizer().normalize(row)) == [] + + +def test_subtract_cached_from_input() -> None: + """Spec: prompt - cached = fresh input.""" + row = _msg_row( + promptTokenCount=1000, + cachedContentTokenCount=300, + candidatesTokenCount=500, + thoughtsTokenCount=0, + ) + events = list(QwenNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 700 + assert ev["output_tokens"] == 500 + assert ev["cache_read_tokens"] == 300 + assert ev["cache_create_tokens"] == 0 + + +def test_fold_thoughts_into_output() -> None: + row = _msg_row( + promptTokenCount=200, + cachedContentTokenCount=0, + candidatesTokenCount=300, + thoughtsTokenCount=150, + ) + events = list(QwenNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 200 + assert ev["output_tokens"] == 450 + + +def test_combined_subtract_and_fold() -> None: + row = _msg_row( + promptTokenCount=1000, + cachedContentTokenCount=300, + candidatesTokenCount=500, + thoughtsTokenCount=200, + ) + events = list(QwenNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 700 + assert ev["output_tokens"] == 700 + assert ev["cache_read_tokens"] == 300 + + +def test_raw_json_usage_metadata_path() -> None: + raw = { + "usageMetadata": { + "promptTokenCount": 500, + "cachedContentTokenCount": 100, + "candidatesTokenCount": 250, + "thoughtsTokenCount": 50, + } + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(QwenNormalizer().normalize(row)) + ev = events[0] + assert ev["input_tokens"] == 400 # 500 - 100 + assert ev["output_tokens"] == 300 # 250 + 50 + assert ev["cache_read_tokens"] == 100 + + +def test_unknown_model_stamps_unknown() -> None: + row = _msg_row(promptTokenCount=100, candidatesTokenCount=50) + events = list(QwenNormalizer().normalize(row)) + assert events[0]["cost_source"] == COST_SOURCE_UNKNOWN + + +def test_no_usage_yields_zero_events() -> None: + row = _msg_row() + assert list(QwenNormalizer().normalize(row)) == [] + + +def test_raw_extras_preserves_function_call() -> None: + raw = { + "usageMetadata": { + "promptTokenCount": 100, + "candidatesTokenCount": 50, + "cachedContentTokenCount": 0, + "thoughtsTokenCount": 0, + }, + "functionCall": {"name": "read_file", "args": {"path": "foo.py"}}, + } + row = _msg_row(raw_json=json.dumps(raw)) + events = list(QwenNormalizer().normalize(row)) + extras = json.loads(events[0]["raw_extras"]) + assert extras["functionCall"]["name"] == "read_file" diff --git a/tests/stackunderflow/etl/normalize/test_roocode.py b/tests/stackunderflow/etl/normalize/test_roocode.py new file mode 100644 index 0000000..9f7b0a8 --- /dev/null +++ b/tests/stackunderflow/etl/normalize/test_roocode.py @@ -0,0 +1,69 @@ +"""RooCodeNormalizer — Cline-family wrapper, only provider_name differs.""" + +from __future__ import annotations + +import json + +from stackunderflow.etl.normalize import RooCodeNormalizer + + +def _msg_row(**overrides) -> dict: + base = { + "id": 1500, + "provider": "roocode", + "project_id": 15, + "session_id": "roo-task-id", + "timestamp": "2026-04-26T00:00:00+00:00", + "role": "assistant", + "model": "claude-sonnet-4-5-20250929", + "speed": "standard", + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_create_tokens": 0, + "content_text": "", + "raw_json": "{}", + } + base.update(overrides) + return base + + +def test_provider_name_is_roocode() -> None: + assert RooCodeNormalizer.provider_name == "roocode" + + +def test_api_req_started_tokens_parsed_from_text_blob() -> None: + """Cline-shape api_req_started.text JSON parses identically.""" + text = json.dumps({ + "tokensIn": 800, + "tokensOut": 400, + "cacheReads": 600, + "cacheWrites": 200, + "cost": 0.0066, + }) + raw = {"text": text} + row = _msg_row(raw_json=json.dumps(raw)) + events = list(RooCodeNormalizer().normalize(row)) + assert len(events) == 1 + ev = events[0] + assert ev["input_tokens"] == 800 + assert ev["output_tokens"] == 400 + assert ev["cache_read_tokens"] == 600 + assert ev["cache_create_tokens"] == 200 + assert ev["provider"] == "roocode" + + +def test_user_role_yields_zero_events() -> None: + row = _msg_row(role="user") + assert list(RooCodeNormalizer().normalize(row)) == [] + + +def test_no_tokens_yields_zero_events() -> None: + row = _msg_row() + assert list(RooCodeNormalizer().normalize(row)) == [] + + +def test_canonical_columns_fallback() -> None: + row = _msg_row(input_tokens=100, output_tokens=50) + events = list(RooCodeNormalizer().normalize(row)) + assert events[0]["input_tokens"] == 100