In [32]:
# ! pip install -U langchain langchain-openai langgraph-supervisor langchain-ollama langgraph ollama mem0 pandas requests

In [33]:
import os
import json
from langchain_core.documents import Document
from mem0 import MemoryClient
from utils import *

In [34]:
# Get the API key from environment variables
os.environ["MEM0_API_KEY"] = os.environ.get("MEM0_API_KEY")
os.environ["OPENAI_API_KEY"] = os.environ.get("OPENAI_API_KEY")

mem0_client = MemoryClient()

In [35]:
mem0_collection: str = "mitre_attack"

In [36]:
def initialize_attack_db(attack_data_path):
    """Initialize the MITRE ATT&CK vector database"""

    print("Loading MITRE ATT&CK data...")

    with open(attack_data_path, "r") as f:

        attack_data = json.load(f)

    documents = []
    ids = []

    for i, technique in enumerate(attack_data.get("objects", [])):

        if technique.get("type") == "attack-pattern":

            technique_id = technique.get("external_references", [{}])[0].get(
                "external_id", ""
            )

            if not technique_id.startswith("T"):
                continue

            name = technique.get("name", "")
            description = technique.get("description", "")
            tactics = [
                phase["phase_name"] for phase in technique.get("kill_chain_phases", [])
            ]
            platforms = technique.get("x_mitre_platforms", [])
            detection = technique.get("x_mitre_detection", "")
            data_sources = technique.get("x_mitre_data_sources", [])

            name = clean_text(name)
            description = clean_text(description)
            detection = clean_text(detection)

            content = f"""
                # {technique_id}: {name}

                ## Description
                {description}

                ## Tactics
                {', '.join(tactics)}

                ## Platforms
                {', '.join(platforms)}

                ## Detection
                {detection}

                ## Data Sources
                {', '.join(data_sources)}
                """

            doc = Document(
                page_content=content,
                metadata={
                    "type": "MITRE_ATTACK",
                    "technique_id": technique_id,
                    "name": name,
                    "tactics": tactics,
                },
                id=str(i),
            )

            documents.append(doc)
            ids.append(str(i))

    return [documents, ids]

### Initialize QDRANT vector database


In [None]:
from services.ollama_service import *

# Initialize and get documents


documents, ids = initialize_attack_db("mitre_data/enterprise-attack.json")


print(f"Processed {len(documents)} MITRE ATT&CK techniques")



# Generate embeddings


print("Generating embeddings...")


embeddings = generate_embeddings(documents)

Loading MITRE ATT&CK data...
Processed 799 MITRE ATT&CK techniques


In [None]:
from services.qdrant_service import *
# Check if the environment variable exists
from qdrant_client import QdrantClient

qdrant_url = os.environ.get("QDRANT_URL")
qdrant_api_key = os.environ.get("QDRANT_API_KEY")

qdrant_client = QdrantClient(
    url=qdrant_url,
    api_key=qdrant_api_key,
)


# # Store in Qdrant


print("Storing in Qdrant...")


ids = [int(x) for x in ids]  # Convert IDs to integers


store_in_qdrant(documents, embeddings, ids, qdrant_client)



print("MITRE ATT&CK vector database initialized successfully!")

### Building AI Agent with Ollama


In [39]:
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langgraph.prebuilt import create_react_agent
from langgraph_supervisor import create_supervisor
import requests

In [40]:
# === AI Model Selection ===
USE_OPENAI = os.getenv("USE_OPENAI", "False").lower() == "true"
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPEN_AI_API_KEY = ""  # Not use currently
LOCAL_LLM = "qwen2.5:7b"
# LOCAL_LLM = "llama3:latest"

if USE_OPENAI and OPENAI_API_KEY:
    model = ChatOpenAI(api_key=OPENAI_API_KEY, model="gpt-4")
else:
    model = ChatOllama(
        model=LOCAL_LLM, temperature=0.0, base_url="http://localhost:11434"
    )

