# MCP + RAG for MITRE ATT&CK Threat Detection (Notebook Demo)
This notebook demonstrates mapping an alert or detection rule to MITRE ATT&CK techniques, detection context, and CISA advisories—using the MCP server and your CSVs as knowledge sources.


In [19]:
import os
import requests
import openai
import json
from dotenv import load_dotenv

load_dotenv()

# --- API keys and URLs ---
MCP_URL = "https://localhost:8003/mcp"
MEMORY_SERVER_URL = "https://localhost:8001/mcp"

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")  # Set in your .env
MCP_API_KEY = os.environ.get("MCP_API_KEY")   # Set in your .env or above

if not MCP_API_KEY:
    raise ValueError("MCP_API_KEY is not set in your environment!")
if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY is not set in your environment!")

HEADERS = {"x-api-key": MCP_API_KEY}

# --- Logging memory with API Key ---
def log_to_memory(session_id, text):
    payload = {
        "jsonrpc": "2.0",
        "method": "insert_memory",
        "params": {"session_id": session_id, "text": text},
        "id": 1
    }
    r = requests.post(MEMORY_SERVER_URL, json=payload, headers=HEADERS, verify=False)
    try:
        resp_json = r.json()
        if "result" in resp_json:
            return resp_json["result"]
        elif "error" in resp_json:
            print("Memory server error:", resp_json["error"])
            raise Exception(f"Memory server error: {resp_json['error']}")
        else:
            print("Unexpected memory server response:", resp_json)
            raise Exception("No result or error in memory server response.")
    except Exception as e:
        print("log_to_memory Exception:", e)
        raise

# --- MCP Query with logging and error handling ---
def mcp_query(method, query, k=2, session_id=None):
    payload = {
        "jsonrpc": "2.0",
        "method": method,
        "params": {"query": query, "k": k},
        "id": 1
    }
    r = requests.post(MCP_URL, json=payload, headers=HEADERS, verify=False)
    try:
        resp_json = r.json()
        if "result" in resp_json:
            result = resp_json["result"]
        elif "error" in resp_json:
            print(f"MCP Error for method '{method}':", resp_json["error"])
            raise Exception(f"MCP Error: {resp_json['error']}")
        else:
            print("Unexpected MCP response:", resp_json)
            raise Exception("No result or error in MCP response.")
    except Exception as e:
        print("mcp_query Exception:", e)
        raise

    if session_id is not None:
        try:
            log_to_memory(session_id, f"MCP Query: {method}\nInput: {query}\nResult: {json.dumps(result)[:500]}")
        except Exception as e:
            print("Failed to log to memory:", e)
    return result

# --- Pretty Print ---
def pretty_print(label, results):
    print(f"\n--- {label} ---")
    if not results:
        print("No results found.")
        return
    for idx, r in enumerate(results[:2], 1):
        meta = r.get("meta", {})
        title = meta.get("title") or meta.get("Subject") or r['text'][:80]
        level = meta.get("level", "")
        tags = meta.get("tags", "")
        author = meta.get("author", "")
        print(f"{idx}. {title}")
        if level:
            print(f"   Level: {level}")
        if tags:
            print(f"   Tags: {tags}")
        if author:
            print(f"   Author: {author}")
        desc = meta.get("description", "")
        if desc:
            print(f"   Description: {desc[:120]}...")
        print()

alert_text = "MoveIT vulnerabilty detection"
session_id = "analyst-session-1"

try:
    mitre_results = mcp_query("search_mitre", alert_text, session_id=session_id)
    detections_results = mcp_query("search_detections", alert_text, session_id=session_id)
    cisa_results = mcp_query("search_cisa", alert_text, session_id=session_id)
except Exception as e:
    print("Failed to fetch from MCP:", e)
    raise

pretty_print("MITRE Techniques", mitre_results)
pretty_print("Detection Rules", detections_results)
pretty_print("CISA Advisories", cisa_results)

# --- B
# --- Build OpenAI prompt ---
context_chunks = []
for label, res in [
    ("MITRE", mitre_results),
    ("Detection", detections_results),
    ("CISA", cisa_results)
]:
    for r in res[:2]:
        meta = r.get("meta", {})
        short_title = meta.get("title") or meta.get("Subject", "") or r['text'][:80]
        level = meta.get("level", "")
        tags = meta.get("tags", "")
        author = meta.get("author", "")
        desc = meta.get("description", "")
        context = f"[{label}] {short_title}\n"
        if level:
            context += f"Level: {level}\n"
        if tags:
            context += f"Tags: {tags}\n"
        if author:
            context += f"Author: {author}\n"
        if desc:
            context += f"Description: {desc[:200]}...\n"
        else:
            context += r["text"][:200] + "\n"
        context_chunks.append(context.strip())

prompt = f"""
You are a cybersecurity SOC assistant. Given the following alert: "{alert_text}"
and the following context from MITRE techniques, detections, and CISA advisories, explain:
- Which MITRE techniques are involved (cite IDs if available)
- Why this is critical
- Recommended SOC actions

Context:
{chr(10).join(context_chunks)}
"""

client = openai.OpenAI(api_key=OPENAI_API_KEY)
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a cybersecurity SOC assistant."},
        {"role": "user", "content": prompt}
    ],
    temperature=0
)
print("\nOpenAI Response:\n", response.choices[0].message.content)

log_to_memory(session_id, f"Q: {alert_text}\nA: {response.choices[0].message.content}")





