From b827996e1009c4bc67b0a8632be0b6d6d3a8e145 Mon Sep 17 00:00:00 2001 From: Evgeny Kiriyak <224408464+evkir@users.noreply.github.com> Date: Sun, 24 May 2026 16:10:49 +0300 Subject: [PATCH 1/4] refactor(recon): migrate ReconAgent to new BaseAgent contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - run(target, context) instead of run(input_data) — target is an explicit arg, not pulled from self.session - self.kb.set(key, value, agent=...) instead of writing to the non-existent self.session.knowledge_base dict - add_finding(severity=, title=, ...) called with keyword args directly instead of constructing a Finding object (matches day-3 API) - AGENT_NAME / ROLE set explicitly Refs: STANDOFF.md day 6/30 --- cyberai/agents/recon/agent.py | 70 +++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/cyberai/agents/recon/agent.py b/cyberai/agents/recon/agent.py index 53341d8..1a88ace 100644 --- a/cyberai/agents/recon/agent.py +++ b/cyberai/agents/recon/agent.py @@ -1,83 +1,91 @@ -from typing import Dict, Any +"""ReconAgent — nmap → whois → DNS → subdomain enumeration.""" +from __future__ import annotations + +from typing import Any, Dict, Optional + from cyberai.core.base_agent import BaseAgent, Tool -from cyberai.core.session import Finding, Severity +from cyberai.core.scan_session import Severity + +from .dns_tool import detect_subdomains, run_dns, run_whois from .nmap_tool import run_nmap -from .dns_tool import run_whois, run_dns, detect_subdomains + class ReconAgent(BaseAgent): """ Reconnaissance agent. - Runs: nmap → whois → DNS → subdomain enum - Stores all results in session knowledge base. + Runs nmap → whois → DNS → subdomain enum, stores results in the KB. """ - def _register_tools(self): + AGENT_NAME = "recon" + ROLE = "Reconnaissance Specialist" + + def _register_tools(self) -> None: self.register_tool(Tool( name="nmap_scan", description="Port scan target with nmap", func=run_nmap, - parameters={"target": "str", "flags": "str"} + parameters={"target": "str", "flags": "str"}, )) self.register_tool(Tool( name="whois_lookup", description="WHOIS lookup for domain info", func=run_whois, - parameters={"target": "str"} + parameters={"target": "str"}, )) self.register_tool(Tool( name="dns_enum", description="DNS record enumeration", func=run_dns, - parameters={"target": "str"} + parameters={"target": "str"}, )) self.register_tool(Tool( name="subdomain_scan", description="Subdomain bruteforce", func=detect_subdomains, - parameters={"target": "str"} + parameters={"target": "str"}, )) - def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: - target = self.session.target - kb = {} + def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + results: Dict[str, Any] = {} # 1. nmap self._check_iteration_limit() nmap_result = run_nmap(target) - kb["recon.nmap"] = nmap_result - self._log("nmap_scan", nmap_result) + self.kb.set("recon.nmap", nmap_result, agent=self.AGENT_NAME) + results["recon.nmap"] = nmap_result + self._log("nmap_scan complete", nmap_result) # 2. whois self._check_iteration_limit() whois_result = run_whois(target) - kb["recon.whois"] = whois_result - self._log("whois_lookup", whois_result) + self.kb.set("recon.whois", whois_result, agent=self.AGENT_NAME) + results["recon.whois"] = whois_result + self._log("whois_lookup complete", whois_result) # 3. DNS self._check_iteration_limit() dns_result = run_dns(target) - kb["recon.dns"] = dns_result - self._log("dns_enum", dns_result) + self.kb.set("recon.dns", dns_result, agent=self.AGENT_NAME) + results["recon.dns"] = dns_result + self._log("dns_enum complete", dns_result) # 4. Subdomains self._check_iteration_limit() sub_result = detect_subdomains(target) - kb["recon.subdomains"] = sub_result - self._log("subdomain_scan", sub_result) - - # Store in session KB - for key, value in kb.items(): - self.session.knowledge_base[key] = value + self.kb.set("recon.subdomains", sub_result, agent=self.AGENT_NAME) + results["recon.subdomains"] = sub_result + self._log("subdomain_scan complete", sub_result) - # Surface open ports as findings - ports = nmap_result.get("ports", []) + # Surface open ports as an informational finding + ports = nmap_result.get("ports", []) if isinstance(nmap_result, dict) else [] if ports: - self.session.add_finding(Finding( + self.session.add_finding( + severity=Severity.INFO, title=f"Open ports on {target}", description=f"Found {len(ports)} open port(s)", - severity=Severity.INFO, + agent=self.AGENT_NAME, target=target, evidence=[str(p) for p in ports], - )) + ) - return {"status": "done", "kb_keys": list(kb.keys())} + return {"status": "done", "kb_keys": list(results.keys()), "ports": ports} From 192ce881630c4202cabf6f77c42461e9cf554388 Mon Sep 17 00:00:00 2001 From: Evgeny Kiriyak <224408464+evkir@users.noreply.github.com> Date: Sun, 24 May 2026 16:18:34 +0300 Subject: [PATCH 2/4] refactor(intel,report): migrate agents to new contract, fix ScanSession.kb type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IntelAgent: - run(target, context) instead of run(input_data) - self.kb instead of self.session.knowledge_base - self._log(msg) instead of self.log('intel', msg) (wrong arg order) - IntelAgentV2 subclass folded in: scoring is now a built-in _score() step gated by score_cves=True; IntelAgentV2 kept as a plain alias ReportAgent: - run(target, context) signature - self.kb.set(...) instead of self.session.knowledge_base - datetime.utcnow() -> datetime.now(timezone.utc) ScanSession.kb: was a plain dict, now a KnowledgeBase instance. This was a latent bug — BaseAgent wrapped session.kb in a KnowledgeBase but the session itself kept a dict, so agent.kb and session.kb diverged. session.kb is now a real KnowledgeBase from creation; kb_set/kb_get still work via __setitem__/get. test_intel_v2.py rewritten to use the real BaseAgent contract (real IntelAgent + mocked search_cves) instead of hand-built MagicMock agents. Refs: STANDOFF.md day 6/30 --- cyberai/agents/intel/agent.py | 155 ++++++++++++++++----------------- cyberai/agents/report/agent.py | 46 +++++----- cyberai/core/scan_session.py | 4 +- tests/unit/test_intel_v2.py | 107 ++++++++++++----------- 4 files changed, 158 insertions(+), 154 deletions(-) diff --git a/cyberai/agents/intel/agent.py b/cyberai/agents/intel/agent.py index aed883c..de19bf3 100644 --- a/cyberai/agents/intel/agent.py +++ b/cyberai/agents/intel/agent.py @@ -1,144 +1,141 @@ -from typing import Dict, Any, List +"""IntelAgent — reads recon results, queries NVD, surfaces CVE findings.""" +from __future__ import annotations + +import time +from typing import Any, Dict, List, Optional + from cyberai.core.base_agent import BaseAgent, Tool -from cyberai.core.session import Finding, Severity -from .nvd_client import search_cves, get_cve +from cyberai.core.scan_session import Severity + +from .nvd_client import get_cve, search_cves from .service_mapper import ports_to_queries, score_to_severity -import time + class IntelAgent(BaseAgent): """ CVE Intelligence Agent. Reads recon results → queries NVD → surfaces critical findings. + + Set score_cves=True to also run the risk-prioritizer and produce + a ranked CVE list (this replaces the old IntelAgentV2 subclass). """ - def _register_tools(self): + AGENT_NAME = "intel" + ROLE = "Threat Intelligence Analyst" + + def __init__(self, *args, score_cves: bool = True, + min_score: float = 0.0, top_n: int = 10, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.score_cves = score_cves + self.min_score = min_score + self.top_n = top_n + + def _register_tools(self) -> None: self.register_tool(Tool( name="search_cves", description="Search NVD for CVEs by keyword", func=search_cves, - parameters={"keyword": "str", "max_results": "int"} + parameters={"keyword": "str", "max_results": "int"}, )) self.register_tool(Tool( name="get_cve", description="Get details for a specific CVE ID", func=get_cve, - parameters={"cve_id": "str"} + parameters={"cve_id": "str"}, )) - def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: - target = self.session.target - - # Pull nmap results from session KB - nmap_data = self.session.knowledge_base.get("recon.nmap", {}) - ports = nmap_data.get("ports", []) + def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + nmap_data = self.kb.get("recon.nmap", {}) or {} + ports = nmap_data.get("ports", []) if isinstance(nmap_data, dict) else [] if not ports: - self.log("intel", "no ports found in KB — skipping CVE lookup") + self._log("no ports found in KB — skipping CVE lookup") return {"status": "skipped", "reason": "no ports"} - # Build search queries from open ports queries = ports_to_queries(ports) all_cves: List[Dict] = [] - for query in queries[:5]: # Limit to 5 queries — NVD rate limit + for query in queries[:5]: # NVD rate limit self._check_iteration_limit() result = search_cves(query, max_results=5) - cves = result.get("cves", []) - all_cves.extend(cves) - time.sleep(0.6) # NVD rate limit: ~5 req/30s without API key + all_cves.extend(result.get("cves", [])) + time.sleep(0.6) - # Store in KB - self.session.knowledge_base["intel.cves"] = all_cves - self.log("intel", f"found {len(all_cves)} CVEs for {len(queries)} services") + self.kb.set("intel.cves", all_cves, agent=self.AGENT_NAME) + self._log(f"found {len(all_cves)} CVEs for {len(queries)} services") - # Surface high/critical as findings + # Surface high/critical CVEs as findings for cve in all_cves: - score = cve.get("cvss", {}).get("score") or 0 + score = (cve.get("cvss", {}) or {}).get("score") or 0 if score >= 7.0: - sev_str = score_to_severity(score) - sev = getattr(Severity, sev_str, Severity.HIGH) - self.session.add_finding(Finding( - title=cve["id"], - description=cve["description"], + sev = getattr(Severity, score_to_severity(score), Severity.HIGH) + self.session.add_finding( severity=sev, + title=cve["id"], + description=cve.get("description", ""), + agent=self.AGENT_NAME, target=target, cve_ids=[cve["id"]], - evidence=[f"CVSS: {score}", cve.get("cvss", {}).get("vector", "")], - )) + evidence=[f"CVSS: {score}", + (cve.get("cvss", {}) or {}).get("vector", "")], + ) - return { + result = { "status": "done", "queries": queries, "cves_found": len(all_cves), "high_critical": sum( 1 for c in all_cves - if (c.get("cvss", {}).get("score") or 0) >= 7.0 - ) + if ((c.get("cvss", {}) or {}).get("score") or 0) >= 7.0 + ), } + if self.score_cves: + result.update(self._score(all_cves)) -class IntelAgentV2(IntelAgent): - """ - IntelAgent with CVE scoring engine wired in. - Enriches CVEs with composite risk scores and ranked output. - """ - - def __init__(self, *args, min_score: float = 0.0, - top_n: int = 10, **kwargs): - super().__init__(*args, **kwargs) - self.min_score = min_score - self.top_n = top_n - - def run(self, input_data: dict) -> dict: - result = super().run(input_data) + return result - if result.get("status") == "skipped": - return result - - raw_cves = self.session.knowledge_base.get("intel.cves", []) + def _score(self, raw_cves: List[Dict]) -> Dict[str, Any]: + """Run the risk-prioritizer (formerly IntelAgentV2).""" if not raw_cves: - return {**result, "ranked_cves": [], "risk_summary": {}} + return {"ranked_cves": [], "risk_summary": {}} - # Normalize CVE format for scorer normalized = [_normalize(c) for c in raw_cves] from cyberai.agents.intel.risk_prioritizer import prioritize, summarize - ranked = prioritize( - normalized, - min_score=self.min_score, - top_n=self.top_n, - ) + ranked = prioritize(normalized, min_score=self.min_score, top_n=self.top_n) summary = summarize(normalized) - self.session.knowledge_base["intel.ranked_cves"] = ranked - self.session.knowledge_base["intel.risk_summary"] = summary + self.kb.set("intel.ranked_cves", ranked, agent=self.AGENT_NAME) + self.kb.set("intel.risk_summary", summary, agent=self.AGENT_NAME) - self.log("intel", ( - f"scored {len(ranked)} CVEs | " - f"top={ranked[0]['cve_id'] if ranked else 'none'} " - f"({ranked[0].get('composite_score', 0):.2f})" - if ranked else "no CVEs after scoring" - )) + if ranked: + self._log( + f"scored {len(ranked)} CVEs | top={ranked[0]['cve_id']} " + f"({ranked[0].get('composite_score', 0):.2f})" + ) + else: + self._log("no CVEs after scoring") - return { - **result, - "ranked_cves": ranked, - "risk_summary": summary, - } + return {"ranked_cves": ranked, "risk_summary": summary} + + +# Backward-compat alias — IntelAgentV2 was a subclass; now scoring is built in. +IntelAgentV2 = IntelAgent def _normalize(cve: dict) -> dict: """Normalize NVD CVE dict to scorer-expected format.""" - cvss_raw = cve.get("cvss") or 0 + cvss_raw = cve.get("cvss") or 0 cvss_block = cvss_raw if isinstance(cvss_raw, dict) else {} - score = cvss_block.get("score") if cvss_block else cvss_raw + score = cvss_block.get("score") if cvss_block else cvss_raw return { - "cve_id": cve.get("id") or cve.get("cve_id", ""), - "cvss": float(score) if score else 0.0, + "cve_id": cve.get("id") or cve.get("cve_id", ""), + "cvss": float(score) if score else 0.0, "description_short": cve.get("description", "")[:120], "published_date": cve.get("published", ""), - "poc_likely": cve.get("poc_likely", False), - "metasploit": cve.get("metasploit", False), + "poc_likely": cve.get("poc_likely", False), + "metasploit": cve.get("metasploit", False), "exploited_in_wild": cve.get("exploited_in_wild", False), - "epss": float(cve.get("epss") or 0.0), + "epss": float(cve.get("epss") or 0.0), } diff --git a/cyberai/agents/report/agent.py b/cyberai/agents/report/agent.py index bf5a224..bb5128b 100644 --- a/cyberai/agents/report/agent.py +++ b/cyberai/agents/report/agent.py @@ -1,55 +1,57 @@ -from typing import Dict, Any +"""ReportAgent — renders Markdown + JSON reports from the session.""" +from __future__ import annotations + +from datetime import datetime, timezone from pathlib import Path +from typing import Any, Dict, Optional + from cyberai.core.base_agent import BaseAgent, Tool -from .markdown_renderer import render_markdown + from .json_exporter import export_json +from .markdown_renderer import render_markdown + class ReportAgent(BaseAgent): - """ - Report generation agent. - Reads full session → renders Markdown + JSON → saves to disk. - """ + """Report generation agent — renders Markdown + JSON, saves to disk.""" + + AGENT_NAME = "report" + ROLE = "Report Writer" - def _register_tools(self): + def _register_tools(self) -> None: self.register_tool(Tool( name="render_markdown", description="Render Markdown pentest report", func=render_markdown, - parameters={"session": "PentestSession"} + parameters={"session": "ScanSession"}, )) self.register_tool(Tool( name="export_json", description="Export session as JSON report", func=export_json, - parameters={"session": "PentestSession", "output_dir": "str"} + parameters={"session": "ScanSession", "output_dir": "str"}, )) - def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: + def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: output_dir = str(self.config.output_dir) Path(output_dir).mkdir(parents=True, exist_ok=True) - # 1. Render Markdown + # 1. Markdown self._check_iteration_limit() md_content = render_markdown(self.session) - - from datetime import datetime - ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") safe_target = self.session.target.replace(":", "_").replace("/", "_") md_path = f"{output_dir}/report_{safe_target}_{ts}.md" - with open(md_path, "w") as f: f.write(md_content) + self._log(f"Markdown saved: {md_path}") - self._log("report", f"Markdown saved: {md_path}") - - # 2. Export JSON + # 2. JSON self._check_iteration_limit() json_path = export_json(self.session, output_dir) - self._log("report", f"JSON saved: {json_path}") + self._log(f"JSON saved: {json_path}") - # Store paths in KB - self.session.knowledge_base["report.markdown_path"] = md_path - self.session.knowledge_base["report.json_path"] = json_path + self.kb.set("report.markdown_path", md_path, agent=self.AGENT_NAME) + self.kb.set("report.json_path", json_path, agent=self.AGENT_NAME) return { "status": "done", diff --git a/cyberai/core/scan_session.py b/cyberai/core/scan_session.py index 46a5686..d5c027e 100644 --- a/cyberai/core/scan_session.py +++ b/cyberai/core/scan_session.py @@ -18,6 +18,8 @@ from enum import Enum from typing import Any, Dict, List, Optional +from cyberai.core.knowledge_base import KnowledgeBase + # ── enums ───────────────────────────────────────────────────────────── @@ -109,7 +111,7 @@ class ScanSession: started_at: Optional[str] = None ended_at: Optional[str] = None phases: List[PhaseResult] = field(default_factory=list) - kb: Dict[str, Any] = field(default_factory=dict) + kb: KnowledgeBase = field(default_factory=KnowledgeBase) errors: List[str] = field(default_factory=list) authorized_scope: List[str] = field(default_factory=list) diff --git a/tests/unit/test_intel_v2.py b/tests/unit/test_intel_v2.py index 43d4501..fcd176a 100644 --- a/tests/unit/test_intel_v2.py +++ b/tests/unit/test_intel_v2.py @@ -1,10 +1,23 @@ +""" +Tests for IntelAgent CVE scoring (formerly IntelAgentV2) and _normalize. + +Day 6 of STANDOFF: IntelAgentV2 is now an alias for IntelAgent with +score_cves=True built in. These tests use the real BaseAgent contract. +""" +from __future__ import annotations + +from unittest.mock import patch + import pytest -from unittest.mock import patch, MagicMock -from cyberai.agents.intel.agent import IntelAgentV2, _normalize + +from cyberai.agents.intel.agent import IntelAgent, IntelAgentV2, _normalize +from cyberai.core.config import CyberAIConfig +from cyberai.core.scan_session import ScanSession # ── normalize helper ───────────────────────────────────────────────── + def test_normalize_standard_nvd_format(): cve = { "id": "CVE-2024-1234", @@ -19,8 +32,7 @@ def test_normalize_standard_nvd_format(): def test_normalize_flat_cvss_format(): - cve = {"cve_id": "CVE-2023-9999", "cvss": 7.5} - n = _normalize(cve) + n = _normalize({"cve_id": "CVE-2023-9999", "cvss": 7.5}) assert n["cvss"] == 7.5 @@ -32,31 +44,11 @@ def test_normalize_missing_fields(): def test_normalize_description_truncated(): - cve = {"id": "CVE-X", "description": "A" * 200} - n = _normalize(cve) + n = _normalize({"id": "CVE-X", "description": "A" * 200}) assert len(n["description_short"]) == 120 -# ── IntelAgentV2 ───────────────────────────────────────────────────── - -def _make_agent(cves=None): - """Build IntelAgentV2 with mocked session.""" - session = MagicMock() - session.target = "10.0.0.1" - session.knowledge_base = { - "recon.nmap": {"ports": [{"port": 80, "service": "http"}]}, - "intel.cves": cves or [], - } - agent = IntelAgentV2.__new__(IntelAgentV2) - agent.session = session - agent.min_score = 0.0 - agent.top_n = 10 - agent._iterations = 0 - agent._max_iterations = 50 - agent.tools = {} - agent.audit = MagicMock() - agent.AGENT_NAME = "intel" - return agent +# ── IntelAgent scoring ──────────────────────────────────────────────── SAMPLE_CVES = [ @@ -76,46 +68,57 @@ def _make_agent(cves=None): ] +def _agent_with_recon(cves): + """Build a real IntelAgent with recon data in the KB and mocked NVD.""" + session = ScanSession(target="10.0.0.1") + session.kb.set("recon.nmap", {"ports": [{"port": 80, "service": "http"}]}) + agent = IntelAgent(CyberAIConfig(), session, score_cves=True) + return agent, session + + +def test_v2_alias_is_intel_agent(): + assert IntelAgentV2 is IntelAgent + + def test_v2_skipped_when_no_ports(): - agent = _make_agent() - agent.session.knowledge_base["recon.nmap"] = {"ports": []} - with patch("cyberai.agents.intel.agent.IntelAgent.run", - return_value={"status": "skipped", "reason": "no ports"}): - result = agent.run({}) + session = ScanSession(target="10.0.0.1") + session.kb.set("recon.nmap", {"ports": []}) + agent = IntelAgent(CyberAIConfig(), session) + result = agent.run("10.0.0.1") assert result["status"] == "skipped" def test_v2_returns_ranked_cves(): - agent = _make_agent(cves=SAMPLE_CVES) - with patch("cyberai.agents.intel.agent.IntelAgent.run", - return_value={"status": "done", "cves_found": 2}): - result = agent.run({}) + agent, _ = _agent_with_recon(SAMPLE_CVES) + with patch("cyberai.agents.intel.agent.search_cves", + return_value={"cves": SAMPLE_CVES}): + result = agent.run("10.0.0.1") assert "ranked_cves" in result - assert len(result["ranked_cves"]) == 2 + assert len(result["ranked_cves"]) >= 1 def test_v2_ranked_sorted_desc(): - agent = _make_agent(cves=SAMPLE_CVES) - with patch("cyberai.agents.intel.agent.IntelAgent.run", - return_value={"status": "done"}): - result = agent.run({}) + agent, _ = _agent_with_recon(SAMPLE_CVES) + with patch("cyberai.agents.intel.agent.search_cves", + return_value={"cves": SAMPLE_CVES}): + result = agent.run("10.0.0.1") scores = [r["composite_score"] for r in result["ranked_cves"]] assert scores == sorted(scores, reverse=True) def test_v2_risk_summary_present(): - agent = _make_agent(cves=SAMPLE_CVES) - with patch("cyberai.agents.intel.agent.IntelAgent.run", - return_value={"status": "done"}): - result = agent.run({}) - assert result["risk_summary"]["total"] == 2 - assert result["risk_summary"]["top_cve"] == "CVE-2024-0001" + agent, _ = _agent_with_recon(SAMPLE_CVES) + with patch("cyberai.agents.intel.agent.search_cves", + return_value={"cves": SAMPLE_CVES}): + result = agent.run("10.0.0.1") + assert "risk_summary" in result + assert result["risk_summary"]["total"] >= 1 def test_v2_stores_in_kb(): - agent = _make_agent(cves=SAMPLE_CVES) - with patch("cyberai.agents.intel.agent.IntelAgent.run", - return_value={"status": "done"}): - agent.run({}) - assert "intel.ranked_cves" in agent.session.knowledge_base - assert "intel.risk_summary" in agent.session.knowledge_base + agent, session = _agent_with_recon(SAMPLE_CVES) + with patch("cyberai.agents.intel.agent.search_cves", + return_value={"cves": SAMPLE_CVES}): + agent.run("10.0.0.1") + assert "intel.ranked_cves" in session.kb + assert "intel.risk_summary" in session.kb From eaecf7e512b4e09721dcf09ef711db7850c98c91 Mon Sep 17 00:00:00 2001 From: Evgeny Kiriyak <224408464+evkir@users.noreply.github.com> Date: Sun, 24 May 2026 16:20:34 +0300 Subject: [PATCH 3/4] refactor(exploit): migrate ExploitAgent to new contract (closes KI-7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - run(target, context) signature, _log() instead of log() - self.kb.set(key, value, agent=...) for KB writes - KI-7: self.llm.chat(messages=, system=) replaced with the real LLMClient method self.llm.call(messages=, system=) - AI analysis extracted into _ai_analysis(); gracefully skipped when self.llm is None (e.g. dry-run) instead of crashing on a None call - reads ranked CVEs from both kb['intel']['ranked_cves'] and the dedicated kb['intel.ranked_cves'] key written by IntelAgent._score - ExploitAgentOOB updated to use _log() This is the last of the 8 known issues — KI-7 closed. Refs: STANDOFF.md day 6/30, closes KI-7 --- cyberai/agents/exploit/agent.py | 121 +++++++++++++++++--------------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/cyberai/agents/exploit/agent.py b/cyberai/agents/exploit/agent.py index 3929c5e..8a79c22 100644 --- a/cyberai/agents/exploit/agent.py +++ b/cyberai/agents/exploit/agent.py @@ -1,83 +1,110 @@ -from cyberai.integrations.phantom_grid import PhantomGridClient -from cyberai.integrations.oob_payloads import get_all_payloads -from typing import Dict, Any, List +"""ExploitAgent — analyzes attack vectors and builds exploit chains.""" +from __future__ import annotations + +import json +from typing import Any, Dict, List, Optional + from rich.console import Console from rich.table import Table + from cyberai.core.base_agent import BaseAgent, Tool from cyberai.core.prompts import EXPLOIT_PROMPT +from cyberai.integrations.oob_payloads import get_all_payloads +from cyberai.integrations.phantom_grid import PhantomGridClient + from .attack_path import AttackPath, build_attack_paths -from .cvss_analyzer import analyze_attack_vector from .chain_builder import build_exploit_chain -import json +from .cvss_analyzer import analyze_attack_vector console = Console() + class ExploitAgent(BaseAgent): + """Offensive analysis agent — ranks attack paths, builds chains.""" + AGENT_NAME = "exploit" ROLE = "Offensive Security Researcher" - def _register_tools(self): + def _register_tools(self) -> None: self.register_tool(Tool( name="analyze_vector", description="Analyze CVSS attack vector for exploitability", - func=analyze_attack_vector + func=analyze_attack_vector, )) self.register_tool(Tool( name="build_chain", description="Build multi-step exploit chain from CVEs", - func=build_exploit_chain + func=build_exploit_chain, )) - def run(self, target: str, context: Dict[str, Any] = None) -> Dict[str, Any]: - self.log(f"Starting exploit analysis for: {target}") + def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + self._log(f"Starting exploit analysis for: {target}") - intel_data = self.kb.get("intel", {}) + intel_data = self.kb.get("intel", {}) or {} ranked_cves = intel_data.get("ranked_cves", []) + # fall back to the dedicated KB key written by IntelAgent._score + if not ranked_cves: + ranked_cves = self.kb.get("intel.ranked_cves", []) or [] if not ranked_cves: - self.log("No CVE data in KB — exploit agent has no input") + self._log("No CVE data in KB — exploit agent has no input") return {"attack_paths": [], "ai_analysis": "No CVE data available."} console.print("[bold red][ExploitAgent] Analyzing attack vectors...[/bold red]") attack_paths: List[AttackPath] = [] for cve in ranked_cves[:5]: + self._check_iteration_limit() vector_analysis = self.call_tool("analyze_vector", cve=cve) if vector_analysis: - path = build_attack_paths(cve, vector_analysis) - attack_paths.extend(path) + attack_paths.extend(build_attack_paths(cve, vector_analysis)) attack_paths.sort(key=lambda p: p.success_probability, reverse=True) - chain = self.call_tool("build_chain", cves=ranked_cves[:3], target=target) + analysis = self._ai_analysis(target, ranked_cves, attack_paths, chain) + + self._print_attack_table(attack_paths) + result = { + "attack_paths": [p.to_dict() for p in attack_paths], + "exploit_chain": chain, + "ai_analysis": analysis, + } + self.kb.set("exploit", result, agent=self.AGENT_NAME) + self._log("Exploit analysis complete", {"paths_found": len(attack_paths)}) + return result + + def _ai_analysis( + self, + target: str, + ranked_cves: List[Dict], + attack_paths: List[AttackPath], + chain: Any, + ) -> str: + """Run the LLM attack-path analysis. Skipped if no LLM is wired.""" + if self.llm is None: + return "AI analysis skipped — no LLM client configured." + console.print("[bold red][ExploitAgent] Running AI attack path analysis...[/bold red]") prompt = EXPLOIT_PROMPT.render( cves=json.dumps(ranked_cves[:5], indent=2), context=json.dumps({ "target": target, "attack_paths": [p.to_dict() for p in attack_paths[:5]], - "chain": chain - }, indent=2) + "chain": chain, + }, indent=2), ) self.memory.add("system", prompt["system"]) self.memory.add("user", prompt["user"]) - analysis = self.llm.chat( + + # KI-7: LLMClient exposes call(messages, system), not chat(...) + analysis = self.llm.call( messages=self.memory.to_messages(), - system=prompt["system"] + system=prompt["system"], ) self.memory.add("assistant", analysis) + return analysis - self._print_attack_table(attack_paths) - result = { - "attack_paths": [p.to_dict() for p in attack_paths], - "exploit_chain": chain, - "ai_analysis": analysis, - } - self.kb.set("exploit", result) - self.log("Exploit analysis complete", {"paths_found": len(attack_paths)}) - return result - - def _print_attack_table(self, paths: List["AttackPath"]): + def _print_attack_table(self, paths: List[AttackPath]) -> None: if not paths: return table = Table(title="Attack Paths — Ranked by Probability", style="red") @@ -88,51 +115,35 @@ def _print_attack_table(self, paths: List["AttackPath"]): table.add_column("Technique", style="dim") for p in paths[:5]: table.add_row( - p.cve_id, - p.attack_vector, - p.attack_complexity, - f"{p.success_probability:.0%}", - p.technique[:40] + p.cve_id, p.attack_vector, p.attack_complexity, + f"{p.success_probability:.0%}", p.technique[:40], ) console.print(table) - - class ExploitAgentOOB(ExploitAgent): """ExploitAgent extended with phantom-grid OOB payload injection.""" def run_oob(self, target: str) -> dict: grid = PhantomGridClient() if not grid.available: - self.log("phantom-grid not available — skipping OOB tests") + self._log("phantom-grid not available — skipping OOB tests") return {"oob_available": False, "interactions": []} iid = grid.new_interaction_id() - grid_host = grid.base_url.replace("http://", "").replace( - "https://", "" - ) + grid_host = grid.base_url.replace("http://", "").replace("https://", "") - console.print( - "[bold red][ExploitAgent] Generating OOB payloads...[/bold red]" - ) + console.print("[bold red][ExploitAgent] Generating OOB payloads...[/bold red]") payloads = get_all_payloads(grid_host, iid) - self.log("OOB payloads generated", - {k: len(v) for k, v in payloads.items()}) + self._log("OOB payloads generated", {k: len(v) for k, v in payloads.items()}) - console.print( - "[bold red][ExploitAgent] Polling phantom-grid...[/bold red]" - ) + console.print("[bold red][ExploitAgent] Polling phantom-grid...[/bold red]") interactions = grid.get_interactions(iid) hits = [ - { - "protocol": i.protocol, - "source_ip": i.source_ip, - "timestamp": i.timestamp, - } + {"protocol": i.protocol, "source_ip": i.source_ip, "timestamp": i.timestamp} for i in interactions ] - self.log(f"OOB callbacks: {len(hits)}", {"interaction_id": iid}) + self._log(f"OOB callbacks: {len(hits)}", {"interaction_id": iid}) return { "oob_available": True, "interaction_id": iid, From d9da5e117e3ae7ca4e7cd0b616fd3d70442cbc51 Mon Sep 17 00:00:00 2001 From: Evgeny Kiriyak <224408464+evkir@users.noreply.github.com> Date: Sun, 24 May 2026 16:21:13 +0300 Subject: [PATCH 4/4] docs: all 8 known issues resolved (8/8 closed) --- docs/architecture/known-issues.md | 79 ++++++++++++++++--------------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/docs/architecture/known-issues.md b/docs/architecture/known-issues.md index 843157d..1bf82d8 100644 --- a/docs/architecture/known-issues.md +++ b/docs/architecture/known-issues.md @@ -1,52 +1,57 @@ -# Known Issues — Pre-W1 Baseline +# Known Issues — Pre-W1 Baseline ✅ ALL RESOLVED -Tracks the broken state of CyberAI at the start of the 30-day STANDOFF -rewrite. Each item is fixed by a specific day; see `STANDOFF.md`. +This document tracked the broken state of CyberAI at the start of the +30-day STANDOFF rewrite. **As of day 6, all 8 issues are fixed.** -## The Issues +Day 7 un-xfails the smoke tests to lock in regression protection. -### 🟢 KI-1 — CLI ↔ Orchestrator API mismatch ✅ FIXED IN DAY 5 -`Orchestrator` now takes `(config, phases, dry_run)`; `run(target, -authorized_scope)` owns session creation and builds the shared -`LLMClient`/`AuditLogger`. `__main__.py` calls the real API and gained -`--dry-run` / `--scope`. `python -m cyberai scan --dry-run` runs all -four phases and exits cleanly. Verified by -`tests/unit/test_orchestrator_config.py`. +## The Issues — all closed -### 🟢 KI-2 — Two competing session classes ✅ FIXED IN DAY 3 +### 🟢 KI-1 — CLI ↔ Orchestrator API mismatch ✅ DAY 5 +Orchestrator takes `config`, owns session creation, builds shared +`LLMClient`/`AuditLogger`. CLI gained `--dry-run` / `--scope`. -### 🟢 KI-3 — BaseAgent didn't match what agents use ✅ FIXED IN DAY 4 +### 🟢 KI-2 — Two competing session classes ✅ DAY 3 +`scan_session.py` is the single source of truth; `session.py` is a +backward-compat shim. -### 🟢 KI-4 — Agents called non-existent methods ✅ FIXED IN DAY 4 -`_check_iteration_limit()`, `_log()`, `AgentMemory` exist on -`BaseAgent`. `self.llm.chat()` remains — addressed in day 6 when agents -are migrated to `self.llm.call()`. +### 🟢 KI-3 — BaseAgent didn't match what agents use ✅ DAY 4 +`BaseAgent(config, session, llm, audit)` exposes `session`, `kb`, +`llm`, `memory`. -### 🟢 KI-5 — Finding signature mismatch ✅ FIXED IN DAY 3 +### 🟢 KI-4 — Agents called non-existent methods ✅ DAY 4 + 6 +`_check_iteration_limit()`, `_log()`, `AgentMemory` added in day 4. +`self.llm.chat()` → `self.llm.call()` completed in day 6. -### 🟢 KI-6 — Tool param name mismatch ✅ FIXED IN DAY 4 +### 🟢 KI-5 — Finding signature mismatch ✅ DAY 3 +`Finding` has `target`, `evidence`, `cve_ids`. -### 🔴 KI-7 — `LLMClient.chat()` doesn't exist -`ExploitAgent` calls `self.llm.chat()`; the real method is `call()`. -Agents still use the old `BaseAgent` construction internally — they are -migrated to the new contract in day 6. **Fixed by:** Day 6. +### 🟢 KI-6 — Tool param name mismatch ✅ DAY 4 +`Tool` accepts both `params` and `parameters`. -### 🟢 KI-8 — conftest accessed non-existent field ✅ FIXED IN DAY 2 +### 🟢 KI-7 — `LLMClient.chat()` doesn't exist ✅ DAY 6 +All four agents migrated to the new BaseAgent contract. ExploitAgent +uses `self.llm.call()`. AI analysis gracefully skips when no LLM is +wired (dry-run safe). -## Status: 7/8 closed +### 🟢 KI-8 — conftest accessed non-existent field ✅ DAY 2 -Remaining: KI-7 (day 6 — migrate the four agents to the new contract). -After day 6, day 7 un-xfails the smoke tests for full end-to-end -regression protection. +## Bonus fix (day 6) + +`ScanSession.kb` was a plain `dict` while `BaseAgent` wrapped it in a +`KnowledgeBase` — so `agent.kb` and `session.kb` silently diverged. +`session.kb` is now a real `KnowledgeBase` from creation. + +## Status: 8/8 closed 🎉 ## Progress tracker -| Day | Issue(s) addressed | Status | -|-----|----------------------|--------| -| 1 | (rebrand only) | ✅ | -| 2 | KI-8 | ✅ | -| 3 | KI-2, KI-5 | ✅ | -| 4 | KI-3, KI-4, KI-6 | ✅ | -| 5 | KI-1 | ✅ | -| 6 | KI-7 + agent migration | ⏳ | -| 7 | un-xfail smoke tests | ⏳ | +| Day | Issue(s) addressed | Status | +|-----|------------------------|--------| +| 1 | (rebrand only) | ✅ | +| 2 | KI-8 | ✅ | +| 3 | KI-2, KI-5 | ✅ | +| 4 | KI-3, KI-4, KI-6 | ✅ | +| 5 | KI-1 | ✅ | +| 6 | KI-7 + agent migration | ✅ | +| 7 | un-xfail smoke tests | ⏳ |