Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 169 additions & 6 deletions Gradata/src/gradata/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import json
import logging
import sys
from datetime import UTC
from pathlib import Path

from gradata._env import env_str
Expand Down Expand Up @@ -140,6 +141,166 @@ def cmd_stats(args):
print(f" Has embeddings: {stats['has_embeddings']}")


def cmd_status(args):
"""Single human-readable summary of brain health.

Wraps stats + health + daemon probe + cloud-sync state into one
terminal-renderable block. Designed for the user's daily "what's
going on with my brain" check — `gradata status` and done.

Output is plain text (no color codes, no Unicode boxes). Stays
under ~40 lines for a typical brain.
"""
import json as _json
import sqlite3 as _sqlite3
import time as _time
import urllib.error as _urllib_error
import urllib.request as _urllib_request
from datetime import datetime, timezone

brain = _get_brain(args)
stats = brain.stats()
brain_dir = stats["brain_dir"]

print(f"Brain: {brain_dir}")
print(f" Database: {stats['db_size_mb']} MB ({stats['markdown_files']} markdown files)")

# Rules / lessons / corrections from events table
db_path = f"{brain_dir}/system.db"
rules_total = lessons_total = corr_total = 0
last_correction_ts = None
try:
con = _sqlite3.connect(db_path)
cur = con.cursor()
rules_total = cur.execute(
"SELECT COUNT(*) FROM events WHERE type='RULE_GRADUATED'"
).fetchone()[0]
lessons_total = cur.execute(
"SELECT COUNT(*) FROM events WHERE type IN ('LESSON_ADDED','LESSON_CHANGE')"
).fetchone()[0]
corr_total = cur.execute("SELECT COUNT(*) FROM events WHERE type='CORRECTION'").fetchone()[
0
]
row = cur.execute("SELECT MAX(ts) FROM events WHERE type='CORRECTION'").fetchone()
last_correction_ts = row[0] if row else None
con.close()
except (_sqlite3.OperationalError, OSError):
# Fresh brain or schema drift — show zeros, don't crash.
pass

print(f" Rules graduated: {rules_total}")
print(f" Lessons: {lessons_total}")
print(f" Corrections: {corr_total}")
if last_correction_ts:
print(f" Last correction: {last_correction_ts}")

# Sync queue state
pending = total_q = 0
try:
con = _sqlite3.connect(db_path)
pending = con.execute("SELECT COUNT(*) FROM sync_queue WHERE synced_at IS NULL").fetchone()[
0
]
total_q = con.execute("SELECT COUNT(*) FROM sync_queue").fetchone()[0]
con.close()
except _sqlite3.OperationalError:
pass
Comment on lines +172 to +207
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use context managers for SQLite connections to prevent resource leaks.

The three SQLite connection blocks (lines 172-186, 199-205, 278-292) call con.close() inside the try block. If an unexpected exception (not OperationalError or OSError) occurs after connect() but before close(), the connection leaks.

Consider consolidating into a single connection with proper context management:

🛡️ Proposed refactor using context manager
-    try:
-        con = _sqlite3.connect(db_path)
-        cur = con.cursor()
-        rules_total = cur.execute(
-            "SELECT COUNT(*) FROM events WHERE type='RULE_GRADUATED'"
-        ).fetchone()[0]
-        lessons_total = cur.execute(
-            "SELECT COUNT(*) FROM events WHERE type IN ('LESSON_ADDED','LESSON_CHANGE')"
-        ).fetchone()[0]
-        corr_total = cur.execute("SELECT COUNT(*) FROM events WHERE type='CORRECTION'").fetchone()[
-            0
-        ]
-        row = cur.execute("SELECT MAX(ts) FROM events WHERE type='CORRECTION'").fetchone()
-        last_correction_ts = row[0] if row else None
-        con.close()
-    except (_sqlite3.OperationalError, OSError):
-        # Fresh brain or schema drift — show zeros, don't crash.
-        pass
+    try:
+        with _sqlite3.connect(db_path) as con:
+            cur = con.cursor()
+            rules_total = cur.execute(
+                "SELECT COUNT(*) FROM events WHERE type='RULE_GRADUATED'"
+            ).fetchone()[0]
+            lessons_total = cur.execute(
+                "SELECT COUNT(*) FROM events WHERE type IN ('LESSON_ADDED','LESSON_CHANGE')"
+            ).fetchone()[0]
+            corr_total = cur.execute(
+                "SELECT COUNT(*) FROM events WHERE type='CORRECTION'"
+            ).fetchone()[0]
+            row = cur.execute("SELECT MAX(ts) FROM events WHERE type='CORRECTION'").fetchone()
+            last_correction_ts = row[0] if row else None
+    except (_sqlite3.OperationalError, OSError):
+        # Fresh brain or schema drift — show zeros, don't crash.
+        pass

