In [2]:
# Cell 1: Setup
import sys
from pathlib import Path

# Add project root to path
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root / "src"))

print(f"Project root: {project_root}")
print(f"Python path: {sys.path[:3]}")

# Verify import works
try:
    from mitre_agentic.schemas import TriageInput
    print("✅ mitre_agentic imported successfully!")
except ImportError as e:
    print(f"❌ Import failed: {e}")
    print("\nTry running: uv pip install -e . (from project root)")

Project root: /Users/Inoussa/Documents/agenticAI/MCP/MITRE-AGENTIC-THREAT-INVESTIGATION/src
Python path: ['/Users/Inoussa/Documents/agenticAI/MCP/MITRE-AGENTIC-THREAT-INVESTIGATION/src/src', '/Users/Inoussa/Documents/agenticAI/MCP/MITRE-AGENTIC-THREAT-INVESTIGATION/src/mitre_agentic/src', '/opt/anaconda3/lib/python312.zip']
✅ mitre_agentic imported successfully!


In [7]:
# Cell 2: Load environment and imports
import asyncio
from dotenv import load_dotenv
import os

from mitre_agentic.schemas import TriageInput
from mitre_agentic.agents.triage_agent import triage_incident
from mitre_agentic.mcp_client import MitreMcpClient
from mitre_agentic.agents.mapping_agent import map_techniques
from mitre_agentic.agents.intel_agent import enrich_with_groups_and_software
from mitre_agentic.agents.detection_agent import recommend_detection_telemetry
from mitre_agentic.agents.detection_reasoning_agent import reason_detection_with_llm_fallback
from mitre_agentic.agents.visualization_agent import build_navigator_layer_from_techniques, save_layer_json
from mitre_agentic.agents.report_agent import write_executive_report_llm
from mitre_agentic.agents.mitigation_agent import enrich_with_mitigations

# Load environment
load_dotenv()

print("✅ All imports successful!")
print(f"✅ OPENAI_API_KEY loaded: {bool(os.getenv('OPENAI_API_KEY'))}")

✅ All imports successful!
✅ OPENAI_API_KEY loaded: True


In [8]:
# Cell 3: Define incident
DEFAULT_INCIDENT_TEXT = (
    "EDR alert: WINWORD.EXE spawned powershell.exe with an encoded command. "
    "Shortly after, rundll32.exe executed with a suspicious DLL entrypoint and "
    "a scheduled task was created for persistence. Network connections to an "
    "unfamiliar external IP followed."
)

print("Incident text loaded:")
print(DEFAULT_INCIDENT_TEXT)

Incident text loaded:
EDR alert: WINWORD.EXE spawned powershell.exe with an encoded command. Shortly after, rundll32.exe executed with a suspicious DLL entrypoint and a scheduled task was created for persistence. Network connections to an unfamiliar external IP followed.


In [None]:
# Cell 4: Helper functions
def _extract_technique_ids_from_triage(triage_out) -> list[str]:
    """Extract technique IDs from Agent 1 output."""
    if hasattr(triage_out, "technique_evidence"):
        te = getattr(triage_out, "technique_evidence")
        if isinstance(te, dict):
            return [str(k) for k in te.keys()]
    
    if isinstance(triage_out, dict):
        te = triage_out.get("technique_evidence")
        if isinstance(te, dict):
            return [str(k) for k in te.keys()]
    
    return []

def _safe_str(x, max_len: int) -> str:
    s = (str(x) if x is not None else "").strip()
    if len(s) > max_len:
        return s[: max_len - 1] + "…"
    return s

print("Helper functions defined")

✅ Helper functions defined


In [None]:
# Cell 5: Triage Agent (LLM)
print("="*80)
print("AGENT 1: TRIAGE (LLM)")
print("="*80)

triage_input = TriageInput(incident_text=DEFAULT_INCIDENT_TEXT)
triage_out = await triage_incident(triage_input)

# Extract results
summary = getattr(triage_out, "summary", None)
technique_ids = _extract_technique_ids_from_triage(triage_out)

print(f"\nTriage complete: {len(technique_ids)} candidates")
print(f"\nSummary:\n{summary}")
print(f"\nTechniques: {', '.join(technique_ids)}")


te = getattr(triage_out, "technique_evidence", None) if not isinstance(triage_out, dict) else triage_out.get("technique_evidence")
if isinstance(te, dict) and te:
    for tid, evidence in te.items():
            print(f"- {tid}: {evidence}")
else:
    for tid in technique_ids:
            print(f"- {tid}")

AGENT 1: TRIAGE (LLM)

✅ Triage complete: 4 candidates

Summary:
WINWORD.EXE spawned powershell.exe with an encoded command, indicating possible command and script execution. Rundll32.exe executed a suspicious DLL entrypoint, and a scheduled task was created for persistence. Network connections to an unfamiliar external IP were observed, suggesting potential command and control activity.

