diff --git a/Gradata/src/gradata/cli.py b/Gradata/src/gradata/cli.py index ea2f098f..d14c2ce2 100644 --- a/Gradata/src/gradata/cli.py +++ b/Gradata/src/gradata/cli.py @@ -24,6 +24,7 @@ import json import logging import sys +from datetime import UTC from pathlib import Path from gradata._env import env_str @@ -140,6 +141,166 @@ def cmd_stats(args): print(f" Has embeddings: {stats['has_embeddings']}") +def cmd_status(args): + """Single human-readable summary of brain health. + + Wraps stats + health + daemon probe + cloud-sync state into one + terminal-renderable block. Designed for the user's daily "what's + going on with my brain" check — `gradata status` and done. + + Output is plain text (no color codes, no Unicode boxes). Stays + under ~40 lines for a typical brain. + """ + import json as _json + import sqlite3 as _sqlite3 + import time as _time + import urllib.error as _urllib_error + import urllib.request as _urllib_request + from datetime import datetime, timezone + + brain = _get_brain(args) + stats = brain.stats() + brain_dir = stats["brain_dir"] + + print(f"Brain: {brain_dir}") + print(f" Database: {stats['db_size_mb']} MB ({stats['markdown_files']} markdown files)") + + # Rules / lessons / corrections from events table + db_path = f"{brain_dir}/system.db" + rules_total = lessons_total = corr_total = 0 + last_correction_ts = None + try: + con = _sqlite3.connect(db_path) + cur = con.cursor() + rules_total = cur.execute( + "SELECT COUNT(*) FROM events WHERE type='RULE_GRADUATED'" + ).fetchone()[0] + lessons_total = cur.execute( + "SELECT COUNT(*) FROM events WHERE type IN ('LESSON_ADDED','LESSON_CHANGE')" + ).fetchone()[0] + corr_total = cur.execute("SELECT COUNT(*) FROM events WHERE type='CORRECTION'").fetchone()[ + 0 + ] + row = cur.execute("SELECT MAX(ts) FROM events WHERE type='CORRECTION'").fetchone() + last_correction_ts = row[0] if row else None + con.close() + except (_sqlite3.OperationalError, OSError): + # Fresh brain or schema drift — show zeros, don't crash. + pass + + print(f" Rules graduated: {rules_total}") + print(f" Lessons: {lessons_total}") + print(f" Corrections: {corr_total}") + if last_correction_ts: + print(f" Last correction: {last_correction_ts}") + + # Sync queue state + pending = total_q = 0 + try: + con = _sqlite3.connect(db_path) + pending = con.execute("SELECT COUNT(*) FROM sync_queue WHERE synced_at IS NULL").fetchone()[ + 0 + ] + total_q = con.execute("SELECT COUNT(*) FROM sync_queue").fetchone()[0] + con.close() + except _sqlite3.OperationalError: + pass + if total_q: + if pending: + print(f" Sync queue: {pending} pending / {total_q} total") + else: + print(f" Sync queue: drained ({total_q} synced)") + + # Daemon health (best-effort, never blocks) + print() + print("Daemon:") + try: + req = _urllib_request.Request( + "http://127.0.0.1:8765/health", + headers={"User-Agent": "gradata-status/1.0"}, + ) + with _urllib_request.urlopen(req, timeout=2) as r: + data = _json.loads(r.read().decode()) + uptime = data.get("uptime_seconds", 0) + hrs = int(uptime // 3600) + mins = int((uptime % 3600) // 60) + print(f" Status: up (uptime {hrs}h{mins}m)") + print(f" Brain dir: {data.get('brain_dir', '?')}") + print(f" SDK version: {data.get('sdk_version', '?')}") + if (data.get("brain_dir") or "").rstrip("/") != brain_dir.rstrip("/"): + print(f" WARNING: daemon brain dir != this brain ({brain_dir})") + except (_urllib_error.URLError, _urllib_error.HTTPError, TimeoutError, OSError): + print(" Status: not running (run: systemctl --user start gradata-daemon)") + + # Cloud sync state (best-effort) + print() + print("Cloud:") + try: + from pathlib import Path as _Path + + key_path = _Path.home() / ".gradata" / "key" + token = key_path.read_text(encoding="utf-8").strip() if key_path.is_file() else "" + if not token: + print(" Status: not configured (run: gradata cloud enable --key )") + else: + req = _urllib_request.Request( + "https://api.gradata.ai/api/v1/brains", + headers={ + "Authorization": f"Bearer {token}", + "User-Agent": "gradata-status/1.0", + }, + ) + with _urllib_request.urlopen(req, timeout=4) as r: + brains = _json.loads(r.read().decode()) + b = brains[0] if isinstance(brains, list) else brains + last_sync = b.get("last_sync") or "(never)" + cloud_corr = b.get("correction_count") or 0 + cloud_lessons = b.get("lesson_count") or 0 + print(f" Last sync: {last_sync}") + print(f" Corrections: {cloud_corr} (local: {corr_total})") + print(f" Lessons: {cloud_lessons} (local: {lessons_total})") + # Lag warning + if last_sync and last_sync != "(never)": + try: + ls = datetime.fromisoformat(last_sync.replace("Z", "+00:00")) + age_min = (datetime.now(UTC) - ls).total_seconds() / 60 + if age_min > 60: + print(f" WARNING: cloud is {int(age_min)}m behind") + except ValueError: + pass + except (_urllib_error.URLError, _urllib_error.HTTPError, TimeoutError, OSError) as exc: + print(f" Status: unreachable ({type(exc).__name__})") + + # Convergence trend — corrections-per-session, last 7 days + print() + print("Convergence (last 7d):") + try: + con = _sqlite3.connect(db_path) + cur = con.cursor() + cutoff = int(_time.time()) - 7 * 86400 + # Sessions and corrections in the last 7 days + sessions_with_data = cur.execute( + """ + SELECT session, COUNT(*) AS n + FROM events + WHERE type='CORRECTION' + AND strftime('%s', ts) >= ? + GROUP BY session + """, + (str(cutoff),), + ).fetchall() + con.close() + if sessions_with_data: + sess_n = len(sessions_with_data) + corr_n = sum(n for _, n in sessions_with_data) + avg = corr_n / sess_n + print(f" Sessions: {sess_n} ({corr_n} corrections, avg {avg:.1f}/session)") + else: + print(" No correction activity in the last 7 days") + except _sqlite3.OperationalError: + print(" (events schema not available)") + + def cmd_audit(args): from gradata._audit import format_audit_text, run_audit @@ -335,9 +496,10 @@ def _cmd_install_agent(args) -> None: # ▸ Flag-gated install verification: write + read a test rule if verify_install and result.action != "failed": try: - from gradata import Brain import tempfile + from gradata import Brain + verification_marker = f"gradata-install-verify-{name}-{os.urandom(4).hex()}" with tempfile.TemporaryDirectory(prefix="gradata-verify-") as verification_tmp: verification_dir = Path(verification_tmp) / "brain" @@ -350,13 +512,10 @@ def _cmd_install_agent(args) -> None: ) results = verification_brain.search(verification_marker, mode="rules", top_k=3) marker_found = any( - verification_marker in (r.get("text") or "").lower() - for r in results + verification_marker in (r.get("text") or "").lower() for r in results ) if not marker_found: - print( - f" ⚠ verify failed: test rule written but not readable for {name}" - ) + print(f" ⚠ verify failed: test rule written but not readable for {name}") had_failure = True else: print(f" ✓ verify: {name} install confirmed (write+read)") @@ -1443,6 +1602,9 @@ def main(): # stats sub.add_parser("stats", help="Brain statistics") + # status (umbrella health check: stats + daemon + cloud + convergence) + sub.add_parser("status", help="Single-page brain/daemon/cloud summary") + # audit p_audit = sub.add_parser("audit", help="Data flow audit") p_audit.add_argument("--json", action="store_true") @@ -1752,6 +1914,7 @@ def main(): "embed": cmd_embed, "manifest": cmd_manifest, "stats": cmd_stats, + "status": cmd_status, "audit": cmd_audit, "sync": cmd_sync, "recall": cmd_recall, diff --git a/Gradata/tests/test_status_command.py b/Gradata/tests/test_status_command.py new file mode 100644 index 00000000..5d7be2bc --- /dev/null +++ b/Gradata/tests/test_status_command.py @@ -0,0 +1,209 @@ +"""Tests for `gradata status` — single-page brain/daemon/cloud summary. + +The command must: +- Print a brain summary block (rules, lessons, corrections from system.db) +- Probe the daemon at 127.0.0.1:8765 (best-effort, never blocks) +- Probe cloud sync state (best-effort, never blocks) +- Show a 7d convergence trend +- Never crash on a fresh brain, missing daemon, or no cloud key +""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path +from types import SimpleNamespace + +import pytest + +# Skip if running without the SDK importable +try: + from gradata.cli import cmd_status +except ImportError: # pragma: no cover + pytest.skip("gradata SDK not importable", allow_module_level=True) + + +def _seed_minimal_brain(brain_dir: Path) -> None: + """Create a brain dir with a system.db that satisfies Brain() open().""" + brain_dir.mkdir(parents=True, exist_ok=True) + db_path = brain_dir / "system.db" + con = sqlite3.connect(db_path) + cur = con.cursor() + # Minimal events table matching production schema columns the test uses. + cur.execute( + """ + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts TEXT NOT NULL, + session INTEGER, + type TEXT NOT NULL, + source TEXT, + data_json TEXT + ) + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS sync_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + payload_json TEXT, + kind TEXT, + enqueued_at REAL, + synced_at REAL, + attempts INTEGER DEFAULT 0, + last_error TEXT + ) + """ + ) + con.commit() + con.close() + + +def _run_status( + brain_dir: Path, capsys, *, mock_daemon_down: bool = True, mock_cloud_down: bool = True +) -> str: + """Invoke cmd_status with args.brain_dir and return captured stdout. + + By default this mocks BOTH the daemon /health probe and the cloud /brains + probe as unreachable, so tests run deterministically regardless of whether + the developer has a live daemon or cloud key on their machine. + """ + import contextlib + import urllib.error + import urllib.request as _ur + + args = SimpleNamespace(brain_dir=str(brain_dir)) + real_urlopen = _ur.urlopen + + def patched_urlopen(req, *a, **kw): + url = req.full_url if hasattr(req, "full_url") else str(req) + if mock_daemon_down and "127.0.0.1:8765" in url: + raise urllib.error.URLError("mocked daemon down") + if mock_cloud_down and "api.gradata.ai" in url: + raise urllib.error.URLError("mocked cloud down") + return real_urlopen(req, *a, **kw) + + _ur.urlopen = patched_urlopen + try: + with contextlib.suppress(SystemExit): + cmd_status(args) + finally: + _ur.urlopen = real_urlopen + captured = capsys.readouterr() + return captured.out + + +def test_status_runs_on_fresh_brain(tmp_path, capsys, monkeypatch): + """Fresh brain (no events, no daemon, no cloud key) — command must + succeed and print something useful, not crash.""" + brain_dir = tmp_path / "brain" + _seed_minimal_brain(brain_dir) + # Make sure there's no real cloud key in this test's view + monkeypatch.setenv("HOME", str(tmp_path)) + out = _run_status(brain_dir, capsys) + + # Brain block + assert "Brain:" in out + assert "Rules graduated: 0" in out + assert "Lessons: 0" in out + assert "Corrections: 0" in out + + # Daemon section (should be "not running" since we didn't start one) + assert "Daemon:" in out + assert "not running" in out + + # Cloud section (should report "not configured" since HOME has no key file) + assert "Cloud:" in out + assert "not configured" in out + + # Convergence + assert "Convergence" in out + + +def test_status_with_corrections(tmp_path, capsys, monkeypatch): + """Brain with seeded CORRECTION events — counts must appear in output.""" + brain_dir = tmp_path / "brain" + _seed_minimal_brain(brain_dir) + monkeypatch.setenv("HOME", str(tmp_path)) + + # Seed 3 CORRECTION events and a LESSON_CHANGE + con = sqlite3.connect(brain_dir / "system.db") + cur = con.cursor() + cur.executemany( + "INSERT INTO events(ts, session, type, source, data_json) VALUES (?,?,?,?,?)", + [ + ("2026-05-19T10:00:00+00:00", 1, "CORRECTION", "test", "{}"), + ("2026-05-19T11:00:00+00:00", 1, "CORRECTION", "test", "{}"), + ("2026-05-19T12:00:00+00:00", 2, "CORRECTION", "test", "{}"), + ("2026-05-19T12:00:01+00:00", 2, "LESSON_CHANGE", "test", "{}"), + ("2026-05-19T13:00:00+00:00", 2, "RULE_GRADUATED", "test", "{}"), + ], + ) + con.commit() + con.close() + + out = _run_status(brain_dir, capsys) + assert "Corrections: 3" in out + assert "Lessons: 1" in out + assert "Rules graduated: 1" in out + # Last correction timestamp should appear + assert "Last correction:" in out + + +def test_status_does_not_crash_on_missing_db(tmp_path, capsys, monkeypatch): + """If system.db doesn't exist yet (brand-new brain), don't crash.""" + brain_dir = tmp_path / "brain" + brain_dir.mkdir() + # Intentionally do NOT create system.db + monkeypatch.setenv("HOME", str(tmp_path)) + + # _get_brain() will likely try to init or open; this test just + # checks the status path tolerates schema absence. If _get_brain + # itself crashes, we accept SystemExit but should NOT see a + # traceback escape. + try: + _run_status(brain_dir, capsys) + except Exception as e: + pytest.fail(f"cmd_status raised on missing-db brain: {e}") + + +def test_status_with_sync_queue_drained(tmp_path, capsys, monkeypatch): + """sync_queue with all rows synced should report 'drained'.""" + brain_dir = tmp_path / "brain" + _seed_minimal_brain(brain_dir) + monkeypatch.setenv("HOME", str(tmp_path)) + + con = sqlite3.connect(brain_dir / "system.db") + cur = con.cursor() + cur.executemany( + "INSERT INTO sync_queue(payload_json, kind, enqueued_at, synced_at) VALUES (?,?,?,?)", + [("{}", "correction", 1700000000.0, 1700000010.0)] * 3, + ) + con.commit() + con.close() + + out = _run_status(brain_dir, capsys) + assert "Sync queue: drained (3 synced)" in out + + +def test_status_with_pending_sync(tmp_path, capsys, monkeypatch): + """sync_queue with pending rows should report the pending count.""" + brain_dir = tmp_path / "brain" + _seed_minimal_brain(brain_dir) + monkeypatch.setenv("HOME", str(tmp_path)) + + con = sqlite3.connect(brain_dir / "system.db") + cur = con.cursor() + cur.executemany( + "INSERT INTO sync_queue(payload_json, kind, enqueued_at, synced_at) VALUES (?,?,?,?)", + [ + ("{}", "correction", 1700000000.0, None), + ("{}", "correction", 1700000001.0, None), + ("{}", "correction", 1700000002.0, 1700000010.0), + ], + ) + con.commit() + con.close() + + out = _run_status(brain_dir, capsys) + assert "Sync queue: 2 pending / 3 total" in out