Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 66 additions & 55 deletions cyberai/agents/exploit/agent.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,110 @@
from cyberai.integrations.phantom_grid import PhantomGridClient
from cyberai.integrations.oob_payloads import get_all_payloads
from typing import Dict, Any, List
"""ExploitAgent — analyzes attack vectors and builds exploit chains."""
from __future__ import annotations

import json
from typing import Any, Dict, List, Optional

from rich.console import Console
from rich.table import Table

from cyberai.core.base_agent import BaseAgent, Tool
from cyberai.core.prompts import EXPLOIT_PROMPT
from cyberai.integrations.oob_payloads import get_all_payloads
from cyberai.integrations.phantom_grid import PhantomGridClient

from .attack_path import AttackPath, build_attack_paths
from .cvss_analyzer import analyze_attack_vector
from .chain_builder import build_exploit_chain
import json
from .cvss_analyzer import analyze_attack_vector

console = Console()


class ExploitAgent(BaseAgent):
"""Offensive analysis agent — ranks attack paths, builds chains."""

AGENT_NAME = "exploit"
ROLE = "Offensive Security Researcher"

def _register_tools(self):
def _register_tools(self) -> None:
self.register_tool(Tool(
name="analyze_vector",
description="Analyze CVSS attack vector for exploitability",
func=analyze_attack_vector
func=analyze_attack_vector,
))
self.register_tool(Tool(
name="build_chain",
description="Build multi-step exploit chain from CVEs",
func=build_exploit_chain
func=build_exploit_chain,
))

def run(self, target: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
self.log(f"Starting exploit analysis for: {target}")
def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
self._log(f"Starting exploit analysis for: {target}")

intel_data = self.kb.get("intel", {})
intel_data = self.kb.get("intel", {}) or {}
ranked_cves = intel_data.get("ranked_cves", [])
# fall back to the dedicated KB key written by IntelAgent._score
if not ranked_cves:
ranked_cves = self.kb.get("intel.ranked_cves", []) or []

if not ranked_cves:
self.log("No CVE data in KB — exploit agent has no input")
self._log("No CVE data in KB — exploit agent has no input")
return {"attack_paths": [], "ai_analysis": "No CVE data available."}

console.print("[bold red][ExploitAgent] Analyzing attack vectors...[/bold red]")
attack_paths: List[AttackPath] = []
for cve in ranked_cves[:5]:
self._check_iteration_limit()
vector_analysis = self.call_tool("analyze_vector", cve=cve)
if vector_analysis:
path = build_attack_paths(cve, vector_analysis)
attack_paths.extend(path)
attack_paths.extend(build_attack_paths(cve, vector_analysis))

attack_paths.sort(key=lambda p: p.success_probability, reverse=True)

chain = self.call_tool("build_chain", cves=ranked_cves[:3], target=target)

analysis = self._ai_analysis(target, ranked_cves, attack_paths, chain)

self._print_attack_table(attack_paths)
result = {
"attack_paths": [p.to_dict() for p in attack_paths],
"exploit_chain": chain,
"ai_analysis": analysis,
}
self.kb.set("exploit", result, agent=self.AGENT_NAME)
self._log("Exploit analysis complete", {"paths_found": len(attack_paths)})
return result

def _ai_analysis(
self,
target: str,
ranked_cves: List[Dict],
attack_paths: List[AttackPath],
chain: Any,
) -> str:
"""Run the LLM attack-path analysis. Skipped if no LLM is wired."""
if self.llm is None:
return "AI analysis skipped — no LLM client configured."

console.print("[bold red][ExploitAgent] Running AI attack path analysis...[/bold red]")
prompt = EXPLOIT_PROMPT.render(
cves=json.dumps(ranked_cves[:5], indent=2),
context=json.dumps({
"target": target,
"attack_paths": [p.to_dict() for p in attack_paths[:5]],
"chain": chain
}, indent=2)
"chain": chain,
}, indent=2),
)
self.memory.add("system", prompt["system"])
self.memory.add("user", prompt["user"])
analysis = self.llm.chat(

# KI-7: LLMClient exposes call(messages, system), not chat(...)
analysis = self.llm.call(
messages=self.memory.to_messages(),
system=prompt["system"]
system=prompt["system"],
)
self.memory.add("assistant", analysis)
return analysis

self._print_attack_table(attack_paths)
result = {
"attack_paths": [p.to_dict() for p in attack_paths],
"exploit_chain": chain,
"ai_analysis": analysis,
}
self.kb.set("exploit", result)
self.log("Exploit analysis complete", {"paths_found": len(attack_paths)})
return result

def _print_attack_table(self, paths: List["AttackPath"]):
def _print_attack_table(self, paths: List[AttackPath]) -> None:
if not paths:
return
table = Table(title="Attack Paths — Ranked by Probability", style="red")
Expand All @@ -88,51 +115,35 @@ def _print_attack_table(self, paths: List["AttackPath"]):
table.add_column("Technique", style="dim")
for p in paths[:5]:
table.add_row(
p.cve_id,
p.attack_vector,
p.attack_complexity,
f"{p.success_probability:.0%}",
p.technique[:40]
p.cve_id, p.attack_vector, p.attack_complexity,
f"{p.success_probability:.0%}", p.technique[:40],
)
console.print(table)




class ExploitAgentOOB(ExploitAgent):
"""ExploitAgent extended with phantom-grid OOB payload injection."""

def run_oob(self, target: str) -> dict:
grid = PhantomGridClient()
if not grid.available:
self.log("phantom-grid not available — skipping OOB tests")
self._log("phantom-grid not available — skipping OOB tests")
return {"oob_available": False, "interactions": []}

iid = grid.new_interaction_id()
grid_host = grid.base_url.replace("http://", "").replace(
"https://", ""
)
grid_host = grid.base_url.replace("http://", "").replace("https://", "")

console.print(
"[bold red][ExploitAgent] Generating OOB payloads...[/bold red]"
)
console.print("[bold red][ExploitAgent] Generating OOB payloads...[/bold red]")
payloads = get_all_payloads(grid_host, iid)
self.log("OOB payloads generated",
{k: len(v) for k, v in payloads.items()})
self._log("OOB payloads generated", {k: len(v) for k, v in payloads.items()})

console.print(
"[bold red][ExploitAgent] Polling phantom-grid...[/bold red]"
)
console.print("[bold red][ExploitAgent] Polling phantom-grid...[/bold red]")
interactions = grid.get_interactions(iid)
hits = [
{
"protocol": i.protocol,
"source_ip": i.source_ip,
"timestamp": i.timestamp,
}
{"protocol": i.protocol, "source_ip": i.source_ip, "timestamp": i.timestamp}
for i in interactions
]
self.log(f"OOB callbacks: {len(hits)}", {"interaction_id": iid})
self._log(f"OOB callbacks: {len(hits)}", {"interaction_id": iid})
return {
"oob_available": True,
"interaction_id": iid,
Expand Down
Loading
Loading