--- MITRE Techniques ---
1. User activity-based checks will likely occur in the first steps of an operation but may also occur throughout as an adversary learns the environment. Data and events should not be viewed in isolation, but as part of a chain of behavior that could lead to other activities, such as lateral movement, based on the information obtained. Detecting actions related to virtualization and sandbox identification may be difficult depending on the adversary's implementation and monitoring required. Monitoring for suspicious processes being spawned that gather a variety of system information or perform other forms of Discovery, especially in a short period of time, may aid in detection. 

2. Virtualization, sandbox, user activity, and related discovery techniques will likely occur in the first steps of an operation but may also occur throughout as an adversary learns the environment. Data and events should not be viewed in isolation, but as part of a chain of behavior th



'Memory inserted'

## MCP Security

### Add API Key Authentication to MCP Server
1. Prevent unauthorized use and restrict sensitive endpoints (like /mcp or /debug/memory).

How: Use a header (X-API-Key) or bearer token for simple demos.

Why: You need a strong, random secret so that only authorized clients can access your MCP endpoints.

How: Generate it once using Python’s secrets library (run this outside your notebook/server code):



In [2]:
import secrets 
print(secrets.token_urlsafe(32))

SzXx5c2tMoI048f6om93iFJe40GZqs9IHaIdRzsMU_Y


 ### 2. Set the API Key as an Environment Variable on Your MCP Server
Never hardcode the API key in code or notebooks!

On your Linux/Mac/WSL server, add this to your shell before running the MCP server:

In [25]:
# import os
# print(os.environ)


In [20]:
import requests
import os
import json

API_KEY = os.environ["MCP_API_KEY"]  # Or paste your key directly
headers = {"x-api-key": API_KEY}

r = requests.get("https://localhost:8001/debug/memory", headers=headers, verify=False)
data = r.json()

print("\n===== Memory Store =====")
if not data["memory_store"]:
    print("(No session memory stored)")
else:
    for session_id, items in data["memory_store"].items():
        print(f"\nSession: {session_id}")
        for i, entry in enumerate(items, 1):
            print(f"  [{i}]")
            print(json.dumps(entry, indent=4))

print("\n===== In-Memory Activity Log =====")
if not data["activity_log"]:
    print("(No activity log entries)")
else:
    for i, entry in enumerate(data["activity_log"], 1):
        print(f"\nLog Entry {i}:")
        print(json.dumps(entry, indent=4))



===== Memory Store =====

Session: analyst-session-1
  [1]
{
    "id": "516498ea-5a36-460c-95f5-4cb6e424fa8f",
    "text": "MCP Query: search_mitre\nInput: MoveIT vulnerabilty detection\nResult: [{\"text\": \"Adversaries may employ various user activity checks to detect and avoid virtualization and analysis environments. This may include changing behaviors based on the results of checks for the presence of artifacts indicative of a virtual machine environment (VME) or sandbox. If the adversary detects a VME, they may alter their malware to disengage from the victim or conceal the core functions of the implant. They may also search for VME artifacts before dropping secondary or additional p",
    "timestamp": "2025-05-29T21:30:47.887741"
}
  [2]
{
    "id": "253b6a6c-6f79-4c04-8729-7e6b6e2125af",
    "text": "MCP Query: search_detections\nInput: MoveIT vulnerabilty detection\nResult: [{\"text\": \"title: MOVEit CVE-2023-34362 Exploitation Attempt - Potential Web Shell Request\\ndescrip



In [21]:
import requests
import os
import json

API_KEY = os.environ["MCP_API_KEY"]  # Or paste your key directly
headers = {"x-api-key": API_KEY}

r = requests.get("https://localhost:8001/debug/memory", headers=headers, verify=False)
data = r.json()

print("\n===== Memory Store =====")
if not data["memory_store"]:
    print("(No session memory stored)")
else:
    for session_id, items in data["memory_store"].items():
        print(f"\nSession: {session_id}")
        for i, entry in enumerate(items, 1):
            print(f"  [{i}]")
            print(json.dumps(entry, indent=4))

print("\n===== In-Memory Activity Log =====")
if not data["activity_log"]:
    print("(No activity log entries)")
else:
    for i, entry in enumerate(data["activity_log"], 1):
        print(f"\nLog Entry {i}:")
        print(json.dumps(entry, indent=4))



===== Memory Store =====

Session: analyst-session-1
  [1]
{
    "id": "516498ea-5a36-460c-95f5-4cb6e424fa8f",
    "text": "MCP Query: search_mitre\nInput: MoveIT vulnerabilty detection\nResult: [{\"text\": \"Adversaries may employ various user activity checks to detect and avoid virtualization and analysis environments. This may include changing behaviors based on the results of checks for the presence of artifacts indicative of a virtual machine environment (VME) or sandbox. If the adversary detects a VME, they may alter their malware to disengage from the victim or conceal the core functions of the implant. They may also search for VME artifacts before dropping secondary or additional p",
    "timestamp": "2025-05-29T21:30:47.887741"
}
  [2]
{
    "id": "253b6a6c-6f79-4c04-8729-7e6b6e2125af",
    "text": "MCP Query: search_detections\nInput: MoveIT vulnerabilty detection\nResult: [{\"text\": \"title: MOVEit CVE-2023-34362 Exploitation Attempt - Potential Web Shell Request\\ndescrip



### API Key Authentication in MCP

- We generate a secure API key using Python's secrets library.
- The key is set as an environment variable (`MCP_API_KEY`) on the server—never hardcoded in code.
- The MCP server checks every request for this key in the `x-api-key` HTTP header before processing.
- Our notebooks and clients also read the key from the environment, not from code or files, and send it in every request.
- This approach protects against unauthorized access and keeps secrets out of our codebase.
