In [1]:
!uv add requests --quiet mitreattack-python langchain langchain-anthropic dotenv

In [2]:
import requests
from pathlib import Path
from mitreattack.stix20 import MitreAttackData
import json
from IPython.display import JSON
import time
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage
from dotenv import load_dotenv
import os
from datetime import datetime

In [3]:
ATTACK_LATEST = "https://github.com/mitre/cti/releases/download/ATT%26CK-v18.1/enterprise-attack.json"
DATA_PATH = Path("data/enterprise-attack.json")
MODEL="claude-haiku-4-5-20251001"

In [4]:
load_dotenv(".env")

True

In [5]:
# print(os.getenv("ANTHROPIC_API_KEY"))

In [6]:
if not DATA_PATH.exists():
    DATA_PATH.write_bytes(requests.get(ATTACK_LATEST).content)

In [7]:
mitre_attack_data = MitreAttackData(str(DATA_PATH))

In [8]:
techniques = mitre_attack_data.get_techniques(include_subtechniques=True, remove_revoked_deprecated=True)

In [9]:
processed_techniques = sorted([{
    "id": next((r.external_id for r in t.external_references if r.source_name == "mitre-attack"), None),
    "name": t.name,
    "description": t.description or "",
    "tactics": [p.phase_name for p in getattr(t, "kill_chain_phases", []) if p.kill_chain_name == "mitre-attack"],
    "platforms": getattr(t, "x_mitre_platforms", []) or []
} for t in techniques], key=lambda x: (x["id"].split(".")[0], int(x["id"].split(".")[1]) if "." in x["id"] else 0))

In [10]:
JSON(processed_techniques[155:165])

<IPython.core.display.JSON object>

In [20]:
llm = ChatAnthropic(model=MODEL, temperature=0, max_tokens=4096)

SYSTEM_PROMPT = """You are a cybersecurity expert compressing MITRE ATT&CK techniques for efficient LLM consumption.

OUTPUT FORMAT (one line per technique):
ID|Tactics|Name|Keywords|Description|Platforms

RULES:
1. ID: Use exact ATT&CK ID (e.g., T1059.001) from input
2. Tactics: Use abbreviations: REC=Reconnaissance, RD=Resource Development, IA=Initial Access, EX=Execution, PE=Persistence, PRV=Privilege Escalation, DE=Defense Evasion, CA=Credential Access, DIS=Discovery, LM=Lateral Movement, COL=Collection, C2=Command and Control, EXF=Exfiltration, IMP=Impact
3. Name: Exact technique name from input
4. Keywords (3-10 terms): Prioritize most identifiable or unique keywords. Filenames, process names, tool names, command flags, protocols. NO generic terms like adversary/attack/malicious. Avoid common keywords (like powerhshell) unless the technique specifically depends on, abuses or targets that common keyword.
5. Description (20-40 words): Start with action verb. Focus on WHAT happens technically. NO fluff phrases. Focus on specific or unique attributes of the TTP. Format for maximum LLM effectiveness and token-efficiency
6. Platforms: Use platforms from input in comma seperated list

Output ONLY the compressed lines, no explanations, markdown or extra fomatting."""

In [21]:
def batch(items, size=10):
    for i in range(0, len(items), size):
        yield items[i:i + size]

batches = list(batch(processed_techniques, 10))
print(f"Created {len(batches)} batches")

Created 70 batches


In [26]:
compressed_lines = []
errors = []
for i, batch_data in enumerate(batches):
    print(f"Processing batch {i+1}/{len(batches)}...", end=" ")
    
    try:
        prompt = f"Compress these {len(batch_data)} ATT&CK techniques:\n\n{json.dumps(batch_data, indent=2)}\n\nOutput exactly {len(batch_data)} lines."
        
        response = llm.invoke([
            SystemMessage(content=SYSTEM_PROMPT),
            HumanMessage(content=prompt)
        ])
        
        lines = [f"|{l.strip()}|" for l in response.content.strip().split("\n") if l.strip().startswith("T")]
        compressed_lines.extend(lines)
        print(f"OK ({len(lines)} lines)")
        print(response.usage_metadata)
        
        time.sleep(0.5)  # Rate limiting
        
    except Exception as e:
        print(f"ERROR: {e}")
        errors.append((i, batch_data, str(e)))
print(f"\nComplete: {len(compressed_lines)} techniques compressed, {len(errors)} errors")

