# Description of the Notebook

This Jupyter Notebook is a comprehensive workflow designed for machine learning tasks, particularly focusing on language modeling and cybersecurity applications. It integrates various tools and frameworks to streamline processes such as data preparation, model fine-tuning, and deployment. Below is an outline of its key components:

1. **Data Preparation**:
    - Extracts and formats datasets from external sources like CAPEC and Mitre CTI.
    - Prepares data for fine-tuning and retrieval-augmented generation (RAG).

2. **MAL Compiler Integration**:
    - Automates the setup and configuration of the MAL compiler.
    - Validates and refines generated Meta Attack Language (MAL) code.

3. **Mitre Data Processing**:
    - Processes STIX objects from the Mitre CTI repository.
    - Converts data into structured formats for use in RAG workflows.

4. **MAL Agent**:
    - Combines RAG and fine-tuned language models to generate valid MAL code.
    - Iteratively refines code based on compiler feedback for accuracy.

5. **LLM Fine-Tuning**:
    - Fine-tunes models like Mistral using LoRA (Low-Rank Adaptation).
    - Includes steps for dataset preparation, training, and saving fine-tuned models.

6. **Model Merging and Deployment**:
    - Merges LoRA adapters into base models for optimized inference.
    - Pushes final models to Hugging Face for seamless deployment.

7. **Inference and Testing**:
    - Provides pipelines for text generation and MAL code generation.
    - Tests fine-tuned models for specific use cases.

This notebook is tailored for researchers and developers working on advanced machine learning projects, with a focus on integrating domain-specific knowledge into language models for cybersecurity and related fields.

In [4]:
from google.colab import userdata

# Get your GitHub username and repository name
username = "TP15" # Replace with your GitHub username
repo_name = "MAThesis-MALLM" # Replace with your repository name
# Retrieve the PAT from secrets
pat = userdata.get('GitHub_MAL')

# Construct the authenticated URL (ensure the PAT is included)
# Note: The PAT effectively acts as the password here in the URL structure
repo_url_authenticated = f"https://{pat}@github.com/{username}/{repo_name}.git"

# Clone using the authenticated URL
!git clone {repo_url_authenticated}

# List files to confirm
!ls {repo_name}

Cloning into 'MAThesis-MALLM'...
remote: Enumerating objects: 2040, done.[K
remote: Counting objects: 100% (2040/2040), done.[K
remote: Compressing objects: 100% (1279/1279), done.[K
remote: Total 2040 (delta 555), reused 1901 (delta 425), pack-reused 0 (from 0)[K
Receiving objects: 100% (2040/2040), 20.85 MiB | 10.65 MiB/s, done.
Resolving deltas: 100% (555/555), done.
Evaluation  InterFace-Code  MAL_Thesis.ipynb  temp_input
HelperData  LLM-Code	    requirements.txt  torch-env


# DataPreparation



Dataextraction out of MAL-Languages

In [None]:
import json
import re
import os

def add_full_mal_file_to_output(content):
    return {'Output': content.strip(), 'Type': 'language'}

def process_all_mal_files(input_folder, output_folder):
    os.makedirs(output_folder, exist_ok=True)

    for filename in os.listdir(input_folder):
        if filename.endswith(".mal"):
            input_path = os.path.join(input_folder, filename)
            lang_name = os.path.splitext(filename)[0]
            output_path = os.path.join(output_folder, f"{lang_name}.jsonl")

            print(f"Processing {filename}...")
            extract_all_blocks(input_path, output_path)

    print("All .mal-Dateien were processed.")




def extract_assets(content):
    assets = []
    index = 0
    while index < len(content):
        if content.startswith("asset", index):
            start = index
            brace_open = content.find("{", index)
            if brace_open == -1:
                break

            brace_count = 1
            i = brace_open + 1
            while i < len(content) and brace_count > 0:
                if content[i] == "{":
                    brace_count += 1
                elif content[i] == "}":
                    brace_count -= 1
                i += 1

            asset_block = content[start:i].strip()
            assets.append({'Output': asset_block, 'Type': 'asset'})
            index = i
        else:
            index += 1
    return assets

def extract_all_blocks(mal_file_path, output_file_path):
    with open(mal_file_path, 'r') as file:
        content = file.read()
        lines = content.splitlines()

    outputs = []

        # --- Extract Language ---
    outputs.append({'Output': content.strip(), 'Type': 'language'})


    # --- Extract categories ---
    inside_category = False
    brace_count = 0
    current_block = []


    for line in lines:
        if 'category ' in line and not inside_category:
            inside_category = True
            brace_count = 0
            current_block = [line]
            brace_count += line.count('{') - line.count('}')
        elif inside_category:
            current_block.append(line)
            brace_count += line.count('{') - line.count('}')
            if brace_count == 0:
                outputs.append({'Output': '\n'.join(current_block).strip(), 'Type': 'category'})
                inside_category = False

    # --- Extract assets ---
    outputs.extend(extract_assets(content))

    # --- Extract associations ---
    assoc_pattern = re.compile(r'(associations\s*\{[^{}]*\})', re.MULTILINE | re.DOTALL)
    associations = assoc_pattern.findall(content)
    outputs.extend([{'Output': a.strip(), 'Type': 'association'} for a in associations])


    # --- Write all to JSONL ---
    with open(output_file_path, 'w') as outfile:
        for item in outputs:
            json.dump(item, outfile)
            outfile.write('\n')

    print(f"Extracted {len(outputs)} total blocks to {output_file_path}")


process_all_mal_files(
    "/Users/thomaspathe/Documents/MAThesis-MALLM/HelperData/MAL Languages/allMALfiles",
    "/Users/thomaspathe/Documents/MAThesis-MALLM/HelperData/MAL Languages/allMALfiles/jsonl_outputs"
)



## Creation of Finetune Dataset based on the pattern (Instruction, Input, Output).

In [None]:
from openai import OpenAI
import json
import random
import os
import glob
import shutil

client = OpenAI(
    api_key="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",
    base_url="https://openrouter.ai/api/v1"
)

asset_instructions = [
    "Convert the following cyber-attack description into Meta Attack Language code.",
    "Generate a Meta Attack Language snippet that models the scenario described below.",
    "Given this cyber incident, write its equivalent representation in Meta Attack Language.",
    "Translate this attack story into valid Meta Attack Language format.",
    "Write the Meta Attack Language structure for the following description.",
    "Represent the attack behavior below in your custom Meta Attack Language.",
    "Use Meta Attack Language to capture the steps described here.",
    "Model this cybersecurity attack using Meta Attack Language syntax.",
    "Write code in Meta Attack Language to describe the attack below.",
    "Convert the input into a structured Meta Attack Language format."
]

category_instructions = [
    "Define the following group of assets under a single category using Meta Attack Language.",
    "Create a category in Meta Attack Language that includes the assets listed below.",
    "Given these related assets, write a category definition in Meta Attack Language.",
    "Group the following assets into a category using the correct Meta Attack Language syntax.",
    "Write a Meta Attack Language snippet that declares a category containing these assets.",
    "Translate the grouping below into a category definition in Meta Attack Language.",
    "Use Meta Attack Language to define a category that encompasses the following assets.",
    "Model the following group of assets under a category using Meta Attack Language.",
    "Write the Meta Attack Language representation for a category including these assets.",
    "Create a valid Meta Attack Language definition for the described asset category."
]
association_instructions = [
    "Define the relationship between these two assets using an association in Meta Attack Language.",
    "Write a Meta Attack Language association that links the following assets.",
    "Model the connection described below as an association in Meta Attack Language.",
    "Use Meta Attack Language to describe the association between the listed assets.",
    "Create an association definition in Meta Attack Language for this asset interaction.",
    "Translate this asset linkage into a valid Meta Attack Language association.",
    "Write a Meta Attack Language snippet representing this association between entities.",
    "Represent the described interaction as a Meta Attack Language association.",
    "Define an association in Meta Attack Language using the connection details below.",
    "Generate a Meta Attack Language association linking the following asset types."
]

language_instructions = [
    "Design a complete Meta Attack Language model that includes asset definitions, grouped categories, and associations for the scenario below.",
    "Write a Meta Attack Language file from scratch that defines relevant assets, organizes them into categories, and models their interactions through associations.",
    "Translate the following cybersecurity system into a full Meta Attack Language specification, including asset classes, categories, and associations.",
    "Given the described infrastructure and threat scenario, generate a comprehensive MAL language definition, covering assets, categories, and associations.",
    "Write a Meta Attack Language snippet that models the entire threat landscape below, including asset types, logical groupings (categories), and how they are connected (associations).",
    "Create a complete .mal file using Meta Attack Language that defines assets, groups them into categories, and specifies their relationships.",
    "Convert the architectural and threat description below into a structured Meta Attack Language definition with all required elements: assets, categories, and associations.",
    "Generate a Meta Attack Language structure that models a threat scenario using proper categories for asset grouping and associations for inter-asset relations.",
    "Build a complete attack surface model using Meta Attack Language by defining custom assets, assigning them to categories, and describing their interactions via associations.",
    "Using Meta Attack Language, create a full language model that reflects the described environment. Ensure to include asset types, logical category groupings, and all necessary associations."
]

def generate_response(prompt: str, model="mistralai/mistral-7b-instruct:free", temperature=0.7, max_tokens=1000):
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are a technical security analyst and writer. Your task is to generate unstructured natural language descriptions that indirectly describe components of MAL (Modeling Attack Language) code, such as individual categories, assets, associations, attack steps, or attributes. These descriptions are not formal documentation — they are written in a more natural, real-world tone, similar to what might be found in cybersecurity incident reports, internal threat modeling documents, informal analyst notes, system architecture overviews, or security audit findings. Your goals: make the text realistic and informal, but factually aligned with the MAL code component. Include only information that is explicitly present in the code — never invent or interpret threat scenarios or system behavior beyond what is defined. The description should help a language model recognize and reconstruct the original MAL concept (e.g., asset, attack step, or relationship) from noisy, freeform input. Writing style rules: do not use bullet points or structured formatting. Mimic the tone and flow of real-world technical writing or internal security team communication. Use synonyms, varied sentence structure, and realistic phrasing to simulate real sources. Refer to identifiers as if they were mentioned in passing during a security assessment. You may mention relationships between components (e.g., 'X connects to Y' or 'A depends on B') as long as they directly reflect the MAL code. Component-specific guidance: for assets, describe what exists, its attributes, and what steps or associations are tied to it. For attack steps, describe possible actions or behaviors related to the asset. For associations, describe how two components are linked. For categories, describe what kinds of components it includes. Do not output or reference the MAL code directly. Do not mention 'MAL', 'language', 'modeling', or any meta-level concepts. The output should look like something a security professional might write naturally in a report or documentation."
                    )
                },
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            temperature=temperature,
            max_tokens=max_tokens,
        )

        output = response.choices[0].message.content.strip()
        print(f" Response received: {output[:80]}{'...' if len(output) > 80 else ''}")
        return output

    except Exception as e:
        print(f" Error: {e}")
        return f"Error: {e}"

def process_jsonl(input_path: str, output_path: str):
    with open(input_path, "r", encoding="utf-8") as infile, \
         open(output_path, "w", encoding="utf-8") as outfile:

        for idx, line in enumerate(infile, start=1):
            try:
                data = json.loads(line)
                if not isinstance(data, dict):
                    print(f"  Skipping line {idx}: Not a JSON object.")
                    continue

                prompt = data.get("Output")
                mal_type = data.get("Type")

                if prompt is None:
                    print(f"  Skipping line {idx}: 'Output' is None.")
                    continue
                if not isinstance(prompt, str) or not prompt.strip():
                    print(f"  Skipping line {idx}: 'Output' is empty or not a string.")
                    continue

                print(f"Processing line {idx}...")
                response = generate_response(prompt)

                if mal_type == "category":
                    instruction = random.choice(category_instructions)
                elif mal_type == "asset":
                    instruction = random.choice(asset_instructions)
                elif mal_type == "association":
                    instruction = random.choice(association_instructions)
                elif mal_type == "language":
                    instruction = random.choice(language_instructions)
                else:
                    instruction = "Convert the following input into Meta Attack Language format."

                result = {
                    "instruction": instruction,
                    "input": response,
                    "output": prompt
                }

                outfile.write(json.dumps(result, ensure_ascii=False) + "\n")

            except json.JSONDecodeError as e:
                print(f"  JSON error on line {idx}: {e}")
            except Exception as e:
                print(f"  Processing error on line {idx}: {e}")

def combine_jsonl_files(folder_path: str, combined_file_path: str):
    with open(combined_file_path, "w", encoding="utf-8") as outfile:
        for filename in sorted(glob.glob(os.path.join(folder_path, "*.jsonl"))):
            with open(filename, "r", encoding="utf-8") as infile:
                shutil.copyfileobj(infile, outfile)

if __name__ == "__main__":
    input_folder = "/Users/thomaspathe/Documents/MAThesis-MALLM/HelperData/MAL Languages/allMALfiles/jsonl_outputs"
    output_folder = "/Users/thomaspathe/Documents/MAThesis-MALLM/HelperData/MAL Languages/allMALfiles/jsonl_output/outputgenerated_jsonl_files"
    os.makedirs(output_folder, exist_ok=True)

    input_files = glob.glob(os.path.join(input_folder, "*.jsonl"))
    print(f" Found {len(input_files)} JSONL files to process.")

    for input_file in input_files:
        base_name = os.path.splitext(os.path.basename(input_file))[0]
        output_path = os.path.join(output_folder, f"{base_name}_processed.jsonl")
        print(f" Processing {base_name}...")
        process_jsonl(input_file, output_path)

    combined_output_path = os.path.join("/Users/thomaspathe/Documents/MAThesis-MALLM/HelperData/MAL Languages/allMALfiles/jsonl_outputs/", "combined_outputfinal.jsonl")
    combine_jsonl_files(output_folder, combined_output_path)
    print(f"\n All files processed and combined into: {combined_output_path}")


# SFT-For LLM

Working Modell - Uploading at the end to HF

In [None]:
!pip install transformers peft datasets bitsandbytes accelerate xformers
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling
)
from peft import LoraConfig, get_peft_model, TaskType
from datasets import load_dataset
from google.colab import auth
import torch
import getpass
from huggingface_hub import login
from transformers import BitsAndBytesConfig
from google.colab import userdata