Techniques: T1059.001, T1218, T1053.005, T1071.001
- T1059.001: ['powershell.exe with an encoded command']
- T1218: ['rundll32.exe executed with suspicious DLL entrypoint']
- T1053.005: ['scheduled task was created for persistence']
- T1071.001: ['network connections to an unfamiliar external IP']


In [12]:
# Cell 6: Initialize MCP Client
print("="*80)
print("INITIALIZING MCP CLIENT")
print("="*80)

client = MitreMcpClient()
print("MCP Client created")

INITIALIZING MCP CLIENT
MCP Client created


In [14]:
# Mapping Agent (MCP)
print("="*80)
print("AGENT 2: MAPPING (MCP)")
print("="*80)

mapping = await map_techniques(
    client,
    technique_ids=technique_ids,
    domain="enterprise",
    include_description=True,
)

confirmed = mapping.get("confirmed_techniques", []) or []
not_found = mapping.get("not_found", []) or []

print(f"\n Mapping complete: {len(confirmed)} techniques confirmed")

if not_found:
    print(f" Not found: {', '.join(not_found)}")

for t in confirmed:
    print("\n---")
    print(f"{t.get('id')} - {t.get('name')}")
    print(f"Tactics: {t.get('tactics')}")
    desc = _safe_str(t.get("description"), 260).replace("\n", " ")
    if desc:
        print(f"Description: {desc}")

AGENT 2: MAPPING (MCP)

 Mapping complete: 4 techniques confirmed

---
T1059.001 - PowerShell
Tactics: [{'tactic': 'Execution', 'phase_name': 'execution', 'kill_chain_name': 'mitre-attack'}]
Description: Adversaries may abuse PowerShell commands and scripts for execution. PowerShell is a powerful interactive command-line interface and scripting environment included in the Windows operating system.(Citation: TechNet PowerShell) Adversaries can use PowerShell t…

---
T1218 - System Binary Proxy Execution
Tactics: [{'tactic': 'Defense Evasion', 'phase_name': 'defense-evasion', 'kill_chain_name': 'mitre-attack'}]
Description: Adversaries may bypass process and/or signature-based defenses by proxying execution of malicious content with signed, or otherwise trusted, binaries. Binaries used in this technique are often Microsoft-signed files, indicating that they have been either down…

