Conversation
|
DCO Assistant Lite bot All contributors have signed the DCO ✍️ ✅ |
|
please sign dco for review |
| def sync_rules_from_github( | ||
| repo: str = "Agent-Threat-Rule/agent-threat-rules", | ||
| branch: str = "main", | ||
| output: Path | None = None, | ||
| ) -> int: | ||
| """Fetch latest ATR rules from GitHub and update the bundled JSON. | ||
|
|
||
| Requires: git, PyYAML (pip install pyyaml). | ||
| Returns the number of patterns synced. | ||
|
|
||
| Usage:: | ||
|
|
||
| from garak.detectors.atr import sync_rules_from_github | ||
| count = sync_rules_from_github() | ||
| print(f"Synced {count} patterns") | ||
| """ | ||
| import yaml # PyYAML -- optional dependency | ||
|
|
||
| dest = output or _RULES_PATH | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| subprocess.run( | ||
| ["git", "clone", "--depth", "1", "-b", branch, | ||
| f"https://github.com/{repo}.git", tmpdir], | ||
| check=True, capture_output=True, | ||
| ) | ||
| rules_dir = Path(tmpdir) / "rules" | ||
| if not rules_dir.exists(): | ||
| raise FileNotFoundError(f"No rules/ directory in {repo}") |
There was a problem hiding this comment.
please use the data mechanism within garak
ce52ef2 to
c960ed7
Compare
Signed-off-by: Panguard AI <support@panguard.ai> Signed-off-by: eeee2345 <imadam4real@gmail.com>
c960ed7 to
90c887d
Compare
|
I have read the DCO Document and I hereby sign the DCO |
|
recheck |
jmartin-tech
left a comment
There was a problem hiding this comment.
Thanks for submission.
As an extra detector to mix into a run this could be useful. The helper methods that have been placed in the detector should likely be extracted as separate tooling.
| subprocess.run( | ||
| ["git", "clone", "--depth", "1", "-b", branch, | ||
| f"https://github.com/{repo}.git", tmpdir], | ||
| check=True, capture_output=True, | ||
| ) |
There was a problem hiding this comment.
subprocess.run is not an acceptable method of retrieving data at runtime.
| def generate_rule_from_probe( | ||
| probe_outputs: list[str], | ||
| category: str = "prompt-injection", | ||
| severity: str = "high", | ||
| min_common_length: int = 8, | ||
| ) -> str: | ||
| """Generate an ATR rule YAML draft from successful Garak probe outputs. | ||
|
|
||
| Takes a list of strings that bypassed defenses (successful attacks) | ||
| and extracts common substrings as detection patterns. Returns a | ||
| YAML rule string ready for review and submission to ATR. | ||
|
|
||
| This is a starting point -- generated rules should be reviewed by | ||
| a human before being added to the ATR ruleset. | ||
|
|
||
| Usage:: | ||
|
|
||
| from garak.detectors.atr import generate_rule_from_probe | ||
| attacks = ["ignore previous instructions and ...", "forget all rules and ..."] | ||
| rule_yaml = generate_rule_from_probe(attacks, category="prompt-injection") | ||
| print(rule_yaml) | ||
| """ | ||
| if not probe_outputs: | ||
| return "" | ||
|
|
||
| # Extract keywords that appear in 50%+ of outputs | ||
| word_counts: dict[str, int] = {} | ||
| for text in probe_outputs: | ||
| words = set(re.findall(r"\b[a-zA-Z]{4,}\b", text.lower())) | ||
| for w in words: | ||
| word_counts[w] = word_counts.get(w, 0) + 1 | ||
|
|
||
| threshold = max(2, len(probe_outputs) // 2) | ||
| common_words = sorted( | ||
| [w for w, c in word_counts.items() if c >= threshold], | ||
| key=lambda w: word_counts[w], | ||
| reverse=True, | ||
| )[:6] | ||
|
|
||
| if not common_words: | ||
| return "" | ||
|
|
||
| # Build regex pattern from common words | ||
| pattern = r"(?i)\b" + r"\b.*\b".join(re.escape(w) for w in common_words[:4]) + r"\b" | ||
|
|
||
| date = datetime.now().strftime("%Y/%m/%d") | ||
| rule_id = f"ATR-DRAFT-{hash(pattern) % 100000:05d}" | ||
|
|
||
| return f"""title: "Garak-generated: {common_words[0]} pattern" | ||
| id: {rule_id} | ||
| rule_version: 1 | ||
| status: draft | ||
| description: > | ||
| Auto-generated from {len(probe_outputs)} successful Garak probe outputs. | ||
| Common keywords: {', '.join(common_words[:6])}. | ||
| REVIEW REQUIRED before adding to production ruleset. | ||
| author: "garak + ATR" | ||
| date: "{date}" | ||
| schema_version: "0.1" | ||
| detection_tier: pattern | ||
| maturity: experimental | ||
| severity: {severity} | ||
| tags: | ||
| category: {category} | ||
| subcategory: garak-generated | ||
| confidence: low | ||
| agent_source: | ||
| type: mcp_exchange | ||
| framework: [any] | ||
| provider: [any] | ||
| detection: | ||
| conditions: | ||
| - field: content | ||
| operator: regex | ||
| value: '{pattern}' | ||
| description: "Pattern from {len(probe_outputs)} Garak probe outputs" | ||
| condition: any | ||
| response: | ||
| actions: [alert] | ||
| test_cases: | ||
| true_positives: | ||
| - input: "{probe_outputs[0][:100].replace(chr(34), chr(39))}" | ||
| expected: triggered | ||
| """ |
There was a problem hiding this comment.
This is an interesting utility for creating yaml files to contribute to defensive tooling. It is never called by code in this PR. I would suggest it should be extracted as a utility that could be used to post process either a report.jsonl or hitlog.jsonl. I could see this being placed in the tools path for use only in a repo based install or exposed as an analyze module to be available as a package provided utility shipped as part of installed package similar to how report_digest is exposed.
| def sync_rules_from_github( | ||
| repo: str = "Agent-Threat-Rule/agent-threat-rules", | ||
| branch: str = "main", | ||
| output: Path | None = None, | ||
| ) -> int: | ||
| """Fetch latest ATR rules from GitHub and update the bundled JSON. | ||
|
|
||
| Requires: git, PyYAML (pip install pyyaml). | ||
| Returns the number of patterns synced. | ||
|
|
||
| Usage:: | ||
|
|
||
| from garak.detectors.atr import sync_rules_from_github | ||
| count = sync_rules_from_github() | ||
| print(f"Synced {count} patterns") | ||
| """ | ||
| import yaml # PyYAML -- optional dependency | ||
|
|
||
| dest = output or _RULES_PATH | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| subprocess.run( | ||
| ["git", "clone", "--depth", "1", "-b", branch, | ||
| f"https://github.com/{repo}.git", tmpdir], | ||
| check=True, capture_output=True, | ||
| ) | ||
| rules_dir = Path(tmpdir) / "rules" | ||
| if not rules_dir.exists(): | ||
| raise FileNotFoundError(f"No rules/ directory in {repo}") | ||
|
|
||
| result: dict[str, list[list[str]]] = {} | ||
| for yaml_file in sorted(rules_dir.rglob("*.yaml")): | ||
| doc = yaml.safe_load(yaml_file.read_text()) | ||
| if not doc or not doc.get("detection", {}).get("conditions"): | ||
| continue | ||
| cat = doc.get("tags", {}).get("category", "unknown") | ||
| if cat not in result: | ||
| result[cat] = [] | ||
| for cond in doc["detection"]["conditions"]: | ||
| if cond.get("operator") == "regex" and cond.get("value"): | ||
| pat = re.sub(r"^\(\?[imsx]+\)", "", cond["value"]) | ||
| result[cat].append([doc["id"], doc.get("severity", "medium"), pat]) | ||
|
|
||
| dest.write_text(json.dumps(result, indent=2, ensure_ascii=True)) | ||
| total = sum(len(v) for v in result.values()) | ||
| logger.info("ATR sync: %d patterns across %d categories -> %s", total, len(result), dest) | ||
| return total |
There was a problem hiding this comment.
I would suggest this also is likely better extracted into the tools path as a separate utility to be executed independently to configuration the user's system. The utility should likely write the generated configuration to the user's XDG based data_path by default or to stdout so the user can place it in the correct location in their XDG_DATA_HOME for the detector to pick it up in place of the shipped version.
| _RULES_PATH = Path(__file__).parent / "atr_rules.json" | ||
| _ALL_RULES: dict[str, list[list[str]]] = {} | ||
| if _RULES_PATH.exists(): | ||
| with open(_RULES_PATH) as f: | ||
| _ALL_RULES = json.load(f) | ||
| else: | ||
| logger.warning("ATR rules file not found: %s", _RULES_PATH) |
There was a problem hiding this comment.
This should use the data_path access pattern see:
garak/garak/probes/snowball.py
Lines 42 to 47 in 2569a60
from garak.data import path as data_path
This helper class provides access to files in the installed package's data directory and supports user override of the file via the XDG base directory specification so users can provider their own content without needed write permissions to the python runtime library path.
Also it is preferred to load this inside of __init__ for a detector instead of globally on module import. This could be accomplished using a the ABC abstract class patterns. See:
garak/garak/probes/packagehallucination.py
Lines 63 to 102 in 2569a60
|
Thanks @jmartin-tech @leondz for the thorough review. Addressed all four points:
For context on the rule set and methodology — the full spec is at agentthreatrule.org and the academic paper is on Zenodo. Would appreciate any design feedback on how the detector categories map to garak's existing taxonomy — happy to adjust the tagging. |
…subprocess Changes per reviewer comments: 1. Rules loading uses garak's data_path mechanism (L37 feedback) - Moved atr_rules.json -> garak/data/atr/rules.json - Detector loads via from garak.data import path as data_path - Supports XDG user override 2. Removed subprocess.run (L64 feedback) - sync tool uses urllib.request to download zip - No git dependency required 3. Extracted helper methods to tools/ (L85, L171 feedback) - tools/atr.py: sync_rules() + generate_rule() - Writes to XDG data_path by default, or --stdout - Detector is now pure detection logic only 4. Rules loaded in __init__, not module level (L37 feedback) - _load_rules() called per-instance, not on import Signed-off-by: eeee2345 <eeee2345@users.noreply.github.com>
abf1394 to
63f60e8
Compare
jmartin-tech
left a comment
There was a problem hiding this comment.
A few more adjustments requested.
| xdg_dir.mkdir(parents=True, exist_ok=True) | ||
| return xdg_dir / "rules.json" | ||
| except Exception: | ||
| return Path(__file__).parent.parent / "garak" / "data" / "atr" / "rules.json" |
There was a problem hiding this comment.
I am not sure the fallback location for the exception handler makes sense. If the import fails the tool was likely executed from a location other than the repo source. It is also somewhat unexpected for a tool to create something in a relative path like that. I would hazard that support for either XDG path or a user supplied command line location is sufficient and if the XDG path search raises and exception it may best to exit early and suggest the user to supply a valid --output value or utilize the --stdout option.
| return Path(__file__).parent.parent / "garak" / "data" / "atr" / "rules.json" | |
| print("The user XDG storage location could not be identified, supply --output or --stdout options or ensure garak is available in the python environment.", file=sys.stderr) | |
| sys.exit(1) |
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
There was a problem hiding this comment.
Required to ensure consistent windows support:
| main() | |
| sys.stdout.reconfigure(encoding="utf-8") | |
| main() |
|
|
||
| def _load_rules() -> dict[str, list[list[str]]]: | ||
| """Load ATR rules from garak's data directory.""" | ||
| rules_path = data_path / _ATR_RULES_FILE |
There was a problem hiding this comment.
Combining in this manner may not work with data_path as the / operator is doing custom things in this context. I am also not sure the constant really adds value here as it is only used once, I could see value in the constant if utilized in test code.
_RULES_FILENAME = "rules.json"
| rules_path = data_path / _ATR_RULES_FILE | |
| rules_path = data_path / __name__.split(".")[-1] / _RULES_FILENAME |
| self._rules = _load_rules() | ||
| self._compiled: list[tuple[str, re.Pattern]] = [] | ||
| for cat in self.atr_categories or list(self._rules.keys()): | ||
| self._compiled.extend(_compile_category(self._rules, cat)) | ||
| logger.info( | ||
| "ATR detector %s: %d patterns from %d categories", | ||
| self.__class__.__name__, | ||
| len(self._compiled), | ||
| len(self.atr_categories) or len(self._rules), | ||
| ) |
There was a problem hiding this comment.
Based on the code in detect there is no reason to keep the _rules dictionary outside the init scope.
| self._rules = _load_rules() | |
| self._compiled: list[tuple[str, re.Pattern]] = [] | |
| for cat in self.atr_categories or list(self._rules.keys()): | |
| self._compiled.extend(_compile_category(self._rules, cat)) | |
| logger.info( | |
| "ATR detector %s: %d patterns from %d categories", | |
| self.__class__.__name__, | |
| len(self._compiled), | |
| len(self.atr_categories) or len(self._rules), | |
| ) | |
| rules = _load_rules() | |
| self._compiled: list[tuple[str, re.Pattern]] = [] | |
| for cat in self.atr_categories or list(rules.keys()): | |
| self._compiled.extend(_compile_category(rules, cat)) | |
| logger.info( | |
| "ATR detector %s: %d patterns from %d categories", | |
| self.__class__.__name__, | |
| len(self._compiled), | |
| len(self.atr_categories) or len(rules), | |
| ) |
|
@jmartin-tech @leondz — all four review points have been addressed (data_path, no subprocess, extracted tools, init-time loading). Ready for re-review when you have a moment. Since the original submission, ATR has shipped v2.0.0 with some changes worth noting:
Happy to update the PR to v2.0.0 rules if that's useful. The sync tool already supports pulling latest from npm, so garak users would get rule updates automatically via Also — if there's interest, ATR's Threat Cloud can accept detection signals from garak runs. That means every garak user running ATR detectors would contribute back to the rule pipeline. No PII, just pattern hashes. Happy to discuss if that's in scope. |
|
Apologies — my previous comment referenced the round-1 feedback only. I missed that there was a second round of review on 4/10. This commit addresses all round-2 items:
Also updated rules.json to ATR v2.0.0 (113 rules, 736 patterns). |
- tools/atr.py: exit with error if XDG path unavailable (no relative fallback) - tools/atr.py: add sys.stdout.reconfigure for Windows encoding - detectors/atr.py: inline data_path construction, drop module-level constant - detectors/atr.py: move _rules to local scope in __init__ - rules.json: update to ATR v2.0.0 (113 rules, 736 patterns, 9 categories) Signed-off-by: eeee2345 <imadam4real@gmail.com>
35d0cd9 to
ddef4a5
Compare
Summary
Adds ATR (Agent Threat Rules) detectors for AI agent-specific threats not covered by garak's existing injection/jailbreak detectors. Focuses on MCP tool poisoning, skill compromise, context exfiltration, and excessive autonomy.
What's included
Two files:
garak/detectors/atr.py-- 9 detector classes + rule sync + rule generation utilitiesgarak/detectors/atr_rules.json-- 714 regex patterns from 108 ATR rules (bundled, no runtime dependency)9 detector classes:
atr.AgentThreatsatr.PromptInjectionatr.ToolPoisoningatr.CredentialExfiltrationatr.PrivilegeEscalationatr.SkillCompromiseatr.ExcessiveAutonomyatr.AgentManipulationatr.DataPoisoningTwo utility functions:
sync_rules_from_github()-- pull latest ATR rules (requires git + PyYAML, optional)generate_rule_from_probe()-- generate ATR rule drafts from successful garak probe outputs (red team -> blue team feedback loop)Why this belongs in garak
garak's existing detectors cover LLM-level threats (jailbreaks, known-bad signatures, encoding tricks). ATR covers agent-level threats specific to the MCP/tool-use ecosystem:
These are not redundant with existing garak detectors -- they scan for a different attack surface.
Provenance
ATR rules are MIT-licensed, community-driven, and adopted by Cisco AI Defense (34 rules merged into their official skill-scanner). The bundled JSON is a static snapshot;
sync_rules_from_github()enables updates without waiting for garak releases.Source: https://github.com/Agent-Threat-Rule/agent-threat-rules
Usage