auth.authenticate_user()
hf_token = userdata.get('HF_TOKEN')
login(token=hf_token)

# === Load model and tokenizer ===
model_name = "mistralai/Mistral-7B-v0.1"
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)


tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
tokenizer.pad_token = tokenizer.eos_token  # Mistral tokenizer doesn't have pad_token

from transformers import BitsAndBytesConfig

quant_config = BitsAndBytesConfig(load_in_4bit=True)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=quant_config,
    device_map="auto"
)


# === Apply LoRA ===
lora_config = LoraConfig(
    r=8,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type=TaskType.CAUSAL_LM,
)

model = get_peft_model(model, lora_config)

# === Load and preprocess your JSONL dataset ===
dataset_path = "/content/combined_output_FT170425.jsonl"
dataset = load_dataset("json", data_files=dataset_path, split="train")

# Format: Instruction-style prompt
def format_prompt(example):
    prompt = f"### Instruction:\n{example['instruction']}\n\n### Input:\n{example['input']}\n\n### Response:\n{example['output']}"
    return {"text": prompt}

# Apply formatting
dataset = dataset.map(format_prompt)

# Tokenize prompts
def tokenize(example):
    return tokenizer(
        example["text"],
        truncation=True,
        padding="max_length",
        max_length=512,
    )

tokenized_dataset = dataset.map(tokenize, batched=True)

# === Training configuration ===
training_args = TrainingArguments(
    output_dir="./mistral-lora",
    per_device_train_batch_size=2,
    gradient_accumulation_steps=16,
    num_train_epochs=3,
    logging_dir="./logs",
    save_total_limit=1,
    save_strategy="epoch",
    fp16=True,
    learning_rate=2e-4,
    optim="paged_adamw_8bit",
    logging_steps=2,
    report_to="none",
)

# === Trainer setup ===
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
    tokenizer=tokenizer,
    data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
)

# === Start training ===
trainer.train()

# === Save final model ===
model.save_pretrained("mistral-lora-ft")
tokenizer.save_pretrained("mistral-lora-ft")

from huggingface_hub import HfApi, HfFolder
from transformers import AutoTokenizer, AutoModelForCausalLM

repo_name = "mistral-7b-lora-finetuned"

from huggingface_hub import create_repo
create_repo(repo_name, private=True)  # Set private=False if you want it public

# Push model and tokenizer
model.push_to_hub(repo_name)
tokenizer.push_to_hub(repo_name)

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting bitsandbytes
  Downloading bitsandbytes-0.45.5-py3-none-manylinux_2_24_x86_64.whl.metadata (5.0 kB)
Collecting xformers
  Downloading xformers-0.0.29.post3-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (1.0 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.13.0->peft)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collec