---
T1053.005 - Scheduled Task
Tactics: [{'tactic': 'Execution', 'phase_name': 'execution', 'kill_chain_name

In [None]:
# Cell 8: Parallel Enrichment (Intel, Detection, Mitigation) all run in parallel
print("="*80)
print("AGENT 3, 4, 6: PARALLEL ENRICHMENT")
print("="*80)

AGENT 3, 4, 6: PARALLEL ENRICHMENT


In [None]:
# --- Agent 3: Intel (groups + software) ---
print("\n=== INTEL: groups/software associated with confirmed techniques ===")
intel = await enrich_with_groups_and_software(
            client,
            confirmed_techniques=confirmed,
            domain="enterprise",
            max_items=5,
        )

for item in intel.get("intel", []) or []:
    tech = item.get("technique", {}) or {}
    print("\n---")
    print(f"{tech.get('id')} - {tech.get('name')}")

    groups = item.get("groups_using_technique", []) or []
    software = item.get("software_using_technique", []) or []

    group_names = [g.get("name", "<?>") for g in groups if isinstance(g, dict)]
    software_names = [s.get("name", "<?>") for s in software if isinstance(s, dict)]

    print(f"Groups (top {len(group_names)}): {group_names}")
    print(f"Software (top {len(software_names)}): {software_names}")


=== INTEL: groups/software associated with confirmed techniques ===

---
T1059.001 - PowerShell
Groups (top 5): ['WIRTE', 'APT42', 'APT5', 'Blue Mockingbird', 'APT39']
Software (top 5): ['Empire', 'Covenant', 'Pupy', 'Koadic', 'Sliver']

---
T1218 - System Binary Proxy Execution
Groups (top 2): ['Lazarus Group', 'Volt Typhoon']
Software (top 0): []

---
T1053.005 - Scheduled Task
Groups (top 5): ['APT3', 'Cobalt Group', 'Silence', 'Chimera', 'Patchwork']
Software (top 5): ['CSPY Downloader', 'IronNetInjector', 'PowerSploit', 'MCMD', 'Empire']

---
T1071.001 - Web Protocols
Groups (top 5): ['Rancor', 'Metador', 'RedEcho', 'BITTER', 'Moonstone Sleet']
Software (top 5): ['PoshC2', 'PcShare', 'Out1', 'Mythic', 'Quick Assist']


In [17]:
# --- Agent 4: Detection (STIX data components or technique detection fallback) ---
print("\n=== DETECTION: telemetry/data components to detect confirmed techniques ===")
detections = await recommend_detection_telemetry(
            client,
            confirmed_techniques=confirmed,
            domain="enterprise",
            max_items=7,
        )

for item in detections.get("detections", []) or []:
            tech = item.get("technique", {}) or {}
            det = item.get("detection", {}) or {}

            print("\n---")
            print(f"{tech.get('id')} - {tech.get('name')}")

            mode = det.get("mode", "datacomponents")

            if mode == "datacomponents" and det.get("top_datacomponents"):
                total = det.get("total_datacomponents", 0)
                comps = det.get("top_datacomponents", []) or []
                print(f"Data components (top {len(comps)} of {total}):")
                for c in comps:
                    print(f"  - {c}")
            else:
                # Fallback: technique detection guidance (x_mitre_data_sources/x_mitre_detection)
                print("Detection guidance (fallback):")

                data_sources = det.get("data_sources", []) or []
                if data_sources:
                    print("  Data sources to collect:")
                    for ds in data_sources:
                        print(f"   - {ds}")

                detection_text = _safe_str(det.get("detection_text"), 700)
                if detection_text:
                    print("  What to look for:")
                    print(f"   {detection_text}")
                else:
                    print("  (No detection guidance text available in this ATT&CK release.)")

                note = _safe_str(det.get("note"), 220)
                if note:
                    print(f"  Note: {note}")


=== DETECTION: telemetry/data components to detect confirmed techniques ===

---
T1059.001 - PowerShell
Detection guidance (fallback):
  (No detection guidance text available in this ATT&CK release.)
  Note: Structured data components were empty; used ATT&CK technique detection guidance instead.

---
T1218 - System Binary Proxy Execution
Detection guidance (fallback):
  (No detection guidance text available in this ATT&CK release.)
  Note: Structured data components were empty; used ATT&CK technique detection guidance instead.

---
T1053.005 - Scheduled Task
Detection guidance (fallback):
  (No detection guidance text available in this ATT&CK release.)
  Note: Structured data components were empty; used ATT&CK technique detection guidance instead.

---
T1071.001 - Web Protocols
Detection guidance (fallback):
  (No detection guidance text available in this ATT&CK release.)
  Note: Structured data components were empty; used ATT&CK technique detection guidance instead.


In [19]:
 # --- Agent 5: Detection Reasoning (LLM fallback when STIX has 0) ---
print("\n=== DETECTION REASONING (LLM fallback when STIX has 0) ===")
try:
    reasoning = await reason_detection_with_llm_fallback(
            confirmed_techniques=confirmed,
            stix_detection_output=detections,   # <-- must pass Agent 4 output
            incident_text=DEFAULT_INCIDENT_TEXT,
            model="gpt-4.1-mini",
                max_hypotheses_per_technique=3,
            )
except Exception as e:
    print(f"[WARN] Detection reasoning agent failed: {e}")
    reasoning = {"detection_reasoning": []}

items = reasoning.get("detection_reasoning", []) if isinstance(reasoning, dict) else []
print(f"Detection reasoning items: {len(items)}")

for item in items:
    tech = item.get("technique", {}) or {}
    mode = item.get("mode", "<?>")

    print("\n---")
    print(f"{tech.get('id')} - {tech.get('name')}  (mode: {mode})")

    if mode == "stix_datacomponents":
        stix = item.get("stix", {}) or {}
        total = stix.get("total", 0)
        comps = stix.get("top_datacomponents", []) or []
        print(f"STIX data components (top {len(comps)} of {total}):")
        for c in comps:
                print(f"  - {c}")
    else:
        llm = item.get("llm", {}) or {}
        hyps = llm.get("hypotheses", []) or []
        print(f"LLM hypotheses (showing up to {min(5, len(hyps))}):")
        for h in hyps[:5]:
            title = _safe_str(h.get("title"), 140)
            rationale = _safe_str(h.get("rationale"), 400)  # extra safety
            print(f"  - {title}: {rationale}")


=== DETECTION REASONING (LLM fallback when STIX has 0) ===
Detection reasoning items: 4

---
T1059.001 - PowerShell  (mode: llm_fallback)
LLM hypotheses (showing up to 3):
  - Detection of suspicious encoded PowerShell commands spawned by non-standard parent processes: Encoded PowerShell commands are commonly used by adversaries to obfuscate malicious payloads. Observing powershell.exe launched by unusual parent processes like WINWORD.EXE with encoded commands indicates potential malicious script execution. Combining EDR process lineage with Sysmon command line telemetry improves detection fidelity.
  - Monitoring rundll32.exe execution with suspicious DLL entrypoints following PowerShell activity: Adversaries often use rundll32.exe to execute malicious DLLs after initial PowerShell execution. Detecting rundll32.exe running with suspicious DLLs shortly after encoded PowerShell commands suggests lateral movement or persistence attempts.
  - Detection of scheduled task creation for pers

In [21]:
# --- MITIGATIONS: defensive controls for confirmed techniques ---
print("\n=== MITIGATIONS: defensive controls for confirmed techniques ===")
mit = await enrich_with_mitigations(
            client,
            confirmed_techniques=confirmed,
            domain="enterprise",
            include_description=False,
        )

for item in mit.get("mitigations", []) or []:
    tech = item.get("technique", {}) or {}
    mitigations = item.get("mitigations", []) or []
    print("\n---")
    print(f"{tech.get('id')} - {tech.get('name')}")
    print(f"Mitigations (top {len(mitigations)} / {item.get('count', len(mitigations))}):")
    for m in mitigations:
        name = (m.get("name") or "").strip() if isinstance(m, dict) else ""
        attack_id = (m.get("attack_id") or "").strip() if isinstance(m, dict) else ""
        if not name and item.get("formatted"):
                # fallback: to show formatted text
            print("  - (see formatted output below)")
            break
        print(f"  - {attack_id} {name}".strip())
    if item.get("formatted"):
        print("Formatted:")
        print(item["formatted"])


=== MITIGATIONS: defensive controls for confirmed techniques ===

---
T1059.001 - PowerShell
Mitigations (top 5 / 5):
  - (see formatted output below)
Formatted:
Name: Disable or Remove Feature or Program
STIX ID: course-of-action--eb88d97c-32f1-40be-80f0-d61a4b0b4b31
---
Name: Antivirus/Antimalware
STIX ID: course-of-action--a6a47a06-08fc-4ec4-bdc3-20373375ebb9
---
Name: Code Signing
STIX ID: course-of-action--590777b3-b475-4c7c-aaf8-f4a73b140312
---
Name: Privileged Account Management
STIX ID: course-of-action--9bb9e696-bff8-4ae1-9454-961fc7d91d5f
---
Name: Execution Prevention
STIX ID: course-of-action--47e0e9fe-96ce-4f65-8bb1-8be1feacb5db

---
T1218 - System Binary Proxy Execution
Mitigations (top 6 / 6):
  - (see formatted output below)
Formatted:
Name: Exploit Protection
STIX ID: course-of-action--d2a24649-9694-4c97-9c62-ce7b270bf6a3
---
Name: Filter Network Traffic
STIX ID: course-of-action--20f6a9df-37c4-4e20-9e47-025983b1b39d
---
Name: Privileged Account Management
STIX ID: c

In [None]:
# Visualization (Navigator Layer)
print("="*80)
print("AGENT 7: VISUALIZATION")
print("="*80)

layer = await build_navigator_layer_from_techniques(
    client,
    confirmed_techniques=confirmed,
    domain="enterprise",
    layer_name="EDR Incident: Technique Coverage",
    description=DEFAULT_INCIDENT_TEXT[:200],
)

out_file = save_layer_json(layer, "../out/incident_layer.json")
print(f"Navigator layer saved to: {out_file}")

AGENT 7: VISUALIZATION
✅ Navigator layer saved to: ../out/incident_layer.json


In [None]:
# Cell 11: Report Generation (LLM)
from IPython.display import display, Markdown



print("="*80)
print("AGENT 8: REPORT GENERATION (LLM)")
print("="*80)

report_out = await write_executive_report_llm(
    incident_text=DEFAULT_INCIDENT_TEXT,
    triage_summary=triage_out.summary,
    confirmed_techniques=confirmed,
    intel=intel,
    detections=detections,
    detection_reasoning=reasoning,
    mitigations=mit,
    navigator_layer_path=str(out_file),
)

md = report_out["report"]["markdown"]
print(f"Report generated: {len(md)} characters")
print("\nFirst 500 characters:")
print(md[:500])

display(Markdown(md[:500]))
# Save report
with open("../out/incident_report.md", "w", encoding="utf-8") as f:
    f.write(md)
print("\nReport saved to: ../out/incident_report.md")

AGENT 8: REPORT GENERATION (LLM)
✅ Report generated: 4562 characters

First 500 characters:
# Executive Incident Report

## Title
Suspicious PowerShell and DLL Execution with Persistence and External Network Connections

## Executive Summary
An endpoint detection and response (EDR) alert identified suspicious activity involving WINWORD.EXE spawning powershell.exe with an encoded command, indicative of potential malicious script execution. Subsequently, rundll32.exe executed a suspicious DLL entrypoint, and a scheduled task was created, likely to maintain persistence. Network connection

Report saved to: ../out/incident_report.md