Apply the same pattern to the sync_queue block (lines 199-207) and the convergence block (lines 277-292).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/cli.py` around lines 172 - 207, The three places that
call _sqlite3.connect(db_path) and then con.close() (the blocks that compute
rules_total/lessons_total/corr_total/last_correction_ts, the sync_queue block
that computes pending/total_q, and the convergence block) can leak connections
if an unexpected exception occurs; replace these with context managers using
"with _sqlite3.connect(db_path) as con:" (or consolidate into a single
with-block when safe) so cursors and connections are closed automatically,
remove explicit con.close() calls, and ensure you still handle
_sqlite3.OperationalError/OSError around the with if you need to swallow
schema-missing errors while preserving the variables rules_total, lessons_total,
corr_total, last_correction_ts, pending, total_q, etc.

if total_q:
if pending:
print(f" Sync queue: {pending} pending / {total_q} total")
else:
print(f" Sync queue: drained ({total_q} synced)")

# Daemon health (best-effort, never blocks)
print()
print("Daemon:")
try:
req = _urllib_request.Request(
"http://127.0.0.1:8765/health",
headers={"User-Agent": "gradata-status/1.0"},
)
with _urllib_request.urlopen(req, timeout=2) as r:
data = _json.loads(r.read().decode())
uptime = data.get("uptime_seconds", 0)
hrs = int(uptime // 3600)
mins = int((uptime % 3600) // 60)
print(f" Status: up (uptime {hrs}h{mins}m)")
print(f" Brain dir: {data.get('brain_dir', '?')}")
print(f" SDK version: {data.get('sdk_version', '?')}")
if (data.get("brain_dir") or "").rstrip("/") != brain_dir.rstrip("/"):
print(f" WARNING: daemon brain dir != this brain ({brain_dir})")
except (_urllib_error.URLError, _urllib_error.HTTPError, TimeoutError, OSError):
print(" Status: not running (run: systemctl --user start gradata-daemon)")

# Cloud sync state (best-effort)
print()
print("Cloud:")
try:
from pathlib import Path as _Path
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

Remove redundant Path import.

Path is already imported at module level (line 28). This local import shadows it unnecessarily.

♻️ Proposed fix
-        from pathlib import Path as _Path
-
-        key_path = _Path.home() / ".gradata" / "key"
+        key_path = Path.home() / ".gradata" / "key"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/cli.py` at line 239, Remove the redundant local import
"from pathlib import Path as _Path" in gradata/cli.py and use the
already-imported module-level symbol Path instead (or drop the alias import
entirely); update any local references to _Path to use Path so no shadowing
occurs and the duplicate import is eliminated.