tokenizer_config.json:   0%|          | 0.00/996 [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/493k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.80M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/414 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/25.1k [00:00<?, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/4.54G [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/9.94G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

Generating train split: 0 examples [00:00, ? examples/s]

Map:   0%|          | 0/213 [00:00<?, ? examples/s]

Map:   0%|          | 0/213 [00:00<?, ? examples/s]

  trainer = Trainer(
No label_names provided for model class `PeftModelForCausalLM`. Since `PeftModel` hides base models input arguments, if label_names is not given, label_names can't be set automatically within `Trainer`. Note that empty label_names list will be used instead.


Step,Training Loss
2,1.9337
4,1.8124
6,1.7401
8,2.4871
10,1.5954
12,1.5476
14,2.2621
16,1.4499
18,1.4478


HfHubHTTPError: 409 Client Error: Conflict for url: https://huggingface.co/api/repos/create (Request ID: Root=1-6804c1a5-6c09a26a07e1aa277446463b;7a6d84bc-0385-4e40-bec5-a9b1f821cce9)

You already created this model repo

## Merging of the Models and Push to HF
Merging LoRA adapter into basemodel and Upload to HF

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
import torch
from transformers import BitsAndBytesConfig

# === Pfade definieren ===
base_model_name = "mistralai/Mistral-7B-v0.1"
lora_model_path = "./mistral-lora-ft"
output_path = "./mistral-lora-merged"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    llm_int8_enable_fp32_cpu_offload=True  # <- erlaubt disk/CPU-Offload
)

base_model = AutoModelForCausalLM.from_pretrained(
    "mistralai/Mistral-7B-v0.1",
    device_map="auto",  # erlaubt GPU+CPU-Mapping
    quantization_config=bnb_config
)



# === LoRA in das Basismodell injizieren ===
model = PeftModel.from_pretrained(base_model, lora_model_path)

# === Merge durchführen ===
print(" Merging LoRA into base model...")
merged_model = model.merge_and_unload()

# === Speichern ===
print(" Saving merged model...")
merged_model.save_pretrained(output_path, safe_serialization=True)
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
tokenizer.save_pretrained(output_path)


# === Push full merged model to Hugging Face ===
repo_name = "mistral-7b-lora-merged"  # Change if needed
#create_repo(repo_name, private=True)

merged_model.push_to_hub(repo_name)
tokenizer.push_to_hub(repo_name)

print(f"Full model successfully uploaded: https://huggingface.co/TP15/{repo_name}")

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

 Merging LoRA into base model...




 Saving merged model...


model.safetensors:   0%|          | 0.00/4.45G [00:00<?, ?B/s]

README.md:   0%|          | 0.00/5.17k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/493k [00:00<?, ?B/s]

Full model successfully uploaded: https://huggingface.co/TP15/mistral-7b-lora-merged


# RAG Building

## MitreDataprep

In [None]:

import os
import json
from stix2 import FileSystemSource, Filter
from typing import Dict, List, Set, Any, Optional

# --- Configuration ---

LOCAL_CTI_REPO_PATH = '/Users/thomaspathe/Documents/MAThesis-MALLM/CTI/cti'
OUTPUT_RAG_DIR = "capec_rag_input_data"


DESIRED_CAPEC_TYPES: Set[str] = {
    "attack-pattern",
    "course-of-action",
}


def get_capec_id(stix_object: Dict[str, Any]) -> Optional[str]:
    """
    Extracts the CAPEC ID from a STIX object's external_references.

    Args:
        stix_object: A STIX object (as a dictionary or stix2 object).

    Returns:
        The CAPEC ID (e.g., "CAPEC-66") or None if not found.
    """
    if not hasattr(stix_object, 'external_references'):
        return None

    for ref in stix_object.external_references:
        if ref.get('source_name') == 'capec' and ref.get('external_id'):
            ext_id = ref['external_id']
            if isinstance(ext_id, int):
                 return f"CAPEC-{ext_id}"
            elif isinstance(ext_id, str):
                 return ext_id if ext_id.startswith("CAPEC-") else f"CAPEC-{ext_id}"
    return None

def save_for_rag(data_dict: Dict[str, List[Any]], output_dir: str):
    """
    Saves the extracted STIX objects into JSON files suitable for RAG input.

    Each object type gets its own JSON file containing a list of objects,
    where each object is converted to a standard Python dictionary.

    Args:
        data_dict: The dictionary containing lists of stix2 objects per type
                   (e.g., {'attack-pattern': [obj1, obj2], ...}).
        output_dir: The path to the directory where JSON files will be saved.
    """
    print(f"\nSaving data for RAG input into directory: {output_dir}")
    try:
        # Create the output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        print(f"Ensured output directory exists or created it.")
    except OSError as e:
        print(f"  Error creating directory {output_dir}: {e}")
        return

    for obj_type, object_list in data_dict.items():
        if not object_list:
            print(f"  Skipping type '{obj_type}': No objects found.")
            continue

        # Convert the list of stix2 objects to a list of dictionaries
        # using the .serialize() method, which gives a JSON-compatible string,
        # then parse it back to a dict. This handles custom properties correctly.
        data_to_save = []
        print(f"  Processing {len(object_list)} objects of type '{obj_type}' for saving...")
        for stix_obj in object_list:
             try:
                 # serialize() gives a string, json.loads() makes it a dict
                 obj_dict = json.loads(stix_obj.serialize())
                 data_to_save.append(obj_dict)
             except Exception as e:
                 print(f"    Warning: Could not serialize object {getattr(stix_obj, 'id', 'N/A')}: {e}")


        if not data_to_save:
             print(f"  Skipping file for '{obj_type}': No objects could be serialized.")
             continue

        # Define the output filename
        file_name = f"{obj_type}_rag_data.json"
        file_path = os.path.join(output_dir, file_name)

        print(f"  Saving {len(data_to_save)} '{obj_type}' objects to {file_path}...")
        try:
            with open(file_path, 'w', encoding='utf-8') as f:
                # Use json.dump for writing the list of dicts to the file
                # indent=4 makes the file readable
                # ensure_ascii=False handles special characters correctly
                json.dump(data_to_save, f, indent=4, ensure_ascii=False)
            print(f"    Successfully saved.")
        except IOError as e:
            print(f"    Error saving file {file_path}: {e}")
        except TypeError as e:
             print(f"    Error during JSON serialization for {file_path}: {e}")


# --- Main Execution ---
if __name__ == "__main__":
    # Construct the path to the capec data within the cloned repository
    capec_data_path = os.path.join(LOCAL_CTI_REPO_PATH, 'capec', '2.1')

    print(f"Attempting to access CAPEC data in: {capec_data_path}")

    # --- Pre-check: Verify the path exists ---
    if not os.path.isdir(capec_data_path):
         print("\n--- ERROR ---")
         print(f"The specific STIX version directory was not found: {capec_data_path}")
         print(f"Please ensure the repository at '{LOCAL_CTI_REPO_PATH}' is complete and contains the 'capec/2.1/' structure.")
         print("-------------\n")
         exit(1)
    # --- End Pre-check ---

    fs = None
    capec_data: Dict[str, List[Any]] = {obj_type: [] for obj_type in DESIRED_CAPEC_TYPES}

    try:
        # 1. Initialize the stix2 FileSystemSource
        print(f"\nInitializing STIX FileSystemSource for directory: {capec_data_path}")
        fs = FileSystemSource(capec_data_path, allow_custom=True)
        print("FileSystemSource initialized successfully.")

        # 2. Query for the desired object types
        print("\nQuerying for desired object types...")
        for obj_type in DESIRED_CAPEC_TYPES:
            try:
                filt = Filter('type', '=', obj_type)
                objects = fs.query([filt])
                capec_data[obj_type] = objects
                print(f"  Found {len(objects)} objects of type '{obj_type}'")
            except Exception as e:
                 print(f"  Error querying for type '{obj_type}': {e}")

        # 3. Example: Access data from the first Attack Pattern (Optional display)
        if capec_data.get("attack-pattern"):
            print("\n--- Example: First CAPEC Attack Pattern ---")
            # ... (example display code remains the same) ...
            first_ap = capec_data["attack-pattern"][0]
            capec_id = get_capec_id(first_ap)
            print(f"  STIX ID: {first_ap.id}")
            print(f"  CAPEC ID: {capec_id or 'Not Found'}")
            print(f"  Name: {getattr(first_ap, 'name', 'N/A')}")
            print(f"  Description: {getattr(first_ap, 'description', 'N/A')[:150]}...")
            print(f"  Custom Abstraction: {getattr(first_ap, 'x_capec_abstraction', 'N/A')}")
            prereqs = getattr(first_ap, 'x_capec_prerequisites', [])
            print(f"  Custom Prerequisites count: {len(prereqs)}")
            if prereqs:
                print(f"    - Prerequisite 1: {prereqs[0][:100]}...")

        else:
            print("\nNo CAPEC attack-patterns found or extracted.")

        # --- 4. SAVE THE EXTRACTED DATA ---
        # Check if any data was actually loaded before saving
        if any(capec_data.values()):
             save_for_rag(capec_data, OUTPUT_RAG_DIR)
        else:
             print("\nNo data loaded, skipping save step.")
        # --- End Save Step ---

    except Exception as e:
        print(f"\nAn error occurred during STIX processing: {e}")

CAPEC Dataprep

In [None]:
import json
import os

def extract_capec_id(external_references):
    """
    Sucht in der Liste der externen Referenzen nach dem CAPEC-Eintrag
    und gibt die externe ID zurück.
    """
    if not external_references:
        return None
    for ref in external_references:
        if ref.get("source_name") == "capec" and "external_id" in ref:
            return ref["external_id"]
    return None

def transform_attack_pattern(ap_data):
    """
    Transformiert einen Attack Pattern Eintrag in das Zielformat.
    """
    if not isinstance(ap_data, dict):
        print(f"Skipping invalid attack pattern data: {ap_data}")
        return None

    embedding_input = f"Name: {ap_data.get('name', 'N/A')}\nDescription: {ap_data.get('description', 'N/A')}"

    metadata = {
        "id": ap_data.get("id"),
        "type": ap_data.get("type"),
        "name": ap_data.get("name"),
        "capec_id": extract_capec_id(ap_data.get("external_references")),
        "abstraction": ap_data.get("x_capec_abstraction"),
        "domains": ap_data.get("x_capec_domains"),
        "status": ap_data.get("x_capec_status"),
        "version": ap_data.get("x_capec_version")
    }
    # Entferne Metadaten-Felder, die None sind, um das Objekt sauber zu halten
    metadata = {k: v for k, v in metadata.items() if v is not None}

    return {
        "embedding_input": embedding_input,
        "source_type": "CAPEC", # Wie besprochen, Quelle ist CAPEC
        "metadata": metadata,
        "raw": ap_data # Der komplette Originaleintrag
    }

def transform_course_of_action(coa_data):
    """
    Transformiert einen Course of Action Eintrag in das Zielformat.
    """
    if not isinstance(coa_data, dict):
        print(f"Skipping invalid course of action data: {coa_data}")
        return None

    embedding_input = f"Name: {coa_data.get('name', 'N/A')}\nDescription: {coa_data.get('description', 'N/A')}"

    metadata = {
        "id": coa_data.get("id"),
        "type": coa_data.get("type"),
        "name": coa_data.get("name"),
        "version": coa_data.get("x_capec_version")
    }
    metadata = {k: v for k, v in metadata.items() if v is not None}

    return {
        "embedding_input": embedding_input,
        "source_type": "CAPEC "+ coa_data.get("type", ""),
        "metadata": metadata,
        "raw": coa_data
    }

def transform_and_combine_jsonl(attack_pattern_file, course_of_action_file, output_file):
    """
    Liest Attack Pattern und Course of Action JSON-Dateien (die jeweils
    eine Liste von Objekten enthalten), transformiert jeden Eintrag und
    schreibt sie in eine gemeinsame Output-JSONL-Datei.

    Args:
        attack_pattern_file (str): Pfad zur Attack Pattern JSON-Datei (Liste).
        course_of_action_file (str): Pfad zur Course of Action JSON-Datei (Liste).
        output_file (str): Pfad zur zu erstellenden Output-JSONL-Datei.
    """
    processed_count = 0
    error_count = 0

    output_dir = os.path.dirname(output_file)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)
        print(f"Created output directory: {output_dir}")

    try:
        with open(output_file, 'w', encoding='utf-8') as outfile:

            # --- Process Attack Patterns ---
            print(f"Processing Attack Patterns from: {attack_pattern_file}")
            try:
                with open(attack_pattern_file, 'r', encoding='utf-8') as infile:
                    content = infile.read() # Read the whole file
                    try:

                        data_list = json.loads(content)
                        if not isinstance(data_list, list):
                            print(f"Error: Expected a JSON list in {attack_pattern_file}, but got {type(data_list)}")
                            error_count += 1
                        else:
                             # Iterate through items in the list
                            for original_data in data_list:
                                try:
                                    transformed_data = transform_attack_pattern(original_data)
                                    if transformed_data:
                                        json.dump(transformed_data, outfile, ensure_ascii=False)
                                        outfile.write('\n') # Write as JSON Lines
                                        processed_count += 1
                                    else:
                                        # Error already printed in transform function if data was invalid type
                                        error_count += 1
                                except Exception as e:
                                    print(f"Error processing attack pattern item: {original_data.get('id', 'N/A')}. Error: {e}")
                                    error_count += 1

                    except json.JSONDecodeError as e:
                        print(f"Skipping invalid JSON file: {attack_pattern_file}. Error: {e}")
                        error_count += 1 # Count the whole file as an error
            except FileNotFoundError:
                print(f"Error: Attack Pattern file not found at {attack_pattern_file}")
                error_count += 1
            except Exception as e:
                 print(f"An unexpected error occurred while processing {attack_pattern_file}: {e}")
                 error_count += 1


            # --- Process Courses of Action ---
            print(f"\nProcessing Courses of Action from: {course_of_action_file}")
            try:
                with open(course_of_action_file, 'r', encoding='utf-8') as infile:
                    content = infile.read() # Read the whole file
                    try:
                        # Parse the entire content as a JSON list
                        data_list = json.loads(content)
                        if not isinstance(data_list, list):
                           print(f"Error: Expected a JSON list in {course_of_action_file}, but got {type(data_list)}")
                           error_count += 1
                        else:
                            # Iterate through items in the list
                            for original_data in data_list:
                                try:
                                    transformed_data = transform_course_of_action(original_data)
                                    if transformed_data:
                                        json.dump(transformed_data, outfile, ensure_ascii=False)
                                        outfile.write('\n') # Write as JSON Lines
                                        processed_count += 1
                                    else:
                                        # Error already printed in transform function if data was invalid type
                                        error_count += 1
                                except Exception as e:
                                    print(f"Error processing course of action item: {original_data.get('id', 'N/A')}. Error: {e}")
                                    error_count += 1

                    except json.JSONDecodeError as e:
                        print(f"Skipping invalid JSON file: {course_of_action_file}. Error: {e}")
                        error_count += 1
            except FileNotFoundError:
                print(f"Error: Course of Action file not found at {course_of_action_file}")
                error_count += 1
            except Exception as e:
                 print(f"An unexpected error occurred while processing {course_of_action_file}: {e}")
                 error_count += 1

    except IOError as e:
        print(f"Error opening or writing to output file {output_file}: {e}")
        return

    print(f"\nTransformation complete.")
    print(f"Successfully processed and wrote {processed_count} entries to {output_file}")
    if error_count > 0:
        print(f"Encountered {error_count} errors or skipped entries/files.")


# --- MODIFIED Example Usage ---
if __name__ == "__main__":
    print("--- Starting Transformation ---")
    transform_and_combine_jsonl(
        attack_pattern_file="/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/capec_rag_input_data/attack-pattern_rag_data.json",
        course_of_action_file="/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/capec_rag_input_data/course-of-action_rag_data.json",
        output_file="/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/capec_rag_input_data/capec_combined_rag_data.jsonl"
    )
    print("--- Transformation Finished ---")

MAL Data Prep

In [None]:
import json
import os

def transform_mal_entry(mal_data):
    """
    Transforms a single MAL entry (from JSONL) into the target RAG format.

    Args:
        mal_data (dict): A dictionary representing a single line from the MAL JSONL file.
                         Expected keys: "input", "output", "type".

    Returns:
        dict or None: The transformed data in the target format, or None if input is invalid.
    """
    if not isinstance(mal_data, dict):
        print(f"Skipping invalid MAL data (expected dict): {type(mal_data)}")
        return None

    # Use .get() to safely access keys, providing default values if they might be missing
    description = mal_data.get("input", "")
    mal_code = mal_data.get("output", "")
    mal_type = mal_data.get("type")

    # Combine description and code for embedding. Add separators for clarity.
    # You can adjust this combination based on what works best for your RAG retrieval.
    embedding_input = f"Description:\n{description}\n\nMAL Code:\n{mal_code}"

    # Define metadata - primarily the type from the source
    metadata = {}
    if mal_type is not None:
        metadata["mal_type"] = mal_type

    # Construct the final object
    return {
        "embedding_input": embedding_input,
        "source_type": "MAL "+mal_type,
        "metadata": metadata,
        "raw": mal_data
    }

def process_mal_jsonl(mal_input_file, mal_output_file):
    """
    Reads a MAL JSONL file, transforms each entry, and writes the results
    to a new JSONL file in the target RAG format.

    Args:s
        mal_input_file (str): Path to the input MAL JSONL file.
        mal_output_file (str): Path for the output JSONL file.
    """
    processed_count = 0
    error_count = 0

    # Ensure output directory exists
    output_dir = os.path.dirname(mal_output_file)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)
        print(f"Created output directory: {output_dir}")

    print(f"Processing MAL data from: {mal_input_file}")
    print(f"Writing transformed data to: {mal_output_file}")

    try:
        with open(mal_input_file, 'r', encoding='utf-8') as infile, \
             open(mal_output_file, 'w', encoding='utf-8') as outfile:

            for line_num, line in enumerate(infile, 1):
                line = line.strip()
                if not line:
                    continue # Skip empty lines

                try:
                    original_data = json.loads(line)
                    transformed_data = transform_mal_entry(original_data)

                    if transformed_data:
                        json.dump(transformed_data, outfile, ensure_ascii=False)
                        outfile.write('\n')
                        processed_count += 1
                    else:
                        # Error details should have been printed by transform_mal_entry
                        error_count += 1

                except json.JSONDecodeError:
                    print(f"Skipping invalid JSON line #{line_num} in {mal_input_file}: {line[:100]}...")
                    error_count += 1
                except Exception as e:
                    print(f"Error processing line #{line_num} in {mal_input_file}: {line[:100]}... Error: {e}")
                    error_count += 1

    except FileNotFoundError:
        print(f"Error: Input file not found at {mal_input_file}")
        error_count += 1 # Consider file not found as an error
    except IOError as e:
        print(f"Error accessing files. Input: {mal_input_file}, Output: {mal_output_file}. Error: {e}")
        # No point reporting counts if files couldn't be opened/written
        return
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        # No point reporting counts if a major error occurred
        return


    print(f"\nTransformation complete.")
    print(f"Successfully processed and wrote {processed_count} MAL entries.")
    if error_count > 0:
        print(f"Encountered {error_count} errors or skipped entries.")

if __name__ == "__main__":

        print("--- Starting MAL Transformation ---")
        process_mal_jsonl(
            mal_input_file="/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/MAL_RAG.jsonl",    # <- Your MAL input file path
            mal_output_file="/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/Transformed_MALRAG_Data.jsonl"     # <- Your desired output file path
        )
        print("--- MAL Transformation Finished ---")

RAG Creation Dataset - Combining MAL and CAPEC

In [None]:
import os
import sys

def combine_jsonl(file1_path, file2_path, output_file_path):
    """
    Combines two JSONL files into a single JSONL file.

    Args:
        file1_path (str): Path to the first input JSONL file.
        file2_path (str): Path to the second input JSONL file.
        output_file_path (str): Path for the combined output JSONL file.
    """
    # Basic check to prevent overwriting input files
    if output_file_path == file1_path or output_file_path == file2_path:
        print(f"Error: Output file path cannot be the same as an input file path.")
        sys.exit(1) # Exit with an error code

    # Ensure output directory exists
    output_dir = os.path.dirname(output_file_path)
    if output_dir and not os.path.exists(output_dir):
        try:
            os.makedirs(output_dir)
            print(f"Created output directory: {output_dir}")
        except OSError as e:
            print(f"Error creating output directory {output_dir}: {e}")
            sys.exit(1)

    print(f"Combining '{os.path.basename(file1_path)}' and '{os.path.basename(file2_path)}' into '{os.path.basename(output_file_path)}'...")

    total_lines_written = 0
    try:
        with open(output_file_path, 'w', encoding='utf-8') as outfile:
            # Process first file
            try:
                print(f"Processing '{file1_path}'...")
                lines_file1 = 0
                with open(file1_path, 'r', encoding='utf-8') as infile1:
                    for line in infile1:
                        outfile.write(line) # Write line directly (includes newline)
                        lines_file1 += 1
                total_lines_written += lines_file1
                print(f"  Added {lines_file1} lines from '{os.path.basename(file1_path)}'.")
            except FileNotFoundError:
                print(f"Error: Input file not found: {file1_path}")
                # Decide if you want to continue or exit if a file is missing
                # sys.exit(1)
                # Or just print warning and continue with the next file

            # Process second file
            try:
                print(f"Processing '{file2_path}'...")
                lines_file2 = 0
                with open(file2_path, 'r', encoding='utf-8') as infile2:
                    for line in infile2:
                        outfile.write(line) # Write line directly
                        lines_file2 += 1
                total_lines_written += lines_file2
                print(f"  Added {lines_file2} lines from '{os.path.basename(file2_path)}'.")
            except FileNotFoundError:
                 print(f"Error: Input file not found: {file2_path}")
                 # Decide if you want to continue or exit

        print(f"\nSuccessfully combined files.")
        print(f"Total lines written to '{output_file_path}': {total_lines_written}")

    except IOError as e:
        print(f"Error during file operation: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


# --- Example Usage ---
if __name__ == "__main__":
    print("--- Starting Combination ---")

    combine_jsonl(
        file1_path="/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/MAL_RAG_Input/Transformed_MALRAG_Data.jsonl",      # <- Your first input JSONL file
        file2_path="/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/capec_rag_input_data/capec_combined_rag_data.jsonl",      # <- Your second input JSONL file
        output_file_path="/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/final_RAG_MAL_CAPEC_DATA.jsonl" #
    )
    print("--- Combination Finished ---")

Final RAG Creation with CAPEC and MAL as input

In [None]:
import os
import json
import sys
import traceback
from langchain_core.documents import Document
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from typing import Dict, List, Any, Optional

# --- Configuration ---

# Path to the single JSONL file containing combined/transformed data
INPUT_JSONL_FILE = "/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/final_RAG_MAL_CAPEC_DATA.jsonl" # ADJUST THIS PATH

# Directory where the FAISS index will be saved
FAISS_INDEX_PATH = "/Users/thomaspathe/Documents/MAThesis-MALLM/LLM-Code/RAG/RAG-DataPrep/capec_faiss_index" # Choose a new name for the combined index

# Name of the Sentence Transformer model to use for embeddings
EMBEDDING_MODEL_NAME = "all-mpnet-base-v2" # Or your preferred model

# --- End Configuration ---


# This function replaces the old load_and_prepare_docs
def load_docs_from_jsonl(jsonl_file_path: str) -> List[Document]:
    """
    Loads data from a JSONL file where each line has the pre-defined RAG structure,
    and creates LangChain Documents.

    Args:
        jsonl_file_path: Path to the input JSONL file.
                         Each line should be a JSON object like:
                         {
                           "embedding_input": "...",
                           "source_type": "MAL" | "CAPEC",
                           "metadata": { ... },
                           "raw": { ... } // raw is ignored here
                         }

    Returns:
        A list of LangChain Document objects.
    """
    all_docs: List[Document] = []
    print(f"Starting data loading from JSONL file: {jsonl_file_path}")

    if not os.path.exists(jsonl_file_path):
        print(f"Error: Input file not found: {jsonl_file_path}")
        return [] # Return empty list if file doesn't exist

    try:
        with open(jsonl_file_path, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue # Skip empty lines

                try:
                    entry = json.loads(line)

                    # Extract data based on the expected structure
                    page_content = entry.get("embedding_input")
                    metadata = entry.get("metadata", {}) # Get metadata dict, default to empty
                    source_type = entry.get("source_type") # Get source type

                    # Ensure essential data is present
                    if not page_content:
                        print(f"Warning: Skipping line {line_num} due to missing 'embedding_input'.")
                        continue
                    if not metadata:
                         print(f"Warning: Line {line_num} has missing 'metadata'. Using empty metadata.")
                    if source_type:
                        # Add source_type to the metadata dict for potential filtering later
                        metadata['source_type'] = source_type
                    else:
                        print(f"Warning: Line {line_num} has missing 'source_type'. It won't be added to metadata.")


                    # --- Create LangChain Document ---
                    # Use 'embedding_input' directly as the content to be embedded.
                    # Use the 'metadata' dictionary directly from the JSONL entry.
                    doc = Document(page_content=page_content, metadata=metadata)
                    all_docs.append(doc)

                except json.JSONDecodeError:
                    print(f"Warning: Skipping invalid JSON on line {line_num}: {line[:100]}...")
                except Exception as e:
                    print(f"Warning: Error processing line {line_num}: {e}. Data: {line[:100]}...")

    except IOError as e:
        print(f"Error reading file {jsonl_file_path}: {e}")
        return [] # Return empty list on file read error
    except Exception as e:
         print(f"An unexpected error occurred during file processing: {e}")
         return []


    print(f"Finished loading. Total documents prepared: {len(all_docs)}")
    return all_docs


# --- Main Execution ---
if __name__ == "__main__":
    print("--- Starting RAG Phase 1: Indexing Combined MAL/CAPEC Data ---")
    print(f"Input JSONL file: {INPUT_JSONL_FILE}")
    print(f"Vector store persistence directory: {FAISS_INDEX_PATH}")
    print(f"Using embedding model: {EMBEDDING_MODEL_NAME}")

    # 1. Load documents from the single JSONL file
    documents = load_docs_from_jsonl(INPUT_JSONL_FILE)

    if not documents:
        print("\nNo documents were loaded. Please check the input JSONL file exists and contains valid data. Exiting.")
        sys.exit(1) # Use sys.exit for clearer exit status

    # 2. Initialize embedding model
    print(f"\nInitializing embedding model '{EMBEDDING_MODEL_NAME}'...")
    # model_kwargs = {'device': 'cpu'} # Uncomment to force CPU if needed
    encode_kwargs = {'normalize_embeddings': False}
    try:
        embeddings = HuggingFaceEmbeddings(
            model_name=EMBEDDING_MODEL_NAME,
            # model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs
        )
        print("Embedding model initialized successfully.")
    except Exception as e:
        print(f"Error initializing embedding model: {e}")
        print("Make sure 'sentence-transformers' and potentially 'torch' are installed correctly.")
        sys.exit(1)

    # 3. Create and persist the FAISS vector store
    print(f"\nCreating FAISS index and saving to: {FAISS_INDEX_PATH}")
    try:
        # Calculate embeddings and create FAISS index
        vectorstore = FAISS.from_documents(
            documents=documents,
            embedding=embeddings
        )

        # Save the index and document store locally
        vectorstore.save_local(folder_path=FAISS_INDEX_PATH)

        print(f"\n--- Success! ---")
        print(f"FAISS index created and saved successfully in '{FAISS_INDEX_PATH}'.")
        print(f"Total documents indexed: {len(documents)}")

        # Optional: Simple test query requires loading the index first
        print("\nPerforming a quick test query (loading from disk)...")
        if not os.path.exists(FAISS_INDEX_PATH):
             print(f"  Error: Saved index path '{FAISS_INDEX_PATH}' not found for testing.")
        else:
            try:
                # Load the persisted index for testing
                loaded_vectorstore = FAISS.load_local(
                    FAISS_INDEX_PATH,
                    embeddings,
                    allow_dangerous_deserialization=True # Required by recent LangChain versions
                )
                # Example query - adjust based on your data (MAL or CAPEC)
                test_query = "Hardware supply chain attack description"
                results = loaded_vectorstore.similarity_search(test_query, k=1)

                if results:
                    print(f"  Test query '{test_query}' found result:")
                    # Access metadata directly from the loaded document's metadata dict
                    # Check the keys that exist in your combined_rag_data.jsonl's metadata
                    meta = results[0].metadata
                    source_type = meta.get('source_type', 'N/A')
                    doc_name = meta.get('name', meta.get('mal_type', 'N/A')) # Try 'name' (CAPEC) or 'mal_type' (MAL)
                    capec_id = meta.get('capec_id', 'N/A') # Check if CAPEC ID exists
                    print(f"    Source Type: {source_type}")
                    print(f"    Name/Type: {doc_name}")
                    if capec_id != 'N/A':
                         print(f"    CAPEC ID: {capec_id}")
                    # print(f"    Content Snippet: {results[0].page_content[:150]}...") # Uncomment to see content
                else:
                    print(f"  Test query '{test_query}' returned no results.")
            except Exception as e:
                print(f"  Error during test query loading/execution: {e}")
                traceback.print_exc() # Print traceback for loading errors


    except Exception as e:
        print(f"\n--- Error ---")
        print(f"An error occurred during FAISS index creation/saving: {e}")
        traceback.print_exc() # Print full traceback for FAISS errors

    print("\n--- RAG Phase 1 Finished (Using FAISS) ---")

MAL Compiler install

In [None]:
!apt-get update
!apt-get install -y openjdk-17-jdk  # Oder eine andere Version wie openjdk-11-jdk
!java -version
!java -version

!apt-get update
!apt-get install -y openjdk-17-jdk

# URL zur .tar.gz-Datei für Version 0.2.0
MALC_TAR_URL = "https://github.com/mal-lang/malc/releases/download/release%2F0.2.0/malc-0.2.0.linux.amd64.tar.gz"
MALC_TAR_FILE = "malc-0.2.0.linux.amd64.tar.gz"
EXTRACT_DIR = "malc_extracted" # Name für das Verzeichnis nach dem Entpacken

# Herunterladen
!wget {MALC_TAR_URL}

# Verzeichnis zum Entpacken erstellen (falls es bereits existiert, wird es übersprungen)
!mkdir -p {EXTRACT_DIR}

# Archiv in das Verzeichnis entpacken
# 'tar -xvzf' : eXtract, Verbose, Zipped (gzip), File
!tar -xvzf {MALC_TAR_FILE} -C {EXTRACT_DIR}

# Inhalt des entpackten Verzeichnisses anzeigen, um den Pfad zur 'malc'-Datei zu finden
!echo "Inhalt des Verzeichnisses '{EXTRACT_DIR}':"
!ls -l {EXTRACT_DIR}

# Prüfen, ob sich die 'malc'-Datei direkt im Verzeichnis oder in einem Unterverzeichnis wie 'bin' befindet.
# Wir gehen davon aus, dass sie direkt im Verzeichnis liegt (anpassen, falls nötig).
MALC_EXECUTABLE_PATH = f"./{EXTRACT_DIR}/malc-0.2.0.linux.amd64/malc"

# Sicherstellen, dass die Datei ausführbar ist
!chmod +x {MALC_EXECUTABLE_PATH}


MALC_EXECUTABLE_PATH = "./malc_extracted/malc-0.2.0.linux.amd64/malc" # Pfad anpassen, falls 'malc' in einem Unterverzeichnis (z.B. bin) liegt
MAL_SOURCE_FILE = "/content/emailphininglang.mal"

!{MALC_EXECUTABLE_PATH} {MAL_SOURCE_FILE}

# MAL Agent
This section introduces the MAL Agent, a pipeline designed to generate valid Meta Attack Language (MAL) code using a combination of Retrieval-Augmented Generation (RAG) and a fine-tuned language model. The agent integrates context retrieval from a FAISS-based vector store, LLM inference for code generation, and a MAL compiler for validation. The workflow includes iterative refinement of generated code based on compiler feedback to ensure correctness.

In [None]:
!pip install gradio
!pip install -U langchain-huggingface

In [None]:
import gradio as gr
from langchain_huggingface import HuggingFaceEmbeddings
import time
import os
import subprocess
import tempfile
import json
import torch
import logging
import re
import numpy as np
import pickle
import faiss
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from peft import PeftModel

# --- Configuration ---
BASE_MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2"
ADAPTER_PATH = "./path/to/your/qlora_adapter"
MAL_COMPILER_PATH = "/usr/local/bin/mal-compiler"

# --- RAG Configuration ---
EMBEDDING_MODEL_ID = 'all-MiniLM-L6-v2'
VECTOR_STORE_PATH = "capec_faiss_index"
USE_FAISS = True
TOP_K_RAG = 4

# --- LLM Configuration ---
MAX_ATTEMPTS = 5
MAX_NEW_TOKENS = 512
TEMPERATURE = 0.3
TOP_P = 0.9
DO_SAMPLE = True

# --- Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Prompt Templates ---
INITIAL_PROMPT_TEMPLATE = """
You are a domain expert in the Meta Attack Language (MAL), a formal meta-language used for designing domain-specific attack modeling languages. MAL enables modeling how cyberattacks propagate through systems by describing components (assets), their vulnerabilities (attack steps), protections (defenses), and relationships (associations). Your role is to understand, generate, explain, and validate MAL code with precise syntax and semantics.

Behavior Guidelines:

Follow MAL syntax strictly.

Ensure attack logic reflects causal and realistic propagation paths.

Ask clarifying questions if user input is ambiguous or underspecified.

Format MAL code clearly. When applicable, include comments to aid understanding.

Core Concepts You Must Apply:

Assets: Represent system entities (e.g., Host, User, Network). Each can contain:

Attack Steps: Labeled as:

"|" for OR logic (triggered by any parent)

"&" for AND logic (requires all parent steps)

"->" for propagation to another step or associated asset

Defenses ("#"): Boolean guards that can delay or block attack steps.

Time Distributions: Attack steps can define time-to-compromise using deterministic values or distributions like Exponential(0.1) or Gamma(1.5, 15).

Associations: Define relations between assets. Assets can reference attack steps in associated assets via named roles (e.g., -> hosts.connect).

Categories: Group related assets into reusable modules or domains (e.g., cloud, IoT, enterprise IT).

Inheritance: Assets can extend others to share attack logic. Use "abstract" for templates.

Entry Points: Use the "entry" keyword to define where the attacker starts.

Always Validate:

Attack step dependencies (require, connect) are logically consistent.

Associations reference correct roles and multiplicities.

Time modeling and defense impact are semantically accurate.

Code is modular, reusable, and conforms to domain modeling practices.

Example: MAL Code Snippet

#id: "org.mal-lang.examplelang" #version: "1.0.0"

category System {{ asset Network {{ | access -> hosts.connect }} # Escaped braces

asset Host {{ | connect -> access | authenticate -> access | guessPassword -> guessedPassword | guessedPassword [Exponential(0.02)] -> authenticate & access }} # Escaped braces

asset User {{ | attemptPhishing -> phish | phish [Exponential(0.1)] -> passwords.obtain }} # Escaped braces

asset Password {{ | obtain -> host.authenticate }} }} # Escaped braces (double closing)

associations {{ Network [networks] * <-- NetworkAccess --> * [hosts] Host Host [host] 1 <-- Credentials --> * [passwords] Password User [user] 1 <-- Credentials --> * [passwords] Password }} # Escaped braces

You are expected to:

Assist users in defining new categories, assets, and associations.

Debug MAL syntax errors or semantic issues.

Suggest domain-specific structures based on input goals (e.g., ICS, AWS, automotive).

Support modeling with attack simulation goals in mind (e.g., estimating global time to compromise).

You serve security engineers, researchers, and system modelers aiming to build precise, simulation-ready threat models.
Hier ist relevanter Kontext aus der CAPEC-Datenbank über Angriffsmuster:
---
{context}
---
Basierend auf diesem Kontext und der folgenden Beschreibung, generiere bitte validen MAL-Code.
Beschreibung: {input_text}

MAL Code:
"""

# REFINEMENT_PROMPT_TEMPLATE remains the same as it doesn't contain literal braces
REFINEMENT_PROMPT_TEMPLATE = """
Der vorherige MAL-Code war nicht korrekt. Compiler-Fehler:
{compiler_error}

Hier ist der ursprüngliche Kontext aus der CAPEC-Datenbank:
---
{context}
---
Hier ist die ursprüngliche Beschreibung: {input_text}
Hier ist der fehlerhafte MAL-Code:
{previous_code}

Bitte korrigiere den MAL-Code basierend auf dem Compiler-Feedback, dem Kontext und der Beschreibung.

Korrigierter MAL Code:
"""

REFINEMENT_PROMPT_TEMPLATE = """
The previous MAL code contained compiler errors: {compiler_error}

Context from the CAPEC database:
{context}
Description: {input_text}

Original incorrect MAL code: {previous_code}

Task:
Based on the compiler feedback, the provided context, and the description:

Correct only the rules and structure of the MAL code (e.g., assets, associations, categories).

Maintain the overall style and syntax similar to the example provided (pure rule-based format, no additional explanations, comments, or extra text).

Ensure that all braces {} are properly opened and closed, and that syntax follows standard MAL language practices.

Output only the fully corrected MAL rules without any comments, explanations, or additional formatting.

Corrected MAL Code:
"""

def setup_rag(embedding_model_id, index_folder_path):
    """
    Sets up RAG by loading the FAISS index and documents using LangChain's load_local.

    Args:
        embedding_model_id (str): The Hugging Face model ID for embeddings
                                  (must match the one used for creation).
        index_folder_path (str): The directory containing 'index.faiss' and 'index.pkl'.

    Returns:
        tuple: (vectorstore, embedding_model) or (None, None) on error.
               vectorstore is the loaded LangChain FAISS object.
    """
    logging.info(f"Setting up RAG by loading index from: {index_folder_path}")
    faiss_index_file = os.path.join(index_folder_path, "index.faiss")
    docstore_file = os.path.join(index_folder_path, "index.pkl")

    if not os.path.exists(faiss_index_file) or not os.path.exists(docstore_file):
        logging.error(f"FAISS index ('index.faiss') or docstore ('index.pkl') not found in {index_folder_path}")
        return None, None

    try:
        # --- Load Embedding Model ---
        # Ensure consistency with the model used during index creation
        # Use HuggingFaceEmbeddings from LangChain for consistency
        encode_kwargs = {'normalize_embeddings': False} # Match creation settings if needed
        embedding_model = HuggingFaceEmbeddings(
            model_name=embedding_model_id,
            encode_kwargs=encode_kwargs
            # model_kwargs={'device': 'cpu'} # Add if necessary
        )
        logging.info(f"Initialized embedding model: {embedding_model_id}")

        # --- Load FAISS index using LangChain ---
        # This correctly handles the index.pkl format
        vectorstore = FAISS.load_local(
            folder_path=index_folder_path,
            embeddings=embedding_model,
            allow_dangerous_deserialization=True # Required for loading pickles across environments/versions
        )
        logging.info(f"Successfully loaded FAISS index and docstore from {index_folder_path}")

        # No need to manually reconstruct docstore_map, the vectorstore object handles it.
        return vectorstore, embedding_model # Return the loaded vectorstore and the embedding instance

    except Exception as e:
        logging.error(f"RAG setup error during loading: {e}", exc_info=True)
        return None, None

def get_rag_examples(query_text, vectorstore, top_k):
    """
    Retrieves relevant document contents using the loaded LangChain FAISS vectorstore.

    Args:
        query_text (str): The user's query.
        vectorstore (FAISS): The loaded LangChain FAISS object from setup_rag.
        top_k (int): The number of top documents to retrieve.

    Returns:
        list[str]: A list of page_content strings from the retrieved documents,
                   or an empty list if retrieval fails or components are missing.
    """
    if not vectorstore:
        logging.warning("RAG vectorstore not available.")
        return []

    try:
        # Use LangChain's similarity search - it handles embedding the query internally
        # if the vectorstore was loaded with an embedding function.
        results = vectorstore.similarity_search(query_text, k=top_k)

        # Extract page_content from the results
        retrieved_contents = [doc.page_content for doc in results]
        return retrieved_contents

    except Exception as e:
        logging.error(f"RAG retrieval error: {e}", exc_info=True)
        return []

def run_llm_inference(prompt, model, tokenizer):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=MAX_NEW_TOKENS,
            temperature=TEMPERATURE,
            top_p=TOP_P,
            do_sample=DO_SAMPLE,
        )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

def run_mal_compiler(code_str, compiler_path):
    try:
        with tempfile.NamedTemporaryFile("w+", suffix=".mal", delete=False) as tmp_file:
            tmp_file.write(code_str)
            tmp_file.flush()
            result = subprocess.run(
                [compiler_path, tmp_file.name],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                timeout=10,
                text=True
            )
            if result.returncode == 0:
                return True, result.stdout
            else:
                return False, result.stderr
    except Exception as e:
        return False, f"Compiler error: {str(e)}"

def load_llm_and_tokenizer(base_model_id, adapter_path):
    try:
        logging.info("Loading base model...")
        bnb_config = BitsAndBytesConfig(load_in_4bit=True)
        base_model = AutoModelForCausalLM.from_pretrained(
            base_model_id,
            device_map="auto",
            quantization_config=bnb_config,
            trust_remote_code=True,
        )
        model = PeftModel.from_pretrained(base_model, adapter_path)
        tokenizer = AutoTokenizer.from_pretrained(base_model_id, trust_remote_code=True)
        return model, tokenizer
    except Exception as e:
        logging.error(f"Model loading error: {e}", exc_info=True)
        return None, None

# --- Change this function definition ---
# OLD:
# def generate_valid_mal(input_text, model, tokenizer, embedding_model, faiss_index, docstore_map):

# NEW:
def generate_valid_mal(input_text, model, tokenizer, vectorstore): # Accept the loaded vectorstore object
    logging.info("Generating MAL code...")
    current_code = ""
    compiler_error = ""
    rag_context_string = ""

    for attempt in range(MAX_ATTEMPTS):
        logging.info(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}")

        # Retrieve context only on the first attempt using the vectorstore
        if attempt == 0:
            # --- Update the call to get_rag_examples here ---
            # OLD:
            # retrieved_docs_content = get_rag_examples(
            #     input_text, embedding_model, faiss_index, docstore_map, TOP_K_RAG
            # )
            # NEW:
            retrieved_docs_content = get_rag_examples(
                input_text, vectorstore, TOP_K_RAG # Use the vectorstore passed to the function
            )
            # --- End change for get_rag_examples call ---

            rag_context_string = "\n\n---\n\n".join(retrieved_docs_content)
            if not rag_context_string:
                rag_context_string = "No relevant context found in CAPEC data."
            logging.info(f"Retrieved RAG context: {rag_context_string[:200]}...") # Log snippet

        # Format the prompt (rest of the logic seems okay)
        prompt = (
            INITIAL_PROMPT_TEMPLATE if attempt == 0 else REFINEMENT_PROMPT_TEMPLATE
        ).format(
            context=rag_context_string,
            input_text=input_text,
            previous_code=current_code,
            compiler_error=compiler_error
        )
        # print(f"DEBUG: Prompt for attempt {attempt+1}:\n{prompt[:500]}...") # Optional debug print

        current_code_raw = run_llm_inference(prompt, model, tokenizer)
        # Extract code potentially between backticks or after "MAL Code:"
        match = re.search(r"```mal\s*([\s\S]+?)\s*```|MAL Code:\s*([\s\S]+)", current_code_raw, re.IGNORECASE)
        if match:
             current_code = match.group(1) or match.group(2)
             current_code = current_code.strip() # Clean leading/trailing whitespace
             logging.info(f"Extracted MAL code snippet: {current_code[:200]}...")
        else:
             # If no clear delimiter, assume the response *after* the prompt is the code
             # This might need refinement depending on LLM output format
             prompt_marker = "Korrigierter MAL Code:" if attempt > 0 else "MAL Code:"
             code_start_index = current_code_raw.rfind(prompt_marker)
             if code_start_index != -1:
                 current_code = current_code_raw[code_start_index + len(prompt_marker):].strip()
                 logging.info(f"Extracted MAL code (fallback): {current_code[:200]}...")
             else:
                 current_code = current_code_raw # Fallback to raw output if marker not found
                 logging.warning("Could not reliably extract MAL code from LLM response, using raw output.")


        # Validate the extracted code
        if not current_code:
             logging.warning("LLM produced empty code.")
             compiler_error = "LLM produced empty code."
             continue # Try again

        success, message = run_mal_compiler(current_code, MAL_COMPILER_PATH)

        if success:
            logging.info("MAL code compiled successfully.")
            # Optional: Log the full successful code if needed
            # logging.info(f"Successful MAL Code:\n{current_code}")
            return current_code # Return the successfully compiled code
        else:
            logging.warning(f"Compilation failed. Error: {message}")
            # Log the failed code for debugging
            # logging.debug(f"Failed MAL Code:\n{current_code}")
            compiler_error = message # Store error for the next refinement prompt

    logging.error("Failed to generate valid MAL code after maximum attempts.")
    return f"Error: Could not generate valid MAL code after {MAX_ATTEMPTS} attempts.\nLast failed code:\n{current_code}\nLast compiler error:\n{compiler_error}"
# --- Define the function for the Chat Interface ---
# Note: Ensure model, tokenizer, loaded_vectorstore are accessible
# They are loaded in __main__ and should be accessible if this function
# is defined before being used in __main__ or if they are made global.
# For simplicity here, we assume they are accessible globally after loading.

def chat_generate_mal(message, history):
    """
    Function to be called by the Gradio ChatInterface.
    Takes user message (description) and chat history.
    Returns the bot's response (generated MAL code or error).
    """
    logging.info(f"Chatbot received message: {message[:100]}...") # Log snippet

    # --- Check if backend components are ready ---
    # This check assumes model, tokenizer, loaded_vectorstore are in the global scope
    # after being loaded in __main__. If not, you'll need to pass them differently.
    global model, tokenizer, loaded_vectorstore
    if model is None or tokenizer is None or loaded_vectorstore is None:
         logging.error("Chatbot cannot process: Backend components not loaded.")
         # Add a small delay for better UX before showing error
         time.sleep(1)
         return "Error: The backend components (LLM, RAG) are not ready. Please check the logs."

    # --- Call the core generation logic ---
    try:
        # The history is not used by generate_valid_mal, only the latest message
        response = generate_valid_mal(
            message,  # User's description is the message
            model,
            tokenizer,
            loaded_vectorstore
        )
        logging.info("MAL generation complete. Sending response to chatbot.")
        # Optional: Add formatting if needed, e.g., ensure code blocks
        # if not response.startswith("Error:"):
        #    response = f"```mal\n{response}\n```" # Wrap successful code in markdown
        return response
    except Exception as e:
        logging.error(f"Error during chat generation process: {e}", exc_info=True)
        return f"An internal error occurred while generating the MAL code: {str(e)}"

def chat_generate_mal(message, history):
    """
    Function to be called by the Gradio ChatInterface.
    Takes user message (description) and chat history.
    Returns the bot's response (generated MAL code or error).
    """
    logging.info(f"Chatbot received message: {message[:100]}...") # Log snippet

    # --- Check if backend components are ready ---
    # This check assumes model, tokenizer, loaded_vectorstore are in the global scope
    # after being loaded in __main__. If not, you'll need to pass them differently.
    global model, tokenizer, loaded_vectorstore
    if model is None or tokenizer is None or loaded_vectorstore is None:
          logging.error("Chatbot cannot process: Backend components not loaded.")
          # Add a small delay for better UX before showing error
          time.sleep(1)
          return "Error: The backend components (LLM, RAG) are not ready. Please check the logs."

    # --- Call the core generation logic ---
    try:
        # The history is not used by generate_valid_mal, only the latest message
        response = generate_valid_mal(
            message,  # User's description is the message
            model,
            tokenizer,
            loaded_vectorstore
        )
        logging.info("MAL generation complete. Sending response to chatbot.")
        # Optional: Add formatting if needed, e.g., ensure code blocks
        # if not response.startswith("Error:"):
        #    response = f"```mal\n{response}\n```" # Wrap successful code in markdown
        return response
    except Exception as e:
        logging.error(f"Error during chat generation process: {e}", exc_info=True)
        return f"An internal error occurred while generating the MAL code: {str(e)}"


# --- Modify the end of your script ---
if __name__ == "__main__":
    logging.info("--- Initializing MAL Agent ---")

    # --- Keep your existing loading logic ---
    # Make variables global so chat_generate_mal can access them
    # Alternatively, use a class structure or pass them via partial functions
    global model, tokenizer, loaded_vectorstore, loaded_embedding_model
    model, tokenizer = load_llm_and_tokenizer(BASE_MODEL_ID, ADAPTER_PATH)
    if model is None or tokenizer is None:
        logging.error("Model or tokenizer loading failed.")
        exit(1)

    logging.info("Attempting to set up RAG...")
    loaded_vectorstore, loaded_embedding_model = setup_rag(EMBEDDING_MODEL_ID, VECTOR_STORE_PATH)
    if loaded_vectorstore is None or loaded_embedding_model is None:
        logging.error("RAG setup failed. Exiting.")
        exit(1)
    else:
        logging.info("RAG setup successful.")
    # --- End of existing loading logic ---


    # --- Launch Gradio Chat Interface ---
    logging.info("All components loaded. Launching Gradio Chat Interface...")

    # Customize the appearance and behavior
    chatbot_ui = gr.Chatbot(
        label="MALCOLM Agent",
        bubble_full_width=False,
        height=600  # Adjust height as needed
        )

    # Define the ChatInterface
    iface = gr.ChatInterface(
        fn=chat_generate_mal, # The function to handle chat messages
        chatbot=chatbot_ui,
        textbox=gr.Textbox(placeholder="Enter your attack scenario description here...", container=False, scale=7),
        title="🤖 MALCOLM: MAL Code Generation Agent",
        description="""Enter a description of an attack scenario below.
        MALCOLM uses Mistral-7B (fine-tuned), RAG (with CAPEC data), and the MAL compiler ('malc') to generate and attempt to validate corresponding Meta Attack Language (MAL) code.
        Each description you enter will trigger a new generation process.""",
        theme="soft", # or "default", "glass", "monochrome"
        examples=[
            ["Describe an attack where an adversary uses SQL injection to gain initial access and then escalates privileges via a known kernel exploit."],
            ["Model a scenario where a user clicks a phishing link in an email, leading to credential theft used to access a company VPN."],
            ["Generate MAL for a simple ransomware attack spreading through SMB."]
        ],
        cache_examples=False, # Re-run examples if clicked
        retry_btn=None, # Text for retry button, e.g., "Retry Generation"
        undo_btn="Delete Last Turn", # Text for undo button
        clear_btn="Clear Conversation", # Text for clear button
        submit_btn="Generate MAL",
        # autofocus=False # Doesn't work well with ChatInterface currently
    )

    # Launch the web server
    iface.launch(share=False) # Set share=True for a temporary public link (useful in Colab)

    print("--- Gradio Chat Interface Launched ---")
    # The script will continue running to serve the interface

In [None]:
if __name__ == "__main__":
    logging.info("--- Initializing MAL Agent ---")

    model, tokenizer = load_llm_and_tokenizer(BASE_MODEL_ID, ADAPTER_PATH)
    if model is None or tokenizer is None:
        logging.error("Model or tokenizer loading failed.")
        exit(1)

    logging.info("Attempting to set up RAG...")
    # Correctly unpack the two return values from the updated setup_rag
    loaded_vectorstore, loaded_embedding_model = setup_rag(EMBEDDING_MODEL_ID, VECTOR_STORE_PATH)

    # Update the check to use the new variable names and check only two values
    if loaded_vectorstore is None or loaded_embedding_model is None:
        logging.error("RAG setup failed. Exiting.")
        # Handle the error appropriately, e.g., exit or raise an exception
        exit(1)
    else:
        logging.info("RAG setup successful.")
        # Now you can use loaded_vectorstore and loaded_embedding_model later
        # For example, when calling get_rag_examples:
        # examples = get_rag_examples(query, loaded_vectorstore, TOP_K)

    example_input = "Generate a Meta Attack Language snippet that models the scenario described below. In this vehicle's threat model, we have a gateway ECU (ECU_Gateway) that serves as the primary line of defense, acting as a firewall for the entire system. This ECU is equipped with extensive access controls, allowing it to handle both regular traffic and potential adversarial activities.\n\nThe gateway ECU has two primary modes of interaction: forwarding and denial. In the forwarding mode, it merely retransmits received messages, potentially leading to compromise if vulnerabilities are exploited. This, in turn, connects the attacker to the vehicle's internal network, bypassing the firewall if it is disabled.\n\nIf the firewall is indeed disabled, the attacker can bypass it and potentially access various services directly. This bypass also extends to the Intrusion Detection and Prevention System (IDPS) if it is in place but disabled. However, when the IDPS is present and the firewall is disabled, the attacker can bypass the IDPS protection as well.\n\nOn the other hand, if the IDPS does not exist, the attacker can access the network layer unrestricted, given that the firewall is also disabled. In such a scenario, the attacker can also directly access the vehicle's Universal Diagnostic Services (UDS) services.\n\nIt's worth noting that the existence of a properly configured firewall provides some level of firewall protection. Conversely, if an IDPS is absent, there is no protection from it.\n\nLastly, the gateway ECU also has the capability to perform denial of service attacks on the connected networks, potentially disrupting the vehicle's communication.\n\nIn summary, the gateway ECU is the first line of defense, controlling and filtering traffic in and out of the vehicle. Its configuration and the presence or absence of certain protection mechanisms significantly impact the vehicle's security posture."
    final_mal_code = generate_valid_mal(
        example_input,
        model,
        tokenizer,
        loaded_vectorstore 
    )

    print("\n--- Final MAL Code ---")
    print(final_mal_code)
    print("--- Finished ---")