In [41]:
def verify_ollama_model(model_name, base_url="http://localhost:11434"):
    """Verify if Ollama model exists and is accessible"""
    try:
        # Make a request to Ollama API to check if the model exists
        url = f"{base_url}/api/tags"
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            available_models = [
                model["name"] for model in response.json().get("models", [])
            ]
            if any(model_name in m for m in available_models):
                print(f"✓ Local model '{model_name}' is available")
                return True
            else:
                print(
                    f"✗ Local model '{model_name}' not found in Ollama. Available models: {available_models}"
                )
                return False
        else:
            print(f"✗ Failed to query Ollama API: {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        print(f"✗ Cannot connect to Ollama server at {base_url}: {str(e)}")
        return False


# Just verify if local LLM is available without initializing it
if verify_ollama_model(LOCAL_LLM):
    print("Connection to local LLM successful!")
else:
    print(
        "Failed to connect to local LLM. Check if Ollama is running and the model is available."
    )

✓ Local model 'qwen2.5:7b' is available
Connection to local LLM successful!


### Prompt template for the agent


In [42]:
# === MITRE ATT&CK Prompts ===
gSce2MitrePrompt = """
You are a helpful assistant who helps map cyber attack scenario descriptions to the
tactic and technique in MITRE Adversarial Tactics, Techniques, and Common Knowledge (ATT&CK) Enterprise Matrix.
Please list the tactic and technique using the following format:

tactic:
technique:
"""

gMitreVerifyPrompt = """
Verify whether the given MITRE ATT&CK technique can be found in the attack scenario:

Scenario:
%s

Description and provide a short explanation about whether the given technique matches
any part of the scenario description. Use the following format:

match: Yes/No
explanation: <Brief summary of how the technique matches the scenario>
"""

gSceVulCheckPrompt = """
You are a helpful assistant who analyzes attack scenario descriptions and identifies vulnerabilities.
Match the vulnerabilities to the MITRE Common Weakness Enumeration (CWE) and provide a short explanation.
List the matched MITRE CWE using the following format:

MITRE_CWE: CWE-<number>
- CWE_Name: <MITRE CWE name>
- vulnerability: Short summary of how the CWE matches the attack scenario
"""

gAgentPrompt = """You are an expert in cybersecurity tactics, techniques, and mitigations.

When analyzing security questions or scenarios:
1. First, use retrieve_attack_techniques to find relevant MITRE ATT&CK knowledge corresponding to the scenario
2. For deeper analysis, use get_technique_details to get comprehensive information on specific techniques
3. Use verify_attack_technique_with_knowledge to validate if techniques apply to the scenario
4. Provide detailed explanations using the knowledge from MITRE ATT&CK framework

Always cite the specific technique IDs and tactics in your answers.
"""

### Building Agents


In [None]:
# === Tools ===
from services.qdrant_service import query_mitre_attack
from qdrant_client.http import models


def retrieve_attack_techniques(query):
    """
    Tool to retrieve relevant MITRE ATT&CK techniques from Qdrant.
    """
    techniques = query_mitre_attack(query, client=qdrant_client)

    if not techniques:
        return "No relevant techniques found."

    print(techniques)
    result = "Retrieved relevant MITRE ATT&CK techniques:\n\n"
    for i, tech in enumerate(techniques, 1):
        result += f"{i}. {tech['name']} ({tech['technique_id']})\n"
        result += f"   Tactics: {', '.join(tech['tactics'])}\n"
        result += f"   Description: {tech['content'][:200]}...\n\n"

    return result


def get_technique_details(technique_id):
    """
    Tool to get detailed information about a specific technique by ID.
    """
    client = qdrant_client

    # Filter by technique_id
    search_results = client.scroll(
        collection_name="mitre-attack",
        scroll_filter=models.Filter(
            must=[
                models.FieldCondition(
                    key="technique_id", match=models.MatchValue(value=technique_id)
                )
            ]
        ),
        limit=1,
    )[0]

    if not search_results:
        return f"No technique found with ID: {technique_id}"

    technique = search_results[0].payload
    result = f"# {technique['name']} ({technique['technique_id']})\n\n"
    result += f"**Tactics**: {', '.join(technique['tactics'])}\n\n"
    result += f"**Description**:\n{technique['content']}\n"

    return result

In [None]:
# result = get_technique_details("T1110.001")
# print(result)
# result = retrieve_attack_techniques("Multiple failed login attempts with different passwords for a user, on my Enterprise app. Log file shows failed login attempts and IP addresses which are also different. Seems like a simulated attack.")

# Password Guessing (T1110.001)

**Tactics**: credential-access

**Description**:

                # T1110.001: Password Guessing

                ## Description
                Adversaries with no prior knowledge of legitimate credentials within the system or environment may guess passwords to attempt access to accounts. Without knowledge of the password for an account, an adversary may opt to systematically guess the password using a repetitive or iterative mechanism. An adversary may guess login credentials without prior knowledge of system or environment passwords during an operation by using a list of common passwords. Password guessing may or may not take into account the target s policies on password complexity or use policies that may lock accounts out after a number of failed attempts. Guessing passwords can be a risky option because it could cause numerous authentication failures and account lockouts, depending on the organization s login failure policies. (Citation: Cylance 

  search_results = client.search(


[ScoredPoint(id=456, version=4, score=0.47665, payload={'type': 'MITRE_ATTACK', 'technique_id': 'T1528', 'name': 'Steal Application Access Token', 'tactics': ['credential-access'], 'content': '\n                # T1528: Steal Application Access Token\n\n                ## Description\n                Adversaries can steal application access tokens as a means of acquiring credentials to access remote systems and resources. Application access tokens are used to make authorized API requests on behalf of a user or service and are commonly used as a way to access resources in cloud and container-based applications and software-as-a-service (SaaS).(Citation: Auth0 - Why You Should Always Use Access Tokens to Secure APIs Sept 2019) Adversaries who steal account API tokens in cloud and containerized environments may be able to access data and perform actions with the permissions of these accounts, which can lead to privilege escalation and further compromise of the environment. For example, in

In [45]:
# === MITRE ATT&CK Data Processing Agent ===
def mitre_attack_tool(query):
    """
    Process attack scenario description and map to MITRE ATT&CK.
    """
    prompt = gSce2MitrePrompt + f"\nScenario:\n{query}"
    response = model.invoke(prompt)
    return response if response else "No relevant MITRE ATT&CK data found."


def verify_attack_technique(technique_id, scenario):
    """
    Verify whether the technique is correctly mapped to the attack scenario
    using knowledge from the Qdrant database.
    """
    # Get technique details
    technique_details = get_technique_details(technique_id)

    # Build verification prompt
    verification_prompt = f"""
    Verify whether the following MITRE ATT&CK technique applies to the attack scenario:

    Scenario:
    {scenario}

    Technique information:
    {technique_details}

    Analyze whether this technique matches the scenario and explain why. Use the following format:

    match: Yes/No
    explanation: <Brief summary of how the technique matches or doesn't match the scenario>
    """

    response = model.invoke(verification_prompt)
    return response


mitre_agent = create_react_agent(
    model=model,
    tools=[
        mitre_attack_tool,
        verify_attack_technique,
        retrieve_attack_techniques,
        get_technique_details,
    ],
    name="mitre_expert",
    prompt=gAgentPrompt,
)

In [49]:
def chat_with_mitre_agent(agent):
    print("=== MITRE ATT&CK Security Assistant ===")
    print("Type 'exit' or 'quit' to end the chat.\n")

    while True:
        user_input = input("You: ")
        if user_input.lower() in ["exit", "quit"]:
            print("\nGoodbye!")
            break

        messages = [
            {
                "role": "user",
                "content": user_input
            }
        ]

        try:
            result = agent.invoke({"messages": messages})
            print("\nAgent:", result, "\n")
        except Exception as e:
            print(f"\nError: {str(e)}")

chat_with_mitre_agent(mitre_agent)

=== MITRE ATT&CK Security Assistant ===
Type 'exit' or 'quit' to end the chat.

Query vector: [-0.011032174, -0.035618085, 0.025896847, 0.02213316, -0.0013822803, 0.0042633275, -0.009605608, -0.012782637, -0.005452732, -0.0021916227, -0.00016821384, 0.011259173, 0.0015347373, -0.020209229, 0.0058451155, 0.019331034, -0.012652764, -0.009101873, -0.019345177, -0.004113675, -0.022470558, 0.0046612555, -0.003520586, 0.015862746, 0.011509708, 0.025643308, 0.016975997, -0.005686827, -0.021892998, -0.0021338272, 0.000114977694, -0.008536156, 0.024826514, 0.016098518, -0.0043239947, 0.00025144953, 0.005470137, 0.013599733, 0.021200052, -0.009343907, 0.0004672356, -0.008372501, 0.0014756419, 0.0006617229, 0.011063432, 0.004139539, 0.022799453, 0.010425456, -0.004485651, -0.009693336, 0.016847461, -0.009947392, -0.016554078, -0.0020440272, -0.011315909, -0.0025129193, -0.016534153, -0.004851079, -0.028460188, -0.006175897, -0.0082263155, 0.008831005, 0.008096379, 0.01644382, 0.01656039, 0.010546

  search_results = client.search(


[ScoredPoint(id=398, version=3, score=0.26119035, payload={'type': 'MITRE_ATTACK', 'technique_id': 'T1070', 'name': 'Indicator Removal', 'tactics': ['defense-evasion'], 'content': '\n                # T1070: Indicator Removal\n\n                ## Description\n                Adversaries may delete or modify artifacts generated within systems to remove evidence of their presence or hinder defenses. Various artifacts may be created by an adversary or something that can be attributed to an adversary s actions. Typically these artifacts are used as defensive indicators related to monitored events, such as strings from downloaded files, logs that are generated from user actions, and other data analyzed by defenders. Location, format, and type of artifact (such as command or login history) are often specific to each platform. Removal of these indicators may interfere with event collection, reporting, or other processes used to detect intrusion activity. This may compromise the integrity of 

KeyboardInterrupt: 