key_path = _Path.home() / ".gradata" / "key"
token = key_path.read_text(encoding="utf-8").strip() if key_path.is_file() else ""
if not token:
print(" Status: not configured (run: gradata cloud enable --key <gd_live_...>)")
else:
req = _urllib_request.Request(
"https://api.gradata.ai/api/v1/brains",
headers={
"Authorization": f"Bearer {token}",
"User-Agent": "gradata-status/1.0",
},
)
with _urllib_request.urlopen(req, timeout=4) as r:
brains = _json.loads(r.read().decode())
b = brains[0] if isinstance(brains, list) else brains
last_sync = b.get("last_sync") or "(never)"
cloud_corr = b.get("correction_count") or 0
cloud_lessons = b.get("lesson_count") or 0
print(f" Last sync: {last_sync}")
print(f" Corrections: {cloud_corr} (local: {corr_total})")
print(f" Lessons: {cloud_lessons} (local: {lessons_total})")
# Lag warning
if last_sync and last_sync != "(never)":
try:
ls = datetime.fromisoformat(last_sync.replace("Z", "+00:00"))
age_min = (datetime.now(UTC) - ls).total_seconds() / 60
if age_min > 60:
print(f" WARNING: cloud is {int(age_min)}m behind")
except ValueError:
pass
except (_urllib_error.URLError, _urllib_error.HTTPError, TimeoutError, OSError) as exc:
print(f" Status: unreachable ({type(exc).__name__})")

# Convergence trend — corrections-per-session, last 7 days
print()
print("Convergence (last 7d):")
try:
con = _sqlite3.connect(db_path)
cur = con.cursor()
cutoff = int(_time.time()) - 7 * 86400
# Sessions and corrections in the last 7 days
sessions_with_data = cur.execute(
"""
SELECT session, COUNT(*) AS n
FROM events
WHERE type='CORRECTION'
AND strftime('%s', ts) >= ?
GROUP BY session
""",
(str(cutoff),),
).fetchall()
con.close()
if sessions_with_data:
sess_n = len(sessions_with_data)
corr_n = sum(n for _, n in sessions_with_data)
avg = corr_n / sess_n
print(f" Sessions: {sess_n} ({corr_n} corrections, avg {avg:.1f}/session)")
else:
print(" No correction activity in the last 7 days")
except _sqlite3.OperationalError:
print(" (events schema not available)")


def cmd_audit(args):
from gradata._audit import format_audit_text, run_audit

Expand Down Expand Up @@ -335,9 +496,10 @@ def _cmd_install_agent(args) -> None:
# ▸ Flag-gated install verification: write + read a test rule
if verify_install and result.action != "failed":
try:
from gradata import Brain
import tempfile

from gradata import Brain

verification_marker = f"gradata-install-verify-{name}-{os.urandom(4).hex()}"
with tempfile.TemporaryDirectory(prefix="gradata-verify-") as verification_tmp:
verification_dir = Path(verification_tmp) / "brain"
Expand All @@ -350,13 +512,10 @@ def _cmd_install_agent(args) -> None:
)
results = verification_brain.search(verification_marker, mode="rules", top_k=3)
marker_found = any(
verification_marker in (r.get("text") or "").lower()
for r in results
verification_marker in (r.get("text") or "").lower() for r in results
)
if not marker_found:
print(
f" ⚠ verify failed: test rule written but not readable for {name}"
)
print(f" ⚠ verify failed: test rule written but not readable for {name}")
had_failure = True
else:
print(f" ✓ verify: {name} install confirmed (write+read)")
Expand Down Expand Up @@ -1443,6 +1602,9 @@ def main():
# stats
sub.add_parser("stats", help="Brain statistics")

# status (umbrella health check: stats + daemon + cloud + convergence)
sub.add_parser("status", help="Single-page brain/daemon/cloud summary")

# audit
p_audit = sub.add_parser("audit", help="Data flow audit")
p_audit.add_argument("--json", action="store_true")
Expand Down Expand Up @@ -1752,6 +1914,7 @@ def main():
"embed": cmd_embed,
"manifest": cmd_manifest,
"stats": cmd_stats,
"status": cmd_status,
"audit": cmd_audit,
"sync": cmd_sync,
"recall": cmd_recall,
Expand Down
Loading
Loading