Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions packages/claude-code-plugin/hooks/lib/pattern_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Detect repeated error patterns from HistoryDB execution history."""
import logging
import time

from hooks.lib.history_db import HistoryDB

logger = logging.getLogger(__name__)


class PatternDetector:
"""Analyzes tool_calls in HistoryDB to find repeated failure patterns."""

def __init__(self, db: HistoryDB):
self._db = db

def detect_patterns(
self, min_occurrences: int = 3, days: int = 30
) -> list:
"""Detect repeated failure patterns from execution history.

Args:
min_occurrences: Minimum number of failures to count as a pattern.
days: How many days back to search.

Returns:
List of pattern dicts with keys: tool_name, input_summary,
failure_count, session_count, first_seen, last_seen.
"""
cutoff = time.time() - (days * 86400)
try:
cursor = self._db._conn.execute(
"""
SELECT
tool_name,
input_summary,
COUNT(*) AS failure_count,
COUNT(DISTINCT session_id) AS session_count,
MIN(timestamp) AS first_seen,
MAX(timestamp) AS last_seen
FROM tool_calls
WHERE success = 0 AND timestamp >= ?
GROUP BY tool_name, input_summary
HAVING COUNT(*) >= ?
ORDER BY failure_count DESC
""",
(cutoff, min_occurrences),
)
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error("Failed to detect patterns: %s", e)
return []
89 changes: 89 additions & 0 deletions packages/claude-code-plugin/hooks/lib/rule_suggester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Generate rule suggestions from detected error patterns."""
import datetime
import logging

logger = logging.getLogger(__name__)

# Map tool names to actionable guidance templates
_ACTION_TEMPLATES = {
"Bash": "Consider verifying the command or its prerequisites before running `{input_summary}`.",
"Read": "Ensure the file `{input_summary}` exists and is accessible before reading.",
"Write": "Check that the target path for `{input_summary}` is valid and writable before writing.",
"Edit": "Verify the target content exists in the file before attempting to edit `{input_summary}`.",
}

_DEFAULT_ACTION = "Consider adding a pre-check or guard to avoid repeated failures with `{input_summary}`."


class RuleSuggester:
"""Generates draft rules in .ai-rules markdown format from detected patterns."""

def suggest_rules(self, patterns: list) -> list:
"""Generate rule suggestions from detected patterns.

Args:
patterns: List of pattern dicts from PatternDetector.detect_patterns().

Returns:
List of suggestion dicts with keys: title, description, rule_content, pattern.
"""
if not patterns:
return []

suggestions = []
for pattern in patterns:
tool_name = pattern["tool_name"]
input_summary = pattern.get("input_summary") or "unknown"
failure_count = pattern["failure_count"]
session_count = pattern["session_count"]

title = f"Repeated {tool_name} failure: {input_summary}"
description = (
f"The `{tool_name}` tool failed {failure_count} times "
f"across {session_count} sessions with input `{input_summary}`."
)

rule_content = self._generate_rule_content(
tool_name=tool_name,
input_summary=input_summary,
failure_count=failure_count,
session_count=session_count,
)

suggestions.append(
{
"title": title,
"description": description,
"rule_content": rule_content,
"pattern": pattern,
}
)

return suggestions

def _generate_rule_content(
self,
tool_name: str,
input_summary: str,
failure_count: int,
session_count: int,
) -> str:
"""Generate markdown rule content for a single pattern."""
template = _ACTION_TEMPLATES.get(tool_name, _DEFAULT_ACTION)
action = template.format(input_summary=input_summary)
now = datetime.datetime.utcnow().strftime("%Y-%m-%d")

return (
f"# Repeated {tool_name} failure: {input_summary}\n"
f"\n"
f"> Auto-detected rule — {now}\n"
f"\n"
f"## Context\n"
f"\n"
f"The `{tool_name}` tool with input `{input_summary}` has failed "
f"{failure_count} times across {session_count} sessions.\n"
f"\n"
f"## Guideline\n"
f"\n"
f"{action}\n"
)
44 changes: 44 additions & 0 deletions packages/claude-code-plugin/hooks/lib/suggest_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Entry point for auto-learning: detect patterns and suggest rules."""
import logging

from hooks.lib.history_db import HistoryDB
from hooks.lib.pattern_detector import PatternDetector
from hooks.lib.rule_suggester import RuleSuggester

logger = logging.getLogger(__name__)


def suggest_rules(
db_or_path, min_occurrences: int = 3, days: int = 30
) -> list:
"""Detect repeated failure patterns and generate rule suggestions.

Args:
db_or_path: A HistoryDB instance or a string path to the SQLite database.
min_occurrences: Minimum failures to count as a pattern.
days: How many days back to search.

Returns:
List of suggestion dicts with keys: title, description, rule_content, pattern.
"""
owns_db = False
if isinstance(db_or_path, str):
db = HistoryDB(db_path=db_or_path)
owns_db = True
else:
db = db_or_path

try:
detector = PatternDetector(db)
patterns = detector.detect_patterns(
min_occurrences=min_occurrences, days=days
)

suggester = RuleSuggester()
return suggester.suggest_rules(patterns)
except Exception as e:
logger.error("Failed to suggest rules: %s", e)
return []
finally:
if owns_db:
db.close()
170 changes: 170 additions & 0 deletions packages/claude-code-plugin/tests/test_pattern_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""Tests for PatternDetector — detects repeated error patterns from HistoryDB."""
import os
import tempfile
import time

import pytest

from hooks.lib.history_db import HistoryDB
from hooks.lib.pattern_detector import PatternDetector


