From f20d42902ac6b92445a2ac338bac71b2a7b0e11c Mon Sep 17 00:00:00 2001 From: JeremyDev87 Date: Thu, 26 Mar 2026 19:52:59 +0900 Subject: [PATCH] feat(plugin): add per-agent persistent memory across sessions (#947) - Create AgentMemory class with load/save/add_finding/add_pattern/add_preference - FIFO eviction when exceeding max_items (default 50) - JSON storage in ~/.codingbuddy/agent_memory/ per agent - Integrate with stop hook to record session agent activity - 17 tests covering all methods including edge cases --- .../hooks/lib/agent_memory.py | 94 ++++++++++ packages/claude-code-plugin/hooks/stop.py | 16 ++ .../tests/test_agent_memory.py | 173 ++++++++++++++++++ 3 files changed, 283 insertions(+) create mode 100644 packages/claude-code-plugin/hooks/lib/agent_memory.py create mode 100644 packages/claude-code-plugin/tests/test_agent_memory.py diff --git a/packages/claude-code-plugin/hooks/lib/agent_memory.py b/packages/claude-code-plugin/hooks/lib/agent_memory.py new file mode 100644 index 0000000..dddf566 --- /dev/null +++ b/packages/claude-code-plugin/hooks/lib/agent_memory.py @@ -0,0 +1,94 @@ +"""AgentMemory — per-agent persistent knowledge across sessions (#947). + +Stores findings, patterns, and preferences per agent in JSON files +under ~/.codingbuddy/agent_memory/. +""" +import json +import os +from typing import Optional + + +class AgentMemory: + DEFAULT_DIR = os.path.expanduser("~/.codingbuddy/agent_memory") + MAX_ITEMS = 50 + + def __init__(self, memory_dir: Optional[str] = None, max_items: int = MAX_ITEMS): + self.memory_dir = memory_dir or self.DEFAULT_DIR + self.max_items = max_items + + def _filepath(self, agent_name: str) -> str: + return os.path.join(self.memory_dir, f"{agent_name}.json") + + def _empty(self) -> dict: + return {"findings": [], "patterns": [], "preferences": []} + + def load(self, agent_name: str) -> dict: + filepath = self._filepath(agent_name) + if not os.path.isfile(filepath): + return self._empty() + try: + with open(filepath, "r") as f: + data = json.load(f) + if not isinstance(data, dict): + return self._empty() + for key in ("findings", "patterns", "preferences"): + if key not in data or not isinstance(data[key], list): + data[key] = [] + return data + except (json.JSONDecodeError, OSError): + return self._empty() + + def save(self, agent_name: str, memory: dict) -> None: + os.makedirs(self.memory_dir, exist_ok=True) + filepath = self._filepath(agent_name) + with open(filepath, "w") as f: + json.dump(memory, f, indent=2, ensure_ascii=False) + + def _add_entry(self, agent_name: str, category: str, entry: dict) -> None: + data = self.load(agent_name) + data[category].append(entry) + if len(data[category]) > self.max_items: + data[category] = data[category][-self.max_items:] + self.save(agent_name, data) + + def add_finding(self, agent_name: str, finding: dict) -> None: + self._add_entry(agent_name, "findings", finding) + + def add_pattern(self, agent_name: str, pattern: dict) -> None: + self._add_entry(agent_name, "patterns", pattern) + + def add_preference(self, agent_name: str, preference: dict) -> None: + self._add_entry(agent_name, "preferences", preference) + + def get_context_prompt(self, agent_name: str) -> str: + data = self.load(agent_name) + if not any(data[k] for k in ("findings", "patterns", "preferences")): + return "" + parts = [] + if data["findings"]: + parts.append("## Previous Findings") + for f in data["findings"]: + parts.append(f"- {json.dumps(f, ensure_ascii=False)}") + if data["patterns"]: + parts.append("## Recognized Patterns") + for p in data["patterns"]: + parts.append(f"- {json.dumps(p, ensure_ascii=False)}") + if data["preferences"]: + parts.append("## Agent Preferences") + for p in data["preferences"]: + parts.append(f"- {json.dumps(p, ensure_ascii=False)}") + return "\n".join(parts) + + def clear(self, agent_name: str) -> None: + filepath = self._filepath(agent_name) + if os.path.isfile(filepath): + os.remove(filepath) + + def list_agents(self) -> list: + if not os.path.isdir(self.memory_dir): + return [] + return sorted( + os.path.splitext(f)[0] + for f in os.listdir(self.memory_dir) + if f.endswith(".json") + ) diff --git a/packages/claude-code-plugin/hooks/stop.py b/packages/claude-code-plugin/hooks/stop.py index a95abf5..2b3263f 100644 --- a/packages/claude-code-plugin/hooks/stop.py +++ b/packages/claude-code-plugin/hooks/stop.py @@ -70,6 +70,22 @@ def handle_stop(data: dict): except Exception: pass # Never block session stop + # Agent memory: record session agent activity (#947) + try: + from agent_memory import AgentMemory + + agent_name = os.environ.get("CODINGBUDDY_ACTIVE_AGENT", "") + if agent_name: + mem = AgentMemory() + # Record session summary as a finding + if summary: + mem.add_finding(agent_name, { + "session_id": session_id, + "summary": summary[:200], + }) + except Exception: + pass # Never block session stop + # Notify on session end (#829) try: _maybe_notify_session_end(summary) diff --git a/packages/claude-code-plugin/tests/test_agent_memory.py b/packages/claude-code-plugin/tests/test_agent_memory.py new file mode 100644 index 0000000..d1e9add --- /dev/null +++ b/packages/claude-code-plugin/tests/test_agent_memory.py @@ -0,0 +1,173 @@ +"""Tests for AgentMemory — per-agent persistent knowledge across sessions (#947).""" +import json +import os +import sys +import pytest + +# Ensure hooks/lib is on path +_tests_dir = os.path.dirname(os.path.abspath(__file__)) +_lib_dir = os.path.join(os.path.dirname(_tests_dir), "hooks", "lib") +if _lib_dir not in sys.path: + sys.path.insert(0, _lib_dir) + +from agent_memory import AgentMemory + + +@pytest.fixture +def memory_dir(tmp_path): + """Temp directory for agent memory files.""" + d = tmp_path / "agent_memory" + d.mkdir() + return str(d) + + +@pytest.fixture +def mem(memory_dir): + return AgentMemory(memory_dir=memory_dir) + + +class TestLoad: + def test_load_returns_empty_structure_for_new_agent(self, mem): + """Loading a non-existent agent should return empty findings/patterns/preferences.""" + result = mem.load("security-specialist") + assert result == {"findings": [], "patterns": [], "preferences": []} + + def test_load_returns_saved_data(self, mem): + """Loading an existing agent should return its saved data.""" + data = { + "findings": [{"issue": "SQL injection in login"}], + "patterns": [], + "preferences": [], + } + mem.save("security-specialist", data) + result = mem.load("security-specialist") + assert result == data + + def test_load_recovers_from_invalid_json(self, mem, memory_dir): + """Loading a corrupted JSON file should return empty structure.""" + filepath = os.path.join(memory_dir, "broken-agent.json") + with open(filepath, "w") as f: + f.write("{invalid json content") + result = mem.load("broken-agent") + assert result == {"findings": [], "patterns": [], "preferences": []} + + +class TestSave: + def test_save_creates_directory_if_missing(self, tmp_path): + """Save should create the memory directory if it doesn't exist.""" + new_dir = str(tmp_path / "nonexistent" / "agent_memory") + mem = AgentMemory(memory_dir=new_dir) + mem.save("test-agent", {"findings": [], "patterns": [], "preferences": []}) + assert os.path.isdir(new_dir) + + def test_save_writes_valid_json(self, mem, memory_dir): + """Saved file should contain valid JSON matching the input data.""" + data = { + "findings": [{"issue": "XSS"}], + "patterns": [{"name": "unsanitized input"}], + "preferences": [{"style": "strict CSP"}], + } + mem.save("web-specialist", data) + filepath = os.path.join(memory_dir, "web-specialist.json") + with open(filepath) as f: + loaded = json.load(f) + assert loaded == data + + +class TestAddFinding: + def test_add_finding_appends_to_list(self, mem): + """Adding a finding should append it to the agent's findings list.""" + mem.add_finding("sec-agent", {"issue": "hardcoded secret"}) + mem.add_finding("sec-agent", {"issue": "open redirect"}) + data = mem.load("sec-agent") + assert len(data["findings"]) == 2 + assert data["findings"][0]["issue"] == "hardcoded secret" + assert data["findings"][1]["issue"] == "open redirect" + + def test_add_finding_fifo_eviction(self, tmp_path): + """When findings exceed max_items, oldest should be evicted (FIFO).""" + mem = AgentMemory(memory_dir=str(tmp_path / "mem"), max_items=3) + for i in range(5): + mem.add_finding("agent", {"id": i}) + data = mem.load("agent") + assert len(data["findings"]) == 3 + assert data["findings"][0]["id"] == 2 + assert data["findings"][2]["id"] == 4 + + +class TestAddPattern: + def test_add_pattern_appends(self, mem): + """Adding a pattern should append it to the agent's patterns list.""" + mem.add_pattern("qa-agent", {"name": "flaky retry"}) + data = mem.load("qa-agent") + assert len(data["patterns"]) == 1 + assert data["patterns"][0]["name"] == "flaky retry" + + def test_add_pattern_fifo_eviction(self, tmp_path): + """When patterns exceed max_items, oldest should be evicted (FIFO).""" + mem = AgentMemory(memory_dir=str(tmp_path / "mem"), max_items=2) + for i in range(4): + mem.add_pattern("agent", {"id": i}) + data = mem.load("agent") + assert len(data["patterns"]) == 2 + assert data["patterns"][0]["id"] == 2 + + +class TestAddPreference: + def test_add_preference_appends(self, mem): + """Adding a preference should append it to the agent's preferences list.""" + mem.add_preference("code-reviewer", {"style": "verbose comments"}) + data = mem.load("code-reviewer") + assert len(data["preferences"]) == 1 + + def test_add_preference_fifo_eviction(self, tmp_path): + """When preferences exceed max_items, oldest should be evicted (FIFO).""" + mem = AgentMemory(memory_dir=str(tmp_path / "mem"), max_items=2) + for i in range(3): + mem.add_preference("agent", {"id": i}) + data = mem.load("agent") + assert len(data["preferences"]) == 2 + assert data["preferences"][0]["id"] == 1 + + +class TestGetContextPrompt: + def test_returns_empty_string_for_new_agent(self, mem): + """Context prompt for agent with no memory should be empty string.""" + result = mem.get_context_prompt("unknown-agent") + assert result == "" + + def test_returns_formatted_context(self, mem): + """Context prompt should include findings, patterns, and preferences.""" + mem.add_finding("sec", {"issue": "SQL injection"}) + mem.add_pattern("sec", {"name": "unsanitized input"}) + mem.add_preference("sec", {"style": "parameterized queries"}) + result = mem.get_context_prompt("sec") + assert "SQL injection" in result + assert "unsanitized input" in result + assert "parameterized queries" in result + + +class TestClear: + def test_clear_removes_agent_memory(self, mem): + """Clear should remove all memory for an agent.""" + mem.add_finding("agent", {"issue": "test"}) + mem.clear("agent") + data = mem.load("agent") + assert data == {"findings": [], "patterns": [], "preferences": []} + + def test_clear_nonexistent_agent_no_error(self, mem): + """Clearing a non-existent agent should not raise.""" + mem.clear("ghost-agent") # Should not raise + + +class TestListAgents: + def test_list_agents_empty(self, mem): + """List agents should return empty list when no memories exist.""" + assert mem.list_agents() == [] + + def test_list_agents_returns_saved_agents(self, mem): + """List agents should return names of all agents with saved memory.""" + mem.save("agent-a", {"findings": [], "patterns": [], "preferences": []}) + mem.save("agent-b", {"findings": [], "patterns": [], "preferences": []}) + agents = mem.list_agents() + assert sorted(agents) == ["agent-a", "agent-b"]