diff --git a/packages/claude-code-plugin/hooks/lib/pattern_detector.py b/packages/claude-code-plugin/hooks/lib/pattern_detector.py new file mode 100644 index 0000000..b435f0b --- /dev/null +++ b/packages/claude-code-plugin/hooks/lib/pattern_detector.py @@ -0,0 +1,52 @@ +"""Detect repeated error patterns from HistoryDB execution history.""" +import logging +import time + +from hooks.lib.history_db import HistoryDB + +logger = logging.getLogger(__name__) + + +class PatternDetector: + """Analyzes tool_calls in HistoryDB to find repeated failure patterns.""" + + def __init__(self, db: HistoryDB): + self._db = db + + def detect_patterns( + self, min_occurrences: int = 3, days: int = 30 + ) -> list: + """Detect repeated failure patterns from execution history. + + Args: + min_occurrences: Minimum number of failures to count as a pattern. + days: How many days back to search. + + Returns: + List of pattern dicts with keys: tool_name, input_summary, + failure_count, session_count, first_seen, last_seen. + """ + cutoff = time.time() - (days * 86400) + try: + cursor = self._db._conn.execute( + """ + SELECT + tool_name, + input_summary, + COUNT(*) AS failure_count, + COUNT(DISTINCT session_id) AS session_count, + MIN(timestamp) AS first_seen, + MAX(timestamp) AS last_seen + FROM tool_calls + WHERE success = 0 AND timestamp >= ? + GROUP BY tool_name, input_summary + HAVING COUNT(*) >= ? + ORDER BY failure_count DESC + """, + (cutoff, min_occurrences), + ) + columns = [desc[0] for desc in cursor.description] + return [dict(zip(columns, row)) for row in cursor.fetchall()] + except Exception as e: + logger.error("Failed to detect patterns: %s", e) + return [] diff --git a/packages/claude-code-plugin/hooks/lib/rule_suggester.py b/packages/claude-code-plugin/hooks/lib/rule_suggester.py new file mode 100644 index 0000000..9f0d9fc --- /dev/null +++ b/packages/claude-code-plugin/hooks/lib/rule_suggester.py @@ -0,0 +1,89 @@ +"""Generate rule suggestions from detected error patterns.""" +import datetime +import logging + +logger = logging.getLogger(__name__) + +# Map tool names to actionable guidance templates +_ACTION_TEMPLATES = { + "Bash": "Consider verifying the command or its prerequisites before running `{input_summary}`.", + "Read": "Ensure the file `{input_summary}` exists and is accessible before reading.", + "Write": "Check that the target path for `{input_summary}` is valid and writable before writing.", + "Edit": "Verify the target content exists in the file before attempting to edit `{input_summary}`.", +} + +_DEFAULT_ACTION = "Consider adding a pre-check or guard to avoid repeated failures with `{input_summary}`." + + +class RuleSuggester: + """Generates draft rules in .ai-rules markdown format from detected patterns.""" + + def suggest_rules(self, patterns: list) -> list: + """Generate rule suggestions from detected patterns. + + Args: + patterns: List of pattern dicts from PatternDetector.detect_patterns(). + + Returns: + List of suggestion dicts with keys: title, description, rule_content, pattern. + """ + if not patterns: + return [] + + suggestions = [] + for pattern in patterns: + tool_name = pattern["tool_name"] + input_summary = pattern.get("input_summary") or "unknown" + failure_count = pattern["failure_count"] + session_count = pattern["session_count"] + + title = f"Repeated {tool_name} failure: {input_summary}" + description = ( + f"The `{tool_name}` tool failed {failure_count} times " + f"across {session_count} sessions with input `{input_summary}`." + ) + + rule_content = self._generate_rule_content( + tool_name=tool_name, + input_summary=input_summary, + failure_count=failure_count, + session_count=session_count, + ) + + suggestions.append( + { + "title": title, + "description": description, + "rule_content": rule_content, + "pattern": pattern, + } + ) + + return suggestions + + def _generate_rule_content( + self, + tool_name: str, + input_summary: str, + failure_count: int, + session_count: int, + ) -> str: + """Generate markdown rule content for a single pattern.""" + template = _ACTION_TEMPLATES.get(tool_name, _DEFAULT_ACTION) + action = template.format(input_summary=input_summary) + now = datetime.datetime.utcnow().strftime("%Y-%m-%d") + + return ( + f"# Repeated {tool_name} failure: {input_summary}\n" + f"\n" + f"> Auto-detected rule — {now}\n" + f"\n" + f"## Context\n" + f"\n" + f"The `{tool_name}` tool with input `{input_summary}` has failed " + f"{failure_count} times across {session_count} sessions.\n" + f"\n" + f"## Guideline\n" + f"\n" + f"{action}\n" + ) diff --git a/packages/claude-code-plugin/hooks/lib/suggest_rules.py b/packages/claude-code-plugin/hooks/lib/suggest_rules.py new file mode 100644 index 0000000..dc1b994 --- /dev/null +++ b/packages/claude-code-plugin/hooks/lib/suggest_rules.py @@ -0,0 +1,44 @@ +"""Entry point for auto-learning: detect patterns and suggest rules.""" +import logging + +from hooks.lib.history_db import HistoryDB +from hooks.lib.pattern_detector import PatternDetector +from hooks.lib.rule_suggester import RuleSuggester + +logger = logging.getLogger(__name__) + + +def suggest_rules( + db_or_path, min_occurrences: int = 3, days: int = 30 +) -> list: + """Detect repeated failure patterns and generate rule suggestions. + + Args: + db_or_path: A HistoryDB instance or a string path to the SQLite database. + min_occurrences: Minimum failures to count as a pattern. + days: How many days back to search. + + Returns: + List of suggestion dicts with keys: title, description, rule_content, pattern. + """ + owns_db = False + if isinstance(db_or_path, str): + db = HistoryDB(db_path=db_or_path) + owns_db = True + else: + db = db_or_path + + try: + detector = PatternDetector(db) + patterns = detector.detect_patterns( + min_occurrences=min_occurrences, days=days + ) + + suggester = RuleSuggester() + return suggester.suggest_rules(patterns) + except Exception as e: + logger.error("Failed to suggest rules: %s", e) + return [] + finally: + if owns_db: + db.close() diff --git a/packages/claude-code-plugin/tests/test_pattern_detector.py b/packages/claude-code-plugin/tests/test_pattern_detector.py new file mode 100644 index 0000000..595233f --- /dev/null +++ b/packages/claude-code-plugin/tests/test_pattern_detector.py @@ -0,0 +1,170 @@ +"""Tests for PatternDetector — detects repeated error patterns from HistoryDB.""" +import os +import tempfile +import time + +import pytest + +from hooks.lib.history_db import HistoryDB +from hooks.lib.pattern_detector import PatternDetector + + +@pytest.fixture +def db_dir(): + """Create a temporary directory for test databases.""" + d = tempfile.mkdtemp() + yield d + + +@pytest.fixture +def db_path(db_dir): + return os.path.join(db_dir, "history.db") + + +@pytest.fixture +def db(db_path): + """Create a HistoryDB instance with a temp path.""" + instance = HistoryDB(db_path=db_path) + yield instance + instance.close() + + +@pytest.fixture +def detector(db): + """Create a PatternDetector wrapping the test DB.""" + return PatternDetector(db) + + +class TestEmptyDatabase: + def test_returns_empty_list_when_no_data(self, detector): + """Empty DB should return no patterns.""" + patterns = detector.detect_patterns() + assert patterns == [] + + def test_returns_empty_list_when_no_failures(self, detector, db): + """DB with only successful calls should return no patterns.""" + db.start_session("s1", project="/proj") + db.record_tool_call("s1", "Read", input_summary="file.py", success=True) + db.record_tool_call("s1", "Write", input_summary="out.py", success=True) + patterns = detector.detect_patterns() + assert patterns == [] + + +class TestPatternDetection: + def test_detects_repeated_failure_pattern(self, detector, db): + """Same tool+input failing 3+ times across sessions = pattern.""" + for i in range(3): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="npm test", success=False) + + patterns = detector.detect_patterns() + assert len(patterns) == 1 + assert patterns[0]["tool_name"] == "Bash" + assert patterns[0]["input_summary"] == "npm test" + assert patterns[0]["failure_count"] >= 3 + + def test_does_not_detect_below_threshold(self, detector, db): + """2 failures should NOT trigger a pattern (threshold=3).""" + for i in range(2): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="npm test", success=False) + + patterns = detector.detect_patterns() + assert patterns == [] + + def test_custom_threshold(self, detector, db): + """Custom threshold should be respected.""" + for i in range(5): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="npm test", success=False) + + patterns_high = detector.detect_patterns(min_occurrences=6) + assert patterns_high == [] + + patterns_low = detector.detect_patterns(min_occurrences=4) + assert len(patterns_low) == 1 + + def test_multiple_distinct_patterns(self, detector, db): + """Different tool+input combos should produce separate patterns.""" + for i in range(3): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="npm test", success=False) + db.record_tool_call(sid, "Read", input_summary="/etc/config", success=False) + + patterns = detector.detect_patterns() + assert len(patterns) == 2 + tool_names = {p["tool_name"] for p in patterns} + assert tool_names == {"Bash", "Read"} + + def test_same_session_failures_counted(self, detector, db): + """Multiple failures in one session should count toward threshold.""" + db.start_session("s1", project="/proj") + for _ in range(3): + db.record_tool_call("s1", "Bash", input_summary="npm test", success=False) + + patterns = detector.detect_patterns() + assert len(patterns) == 1 + + def test_mixed_success_failure_only_counts_failures(self, detector, db): + """Only failures are counted, not successes.""" + for i in range(5): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="npm test", success=True) + + # Only 2 failures + db.record_tool_call("sess-0", "Bash", input_summary="npm test", success=False) + db.record_tool_call("sess-1", "Bash", input_summary="npm test", success=False) + + patterns = detector.detect_patterns() + assert patterns == [] + + +class TestPatternMetadata: + def test_pattern_includes_session_count(self, detector, db): + """Pattern should report how many distinct sessions had the failure.""" + for i in range(4): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="npm test", success=False) + + patterns = detector.detect_patterns() + assert len(patterns) == 1 + assert patterns[0]["session_count"] >= 3 + + def test_pattern_includes_first_and_last_seen(self, detector, db): + """Pattern should include timestamps for first and last occurrence.""" + for i in range(3): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="npm test", success=False) + + patterns = detector.detect_patterns() + assert "first_seen" in patterns[0] + assert "last_seen" in patterns[0] + assert patterns[0]["first_seen"] <= patterns[0]["last_seen"] + + def test_days_filter_limits_scope(self, detector, db): + """days parameter should limit how far back we look.""" + db.start_session("recent", project="/proj") + db.record_tool_call("recent", "Bash", input_summary="npm test", success=False) + + # Insert old failures directly + old_time = time.time() - (60 * 86400) + for i in range(3): + db._conn.execute( + "INSERT INTO tool_calls (session_id, timestamp, tool_name, input_summary, success) " + "VALUES (?, ?, ?, ?, ?)", + (f"old-{i}", old_time, "Bash", "npm test", 0), + ) + db._conn.commit() + + patterns_recent = detector.detect_patterns(days=30) + assert patterns_recent == [] + + patterns_all = detector.detect_patterns(days=90) + assert len(patterns_all) == 1 diff --git a/packages/claude-code-plugin/tests/test_rule_suggester.py b/packages/claude-code-plugin/tests/test_rule_suggester.py new file mode 100644 index 0000000..2de2d0c --- /dev/null +++ b/packages/claude-code-plugin/tests/test_rule_suggester.py @@ -0,0 +1,107 @@ +"""Tests for RuleSuggester — generates rule suggestions from detected patterns.""" +import pytest + +from hooks.lib.rule_suggester import RuleSuggester + + +@pytest.fixture +def suggester(): + return RuleSuggester() + + +SAMPLE_PATTERN = { + "tool_name": "Bash", + "input_summary": "npm test", + "failure_count": 5, + "session_count": 4, + "first_seen": 1711000000.0, + "last_seen": 1711300000.0, +} + +SAMPLE_PATTERN_READ = { + "tool_name": "Read", + "input_summary": "/etc/missing-config.json", + "failure_count": 3, + "session_count": 3, + "first_seen": 1711000000.0, + "last_seen": 1711200000.0, +} + + +class TestEmptyInput: + def test_returns_empty_list_for_no_patterns(self, suggester): + """No patterns should produce no suggestions.""" + suggestions = suggester.suggest_rules([]) + assert suggestions == [] + + +class TestRuleSuggestionContent: + def test_suggestion_has_required_keys(self, suggester): + """Each suggestion must have title, description, rule_content, pattern.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN]) + assert len(suggestions) == 1 + s = suggestions[0] + assert "title" in s + assert "description" in s + assert "rule_content" in s + assert "pattern" in s + + def test_title_includes_tool_name(self, suggester): + """Title should reference the failing tool.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN]) + assert "Bash" in suggestions[0]["title"] + + def test_description_includes_failure_count(self, suggester): + """Description should mention how often the failure occurred.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN]) + assert "5" in suggestions[0]["description"] + + def test_rule_content_is_valid_markdown(self, suggester): + """rule_content should be valid markdown with a heading.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN]) + content = suggestions[0]["rule_content"] + assert content.startswith("#") + assert "Bash" in content + assert "npm test" in content + + def test_pattern_reference_preserved(self, suggester): + """The original pattern dict should be included.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN]) + assert suggestions[0]["pattern"] == SAMPLE_PATTERN + + +class TestMultiplePatterns: + def test_generates_one_suggestion_per_pattern(self, suggester): + """Each pattern should produce exactly one suggestion.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN, SAMPLE_PATTERN_READ]) + assert len(suggestions) == 2 + + def test_suggestions_have_unique_titles(self, suggester): + """Each suggestion should have a distinct title.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN, SAMPLE_PATTERN_READ]) + titles = [s["title"] for s in suggestions] + assert len(set(titles)) == 2 + + +class TestRuleContentFormat: + def test_rule_content_has_metadata_section(self, suggester): + """Rule content should include auto-generated metadata.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN]) + content = suggestions[0]["rule_content"] + assert "Auto-detected" in content or "auto-detected" in content + + def test_rule_content_includes_session_count(self, suggester): + """Rule content should mention affected sessions.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN]) + content = suggestions[0]["rule_content"] + assert "4" in content # session_count + + def test_rule_content_includes_suggestion_action(self, suggester): + """Rule content should include a suggested action or guideline.""" + suggestions = suggester.suggest_rules([SAMPLE_PATTERN]) + content = suggestions[0]["rule_content"] + # Should have some actionable guidance + assert any( + keyword in content.lower() + for keyword in ["before", "ensure", "check", "verify", "avoid", "consider"] + ) diff --git a/packages/claude-code-plugin/tests/test_suggest_rules.py b/packages/claude-code-plugin/tests/test_suggest_rules.py new file mode 100644 index 0000000..356d159 --- /dev/null +++ b/packages/claude-code-plugin/tests/test_suggest_rules.py @@ -0,0 +1,100 @@ +"""Tests for suggest_rules — end-to-end integration of pattern detection + rule suggestion.""" +import os +import tempfile + +import pytest + +from hooks.lib.history_db import HistoryDB +from hooks.lib.suggest_rules import suggest_rules + + +@pytest.fixture +def db_dir(): + d = tempfile.mkdtemp() + yield d + + +@pytest.fixture +def db_path(db_dir): + return os.path.join(db_dir, "history.db") + + +@pytest.fixture +def db(db_path): + instance = HistoryDB(db_path=db_path) + yield instance + instance.close() + + +class TestSuggestRulesIntegration: + def test_empty_db_returns_empty(self, db): + """Empty database should return no suggestions.""" + result = suggest_rules(db) + assert result == [] + + def test_end_to_end_pattern_to_rule(self, db): + """Failures meeting threshold should produce rule suggestions.""" + for i in range(4): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="yarn build", success=False) + + result = suggest_rules(db) + assert len(result) == 1 + assert "Bash" in result[0]["title"] + assert "yarn build" in result[0]["rule_content"] + assert result[0]["rule_content"].startswith("#") + + def test_below_threshold_returns_empty(self, db): + """Failures below threshold should not generate suggestions.""" + db.start_session("s1", project="/proj") + db.record_tool_call("s1", "Bash", input_summary="yarn build", success=False) + db.start_session("s2", project="/proj") + db.record_tool_call("s2", "Bash", input_summary="yarn build", success=False) + + result = suggest_rules(db) + assert result == [] + + def test_custom_params_forwarded(self, db): + """min_occurrences and days should be forwarded to detector.""" + for i in range(5): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Read", input_summary="missing.py", success=False) + + result = suggest_rules(db, min_occurrences=6) + assert result == [] + + result = suggest_rules(db, min_occurrences=4) + assert len(result) == 1 + + def test_multiple_patterns_produce_multiple_rules(self, db): + """Multiple distinct failure patterns should each get a rule.""" + for i in range(3): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="npm test", success=False) + db.record_tool_call(sid, "Read", input_summary="/no/such/file", success=False) + + result = suggest_rules(db) + assert len(result) == 2 + + +class TestSuggestRulesWithDbPath: + def test_accepts_db_path_string(self, db_path): + """suggest_rules should accept a string path and open DB internally.""" + db = HistoryDB(db_path=db_path) + for i in range(3): + sid = f"sess-{i}" + db.start_session(sid, project="/proj") + db.record_tool_call(sid, "Bash", input_summary="make", success=False) + db.close() + + result = suggest_rules(db_path) + assert len(result) == 1 + + def test_nonexistent_db_path_returns_empty(self, db_dir): + """Non-existent DB path should create empty DB and return no suggestions.""" + path = os.path.join(db_dir, "nonexistent", "history.db") + result = suggest_rules(path) + assert result == []