Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 233 additions & 29 deletions src/gradata/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ def _filter_lessons_by_state(lessons, min_state: str = "PATTERN"):

# ── correct() ──────────────────────────────────────────────────────────


def _attribute_domain_fires(
brain: "Brain",
correction_category: str,
correction_desc: str,
) -> None:
"""Attribute fires and misfires to rules active in this session.

For each fired rule, increment fires for the correction's category.
If the correction contradicts the rule, also increment misfires.
"""
from gradata.enhancements.self_improvement import _classify_correction_direction

for rule in brain._fired_rules:
if not hasattr(rule, "domain_scores"):
continue
domain = correction_category.upper()
if domain not in rule.domain_scores:
rule.domain_scores[domain] = {"fires": 0, "misfires": 0}
rule.domain_scores[domain]["fires"] += 1

direction = _classify_correction_direction(correction_desc, rule.description)
if direction == "CONTRADICTING":
rule.domain_scores[domain]["misfires"] += 1


def brain_correct(
brain: Brain, draft: str, final: str, *,
category: str | None = None, context: dict | None = None,
Expand Down Expand Up @@ -153,21 +179,30 @@ def brain_correct(
if classifications:
primary = next((c for c in classifications if c.category.upper() == cat),
classifications[0])
# Try behavioral extraction (LLM + cache + templates)
try:
from gradata.enhancements.edit_classifier import extract_behavioral_instruction
from gradata.enhancements.instruction_cache import InstructionCache
if not isinstance(brain._instruction_cache, InstructionCache):
brain._instruction_cache = InstructionCache(
lessons_path.parent / "instruction_cache.json"
)
behavioral_desc = extract_behavioral_instruction(
diff, primary, cache=brain._instruction_cache, # type: ignore[arg-type]
)
desc = behavioral_desc or primary.description
except Exception as e:
_log.debug("Behavioral extraction failed: %s", e)
# Check convergence gate — skip extraction if category is settled
convergence_data = brain._get_convergence()
cat_convergence = convergence_data.get("by_category", {}).get(cat, {})
category_converged = cat_convergence.get("trend") == "converged"

if category_converged:
_log.debug("Skipping extraction for converged category: %s", cat)
desc = primary.description
else:
# Try behavioral extraction (LLM + cache + templates)
try:
from gradata.enhancements.edit_classifier import extract_behavioral_instruction
from gradata.enhancements.instruction_cache import InstructionCache
if not isinstance(brain._instruction_cache, InstructionCache):
brain._instruction_cache = InstructionCache(
lessons_path.parent / "instruction_cache.json"
)
behavioral_desc = extract_behavioral_instruction(
diff, primary, cache=brain._instruction_cache, # type: ignore[arg-type]
)
desc = behavioral_desc or primary.description
except Exception as e:
_log.debug("Behavioral extraction failed: %s", e)
desc = primary.description
elif summary:
desc = summary
else:
Expand Down Expand Up @@ -276,6 +311,18 @@ def brain_correct(
except Exception as e:
_log.warning("Lesson creation failed: %s", e)

# Domain-scoped misfire attribution
try:
if brain._fired_rules and (category or classifications):
correction_desc = ""
if 'desc' in locals():
correction_desc = desc
Comment on lines +317 to +319
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 locals() inspection is fragile for desc-variable lookup

desc is assigned inside the broad try block that ends at line 311. Whether it exists in the local namespace after the except depends on exactly how far execution got. Using 'desc' in locals() to probe this is an anti-pattern — easy to misread and could silently yield an empty correction_desc if the assignment was skipped part-way through the block.

A cleaner approach is to initialise a dedicated variable before the try block:

_extracted_desc: str = ""  # initialise before the try block
# ... inside the try, after desc is computed:
_extracted_desc = desc
# ... after the try block:
if brain._fired_rules and (category or classifications):
    _attribute_domain_fires(brain, category or "UNKNOWN", _extracted_desc or summary)

This makes the intent explicit and eliminates the locals() introspection.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/gradata/_core.py
Line: 317-319

Comment:
**`locals()` inspection is fragile for `desc`-variable lookup**

`desc` is assigned inside the broad `try` block that ends at line 311. Whether it exists in the local namespace after the except depends on exactly how far execution got. Using `'desc' in locals()` to probe this is an anti-pattern — easy to misread and could silently yield an empty `correction_desc` if the assignment was skipped part-way through the block.

A cleaner approach is to initialise a dedicated variable before the try block:

```python
_extracted_desc: str = ""  # initialise before the try block
# ... inside the try, after desc is computed:
_extracted_desc = desc
# ... after the try block:
if brain._fired_rules and (category or classifications):
    _attribute_domain_fires(brain, category or "UNKNOWN", _extracted_desc or summary)
```

This makes the intent explicit and eliminates the `locals()` introspection.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

elif summary:
correction_desc = summary
_attribute_domain_fires(brain, category or "UNKNOWN", correction_desc)
except Exception as e:
_log.debug("Domain fire attribution failed: %s", e)
Comment on lines +314 to +324
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Avoid if 'desc' in locals() — initialize desc explicitly.

Using locals() lookup to check variable existence is fragile and obscure. The variable desc is conditionally assigned across multiple branches earlier in this function (lines 189, 202, 205, 207, 209), making this check difficult to verify.

♻️ Proposed fix: Initialize `desc` before the conditional blocks

Add initialization near line 178 (before the classification branches):

cat = (category or "UNKNOWN").upper()
desc = ""  # Will be set by classification or summary
if classifications:
    ...

Then simplify the attribution block:

     # Domain-scoped misfire attribution
     try:
         if brain._fired_rules and (category or classifications):
-            correction_desc = ""
-            if 'desc' in locals():
-                correction_desc = desc
-            elif summary:
-                correction_desc = summary
+            correction_desc = desc or summary
             _attribute_domain_fires(brain, category or "UNKNOWN", correction_desc)
     except Exception as e:
         _log.debug("Domain fire attribution failed: %s", e)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/gradata/_core.py` around lines 314 - 324, The code uses if 'desc' in
locals() to detect a variable set earlier; instead initialize desc explicitly
before the classification branches (e.g., set desc = "" near where
cat/category/classifications are prepared) so later code can rely on it, then
replace the locals() check in the attribution block with a simple use of desc
(e.g., use correction_desc = desc or correction_desc = summary fallback) when
calling _attribute_domain_fires(brain, category or "UNKNOWN", correction_desc);
update references to desc across the classification branches to assign into this
initialized variable.


# Index into FTS5
try:
from gradata._query import fts_index
Expand Down Expand Up @@ -782,27 +829,104 @@ def brain_export_skills(brain: Brain, *, output_dir: str | None = None,

# ── convergence() ─────────────────────────────────────────────────────

def _mann_kendall(data: "list[int] | list[float]") -> tuple[str, float]:
"""Mann-Kendall trend test (pure Python, no scipy needed).

Returns (trend, p_value) where trend is "decreasing", "increasing", or "no_trend".
Uses normal approximation for n >= 3.
"""
import math

n = len(data)
if n < 3:
return "no_trend", 1.0

# Compute S statistic
s = 0
for i in range(n - 1):
for j in range(i + 1, n):
diff = data[j] - data[i]
if diff > 0:
s += 1
elif diff < 0:
s -= 1

# Handle ties
from collections import Counter
tie_counts = [c for c in Counter(data).values() if c > 1]
tie_correction = sum(t * (t - 1) * (2 * t + 5) for t in tie_counts)

# Variance of S
var_s = (n * (n - 1) * (2 * n + 5) - tie_correction) / 18.0
if var_s == 0:
return "no_trend", 1.0

# Z statistic (continuity correction)
if s > 0:
z = (s - 1) / math.sqrt(var_s)
elif s < 0:
z = (s + 1) / math.sqrt(var_s)
else:
z = 0.0

# Two-tailed p-value using normal CDF approximation
p_value = 2.0 * (1.0 - _normal_cdf(abs(z)))

if p_value < 0.05:
trend = "decreasing" if s < 0 else "increasing"
else:
trend = "no_trend"

return trend, round(p_value, 4)


def _normal_cdf(x: float) -> float:
"""Standard normal CDF approximation (Abramowitz & Stegun)."""
import math
t = 1.0 / (1.0 + 0.2316419 * abs(x))
d = 0.3989422804014327 # 1/sqrt(2*pi)
p = d * math.exp(-x * x / 2.0) * (
t * (0.319381530 + t * (-0.356563782 + t * (1.781477937 +
t * (-1.821255978 + t * 1.330274429))))
)
return 1.0 - p if x >= 0 else p


def brain_convergence(brain: "Brain") -> dict:
"""Compute corrections-per-session convergence data.

Uses Mann-Kendall trend test for statistical rigor.
Includes per-category breakdown.

Returns dict with:
sessions: list of session numbers
corrections_per_session: list of correction counts per session
trend: "converging" | "converged" | "diverging" | "insufficient_data"
p_value: float (Mann-Kendall p-value, lower = stronger trend)
by_category: dict of category -> {corrections_per_session, trend}
total_corrections: int
total_sessions: int
"""
empty = {"sessions": [], "corrections_per_session": [], "trend": "insufficient_data",
"total_corrections": 0, "total_sessions": 0}
"p_value": 1.0, "by_category": {}, "total_corrections": 0, "total_sessions": 0}

try:
from gradata._db import get_connection
import json as _json
with get_connection(brain.db_path) as conn:
# Aggregate corrections per session
rows = conn.execute(
"SELECT session, COUNT(*) as cnt FROM events "
"WHERE type = 'CORRECTION' AND session IS NOT NULL AND session > 0 "
"GROUP BY session ORDER BY session"
).fetchall()

# Per-category breakdown
cat_rows = conn.execute(
"SELECT session, data_json FROM events "
"WHERE type = 'CORRECTION' AND session IS NOT NULL AND session > 0 "
"ORDER BY session"
).fetchall()
except Exception:
return empty

Expand All @@ -812,25 +936,105 @@ def brain_convergence(brain: "Brain") -> dict:
sessions = [r[0] for r in rows]
counts = [r[1] for r in rows]

# Determine trend
trend = "insufficient_data"
if len(counts) >= 3:
first_half = counts[:len(counts) // 2]
second_half = counts[len(counts) // 2:]
avg_first = sum(first_half) / len(first_half)
avg_second = sum(second_half) / len(second_half)

if avg_second < avg_first * 0.7:
trend = "converging"
elif abs(avg_second - avg_first) <= max(1, avg_first * 0.15):
trend = "converged"
else:
trend = "diverging"
# Mann-Kendall trend test
mk_trend, p_value = _mann_kendall(counts)
if mk_trend == "decreasing":
trend = "converging"
elif mk_trend == "increasing":
trend = "diverging"
elif len(counts) >= 3:
trend = "converged"
else:
trend = "insufficient_data"

# Per-category convergence
cat_by_session: dict[str, dict[int, int]] = {}
for session, data_json in cat_rows:
try:
data = _json.loads(data_json) if isinstance(data_json, str) else {}
cat = data.get("category", "UNKNOWN")
except (_json.JSONDecodeError, TypeError):
cat = "UNKNOWN"
if cat not in cat_by_session:
cat_by_session[cat] = {}
cat_by_session[cat][session] = cat_by_session[cat].get(session, 0) + 1

by_category: dict[str, dict] = {}
for cat, session_counts in cat_by_session.items():
cat_counts = [session_counts.get(s, 0) for s in sessions]
cat_mk, cat_p = _mann_kendall(cat_counts)
cat_trend = "converging" if cat_mk == "decreasing" else (
"diverging" if cat_mk == "increasing" else "converged")
Comment on lines +962 to +967
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Per-category convergence has no insufficient_data guard

At the top-level, brain_convergence correctly emits "insufficient_data" when len(counts) < 3. But the per-category branch maps every "no_trend" result straight to "converged":

cat_trend = "converging" if cat_mk == "decreasing" else (
    "diverging" if cat_mk == "increasing" else "converged")

cat_counts is built with zeros for every session where the category had no corrections, so a category that appeared in only one or two sessions out of seven gets a list like [0, 0, 0, 5, 0, 0, 0]. Because this sequence has no monotonic trend, Mann-Kendall returns "no_trend""converged". The convergence gate in _core.py then sees trend == "converged" and skips LLM extraction for future corrections in that category, even though the category has never meaningfully converged — it simply has sparse history.

Suggested fix:

if cat_mk == "decreasing":
    cat_trend = "converging"
elif cat_mk == "increasing":
    cat_trend = "diverging"
elif sum(cat_counts) < 3:          # not enough non-zero data
    cat_trend = "insufficient_data"
else:
    cat_trend = "converged"

The convergence gate already checks cat_convergence.get("trend") == "converged", so adding the "insufficient_data" branch here is a backward-compatible fix — categories with thin history will continue to get LLM extraction.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/gradata/_core.py
Line: 962-967

Comment:
**Per-category convergence has no `insufficient_data` guard**

At the top-level, `brain_convergence` correctly emits `"insufficient_data"` when `len(counts) < 3`. But the per-category branch maps every `"no_trend"` result straight to `"converged"`:

```python
cat_trend = "converging" if cat_mk == "decreasing" else (
    "diverging" if cat_mk == "increasing" else "converged")
```

`cat_counts` is built with zeros for every session where the category had no corrections, so a category that appeared in only one or two sessions out of seven gets a list like `[0, 0, 0, 5, 0, 0, 0]`. Because this sequence has no monotonic trend, Mann-Kendall returns `"no_trend"``"converged"`. The convergence gate in `_core.py` then sees `trend == "converged"` and **skips LLM extraction for future corrections in that category**, even though the category has never meaningfully converged — it simply has sparse history.

Suggested fix:

```python
if cat_mk == "decreasing":
    cat_trend = "converging"
elif cat_mk == "increasing":
    cat_trend = "diverging"
elif sum(cat_counts) < 3:          # not enough non-zero data
    cat_trend = "insufficient_data"
else:
    cat_trend = "converged"
```

The convergence gate already checks `cat_convergence.get("trend") == "converged"`, so adding the `"insufficient_data"` branch here is a backward-compatible fix — categories with thin history will continue to get LLM extraction.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

by_category[cat] = {
"corrections_per_session": cat_counts,
"trend": cat_trend,
"p_value": cat_p,
}
Comment on lines +962 to +972
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Per-category trend logic lacks insufficient_data handling.

The overall trend logic (lines 941-948) maps no_trend to converged only when len(counts) >= 3, otherwise to insufficient_data. However, the per-category logic unconditionally maps no_trend to converged, which could prematurely gate extraction for categories with sparse data.

🐛 Proposed fix: Apply consistent length check
     for cat, session_counts in cat_by_session.items():
         cat_counts = [session_counts.get(s, 0) for s in sessions]
         cat_mk, cat_p = _mann_kendall(cat_counts)
-        cat_trend = "converging" if cat_mk == "decreasing" else (
-            "diverging" if cat_mk == "increasing" else "converged")
+        if cat_mk == "decreasing":
+            cat_trend = "converging"
+        elif cat_mk == "increasing":
+            cat_trend = "diverging"
+        elif sum(1 for c in cat_counts if c > 0) >= 3:
+            cat_trend = "converged"
+        else:
+            cat_trend = "insufficient_data"
         by_category[cat] = {
             "corrections_per_session": cat_counts,
             "trend": cat_trend,
             "p_value": cat_p,
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/gradata/_core.py` around lines 962 - 972, The per-category trend mapping
currently always maps a "no_trend" result from _mann_kendall to "converged",
which ignores sparse data; update the logic in the by_category loop (where
cat_counts, cat_mk, cat_p are computed) to mirror the global logic: when cat_mk
== "no_trend" return "converged" only if len(cat_counts) >= 3, otherwise set
"insufficient_data"; keep the existing mappings for "increasing" -> "diverging"
and "decreasing" -> "converging" and ensure by_category[cat]["trend"] uses this
corrected cat_trend determination.


return {
"sessions": sessions,
"corrections_per_session": counts,
"trend": trend,
"p_value": p_value,
"by_category": by_category,
"total_corrections": sum(counts),
"total_sessions": len(sessions),
}


# ── Efficiency ────────────────────────────────────────────────────────

_SEVERITY_SECONDS = {
"trivial": 5,
"minor": 15,
"moderate": 45,
"major": 120,
"rewrite": 300,
}


def brain_efficiency(brain: "Brain", *, estimate_time: bool = False) -> dict:
"""Quantify effort saved by brain learning.

Returns effort_ratio (current vs initial correction rate).
Optional estimate_time adds severity-weighted time estimates (approximate).
"""
convergence = brain._get_convergence()
counts = convergence.get("corrections_per_session", [])

if len(counts) < 3:
result: dict = {
"effort_ratio": 1.0,
"corrections_initial": 0,
"corrections_recent": 0,
"total_corrections": convergence.get("total_corrections", 0),
"total_sessions": convergence.get("total_sessions", 0),
}
if estimate_time:
result["estimated_seconds_saved"] = 0
result["time_breakdown"] = {}
return result

initial = sum(counts[:3]) / 3.0
recent = sum(counts[-3:]) / 3.0
effort_ratio = round(recent / initial, 2) if initial > 0 else 1.0

result = {
"effort_ratio": effort_ratio,
"corrections_initial": round(initial, 1),
"corrections_recent": round(recent, 1),
"total_corrections": convergence.get("total_corrections", 0),
"total_sessions": convergence.get("total_sessions", 0),
}

if estimate_time:
corrections_avoided = max(0, (initial - recent) * len(counts))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 corrections_avoided multiplier includes the baseline period, overestimating savings

corrections_avoided = max(0, (initial - recent) * len(counts))

This multiplies by the total session count, including the first three sessions used to establish initial. In a 20-session brain with initial = 10, recent = 4, the formula yields 6 × 20 = 120 — but sessions 1–3 were already running at the initial correction rate, so nothing was avoided there.

A more accurate (still approximate) estimate uses only post-baseline sessions:

Suggested change
corrections_avoided = max(0, (initial - recent) * len(counts))
corrections_avoided = max(0, (initial - recent) * max(0, len(counts) - 3))

This is still labelled "approximate" in the docstring, but avoids inflating the reported savings for long-running brains.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/gradata/_core.py
Line: 1031

Comment:
**`corrections_avoided` multiplier includes the baseline period, overestimating savings**

```python
corrections_avoided = max(0, (initial - recent) * len(counts))
```

This multiplies by the **total** session count, including the first three sessions used to establish `initial`. In a 20-session brain with initial = 10, recent = 4, the formula yields `6 × 20 = 120` — but sessions 1–3 were already running at the initial correction rate, so nothing was avoided there.

A more accurate (still approximate) estimate uses only post-baseline sessions:

```suggestion
        corrections_avoided = max(0, (initial - recent) * max(0, len(counts) - 3))
```

This is still labelled "approximate" in the docstring, but avoids inflating the reported savings for long-running brains.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

avg_severity_weight = _SEVERITY_SECONDS.get("moderate", 45)
estimated_seconds = int(corrections_avoided * avg_severity_weight)
result["estimated_seconds_saved"] = estimated_seconds
result["time_breakdown"] = {
"corrections_avoided": round(corrections_avoided, 1),
"avg_seconds_per_correction": avg_severity_weight,
}
Comment on lines +1030 to +1038
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Time estimation formula may significantly overestimate savings.

The calculation corrections_avoided = (initial - recent) * len(counts) assumes every session would have had the initial correction rate without learning, which overestimates savings. For 10 sessions with initial=10 and recent=5, this yields 50 avoided corrections, but actual savings would be lower since sessions 1-3 contributed to the "initial" baseline.

Consider a more conservative estimate, e.g., using sessions after the initial window:

♻️ More accurate estimation
     if estimate_time:
-        corrections_avoided = max(0, (initial - recent) * len(counts))
+        # Estimate based on sessions after the initial baseline window
+        sessions_with_learning = max(0, len(counts) - 3)
+        corrections_avoided = max(0, (initial - recent) * sessions_with_learning)
         avg_severity_weight = _SEVERITY_SECONDS.get("moderate", 45)
         estimated_seconds = int(corrections_avoided * avg_severity_weight)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/gradata/_core.py` around lines 1030 - 1038, The current time-estimate
overstates savings by computing corrections_avoided = (initial - recent) *
len(counts); instead, compute avoided corrections per session (or only for
sessions after an initial baseline window) and sum those non-negative
differences so you don’t assume every session would have stayed at the initial
rate. Update the block guarded by estimate_time to derive corrections_avoided as
sum(max(0, counts[i] - recent) for each session i) or use a baseline mean of the
first N sessions (e.g., baseline = mean(counts[:N]) then sum(max(0, counts[i] -
recent) for i>=N)), then reuse avg_severity_weight from _SEVERITY_SECONDS and
set result["estimated_seconds_saved"] and result["time_breakdown"] accordingly
(referencing variables initial, recent, counts, estimate_time, result,
_SEVERITY_SECONDS, avg_severity_weight).


return result
1 change: 1 addition & 0 deletions src/gradata/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class Lesson:
pending_approval: bool = False # True = awaiting human review before graduation
parent_meta_rule_id: str | None = None # Meta-rule this lesson contributed to
memory_ids: list[str] = field(default_factory=list) # Linked memory IDs
domain_scores: dict[str, dict[str, int]] = field(default_factory=dict) # Per-domain fire/misfire tracking

def __post_init__(self) -> None:
self.confidence = round(max(0.0, min(1.0, self.confidence)), 2)
Expand Down
21 changes: 21 additions & 0 deletions src/gradata/brain.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def __init__(self, brain_dir: str | Path, working_dir: str | Path | None = None,
open_encrypted_db(self.dir, self._encryption_key)

self._instruction_cache: object | None = None # lazy: InstructionCache
self._fired_rules: list = [] # Rules injected this session (for misfire attribution)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for any code that appends to or assigns _fired_rules
rg -n '_fired_rules' --type=py -C3

Repository: Gradata/gradata

Length of output: 1927


_fired_rules is never populated — domain attribution will silently fail.

The _fired_rules list is initialized on line 70 but never populated in the application code. The grep search confirms it only appears in:

  • src/gradata/brain.py:70 — initialization as empty list
  • src/gradata/_core.py:56 — reading it in _attribute_domain_fires()
  • tests/test_rule_scoping.py:147 — manually set in test code only

Since nothing appends to this list during normal execution, the loop at src/gradata/_core.py:56 never executes and domain fire/misfire counts remain permanently at their initial values. The rule injection flow must populate _fired_rules when rules are applied, likely in apply_brain_rules() or its callchain.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/gradata/brain.py` at line 70, The _fired_rules list is never updated so
domain attribution in _attribute_domain_fires() never runs; update the rule
application flow (e.g., inside apply_brain_rules() or the concrete function that
executes individual rules) to append a concise identifier for each rule when it
successfully fires (use the same identifier format tests expect, e.g., rule
id/name/object used in tests/test_rule_scoping.py), and ensure you append to
self._fired_rules on the Brain instance before/after any side-effectful action
so attribution logic can iterate over it; also ensure any existing test that
directly sets _fired_rules still passes by matching the same identifier
convention.

self._convergence_cache: dict | None = None
self._convergence_session: int | None = None

logger.debug("Brain init: %s (db=%s)", self.dir, self.db_path)

Expand Down Expand Up @@ -338,6 +341,24 @@ def convergence(self) -> dict:
from gradata._core import brain_convergence
return brain_convergence(self)

def _get_convergence(self) -> dict:
"""Get cached convergence data (one DB query per session)."""
if self._convergence_cache is not None and self._convergence_session == self.session:
return self._convergence_cache
from gradata._core import brain_convergence
self._convergence_cache = brain_convergence(self)
self._convergence_session = self.session
return self._convergence_cache

def efficiency(self, *, estimate_time: bool = False) -> dict:
"""Quantify effort saved by brain learning.

Returns effort_ratio (ratio of current vs initial correction rate).
Pass estimate_time=True for approximate time-saved estimates.
"""
from gradata._core import brain_efficiency
return brain_efficiency(self, estimate_time=estimate_time)

# ── Output Logging ─────────────────────────────────────────────────

def log_output(self, text: str, output_type: str = "general",
Expand Down
Loading
Loading