Processing batch 1/70... OK (10 lines)
{'input_tokens': 3671, 'output_tokens': 661, 'total_tokens': 4332, 'input_token_details': {'cache_read': 0, 'cache_creation': 0, 'ephemeral_5m_input_tokens': 0, 'ephemeral_1h_input_tokens': 0}}
Processing batch 2/70... OK (10 lines)
{'input_tokens': 3282, 'output_tokens': 730, 'total_tokens': 4012, 'input_token_details': {'cache_read': 0, 'cache_creation': 0, 'ephemeral_5m_input_tokens': 0, 'ephemeral_1h_input_tokens': 0}}
Processing batch 3/70... OK (10 lines)
{'input_tokens': 4089, 'output_tokens': 671, 'total_tokens': 4760, 'input_token_details': {'cache_read': 0, 'cache_creation': 0, 'ephemeral_5m_input_tokens': 0, 'ephemeral_1h_input_tokens': 0}}
Processing batch 4/70... OK (10 lines)
{'input_tokens': 4117, 'output_tokens': 744, 'total_tokens': 4861, 'input_token_details': {'cache_read': 0, 'cache_creation': 0, 'ephemeral_5m_input_tokens': 0, 'ephemeral_1h_input_tokens': 0}}
Processing batch 5/70... OK (10 lines)
{'input_tokens': 4554, 'outpu

In [27]:
keyword_index = {}

for line in compressed_lines:
    parts = line.strip("|").split("|")
    if len(parts) >= 4:
        technique_id = parts[0].strip()
        keywords = parts[3].strip()
        
        for kw in keywords.split(","):
            kw = kw.strip().lower()
            if kw:
                if kw not in keyword_index:
                    keyword_index[kw] = []
                if technique_id not in keyword_index[kw]:
                    keyword_index[kw].append(technique_id)

print(f"Extracted {len(keyword_index)} unique keywords")

Extracted 2755 unique keywords


In [28]:
idx_header = f"""# ATT&CK Keyword Index | Enterprise | v18.1 | Generated {datetime.now().strftime("%Y-%m-%d")}
# Format: keyword:technique_ids
# Usage: grep -wi "keyword" attack_keywords.idx
"""

idx_lines = [f"{kw}:{','.join(sorted(ids))}" for kw, ids in sorted(keyword_index.items())]
idx_output = idx_header + "\n".join(idx_lines)

# Preview
print(idx_output[:1500])
print(f"\n... ({len(idx_lines)} keywords), ~{len(output)//4} tokens)")

# ATT&CK Keyword Index | Enterprise | v18.1 | Generated 2025-12-17
# Format: keyword:technique_ids
# Usage: grep -wi "keyword" attack_keywords.idx
$file_name:T1070.006
$path modification:T1574.007
$standard_information:T1070.006
$user:T1033
%appdata%:T1217
%temp%:T1574.005
%username%:T1033
--disable-gpu-sandbox:T1218.015
--gpu-launcher:T1218.015
-erroraction silentlycontinue:T1564.011
.ahk:T1059.010
.application:T1127.002
.appref-ms:T1127.002
.au3:T1059.010
.bat:T1059.003
.cer:T1552.004
.cmd:T1059.003
.cpl:T1218.002,T1218.011
.desktop:T1547.013
.dll:T1679
.doc:T1204.002
.docx:T1221
.dylib:T1129
.evtx:T1070.001
.exe:T1059.010,T1204.002,T1679
.hta:T1218.005
.inf:T1218.003
.jam files:T1127.003
.key:T1552.004
.lnk:T1027.012,T1080,T1204.002,T1679
.lua scripts:T1059.011
.msc:T1218.014
.msi:T1218.007,T1546.016
.net:T1127.001,T1218.004,T1574.014
.net assembly:T1505.001,T1505.002
.net clr:T1574.012
.net com:T1218.009
.net framework:T1059.001,T1059.005
.onion:T1133
.ost:T1114.001
.pdf:T1204.002


In [29]:
header = f"""# ATT&CK-LLM | Enterprise | v18.1 | Generated {datetime.now().strftime("%Y-%m-%d")}
# Tactics: REC=Reconnaissance, RD=Resource Development, IA=Initial Access, EX=Execution, PE=Persistence, PRV=Privilege Escalation, DE=Defense Evasion, CA=Credential Access, DIS=Discovery, LM=Lateral Movement, COL=Collection, C2=Command and Control, EXF=Exfiltration, IMP=Impact
# Techniques (inc. sub-techniques): {len(compressed_lines)}

|ID|Tactics|Name|Keywords|Description|Platforms|
|---|---|---|---|---|---|
"""

output = header + "\n".join(compressed_lines)

# Preview
print(output[:1000])
print(f"\n... ({len(output)} chars, ~{len(output)//4} tokens)")

# ATT&CK-LLM | Enterprise | v18.1 | Generated 2025-12-17
# Tactics: REC=Reconnaissance, RD=Resource Development, IA=Initial Access, EX=Execution, PE=Persistence, PRV=Privilege Escalation, DE=Defense Evasion, CA=Credential Access, DIS=Discovery, LM=Lateral Movement, COL=Collection, C2=Command and Control, EXF=Exfiltration, IMP=Impact
# Techniques (inc. sub-techniques): 691

|ID|Tactics|Name|Keywords|Description|Platforms|
|---|---|---|---|---|---|
|T1001|C2|Data Obfuscation|junk data, steganography, protocol impersonation, C2 traffic, detection evasion|Obfuscates command and control traffic using junk data, steganography, or protocol impersonation to evade detection and analysis|ESXi, Linux, macOS, Windows|
|T1001.001|C2|Junk Data|random data, protocol padding, meaningless characters, C2 obfuscation|Adds random or meaningless data to C2 protocols to prevent trivial decoding and traffic analysis|ESXi, Linux, macOS, Windows|
|T1001.002|C2|Steganography|hidden data, image files, document f

In [30]:
output_dir = Path("output")
output_dir.mkdir(exist_ok=True)

# Save main techniques file
techniques_path = output_dir / f"attack_techniques_{MODEL}.md"
techniques_path.write_text(output)
print(f"Saved: {techniques_path} ({len(output):,} chars)")

# Save keyword index
idx_path = output_dir / f"attack_keywords_{MODEL}.idx"
idx_path.write_text(idx_output)
print(f"Saved: {idx_path} ({len(idx_lines)} keywords)")

Saved: output/attack_techniques_claude-haiku-4-5-20251001.md (180,438 chars)
Saved: output/attack_keywords_claude-haiku-4-5-20251001.idx (2755 keywords)
