diff --git a/Gradata/pyproject.toml b/Gradata/pyproject.toml
index d9bd5729..ccea1350 100644
--- a/Gradata/pyproject.toml
+++ b/Gradata/pyproject.toml
@@ -145,4 +145,5 @@ testpaths = ["tests"]
pythonpath = ["src"]
markers = [
"integration: tests that hit external LLM APIs (cost money, skip in CI)",
+ "dualwrite: dual-write crash recovery and reconciliation tests",
]
diff --git a/Gradata/src/gradata/_doctor.py b/Gradata/src/gradata/_doctor.py
index 0265981a..b147db7c 100644
--- a/Gradata/src/gradata/_doctor.py
+++ b/Gradata/src/gradata/_doctor.py
@@ -14,6 +14,7 @@
from __future__ import annotations
+import contextlib
import json
import os
import shutil
@@ -125,6 +126,11 @@ def _resolve_brain_path():
return None
+def resolve_brain_path(brain_dir: str | Path | None = None) -> Path | None:
+ """Public wrapper used by CLI subcommands that need the doctor target."""
+ return Path(brain_dir).resolve() if brain_dir else _resolve_brain_path()
+
+
def _skip(name: str) -> dict:
return {"name": name, "status": "skip", "detail": "no brain dir resolved"}
@@ -367,10 +373,8 @@ def _probe_api(url: str, bearer: str) -> tuple[int, str]:
return resp.status, body
except urllib.error.HTTPError as e:
body = ""
- try:
+ with contextlib.suppress(Exception):
body = e.read(512).decode("utf-8", errors="replace")
- except Exception:
- pass
return e.code, body
except (urllib.error.URLError, OSError) as e:
return 0, str(e)
@@ -475,7 +479,7 @@ def diagnose(
}
"""
# Resolve brain path
- brain_path = Path(brain_dir).resolve() if brain_dir else _resolve_brain_path()
+ brain_path = resolve_brain_path(brain_dir)
if cloud_only:
checks = _cloud_checks()
diff --git a/Gradata/src/gradata/_events.py b/Gradata/src/gradata/_events.py
index 6fc2c1d1..2ebf0b75 100644
--- a/Gradata/src/gradata/_events.py
+++ b/Gradata/src/gradata/_events.py
@@ -153,6 +153,99 @@ def _ensure_table(conn: sqlite3.Connection):
_schema_initialized.add(db_file)
+def _insert_event_projection(
+ conn: sqlite3.Connection,
+ event: dict,
+ *,
+ brain_dir: Path,
+) -> int | None:
+ """Project one canonical JSONL event into SQLite idempotently."""
+ _ensure_table(conn)
+ tid = tenant_for(brain_dir)
+ cursor = conn.execute(
+ "INSERT OR IGNORE INTO events "
+ "(ts, session, type, source, data_json, tags_json, valid_from, valid_until, tenant_id, schema_version) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)",
+ (
+ event.get("ts", ""),
+ event.get("session"),
+ event.get("type", ""),
+ event.get("source", ""),
+ json.dumps(event.get("data", {}), default=str),
+ json.dumps(event.get("tags", []), default=str),
+ event.get("valid_from"),
+ event.get("valid_until"),
+ tid,
+ ),
+ )
+ if cursor.rowcount == 1:
+ return cursor.lastrowid
+ existing = conn.execute(
+ "SELECT id FROM events WHERE tenant_id=? AND ts=? AND type=? AND source=?",
+ (tid, event.get("ts", ""), event.get("type", ""), event.get("source", "")),
+ ).fetchone()
+ return existing[0] if existing else None
+
+
+def reconcile_jsonl_to_sqlite(
+ brain_dir: str | Path | None = None,
+ *,
+ ctx: BrainContext | None = None,
+) -> dict:
+ """Replay canonical ``events.jsonl`` rows into the SQLite projection.
+
+ JSONL is the source of truth. SQLite is a query projection that may lag
+ after process death between append+fsync and DB commit. This function is
+ idempotent because inserts use the event dedup key.
+ """
+ if ctx is not None:
+ events_jsonl = ctx.events_jsonl
+ db_path = ctx.db_path
+ root = ctx.brain_dir
+ else:
+ root = Path(brain_dir).resolve() if brain_dir is not None else _p.BRAIN_DIR
+ events_jsonl = root / "events.jsonl"
+ db_path = root / "system.db"
+
+ if not events_jsonl.exists():
+ return {"jsonl_events": 0, "sqlite_events_before": 0, "replayed": 0, "invalid": 0}
+
+ invalid = 0
+ total = 0
+ with (
+ open(events_jsonl, encoding="utf-8", errors="replace") as fh,
+ contextlib.closing(sqlite3.connect(str(db_path))) as conn,
+ ):
+ conn.execute("PRAGMA busy_timeout=5000")
+ _ensure_table(conn)
+ before = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
+ for raw in fh:
+ raw = raw.strip()
+ if not raw:
+ continue
+ try:
+ event = json.loads(raw)
+ except json.JSONDecodeError:
+ invalid += 1
+ continue
+ if not isinstance(event, dict):
+ invalid += 1
+ continue
+ total += 1
+ _insert_event_projection(conn, event, brain_dir=root)
+ conn.commit()
+ after = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
+
+ return {
+ "jsonl_events": total,
+ "sqlite_events_before": before,
+ "sqlite_events_after": after,
+ "drift": max(0, total - before),
+ "replayed": max(0, after - before),
+ "invalid": invalid,
+ }
+
+
def emit(
event_type: str,
source: str,
@@ -248,42 +341,23 @@ def emit(
try:
_locked_append(events_jsonl, json.dumps(event, ensure_ascii=False) + "\n")
jsonl_ok = True
+ delay_ms = os.environ.get("GRADATA_DUALWRITE_JSONL_FSYNC_DELAY_MS", "").strip()
+ if delay_ms:
+ import time
+
+ with contextlib.suppress(ValueError):
+ time.sleep(max(0.0, float(delay_ms)) / 1000.0)
except Exception as e:
_log.error("JSONL write failed: %s", e)
try:
with contextlib.closing(sqlite3.connect(str(db_path))) as conn:
- _ensure_table(conn)
# INSERT OR IGNORE + UNIQUE(ts,type,source) makes emit() idempotent
# across retries and partial-write recoveries. If an identical
# event was already persisted (same dedup key), the INSERT is a
# no-op -- we then look up the pre-existing row's id so callers
# that depend on `event["id"]` still get the real rowid.
- _tid = tenant_for(db_path.parent)
- cursor = conn.execute(
- "INSERT OR IGNORE INTO events "
- "(ts, session, type, source, data_json, tags_json, valid_from, valid_until, tenant_id, schema_version) "
- "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)",
- (
- ts,
- session,
- event_type,
- source,
- json.dumps(redacted_data),
- json.dumps(redacted_tags),
- valid_from,
- valid_until,
- _tid,
- ),
- )
- if cursor.rowcount == 1:
- event["id"] = cursor.lastrowid
- else:
- existing = conn.execute(
- "SELECT id FROM events WHERE tenant_id=? AND ts=? AND type=? AND source=?",
- (_tid, ts, event_type, source),
- ).fetchone()
- event["id"] = existing[0] if existing else None
+ event["id"] = _insert_event_projection(conn, event, brain_dir=db_path.parent)
conn.commit()
sqlite_ok = True
except Exception as e:
@@ -729,25 +803,8 @@ def flush(self) -> dict:
with contextlib.closing(sqlite3.connect(str(self.db_path))) as conn:
_ensure_table(conn)
conn.execute("PRAGMA busy_timeout=5000")
- _tid = tenant_for(self.brain_dir)
for event in new_events:
- conn.execute(
- "INSERT OR IGNORE INTO events "
- "(ts, session, type, source, data_json, tags_json, "
- " valid_from, valid_until, tenant_id, schema_version) "
- "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)",
- (
- event.get("ts", ""),
- event.get("session"),
- event.get("type", ""),
- event.get("source", ""),
- json.dumps(event.get("data", {}), default=str),
- json.dumps(event.get("tags", []), default=str),
- event.get("valid_from"),
- event.get("valid_until"),
- _tid,
- ),
- )
+ _insert_event_projection(conn, event, brain_dir=self.brain_dir)
conn.commit()
except Exception as db_exc:
result["errors"].append(f"Phase 2 DB: {db_exc}")
diff --git a/Gradata/src/gradata/brain.py b/Gradata/src/gradata/brain.py
index 00b56880..42914c02 100644
--- a/Gradata/src/gradata/brain.py
+++ b/Gradata/src/gradata/brain.py
@@ -60,11 +60,13 @@ class Brain(BrainInspectionMixin):
def __init__(
self,
- brain_dir: str | Path,
+ brain_dir: str | Path | None = None,
working_dir: str | Path | None = None,
encryption_key: str | None = None,
):
- self.dir = Path(brain_dir).resolve()
+ from gradata._paths import resolve_brain_dir
+
+ self.dir = resolve_brain_dir(brain_dir)
if not self.dir.exists():
from gradata.exceptions import BrainNotFoundError
@@ -136,6 +138,16 @@ def __init__(
run_migrations(self.db_path)
+ # JSONL is canonical; SQLite is a query projection. A process can die
+ # after append+fsync and before SQLite commit, so every open heals any
+ # projection lag before readers query the events table.
+ try:
+ from gradata._events import reconcile_jsonl_to_sqlite
+
+ reconcile_jsonl_to_sqlite(ctx=self.ctx)
+ except Exception as exc:
+ logger.debug("event projection reconcile failed: %s", exc)
+
# Initialize pattern registries (lazy — ImportError safe)
try:
from gradata.enhancements.behavioral_engine import DirectiveRegistry
@@ -1505,8 +1517,17 @@ def get_facts(self, prospect: str | None = None, fact_type: str | None = None) -
except ImportError:
return []
- def observe(self, messages: list[dict], user_id: str = "default") -> list[dict]:
+ def observe(
+ self,
+ messages: list[dict] | str,
+ user_id: str = "default",
+ *,
+ kind: str | None = None,
+ ) -> list[dict] | dict:
"""Extract facts from a conversation without requiring corrections."""
+ if isinstance(messages, str):
+ event_type = (kind or "observation").strip().upper()
+ return self.emit(event_type, "brain.observe", {"content": messages})
try:
from gradata.enhancements.memory_extraction import MemoryExtractor
except ImportError:
diff --git a/Gradata/src/gradata/cli.py b/Gradata/src/gradata/cli.py
index e8b90eb5..04a4aa9d 100644
--- a/Gradata/src/gradata/cli.py
+++ b/Gradata/src/gradata/cli.py
@@ -221,6 +221,26 @@ def cmd_doctor(args):
from gradata._doctor import diagnose, print_diagnosis
brain_dir = getattr(args, "brain_dir", None)
+ if getattr(args, "reconcile", False):
+ from gradata._doctor import resolve_brain_path
+ from gradata._events import reconcile_jsonl_to_sqlite
+
+ brain_path = resolve_brain_path(brain_dir)
+ if brain_path is None:
+ print("reconcile: no brain dir resolved", file=sys.stderr)
+ sys.exit(1)
+ result = reconcile_jsonl_to_sqlite(brain_path)
+ if getattr(args, "json", False):
+ print(json.dumps({"reconcile": result}, indent=2))
+ else:
+ print(
+ "reconcile: "
+ f"drift={result.get('drift', 0)} "
+ f"replayed={result.get('replayed', 0)} "
+ f"jsonl={result.get('jsonl_events', 0)} "
+ f"sqlite={result.get('sqlite_events_after', result.get('sqlite_events_before', 0))}"
+ )
+ return
cloud_only = getattr(args, "cloud", False)
include_cloud = not getattr(args, "no_cloud", False)
report = diagnose(
@@ -1227,6 +1247,11 @@ def main():
p_doctor.add_argument("--json", action="store_true", help="Output as JSON")
p_doctor.add_argument("--cloud", action="store_true", help="Only run cloud checks")
p_doctor.add_argument("--no-cloud", action="store_true", help="Skip cloud checks (offline)")
+ p_doctor.add_argument(
+ "--reconcile",
+ action="store_true",
+ help="Replay events.jsonl into system.db and report healed drift",
+ )
# install
p_install = sub.add_parser("install", help="Install a brain from marketplace archive")
diff --git a/Gradata/src/gradata/cloud/client.py b/Gradata/src/gradata/cloud/client.py
index bc96f143..a51f9640 100644
--- a/Gradata/src/gradata/cloud/client.py
+++ b/Gradata/src/gradata/cloud/client.py
@@ -161,6 +161,10 @@ def sync(self, batch_size: int = 500) -> int:
except json.JSONDecodeError:
continue
ts = ev.get("ts", "")
+ # Coerce non-string ts (e.g. float epoch) to string for safe compare
+ if not isinstance(ts, str):
+ ts = str(ts)
+ ev["ts"] = ts
if ts > last_watermark:
pending.append(ev)
@@ -240,17 +244,35 @@ def _format_event(ev: dict) -> dict:
server can upsert idempotently on (brain_id, event_id).
"""
ts = ev.get("ts", "")
+ if not isinstance(ts, str):
+ ts = str(ts)
event_type = ev.get("type", "")
source = ev.get("source", "")
raw = f"{ts}:{event_type}:{source}"
event_id = hashlib.sha256(raw.encode()).hexdigest()[:32]
+ # Coerce session to int|None — server schema rejects floats/strings
+ session_raw = ev.get("session")
+ session_val: int | None
+ try:
+ if session_raw is None:
+ session_val = None
+ elif isinstance(session_raw, bool):
+ session_val = None
+ elif isinstance(session_raw, int):
+ session_val = session_raw
+ elif isinstance(session_raw, float):
+ session_val = int(session_raw)
+ else:
+ session_val = int(str(session_raw))
+ except (ValueError, TypeError):
+ session_val = None
return {
"event_id": event_id,
"type": event_type,
"source": source,
"data": ev.get("data", {}),
"tags": ev.get("tags", []),
- "session": ev.get("session"),
+ "session": session_val,
"created_at": ts or None,
}
@@ -276,7 +298,11 @@ def _post(self, path: str, data: dict) -> dict[str, Any]:
except HTTPError as e:
if e.code == 413:
raise _TooLargeError() from e
- raise ConnectionError(f"Cloud API request failed: {e}") from e
+ try:
+ body = e.read().decode("utf-8", errors="replace")[:500]
+ except Exception:
+ body = ""
+ raise ConnectionError(f"Cloud API request failed: {e} body={body}") from e
except URLError as e:
raise ConnectionError(f"Cloud API request failed: {e}") from e
diff --git a/Gradata/src/gradata/enhancements/rule_pipeline.py b/Gradata/src/gradata/enhancements/rule_pipeline.py
index b7fd04fd..29cf3979 100644
--- a/Gradata/src/gradata/enhancements/rule_pipeline.py
+++ b/Gradata/src/gradata/enhancements/rule_pipeline.py
@@ -335,7 +335,7 @@ def run_rule_pipeline(
# Rank rules using retrieval fusion if available
try:
- from gradata.enhancements.retrieval_fusion import ( # type: ignore[import]
+ from gradata.enhancements.scoring.retrieval_fusion import ( # type: ignore[import]
ScoredRule,
apply_correction_boost,
reciprocal_rank_fusion,
diff --git a/Gradata/src/gradata/enhancements/retrieval_fusion.py b/Gradata/src/gradata/enhancements/scoring/retrieval_fusion.py
similarity index 100%
rename from Gradata/src/gradata/enhancements/retrieval_fusion.py
rename to Gradata/src/gradata/enhancements/scoring/retrieval_fusion.py
diff --git a/Gradata/src/gradata/enhancements/self_improvement/_graduation.py b/Gradata/src/gradata/enhancements/self_improvement/_graduation.py
index 680a12f5..5bf8a0f1 100644
--- a/Gradata/src/gradata/enhancements/self_improvement/_graduation.py
+++ b/Gradata/src/gradata/enhancements/self_improvement/_graduation.py
@@ -107,7 +107,12 @@ def _read_beta_lb_config() -> tuple[bool, float, int]:
import math
import os
- enabled = os.environ.get("GRADATA_BETA_LB_GATE", "").lower() in ("1", "true", "yes", "on")
+ enabled = os.environ.get("GRADATA_BETA_LB_GATE", "1").lower() not in (
+ "0",
+ "false",
+ "no",
+ "off",
+ )
try:
threshold = float(os.environ.get("GRADATA_BETA_LB_THRESHOLD", "0.85"))
if not math.isfinite(threshold):
@@ -128,7 +133,7 @@ def _passes_beta_lb_gate(
) -> bool:
"""Beta lower-bound gate on PATTERN -> RULE promotion.
- Opt-in via env var ``GRADATA_BETA_LB_GATE`` (default off). When enabled,
+ Enabled by default; set ``GRADATA_BETA_LB_GATE=0`` to disable. When enabled,
requires the 5th-percentile lower bound of Beta(α, β) to meet the
configured threshold (``GRADATA_BETA_LB_THRESHOLD``, default 0.85) AND
at least ``GRADATA_BETA_LB_MIN_FIRES`` observations (default 5).
diff --git a/Gradata/src/gradata/middleware/_core.py b/Gradata/src/gradata/middleware/_core.py
index 584784e8..3f38e3af 100644
--- a/Gradata/src/gradata/middleware/_core.py
+++ b/Gradata/src/gradata/middleware/_core.py
@@ -16,6 +16,7 @@
from gradata._env import env_str
from gradata.enhancements.rule_to_hook import DeterminismCheck, classify_rule
+from gradata.security.score_obfuscation import obfuscate_instruction
if TYPE_CHECKING: # pragma: no cover
from gradata._types import Lesson
@@ -276,7 +277,10 @@ def build_brain_rules_block(source: RuleSource) -> str:
selected = source.select()
if not selected:
return ""
- lines = [f"[{l.state}:{l.confidence:.2f}] {l.category}: {l.description}" for l in selected]
+ lines = [
+ obfuscate_instruction(f"[{l.state}:{l.confidence:.2f}] {l.category}: {l.description}")
+ for l in selected
+ ]
return "\n" + "\n".join(lines) + "\n"
diff --git a/Gradata/tests/test_dualwrite_atomicity.py b/Gradata/tests/test_dualwrite_atomicity.py
new file mode 100644
index 00000000..f2ba879c
--- /dev/null
+++ b/Gradata/tests/test_dualwrite_atomicity.py
@@ -0,0 +1,197 @@
+"""PR2 spec: dual-write atomicity. Both-or-neither under kill-9 mid-write.
+
+These tests are PATH-AGNOSTIC — they import the public API (Brain) only, not
+internal _events.py paths, so they survive a rebase that moves files around.
+
+Invariant under test:
+ Every event written via Brain MUST land in BOTH events.jsonl AND system.db,
+ OR in NEITHER. A crash mid-write must leave the brain in a recoverable state
+ where `gradata doctor --reconcile` (or Brain re-init) brings them back into
+ agreement without data loss.
+
+Acceptance:
+ test_dualwrite_jsonl_first_then_sqlite — happy path, both written, ordered
+ test_dualwrite_kill9_after_jsonl_before_sqlite — JSONL has event, SQLite missing → reconcile replays
+ test_dualwrite_kill9_after_sqlite_before_jsonl — should not happen (JSONL is source of truth)
+ test_reconcile_idempotent — running reconcile twice = same state
+ test_reconcile_detects_split_brain — doctor --reconcile reports drift count
+ test_concurrent_writers_serialize — two writers don't interleave events
+
+Fixtures use tmp_path BRAIN_DIR per test (conftest.py already does this).
+No new deps. Prefer SQLite WAL + JSONL append-fsync ordering. CAS via
+schema_version sentinel acceptable. Two-phase commit NOT required.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import signal
+import subprocess
+import sys
+import time
+from pathlib import Path
+
+import pytest
+
+pytestmark = pytest.mark.dualwrite
+
+
+# ---------------------------------------------------------------------------
+# Helpers — kill-9 simulation via subprocess so we can pull the plug mid-write
+# ---------------------------------------------------------------------------
+
+def _spawn_writer(brain_dir: Path, n_events: int, kill_after: int | None = None) -> int:
+ """Spawn a child process that writes n_events to a fresh Brain.
+
+ If kill_after is set, the child is SIGKILLed after writing that many events
+ (event count detected by line count of events.jsonl). Returns child pid.
+ """
+ code = f"""
+import os, sys, time
+os.environ['BRAIN_DIR'] = {str(brain_dir)!r}
+os.environ['GRADATA_DUALWRITE_JSONL_FSYNC_DELAY_MS'] = '50'
+from gradata import Brain
+b = Brain()
+for i in range({n_events}):
+ b.observe(f'lesson-{{i}}', kind='correction')
+ time.sleep(0.01)
+"""
+ p = subprocess.Popen([sys.executable, '-c', code])
+ if kill_after is not None:
+ jsonl = brain_dir / 'events.jsonl'
+ deadline = time.time() + 10.0
+ while time.time() < deadline:
+ if jsonl.exists() and sum(1 for _ in jsonl.open()) >= kill_after:
+ os.kill(p.pid, signal.SIGKILL)
+ break
+ time.sleep(0.01)
+ p.wait()
+ return p.returncode
+
+
+def _count_jsonl(brain_dir: Path) -> int:
+ p = brain_dir / 'events.jsonl'
+ return sum(1 for _ in p.open()) if p.exists() else 0
+
+
+def _count_sqlite_events(brain_dir: Path) -> int:
+ """Count rows in the events table of system.db. Tolerant to schema drift."""
+ import sqlite3
+ db = brain_dir / 'system.db'
+ if not db.exists():
+ return 0
+ conn = sqlite3.connect(str(db))
+ try:
+ for table in ('events', 'event_log', 'lessons'):
+ try:
+ cur = conn.execute(f"SELECT COUNT(*) FROM {table}")
+ return cur.fetchone()[0]
+ except sqlite3.OperationalError:
+ continue
+ return 0
+ finally:
+ conn.close()
+
+
+# ---------------------------------------------------------------------------
+# Tests
+# ---------------------------------------------------------------------------
+
+def test_dualwrite_jsonl_first_then_sqlite(tmp_path, monkeypatch):
+ """Happy path. Both stores agree after a normal write batch."""
+ monkeypatch.setenv('BRAIN_DIR', str(tmp_path))
+ from gradata import Brain
+ b = Brain()
+ for i in range(10):
+ b.observe(f'lesson-{i}', kind='correction')
+ assert _count_jsonl(tmp_path) == _count_sqlite_events(tmp_path) == 10
+
+
+def test_dualwrite_kill9_mid_batch_leaves_jsonl_canonical(tmp_path):
+ """Crash mid-batch. JSONL must be ahead of (or equal to) SQLite, never behind."""
+ _spawn_writer(tmp_path, n_events=20, kill_after=5)
+ j = _count_jsonl(tmp_path)
+ s = _count_sqlite_events(tmp_path)
+ assert j >= s, f"JSONL ({j}) must not be behind SQLite ({s}) — JSONL is source of truth"
+ assert j > 0, "kill-9 fired before any write reached disk — flaky fixture"
+
+
+def test_reconcile_replays_missing_sqlite_rows(tmp_path):
+ """After kill-9 + reopen, Brain.__init__ (or doctor --reconcile) must replay JSONL → SQLite."""
+ _spawn_writer(tmp_path, n_events=20, kill_after=5)
+ j_before = _count_jsonl(tmp_path)
+ s_before = _count_sqlite_events(tmp_path)
+ if j_before == s_before:
+ pytest.skip("no drift to reconcile — try a more aggressive kill-after")
+
+ # Trigger reconcile — either via Brain.__init__ auto-replay or doctor CLI
+ os.environ['BRAIN_DIR'] = str(tmp_path)
+ from gradata import Brain
+ Brain() # should auto-replay on init
+
+ assert _count_sqlite_events(tmp_path) == j_before, "reconcile failed to replay JSONL into SQLite"
+
+
+def test_reconcile_idempotent(tmp_path, monkeypatch):
+ """Running reconcile twice produces the same state."""
+ monkeypatch.setenv('BRAIN_DIR', str(tmp_path))
+ from gradata import Brain
+ b = Brain()
+ for i in range(5):
+ b.observe(f'lesson-{i}', kind='correction')
+ snapshot1 = (_count_jsonl(tmp_path), _count_sqlite_events(tmp_path))
+ Brain() # reopen → reconcile pass
+ snapshot2 = (_count_jsonl(tmp_path), _count_sqlite_events(tmp_path))
+ Brain() # again
+ snapshot3 = (_count_jsonl(tmp_path), _count_sqlite_events(tmp_path))
+ assert snapshot1 == snapshot2 == snapshot3
+
+
+def test_doctor_reconcile_reports_drift(tmp_path):
+ """gradata doctor --reconcile must report the drift count it healed."""
+ _spawn_writer(tmp_path, n_events=20, kill_after=5)
+ j_before = _count_jsonl(tmp_path)
+ s_before = _count_sqlite_events(tmp_path)
+ drift = j_before - s_before
+ if drift <= 0:
+ pytest.skip("no drift — fixture didn't crash mid-write")
+
+ env = {**os.environ, 'BRAIN_DIR': str(tmp_path)}
+ r = subprocess.run(
+ [sys.executable, '-m', 'gradata.cli', 'doctor', '--reconcile'],
+ capture_output=True, text=True, env=env, timeout=30,
+ )
+ assert r.returncode == 0, f"doctor --reconcile failed: {r.stderr}"
+ assert 'reconcil' in (r.stdout + r.stderr).lower()
+ assert _count_sqlite_events(tmp_path) == j_before
+
+
+def test_concurrent_writers_serialize(tmp_path):
+ """Two writers should not produce interleaved partial events in JSONL."""
+ p1 = subprocess.Popen([sys.executable, '-c', f"""
+import os; os.environ['BRAIN_DIR'] = {str(tmp_path)!r}
+from gradata import Brain
+b = Brain()
+for i in range(20): b.observe(f'A-{{i}}', kind='correction')
+"""])
+ p2 = subprocess.Popen([sys.executable, '-c', f"""
+import os; os.environ['BRAIN_DIR'] = {str(tmp_path)!r}
+from gradata import Brain
+b = Brain()
+for i in range(20): b.observe(f'B-{{i}}', kind='correction')
+"""])
+ p1.wait()
+ p2.wait()
+
+ # Every line in events.jsonl must be a complete JSON object
+ jsonl = tmp_path / 'events.jsonl'
+ with jsonl.open() as f:
+ for ln, line in enumerate(f, 1):
+ try:
+ json.loads(line)
+ except json.JSONDecodeError as e:
+ pytest.fail(f"corrupted line {ln}: {e}")
+
+ assert _count_jsonl(tmp_path) == _count_sqlite_events(tmp_path), \
+ "concurrent writers desynced jsonl/sqlite"
diff --git a/Gradata/tests/test_initial_confidence_invariant.py b/Gradata/tests/test_initial_confidence_invariant.py
new file mode 100644
index 00000000..3ea5bbd9
--- /dev/null
+++ b/Gradata/tests/test_initial_confidence_invariant.py
@@ -0,0 +1,44 @@
+from __future__ import annotations
+
+from gradata._types import Lesson, LessonState
+from gradata.enhancements.self_improvement import graduate
+from gradata.enhancements.self_improvement._confidence import (
+ INITIAL_CONFIDENCE,
+ MIN_APPLICATIONS_FOR_PATTERN,
+ PATTERN_THRESHOLD,
+)
+
+
+def _lesson(confidence: float, fire_count: int) -> Lesson:
+ return Lesson(
+ date="2026-05-02",
+ state=LessonState.INSTINCT,
+ confidence=confidence,
+ category="PROCESS",
+ description="Follow the existing process",
+ fire_count=fire_count,
+ )
+
+
+def test_fresh_lesson_starts_as_instinct() -> None:
+ lesson = _lesson(INITIAL_CONFIDENCE, 0)
+
+ assert lesson.state is LessonState.INSTINCT
+
+
+def test_pattern_threshold_tie_does_not_promote() -> None:
+ lesson = _lesson(PATTERN_THRESHOLD, MIN_APPLICATIONS_FOR_PATTERN)
+
+ active, graduated = graduate([lesson])
+
+ assert lesson.state is LessonState.INSTINCT
+ assert active == [lesson]
+ assert graduated == []
+
+
+def test_above_pattern_threshold_with_enough_fires_promotes() -> None:
+ lesson = _lesson(PATTERN_THRESHOLD + 0.01, MIN_APPLICATIONS_FOR_PATTERN)
+
+ graduate([lesson])
+
+ assert lesson.state is LessonState.PATTERN
diff --git a/Gradata/tests/test_middleware_core.py b/Gradata/tests/test_middleware_core.py
index a1339110..d3de676b 100644
--- a/Gradata/tests/test_middleware_core.py
+++ b/Gradata/tests/test_middleware_core.py
@@ -43,7 +43,8 @@ def test_build_brain_rules_block_wraps_in_xml():
block = build_brain_rules_block(src)
assert block.startswith("")
assert block.endswith("")
- assert "[RULE:0.95]" in block
+ assert "[RULE]" in block
+ assert "[RULE:" not in block
assert "TONE" in block
@@ -55,7 +56,7 @@ def test_build_brain_rules_block_respects_max_rules():
]
src = RuleSource(lessons=lessons, max_rules=5)
block = build_brain_rules_block(src)
- assert block.count("[RULE:") == 5
+ assert block.count("[RULE]") == 5
def test_check_output_finds_em_dash_violation():
diff --git a/Gradata/tests/test_retrieval_fusion.py b/Gradata/tests/test_retrieval_fusion.py
index 5f442e8b..f25aae41 100644
--- a/Gradata/tests/test_retrieval_fusion.py
+++ b/Gradata/tests/test_retrieval_fusion.py
@@ -3,14 +3,13 @@
import pytest
-from gradata.enhancements.retrieval_fusion import (
+from gradata.enhancements.scoring.retrieval_fusion import (
MergedRule,
ScoredRule,
apply_correction_boost,
reciprocal_rank_fusion,
)
-
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
diff --git a/Gradata/tests/test_rule_graduated_events.py b/Gradata/tests/test_rule_graduated_events.py
index 496c3a20..d82b35f7 100644
--- a/Gradata/tests/test_rule_graduated_events.py
+++ b/Gradata/tests/test_rule_graduated_events.py
@@ -59,7 +59,8 @@ def test_instinct_to_pattern_emits_rule_graduated(tmp_path):
assert data["fire_count"] == 5
-def test_pattern_to_rule_emits_rule_graduated(tmp_path):
+def test_pattern_to_rule_emits_rule_graduated(tmp_path, monkeypatch):
+ monkeypatch.setenv("GRADATA_BETA_LB_GATE", "0")
brain = init_brain(tmp_path)
lesson = Lesson(
date="2026-04-21",
diff --git a/Gradata/tests/test_rule_pipeline.py b/Gradata/tests/test_rule_pipeline.py
index 53d77b93..7ef77f12 100644
--- a/Gradata/tests/test_rule_pipeline.py
+++ b/Gradata/tests/test_rule_pipeline.py
@@ -168,7 +168,7 @@ def test_pipeline_does_not_graduate_instinct_below_threshold(tmp_path: Path) ->
assert result.graduated == []
-def test_pipeline_graduates_pattern_to_rule(tmp_path: Path) -> None:
+def test_pipeline_graduates_pattern_to_rule(tmp_path: Path, monkeypatch) -> None:
"""PATTERN lesson at 0.90 confidence with >= 5 fires graduates to RULE.
C2 fix: MIN_APPLICATIONS_FOR_RULE was accidentally lowered to 3 in
@@ -177,6 +177,7 @@ def test_pipeline_graduates_pattern_to_rule(tmp_path: Path) -> None:
which only passed because of the bug. Updated to fire_count=5 which is
the correct threshold.
"""
+ monkeypatch.setenv("GRADATA_BETA_LB_GATE", "0")
lesson = _make_lesson(
state=LessonState.PATTERN,
confidence=0.90,
@@ -242,7 +243,7 @@ def test_pipeline_handles_missing_retrieval_fusion_module(tmp_path: Path) -> Non
_write_lessons(lessons_path, [lesson])
db_path = tmp_path / "system.db"
- with patch.dict(sys.modules, {"gradata.enhancements.retrieval_fusion": None}):
+ with patch.dict(sys.modules, {"gradata.enhancements.scoring.retrieval_fusion": None}):
result = run_rule_pipeline(lessons_path, db_path, current_session=5)
assert not any("retrieval_fusion" in e for e in result.errors)
diff --git a/Gradata/tests/test_rule_to_hook.py b/Gradata/tests/test_rule_to_hook.py
index 756705b1..430c8796 100644
--- a/Gradata/tests/test_rule_to_hook.py
+++ b/Gradata/tests/test_rule_to_hook.py
@@ -315,6 +315,7 @@ def test_graduate_promotes_and_installs_hook_for_em_dash(self, tmp_path, monkeyp
from datetime import UTC as _UTC
from datetime import datetime as _dt
+ monkeypatch.setenv("GRADATA_BETA_LB_GATE", "0")
monkeypatch.setenv("GRADATA_HOOK_ROOT", str(tmp_path))
from gradata import _paths as _p
from gradata._types import Lesson, LessonState
diff --git a/Gradata/tests/test_rule_to_hook_promotion.py b/Gradata/tests/test_rule_to_hook_promotion.py
index d0cee0ed..28b3ee87 100644
--- a/Gradata/tests/test_rule_to_hook_promotion.py
+++ b/Gradata/tests/test_rule_to_hook_promotion.py
@@ -264,6 +264,7 @@ def test_graduation_auto_promotes_deterministic_rule(tmp_path, monkeypatch):
from gradata.enhancements.self_improvement import graduate
hook_dir = tmp_path / "pre-tool" / "generated"
+ monkeypatch.setenv("GRADATA_BETA_LB_GATE", "0")
monkeypatch.setenv("GRADATA_HOOK_ROOT", str(hook_dir))
# Satisfy the council empirical gate: fire_count>=10 and >=3 distinct
diff --git a/Gradata/tests/test_safety_assertion.py b/Gradata/tests/test_safety_assertion.py
index 01ec373e..e195c13a 100644
--- a/Gradata/tests/test_safety_assertion.py
+++ b/Gradata/tests/test_safety_assertion.py
@@ -80,8 +80,9 @@ def test_no_rule_without_5_fires(self) -> None:
f"Promoted to {lesson.state} with only {lesson.fire_count} fires"
)
- def test_rule_with_5_fires(self) -> None:
+ def test_rule_with_5_fires(self, monkeypatch) -> None:
"""PATTERN -> RULE succeeds with 5+ fires and sufficient confidence."""
+ monkeypatch.setenv("GRADATA_BETA_LB_GATE", "0")
lesson = _make_lesson(
state=LessonState.PATTERN,
confidence=RULE_THRESHOLD + 0.01,
diff --git a/Gradata/tests/test_score_obfuscation_gate.py b/Gradata/tests/test_score_obfuscation_gate.py
new file mode 100644
index 00000000..d326136f
--- /dev/null
+++ b/Gradata/tests/test_score_obfuscation_gate.py
@@ -0,0 +1,59 @@
+from __future__ import annotations
+
+import re
+
+from gradata import Brain
+from gradata.middleware._core import RuleSource, build_brain_rules_block
+
+_RAW_CONFIDENCE_FLOAT = re.compile(r"(? None:
+ leaks = _RAW_CONFIDENCE_FLOAT.findall(prompt)
+ assert not leaks, f"raw confidence float leaked into prompt-bound text: {prompt}"
+
+
+def test_apply_brain_rules_prompt_does_not_leak_raw_confidence(tmp_path) -> None:
+ brain = Brain.init(
+ tmp_path / "brain",
+ name="ObfuscationGate",
+ domain="Testing",
+ embedding="local",
+ interactive=False,
+ )
+ result = brain.add_rule(
+ "Prefer concrete dates over relative dates",
+ "PROCESS",
+ state="RULE",
+ confidence=0.95,
+ )
+ assert result["added"] is True
+
+ prompt = brain.apply_brain_rules("write a status update", max_rules=5)
+
+ assert "" in prompt
+ _assert_no_raw_confidence_float(prompt)
+
+
+def test_middleware_brain_rules_block_does_not_leak_raw_confidence() -> None:
+ source = RuleSource(
+ lessons=[
+ {
+ "state": "RULE",
+ "confidence": 0.95,
+ "category": "PROCESS",
+ "description": "Prefer concrete dates over relative dates",
+ },
+ {
+ "state": "PATTERN",
+ "confidence": 0.72,
+ "category": "STYLE",
+ "description": "Keep summaries short",
+ },
+ ]
+ )
+
+ prompt = build_brain_rules_block(source)
+
+ assert "" in prompt
+ _assert_no_raw_confidence_float(prompt)
diff --git a/Gradata/tests/test_wiring_compound.py b/Gradata/tests/test_wiring_compound.py
index f35175d2..dc531d0e 100644
--- a/Gradata/tests/test_wiring_compound.py
+++ b/Gradata/tests/test_wiring_compound.py
@@ -76,11 +76,11 @@ def test_monotone_in_alpha(self):
class TestBetaLBGate:
- def test_gate_disabled_by_default_allows_promotion(self, monkeypatch):
+ def test_gate_can_be_disabled_to_allow_promotion(self, monkeypatch):
from gradata._types import Lesson, LessonState
from gradata.enhancements.self_improvement import _passes_beta_lb_gate
- monkeypatch.delenv("GRADATA_BETA_LB_GATE", raising=False)
+ monkeypatch.setenv("GRADATA_BETA_LB_GATE", "0")
lesson = Lesson(
date="2026-04-15", category="test", description="test rule",
state=LessonState.PATTERN, confidence=0.95, fire_count=5,