@pytest.fixture
def db_dir():
"""Create a temporary directory for test databases."""
d = tempfile.mkdtemp()
yield d


@pytest.fixture
def db_path(db_dir):
return os.path.join(db_dir, "history.db")


@pytest.fixture
def db(db_path):
"""Create a HistoryDB instance with a temp path."""
instance = HistoryDB(db_path=db_path)
yield instance
instance.close()


@pytest.fixture
def detector(db):
"""Create a PatternDetector wrapping the test DB."""
return PatternDetector(db)


class TestEmptyDatabase:
def test_returns_empty_list_when_no_data(self, detector):
"""Empty DB should return no patterns."""
patterns = detector.detect_patterns()
assert patterns == []

def test_returns_empty_list_when_no_failures(self, detector, db):
"""DB with only successful calls should return no patterns."""
db.start_session("s1", project="/proj")
db.record_tool_call("s1", "Read", input_summary="file.py", success=True)
db.record_tool_call("s1", "Write", input_summary="out.py", success=True)
patterns = detector.detect_patterns()
assert patterns == []


class TestPatternDetection:
def test_detects_repeated_failure_pattern(self, detector, db):
"""Same tool+input failing 3+ times across sessions = pattern."""
for i in range(3):
sid = f"sess-{i}"
db.start_session(sid, project="/proj")
db.record_tool_call(sid, "Bash", input_summary="npm test", success=False)

patterns = detector.detect_patterns()
assert len(patterns) == 1
assert patterns[0]["tool_name"] == "Bash"
assert patterns[0]["input_summary"] == "npm test"
assert patterns[0]["failure_count"] >= 3

def test_does_not_detect_below_threshold(self, detector, db):
"""2 failures should NOT trigger a pattern (threshold=3)."""
for i in range(2):
sid = f"sess-{i}"
db.start_session(sid, project="/proj")
db.record_tool_call(sid, "Bash", input_summary="npm test", success=False)

patterns = detector.detect_patterns()
assert patterns == []

def test_custom_threshold(self, detector, db):
"""Custom threshold should be respected."""
for i in range(5):
sid = f"sess-{i}"
db.start_session(sid, project="/proj")
db.record_tool_call(sid, "Bash", input_summary="npm test", success=False)

patterns_high = detector.detect_patterns(min_occurrences=6)
assert patterns_high == []

patterns_low = detector.detect_patterns(min_occurrences=4)
assert len(patterns_low) == 1

def test_multiple_distinct_patterns(self, detector, db):
"""Different tool+input combos should produce separate patterns."""
for i in range(3):
sid = f"sess-{i}"
db.start_session(sid, project="/proj")
db.record_tool_call(sid, "Bash", input_summary="npm test", success=False)
db.record_tool_call(sid, "Read", input_summary="/etc/config", success=False)

patterns = detector.detect_patterns()
assert len(patterns) == 2
tool_names = {p["tool_name"] for p in patterns}
assert tool_names == {"Bash", "Read"}

def test_same_session_failures_counted(self, detector, db):
"""Multiple failures in one session should count toward threshold."""
db.start_session("s1", project="/proj")
for _ in range(3):
db.record_tool_call("s1", "Bash", input_summary="npm test", success=False)

patterns = detector.detect_patterns()
assert len(patterns) == 1

def test_mixed_success_failure_only_counts_failures(self, detector, db):
"""Only failures are counted, not successes."""
for i in range(5):
sid = f"sess-{i}"
db.start_session(sid, project="/proj")
db.record_tool_call(sid, "Bash", input_summary="npm test", success=True)

# Only 2 failures
db.record_tool_call("sess-0", "Bash", input_summary="npm test", success=False)
db.record_tool_call("sess-1", "Bash", input_summary="npm test", success=False)

patterns = detector.detect_patterns()
assert patterns == []


class TestPatternMetadata:
def test_pattern_includes_session_count(self, detector, db):
"""Pattern should report how many distinct sessions had the failure."""
for i in range(4):
sid = f"sess-{i}"
db.start_session(sid, project="/proj")
db.record_tool_call(sid, "Bash", input_summary="npm test", success=False)

patterns = detector.detect_patterns()
assert len(patterns) == 1
assert patterns[0]["session_count"] >= 3

def test_pattern_includes_first_and_last_seen(self, detector, db):
"""Pattern should include timestamps for first and last occurrence."""
for i in range(3):
sid = f"sess-{i}"
db.start_session(sid, project="/proj")
db.record_tool_call(sid, "Bash", input_summary="npm test", success=False)

patterns = detector.detect_patterns()
assert "first_seen" in patterns[0]
assert "last_seen" in patterns[0]
assert patterns[0]["first_seen"] <= patterns[0]["last_seen"]

def test_days_filter_limits_scope(self, detector, db):
"""days parameter should limit how far back we look."""
db.start_session("recent", project="/proj")
db.record_tool_call("recent", "Bash", input_summary="npm test", success=False)

# Insert old failures directly
old_time = time.time() - (60 * 86400)
for i in range(3):
db._conn.execute(
"INSERT INTO tool_calls (session_id, timestamp, tool_name, input_summary, success) "
"VALUES (?, ?, ?, ?, ?)",
(f"old-{i}", old_time, "Bash", "npm test", 0),
)
db._conn.commit()

patterns_recent = detector.detect_patterns(days=30)
assert patterns_recent == []

patterns_all = detector.detect_patterns(days=90)
assert len(patterns_all) == 1
Loading
Loading