Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gradata/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,5 @@ testpaths = ["tests"]
pythonpath = ["src"]
markers = [
"integration: tests that hit external LLM APIs (cost money, skip in CI)",
"dualwrite: dual-write crash recovery and reconciliation tests",
]
12 changes: 8 additions & 4 deletions Gradata/src/gradata/_doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import contextlib
import json
import os
import shutil
Expand Down Expand Up @@ -125,6 +126,11 @@ def _resolve_brain_path():
return None


def resolve_brain_path(brain_dir: str | Path | None = None) -> Path | None:
"""Public wrapper used by CLI subcommands that need the doctor target."""
return Path(brain_dir).resolve() if brain_dir else _resolve_brain_path()


def _skip(name: str) -> dict:
return {"name": name, "status": "skip", "detail": "no brain dir resolved"}

Expand Down Expand Up @@ -367,10 +373,8 @@ def _probe_api(url: str, bearer: str) -> tuple[int, str]:
return resp.status, body
except urllib.error.HTTPError as e:
body = ""
try:
with contextlib.suppress(Exception):
body = e.read(512).decode("utf-8", errors="replace")
except Exception:
pass
return e.code, body
except (urllib.error.URLError, OSError) as e:
return 0, str(e)
Expand Down Expand Up @@ -475,7 +479,7 @@ def diagnose(
}
"""
# Resolve brain path
brain_path = Path(brain_dir).resolve() if brain_dir else _resolve_brain_path()
brain_path = resolve_brain_path(brain_dir)

if cloud_only:
checks = _cloud_checks()
Expand Down
145 changes: 101 additions & 44 deletions Gradata/src/gradata/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,99 @@ def _ensure_table(conn: sqlite3.Connection):
_schema_initialized.add(db_file)


def _insert_event_projection(
conn: sqlite3.Connection,
event: dict,
*,
brain_dir: Path,
) -> int | None:
"""Project one canonical JSONL event into SQLite idempotently."""
_ensure_table(conn)
tid = tenant_for(brain_dir)
cursor = conn.execute(
"INSERT OR IGNORE INTO events "
"(ts, session, type, source, data_json, tags_json, valid_from, valid_until, tenant_id, schema_version) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)",
(
event.get("ts", ""),
event.get("session"),
event.get("type", ""),
event.get("source", ""),
json.dumps(event.get("data", {}), default=str),
json.dumps(event.get("tags", []), default=str),
event.get("valid_from"),
event.get("valid_until"),
tid,
),
)
if cursor.rowcount == 1:
return cursor.lastrowid
existing = conn.execute(
"SELECT id FROM events WHERE tenant_id=? AND ts=? AND type=? AND source=?",
(tid, event.get("ts", ""), event.get("type", ""), event.get("source", "")),
).fetchone()
return existing[0] if existing else None


def reconcile_jsonl_to_sqlite(
brain_dir: str | Path | None = None,
*,
ctx: BrainContext | None = None,
) -> dict:
"""Replay canonical ``events.jsonl`` rows into the SQLite projection.

