In [None]:
!pip install langchain-experimental langchain-community langchain networkx langchain-google-genai langchain-core json-repair tiktoken

Collecting langchain-experimental
  Downloading langchain_experimental-0.3.4-py3-none-any.whl.metadata (1.7 kB)
Collecting langchain-community
  Downloading langchain_community-0.3.14-py3-none-any.whl.metadata (2.9 kB)
Collecting langchain-google-genai
  Downloading langchain_google_genai-2.0.8-py3-none-any.whl.metadata (3.6 kB)
Collecting json-repair
  Downloading json_repair-0.35.0-py3-none-any.whl.metadata (11 kB)
Collecting tiktoken
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting langchain-core
  Downloading langchain_core-0.3.29-py3-none-any.whl.metadata (6.3 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting httpx-sse<0.5.0,>=0.4.0 (from langchain-community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting langchain
  Downloading langchain-0.3.14-py3-none-any.whl.metadata (7.1 kB)
Coll

In [None]:
def map_rule_to_mitre(rule, mitre_techniques):
    print(f"Mapping rule ID {rule['suri_rule_id']}...")

    # Create a set of valid technique IDs for validation
    valid_technique_ids = {t['technique_id'] for t in mitre_techniques}

    # Initial prompt for mapping
    def create_prompt():
        return f"""
        You are a cybersecurity expert tasked with mapping Suricata IDS rules to MITRE ATT&CK techniques.

        ### Instructions:
        - Carefully analyze the rule classification and message to determine the intent of the activity.
        - Use the provided MITRE ATT&CK techniques list to map the rule.
        - If the rule involves scanning or probing a network broadly, map it to "Active Scanning (T1595)" under "Reconnaissance (TA0043)".
        - If the rule focuses on identifying specific services on individual hosts, map it to "Network Service Discovery (T1046)" under "Discovery (TA0007)".
        - For rules mentioning exploitation of public-facing services, use "Exploit Public-Facing Application (T1190)".
        - Respond strictly with one of the techniques from the provided list.

        ### Suricata Rule:
        - ID: {rule["suri_rule_id"]}
        - Classification: {rule["suri_rule_classtype"]}
        - Message: "{rule["suri_rule_msg"]}"

        ### MITRE ATT&CK Techniques:
        {json.dumps([{t['technique_id']: t['technique_name']} for t in mitre_techniques], indent=2)}

        Respond in this exact JSON format:
        {{
            "mitre_technique_id": "<Technique ID>",
            "mitre_technique_name": "<Technique Name>",
            "mitre_tactic_id": "<Tactic ID>",
            "mitre_tactic_name": "<Tactic Name>"
        }}
        """

    # Helper function to interact with the LLM
    def query_llm(prompt):
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a cybersecurity expert that maps Suricata rules to MITRE ATT&CK techniques."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=400,
            temperature=0.0
        )
        return response

    try:
        # Track time for API call
        start_time = time.time()
        response = query_llm(create_prompt())
        elapsed_time = time.time() - start_time

        # Extract and clean response content
        response_content = response.choices[0].message.content.strip()
        print(f"Raw response for rule ID {rule['suri_rule_id']}:\n{response_content}")

        # Remove markdown formatting
        cleaned_response = response_content.replace("```json", "").replace("```", "").strip()

        # Safely parse JSON
        parsed_response = json.loads(cleaned_response)

        # Validate technique ID
        if parsed_response['mitre_technique_id'] not in valid_technique_ids:
            print(f"Invalid technique ID '{parsed_response['mitre_technique_id']}' for rule ID {rule['suri_rule_id']}. Retrying with stricter constraints...")

            # Retry prompt to enforce stricter constraints
            retry_prompt = f"""
            You previously mapped the rule to an invalid MITRE ATT&CK technique. Please remap it strictly using only the following list of valid techniques:

            ### Valid MITRE ATT&CK Techniques:
            {json.dumps([{t['technique_id']: t['technique_name']} for t in mitre_techniques], indent=2)}

            ### Suricata Rule:
            - ID: {rule["suri_rule_id"]}
            - Classification: {rule["suri_rule_classtype"]}
            - Message: "{rule["suri_rule_msg"]}"

            Respond in the same JSON format as before:
            {{
                "mitre_technique_id": "<Technique ID>",
                "mitre_technique_name": "<Technique Name>",
                "mitre_tactic_id": "<Tactic ID>",
                "mitre_tactic_name": "<Tactic Name>"
            }}
            """
            retry_response = query_llm(retry_prompt)
            retry_response_content = retry_response.choices[0].message.content.strip()
            cleaned_retry_response = retry_response_content.replace("```json", "").replace("```", "").strip()
            parsed_retry_response = json.loads(cleaned_retry_response)

            # Validate retry response
            if parsed_retry_response['mitre_technique_id'] not in valid_technique_ids:
                print(f"Retry failed: Invalid technique ID again for rule ID {rule['suri_rule_id']}. Skipping this rule.")
                return None

            print(f"Rule ID {rule['suri_rule_id']} successfully remapped after retry.")
            return parsed_retry_response

        print(f"Rule ID {rule['suri_rule_id']} mapped successfully in {elapsed_time:.2f} seconds.")
        return parsed_response

    except json.JSONDecodeError as e:
        print(f"Error: Invalid JSON format for rule ID {rule['suri_rule_id']}.")
        return None

    except Exception as e:
        print(f"Error processing rule {rule['suri_rule_id']}: {e}")
        return None


In [None]:
# Install OpenAI SDK if not installed

# Import required libraries
import json
import os
import time
from openai import OpenAI

# Set OpenAI API key directly
os.environ['OPENAI_API_KEY'] = ''

# Initialize the OpenAI client
client = OpenAI(
    api_key=os.environ['OPENAI_API_KEY']
)


# Load extracted rules
def load_extracted_rules(file_path):
    print(f"Loading extracted rules from {file_path}...")
    with open(file_path, "r") as f:
        rules = json.load(f)
    print(f"Loaded {len(rules)} rules.\n")
    return rules

# Load MITRE ATT&CK techniques
def load_mitre_techniques(file_path):
    print(f"Loading MITRE ATT&CK techniques from {file_path}...")
    with open(file_path, "r") as f:
        techniques = json.load(f)
    print(f"Loaded {len(techniques)} MITRE ATT&CK techniques.\n")
    print("Sample MITRE ATT&CK techniques:")
    for technique in techniques[:5]:  # Print first 5 techniques
        print(json.dumps(technique, indent=4))
    return techniques

# # Map a single rule to MITRE ATT&CK technique using LLM
# def map_rule_to_mitre(rule, mitre_techniques):
#     print(f"Mapping rule ID {rule['suri_rule_id']}...")

#     # Create a set of valid technique IDs for validation
#     valid_technique_ids = {t['technique_id'] for t in mitre_techniques}

#     prompt = f"""
#     You are a cybersecurity expert tasked with mapping Suricata IDS rules to MITRE ATT&CK techniques.

#     ### Instructions:
#     - Carefully analyze the rule classification and message to determine the intent of the activity.
#     - Use the provided MITRE ATT&CK techniques list to map the rule.
#     - If the rule involves scanning or probing a network broadly, map it to "Active Scanning (T1595)" under "Reconnaissance (TA0043)".
#     - If the rule focuses on identifying specific services on individual hosts, map it to "Network Service Discovery (T1046)" under "Discovery (TA0007)".
#     - For rules mentioning exploitation of public-facing services, use "Exploit Public-Facing Application (T1190)".
#     - Respond strictly with one of the techniques from the provided list.

#     ### Suricata Rule:
#     - ID: {rule["suri_rule_id"]}
#     - Classification: {rule["suri_rule_classtype"]}
#     - Message: "{rule["suri_rule_msg"]}"

#     ### MITRE ATT&CK Techniques:
#     {json.dumps([{t['technique_id']: t['technique_name']} for t in mitre_techniques], indent=2)}

#     Respond in this exact JSON format:
#     {{
#         "mitre_technique_id": "<Technique ID>",
#         "mitre_technique_name": "<Technique Name>",
#         "mitre_tactic_id": "<Tactic ID>",
#         "mitre_tactic_name": "<Tactic Name>"
#     }}
#     """
#     try:
#         # Track time for API call
#         start_time = time.time()
#         response = client.chat.completions.create(
#             model="gpt-4o",
#             messages=[
#                 {"role": "system", "content": "You are a cybersecurity expert that maps Suricata rules to MITRE ATT&CK techniques."},
#                 {"role": "user", "content": prompt}
#             ],
#             max_tokens=400,
#             temperature=0.0
#         )
#         elapsed_time = time.time() - start_time

#         # Extract and clean response content
#         response_content = response.choices[0].message.content.strip()
#         print(f"Raw response for rule ID {rule['suri_rule_id']}:\n{response_content}")

#         # Remove markdown formatting
#         cleaned_response = response_content.replace("```json", "").replace("```", "").strip()

#         # Safely parse JSON
#         try:
#             parsed_response = json.loads(cleaned_response)

#             # Validate technique ID
#             if parsed_response['mitre_technique_id'] not in valid_technique_ids:
#                 print(f"Invalid technique ID '{parsed_response['mitre_technique_id']}' for rule ID {rule['suri_rule_id']}. Skipping this rule.")
#                 return None

#             print(f"Rule ID {rule['suri_rule_id']} mapped successfully in {elapsed_time:.2f} seconds.")
#             return parsed_response

#         except json.JSONDecodeError as e:
#             print(f"Error: Invalid JSON format for rule ID {rule['suri_rule_id']}. Response: {cleaned_response}")
#             return None

#     except Exception as e:
#         print(f"Error processing rule {rule['suri_rule_id']}: {e}")
#         return None

# Process first 100 rules
def process_rules_and_map_to_mitre(extracted_rules, mitre_techniques, output_file):
    print("Starting mapping process...\n")
    mapped_results = []
    total_rules = min(10, len(extracted_rules))  # Process up to 10 rules
    for i, rule in enumerate(extracted_rules[:total_rules]):
        print(f"Processing rule {i+1}/{total_rules}: {rule['suri_rule_msg']}")
        mapping = map_rule_to_mitre(rule, mitre_techniques)
        if mapping:
            mapped_results.append({
                "suri_rule_id": rule["suri_rule_id"],
                "suri_rule_classtype": rule["suri_rule_classtype"],
                "suri_rule_msg": rule["suri_rule_msg"],
                **mapping
            })
        else:
            print(f"Skipping rule ID {rule['suri_rule_id']} due to an error.")

    print("\nSaving mapping results to file...")
    with open(output_file, "w") as f:
        json.dump(mapped_results, f, indent=4)
    print(f"Mapping complete! Results saved to {output_file}")
    print(f"Total successfully mapped rules: {len(mapped_results)} out of {total_rules}.")

# File paths (assumes files are in the same directory)
extracted_rules_file = "test_suricata_rules.json"
mitre_techniques_file = "MITRE_ATTACK_TECHNIQUES.json"
output_file = "mapped_rules_to_mitre.json"

# Load files
extracted_rules = load_extracted_rules(extracted_rules_file)
mitre_techniques = load_mitre_techniques(mitre_techniques_file)

# Run the mapping
process_rules_and_map_to_mitre(extracted_rules, mitre_techniques, output_file)

# Output results (download the file in Colab, if needed)
print(f"Mapped rules saved to {output_file}.")


Loading extracted rules from test_suricata_rules.json...
Loaded 32 rules.

Loading MITRE ATT&CK techniques from MITRE_ATTACK_TECHNIQUES.json...
Loaded 52 MITRE ATT&CK techniques.

Sample MITRE ATT&CK techniques:
{
    "technique_id": "T1548",
    "technique_name": "Abuse Elevation Control Mechanism"
}
{
    "technique_id": "T1134",
    "technique_name": "Access Token Manipulation"
}
{
    "technique_id": "T1087",
    "technique_name": "Account Discovery"
}
{
    "technique_id": "T1098",
    "technique_name": "Account Manipulation"
}
{
    "technique_id": "T1595",
    "technique_name": "Active Scanning"
}
Starting mapping process...

Processing rule 1/10: GPL ATTACK_RESPONSE id check returned root
Mapping rule ID 2100498...
Raw response for rule ID 2100498:
```json
{
    "mitre_technique_id": "T1068",
    "mitre_technique_name": "Exploitation for Privilege Escalation",
    "mitre_tactic_id": "TA0002",
    "mitre_tactic_name": "Execution"
}
```
Rule ID 2100498 mapped successfully in 1.57

In [None]:
# Code to calculate accuracy by comparing LLM output JSON with ground truth JSONL
# and print differences

# Define the file paths
ground_truth_file = "suri_to_mitre_map_v2.jsonl"
llm_output_file = "mapped_rules_to_mitre.json"

# Load the ground truth data
ground_truth = {}
with open(ground_truth_file, "r") as f:
    for line in f:
        entry = json.loads(line)
        ground_truth[entry["suri_rule_id"]] = entry

# Load the LLM output data
with open(llm_output_file, "r") as f:
    llm_output = json.load(f)

# Initialize counters for accuracy calculation
correct_mappings = 0
total_mappings = len(llm_output)
differences = []

# Compare each LLM output entry with the ground truth
for rule in llm_output:
    rule_id = rule["suri_rule_id"]
    if rule_id in ground_truth:
        gt = ground_truth[rule_id]
        # Check if the LLM output matches the ground truth for the rule
        if (
            rule["mitre_technique_id"] == gt["mitre_technique_id"]
        ):
            correct_mappings += 1
        else:
            differences.append({
                "suri_rule_id": rule_id,
                "ground_truth": {
                    "mitre_technique_id": gt["mitre_technique_id"],
                    "mitre_technique_name": gt["mitre_technique_name"],
                    "mitre_tactic_id": gt["mitre_tactic_id"],
                    "mitre_tactic_name": gt["mitre_tactic_name"]
                },
                "llm_output": {
                    "mitre_technique_id": rule["mitre_technique_id"],
                    "mitre_technique_name": rule["mitre_technique_name"],
                    "mitre_tactic_id": rule["mitre_tactic_id"],
                    "mitre_tactic_name": rule["mitre_tactic_name"]
                }
            })

# Calculate accuracy
accuracy = (correct_mappings / total_mappings) * 100 if total_mappings > 0 else 0

# Print the results
print(f"Total Rules Processed: {total_mappings}")
print(f"Correct Mappings: {correct_mappings}")
print(f"Accuracy: {accuracy:.2f}%")

# Print differences if any
if differences:
    print("\nDifferences Found:")
    for diff in differences:
        print(json.dumps(diff, indent=4))
else:
    print("\nNo differences found between LLM output and ground truth.")

Total Rules Processed: 10
Correct Mappings: 6
Accuracy: 60.00%

Differences Found:
{
    "suri_rule_id": "2010937",
    "ground_truth": {
        "mitre_technique_id": "T1046",
        "mitre_technique_name": "Network Service Discovery",
        "mitre_tactic_id": "TA0007",
        "mitre_tactic_name": "Discovery"
    },
    "llm_output": {
        "mitre_technique_id": "T1595",
        "mitre_technique_name": "Active Scanning",
        "mitre_tactic_id": "TA0043",
        "mitre_tactic_name": "Reconnaissance"
    }
}
{
    "suri_rule_id": "2010935",
    "ground_truth": {
        "mitre_technique_id": "T1046",
        "mitre_technique_name": "Network Service Discovery",
        "mitre_tactic_id": "TA0007",
        "mitre_tactic_name": "Discovery"
    },
    "llm_output": {
        "mitre_technique_id": "T1595",
        "mitre_technique_name": "Active Scanning",
        "mitre_tactic_id": "TA0043",
        "mitre_tactic_name": "Reconnaissance"
    }
}
{
    "suri_rule_id": "2018489",
  