JSONL is the source of truth. SQLite is a query projection that may lag
after process death between append+fsync and DB commit. This function is
idempotent because inserts use the event dedup key.
"""
if ctx is not None:
events_jsonl = ctx.events_jsonl
db_path = ctx.db_path
root = ctx.brain_dir
else:
root = Path(brain_dir).resolve() if brain_dir is not None else _p.BRAIN_DIR
events_jsonl = root / "events.jsonl"
db_path = root / "system.db"

if not events_jsonl.exists():
return {"jsonl_events": 0, "sqlite_events_before": 0, "replayed": 0, "invalid": 0}

invalid = 0
total = 0
with (
open(events_jsonl, encoding="utf-8", errors="replace") as fh,
contextlib.closing(sqlite3.connect(str(db_path))) as conn,
):
conn.execute("PRAGMA busy_timeout=5000")
_ensure_table(conn)
before = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
for raw in fh:
raw = raw.strip()
if not raw:
continue
try:
event = json.loads(raw)
except json.JSONDecodeError:
invalid += 1
continue
if not isinstance(event, dict):
invalid += 1
continue
total += 1
_insert_event_projection(conn, event, brain_dir=root)
conn.commit()
after = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]

return {
"jsonl_events": total,
"sqlite_events_before": before,
"sqlite_events_after": after,
"drift": max(0, total - before),
"replayed": max(0, after - before),
"invalid": invalid,
}


def emit(
event_type: str,
source: str,
Expand Down Expand Up @@ -248,42 +341,23 @@ def emit(
try:
_locked_append(events_jsonl, json.dumps(event, ensure_ascii=False) + "\n")
jsonl_ok = True
delay_ms = os.environ.get("GRADATA_DUALWRITE_JSONL_FSYNC_DELAY_MS", "").strip()
if delay_ms:
import time

with contextlib.suppress(ValueError):
time.sleep(max(0.0, float(delay_ms)) / 1000.0)
except Exception as e:
_log.error("JSONL write failed: %s", e)

try:
with contextlib.closing(sqlite3.connect(str(db_path))) as conn:
_ensure_table(conn)
# INSERT OR IGNORE + UNIQUE(ts,type,source) makes emit() idempotent
# across retries and partial-write recoveries. If an identical
# event was already persisted (same dedup key), the INSERT is a
# no-op -- we then look up the pre-existing row's id so callers
# that depend on `event["id"]` still get the real rowid.
_tid = tenant_for(db_path.parent)
cursor = conn.execute(
"INSERT OR IGNORE INTO events "
"(ts, session, type, source, data_json, tags_json, valid_from, valid_until, tenant_id, schema_version) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)",
(
ts,
session,
event_type,
source,
json.dumps(redacted_data),
json.dumps(redacted_tags),
valid_from,
valid_until,
_tid,
),
)
if cursor.rowcount == 1:
event["id"] = cursor.lastrowid
else:
existing = conn.execute(
"SELECT id FROM events WHERE tenant_id=? AND ts=? AND type=? AND source=?",
(_tid, ts, event_type, source),
).fetchone()
event["id"] = existing[0] if existing else None
event["id"] = _insert_event_projection(conn, event, brain_dir=db_path.parent)
conn.commit()
sqlite_ok = True
except Exception as e:
Expand Down Expand Up @@ -729,25 +803,8 @@ def flush(self) -> dict:
with contextlib.closing(sqlite3.connect(str(self.db_path))) as conn:
_ensure_table(conn)
conn.execute("PRAGMA busy_timeout=5000")
_tid = tenant_for(self.brain_dir)
for event in new_events:
conn.execute(
"INSERT OR IGNORE INTO events "
"(ts, session, type, source, data_json, tags_json, "
" valid_from, valid_until, tenant_id, schema_version) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)",
(
event.get("ts", ""),
event.get("session"),
event.get("type", ""),
event.get("source", ""),
json.dumps(event.get("data", {}), default=str),
json.dumps(event.get("tags", []), default=str),
event.get("valid_from"),
event.get("valid_until"),
_tid,
),
)
_insert_event_projection(conn, event, brain_dir=self.brain_dir)
conn.commit()
except Exception as db_exc:
result["errors"].append(f"Phase 2 DB: {db_exc}")
Expand Down
27 changes: 24 additions & 3 deletions Gradata/src/gradata/brain.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ class Brain(BrainInspectionMixin):

def __init__(
self,
brain_dir: str | Path,
brain_dir: str | Path | None = None,
working_dir: str | Path | None = None,
encryption_key: str | None = None,
):
self.dir = Path(brain_dir).resolve()
from gradata._paths import resolve_brain_dir

self.dir = resolve_brain_dir(brain_dir)
if not self.dir.exists():
from gradata.exceptions import BrainNotFoundError

Expand Down Expand Up @@ -136,6 +138,16 @@ def __init__(

run_migrations(self.db_path)

# JSONL is canonical; SQLite is a query projection. A process can die
# after append+fsync and before SQLite commit, so every open heals any
# projection lag before readers query the events table.
try:
from gradata._events import reconcile_jsonl_to_sqlite

reconcile_jsonl_to_sqlite(ctx=self.ctx)
except Exception as exc:
logger.debug("event projection reconcile failed: %s", exc)

# Initialize pattern registries (lazy — ImportError safe)
try:
from gradata.enhancements.behavioral_engine import DirectiveRegistry
Expand Down Expand Up @@ -1505,8 +1517,17 @@ def get_facts(self, prospect: str | None = None, fact_type: str | None = None) -
except ImportError:
return []

def observe(self, messages: list[dict], user_id: str = "default") -> list[dict]:
def observe(
self,
messages: list[dict] | str,
user_id: str = "default",
*,
kind: str | None = None,
) -> list[dict] | dict:
"""Extract facts from a conversation without requiring corrections."""
if isinstance(messages, str):
event_type = (kind or "observation").strip().upper()
return self.emit(event_type, "brain.observe", {"content": messages})
try:
from gradata.enhancements.memory_extraction import MemoryExtractor
except ImportError:
Expand Down
25 changes: 25 additions & 0 deletions Gradata/src/gradata/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,26 @@ def cmd_doctor(args):
from gradata._doctor import diagnose, print_diagnosis

brain_dir = getattr(args, "brain_dir", None)
if getattr(args, "reconcile", False):
from gradata._doctor import resolve_brain_path
from gradata._events import reconcile_jsonl_to_sqlite

brain_path = resolve_brain_path(brain_dir)
if brain_path is None:
print("reconcile: no brain dir resolved", file=sys.stderr)
sys.exit(1)
result = reconcile_jsonl_to_sqlite(brain_path)
if getattr(args, "json", False):
print(json.dumps({"reconcile": result}, indent=2))
else:
print(
"reconcile: "
f"drift={result.get('drift', 0)} "
f"replayed={result.get('replayed', 0)} "
f"jsonl={result.get('jsonl_events', 0)} "
f"sqlite={result.get('sqlite_events_after', result.get('sqlite_events_before', 0))}"
)
return
cloud_only = getattr(args, "cloud", False)
include_cloud = not getattr(args, "no_cloud", False)
report = diagnose(
Expand Down Expand Up @@ -1227,6 +1247,11 @@ def main():
p_doctor.add_argument("--json", action="store_true", help="Output as JSON")
p_doctor.add_argument("--cloud", action="store_true", help="Only run cloud checks")
p_doctor.add_argument("--no-cloud", action="store_true", help="Skip cloud checks (offline)")
p_doctor.add_argument(
"--reconcile",
action="store_true",
help="Replay events.jsonl into system.db and report healed drift",
)

# install
p_install = sub.add_parser("install", help="Install a brain from marketplace archive")
Expand Down
30 changes: 28 additions & 2 deletions Gradata/src/gradata/cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ def sync(self, batch_size: int = 500) -> int:
except json.JSONDecodeError:
continue
ts = ev.get("ts", "")
# Coerce non-string ts (e.g. float epoch) to string for safe compare
if not isinstance(ts, str):
ts = str(ts)
ev["ts"] = ts
if ts > last_watermark:
pending.append(ev)

Expand Down Expand Up @@ -240,17 +244,35 @@ def _format_event(ev: dict) -> dict:
server can upsert idempotently on (brain_id, event_id).
"""
ts = ev.get("ts", "")
if not isinstance(ts, str):
ts = str(ts)
event_type = ev.get("type", "")
source = ev.get("source", "")
raw = f"{ts}:{event_type}:{source}"
event_id = hashlib.sha256(raw.encode()).hexdigest()[:32]
# Coerce session to int|None — server schema rejects floats/strings
session_raw = ev.get("session")
session_val: int | None
try:
if session_raw is None:
session_val = None
elif isinstance(session_raw, bool):
session_val = None
elif isinstance(session_raw, int):
session_val = session_raw
elif isinstance(session_raw, float):
session_val = int(session_raw)
else:
session_val = int(str(session_raw))
except (ValueError, TypeError):
session_val = None
return {
"event_id": event_id,
"type": event_type,
"source": source,
"data": ev.get("data", {}),
"tags": ev.get("tags", []),
"session": ev.get("session"),
"session": session_val,
"created_at": ts or None,
}

Expand All @@ -276,7 +298,11 @@ def _post(self, path: str, data: dict) -> dict[str, Any]:
except HTTPError as e:
if e.code == 413:
raise _TooLargeError() from e
raise ConnectionError(f"Cloud API request failed: {e}") from e
try:
body = e.read().decode("utf-8", errors="replace")[:500]
except Exception:
body = ""
raise ConnectionError(f"Cloud API request failed: {e} body={body}") from e
except URLError as e:
raise ConnectionError(f"Cloud API request failed: {e}") from e

Expand Down
2 changes: 1 addition & 1 deletion Gradata/src/gradata/enhancements/rule_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def run_rule_pipeline(

# Rank rules using retrieval fusion if available
try:
from gradata.enhancements.retrieval_fusion import ( # type: ignore[import]
from gradata.enhancements.scoring.retrieval_fusion import ( # type: ignore[import]
ScoredRule,
apply_correction_boost,
reciprocal_rank_fusion,
Expand Down
Loading
Loading