In [1]:
import torch
import json
import re
from pathlib import Path
from tqdm.notebook import tqdm
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer



#MERGED_MODEL_PATH = "./qwen3-4b-sft-merged-final"
#MERGED_MODEL_PATH = "./Qwen3-4B-Instruct-2507"
MERGED_MODEL_PATH = "./qwen3-4b-sft-merged-final--with-reasoning"

TEST_DATASET_PATH = "./data/input/sft_dataset_4000_test.json"

#RESULTS_OUTPUT_PATH =  "./evaluation_results.json"
#RESULTS_OUTPUT_PATH =  "./evaluation_results_1.json"
RESULTS_OUTPUT_PATH =  "./evaluation_results_2.json"

print("✅ 配置加载完成")
print(f"模型路径: {MERGED_MODEL_PATH}")
print(f"测试集路径: {TEST_DATASET_PATH}")

✅ 配置加载完成
模型路径: ./qwen3-4b-sft-merged-final--with-reasoning
测试集路径: ./data/input/sft_dataset_4000_test.json


In [2]:
def create_prompt(instruction, input_text):
    messages = [
        {"role": "system", "content": "You are a helpful assistant specialized in cybersecurity and the MITRE ATT&CK framework."},
        {"role": "user", "content": f"{instruction}\n\n{input_text}"}
    ]
    return messages

def extract_technique_id(text):
    #正则表达式匹配T10043 等等
    match = re.search(r'T\d{4}(\.\d{3})?', text)
    if match:
        return match.group(0)
    return None


def extract_final_answer_id(prediction_text):
    """
    智能提取函数：
    1. 首先尝试在 '[Final Answer]:' 标签 (不区分大小写) 之后查找 TTP ID。
    2. 如果找不到，则在整个文本中查找第一个 TTP ID (作为备用)。
    """
    try:
        # 1. 寻找 [Final Answer]: 标签 (忽略大小写)
        # re.split 会返回一个列表, parts[1] 是标签之后的所有内容
        parts = re.split(r'\[Final Answer\]:', prediction_text, flags=re.IGNORECASE)

        if len(parts) > 1:
            # 2. 如果找到了标签, 只在标签*之后*的文本 (parts[1]) 中查找
            answer_part = parts[1]
            match = re.search(r'T\d{4}(\.\d{3})?', answer_part)
            if match:
                return match.group(0) # 找到了，返回

        # 3. 如果 (1) 标签未找到, 或 (2) 标签后未找到TTP,
        #    则回退到在整个文本中查找第一个 (这是旧模型的行为)
        return extract_technique_id(prediction_text)

    except Exception as e:
        print(f"解析预测时出错: {e}\n预测文本: {prediction_text[:200]}...")
        return None


## 加载模型

In [3]:


compute_dtype = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.get_device_properties(0).major >= 8 else torch.float16

tokenizer = AutoTokenizer.from_pretrained(MERGED_MODEL_PATH, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    MERGED_MODEL_PATH,
    dtype=compute_dtype,
    device_map="auto"
)
model.eval() # 设置为评估模式


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Qwen3ForCausalLM(
  (model): Qwen3Model(
    (embed_tokens): Embedding(151936, 2560)
    (layers): ModuleList(
      (0-35): 36 x Qwen3DecoderLayer(
        (self_attn): Qwen3Attention(
          (q_proj): Linear(in_features=2560, out_features=4096, bias=False)
          (k_proj): Linear(in_features=2560, out_features=1024, bias=False)
          (v_proj): Linear(in_features=2560, out_features=1024, bias=False)
          (o_proj): Linear(in_features=4096, out_features=2560, bias=False)
          (q_norm): Qwen3RMSNorm((128,), eps=1e-06)
          (k_norm): Qwen3RMSNorm((128,), eps=1e-06)
        )
        (mlp): Qwen3MLP(
          (gate_proj): Linear(in_features=2560, out_features=9728, bias=False)
          (up_proj): Linear(in_features=2560, out_features=9728, bias=False)
          (down_proj): Linear(in_features=9728, out_features=2560, bias=False)
          (act_fn): SiLU()
        )
        (input_layernorm): Qwen3RMSNorm((2560,), eps=1e-06)
        (post_attention_layernorm): Qwe

In [4]:
test_dataset = load_dataset("json", data_files=str(TEST_DATASET_PATH), split="train")
print(f"✅ 测试集加载成功，共 {len(test_dataset)} 条数据。")

✅ 测试集加载成功，共 1000 条数据。


## 评估函数
目前方法仅提取方法编号进行字符串匹配比较。

In [5]:
correct_predictions = 0
total_samples = len(test_dataset)
evaluation_results = []

# --- 关键修改 1: 在循环外部添加计数器 ---
sample_counter = 0

# (使用 enumerate 来获取索引, 可选)
for i, sample in enumerate(tqdm(test_dataset, desc="评估进度")):
    sample_counter += 1 # 增加计数器
    instruction = sample["instruction"]
    input_text = sample["input"]
    ground_truth = sample["output"] # 这是 ground_truth 的完整文本 (包括 [Reasoning Process] 如果测试集也更新了的话)

    # 1. 创建提示
    messages = create_prompt(instruction, input_text)
    prompt_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)

    # 2. 模型推理
    inputs = tokenizer(prompt_text, return_tensors="pt").to(model.device)

    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=1024, # (保持我们之前修复的值)
            do_sample=False,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.pad_token_id
        )

    # 3. 解码并清理输出
    response_ids = outputs[0][inputs.input_ids.shape[1]:]
    prediction = tokenizer.decode(response_ids, skip_special_tokens=True)

    # 4. 提取 T-ID
    ground_truth_id = extract_technique_id(ground_truth) # (Ground truth ID 提取)
    prediction_id = extract_final_answer_id(prediction) # (Prediction ID 提取)

    # 5. 比较结果
    is_correct = (ground_truth_id is not None and ground_truth_id == prediction_id)
    if is_correct:
        correct_predictions += 1

    # --- 关键修改 2: 添加实时打印 ---
    # 使用 tqdm.write() 来避免破坏进度条
    tqdm.write("\n" + "="*80)
    tqdm.write(f"--- [Sample {sample_counter}/{total_samples}] ---")
    tqdm.write(f"Input:         {input_text[:150]}...") # 打印输入 (截断)
    tqdm.write("-"*80)
    tqdm.write(f"Ground Truth ID: {ground_truth_id}")
    tqdm.write(f"Ground Truth (Full):\n{ground_truth}") # 打印完整的 Ground Truth
    tqdm.write("-"*80)
    tqdm.write(f"Prediction ID:   {prediction_id}")
    tqdm.write(f"Prediction (Full):\n{prediction}") # 打印完整的模型预测
    tqdm.write("-"*80)
    tqdm.write(f"Correct?       {is_correct}")
    tqdm.write("="*80 + "\n")
    # --- 实时打印结束 ---

    # 6. 记录详细结果 (保持不变)
    evaluation_results.append({
        "input": input_text,
        "ground_truth": ground_truth,
        "prediction": prediction,
        "ground_truth_id": ground_truth_id,
        "prediction_id": prediction_id,
        "is_correct": is_correct
    })

评估进度:   0%|          | 0/1000 [00:00<?, ?it/s]

The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.



--- [Sample 1/1000] ---
Input:         Bundlore uses the mktemp utility to make unique file and directory names for payloads, such as TMP_DIR=`mktemp -d -t x. OSX/Shlayer uses the mktemp ut...
--------------------------------------------------------------------------------
Ground Truth ID: T1564
Ground Truth (Full):
T1564: Hide Artifacts
--------------------------------------------------------------------------------
Prediction ID:   T1027
Prediction (Full):
<think>

</think>

[Reasoning Process]:
[Reasoning Process]: The CTI input describes adversaries using the `mktemp` utility to generate unique file and directory names for payloads, such as `TMP_DIR=$(mktemp -d -t x)` or `export tmpDir="$"` or `mktemp -t Installer`. The key action here is the use of `mktemp` to create temporary files or directories with unique, often random, names. This behavior aligns directly with the definition of T1027.004 (Obfuscated Files or Information: File and Directory Name Staging), which involves adver

In [6]:
# correct_predictions = 0
# total_samples = len(test_dataset)
# evaluation_results = []
#
#
#
# for sample in tqdm(test_dataset, desc="评估进度"):
#     instruction = sample["instruction"]
#     input_text = sample["input"]
#     ground_truth = sample["output"]
#
#     # 1. 创建提示
#     messages = create_prompt(instruction, input_text)
#     prompt_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
#
#     # 2. 模型推理
#     inputs = tokenizer(prompt_text, return_tensors="pt").to(model.device)
#
#     with torch.no_grad():
#         outputs = model.generate(
#             **inputs,
#             max_new_tokens=1024,
#             do_sample=False,
#             eos_token_id=tokenizer.eos_token_id,
#             pad_token_id=tokenizer.pad_token_id
#         )
#
#     # 3. 解码并清理输出
#     response_ids = outputs[0][inputs.input_ids.shape[1]:]
#     prediction = tokenizer.decode(response_ids, skip_special_tokens=True)
#
#     # 4. 提取 T-ID
#     ground_truth_id = extract_technique_id(ground_truth)
#     prediction_id = extract_final_answer_id(prediction)
#
#     # 5. 比较结果
#     is_correct = (ground_truth_id is not None and ground_truth_id == prediction_id)
#     if is_correct:
#         correct_predictions += 1
#
#     # 6. 记录详细结果
#     evaluation_results.append({
#         "input": input_text,
#         "ground_truth": ground_truth,
#         "prediction": prediction,
#         "ground_truth_id": ground_truth_id,
#         "prediction_id": prediction_id,
#         "is_correct": is_correct
#     })

## 保存结果

In [7]:
accuracy = (correct_predictions / total_samples) * 100 if total_samples > 0 else 0

print("\n" + "="*30)
print("      模型评估结果")
print("="*30)
print(f"总测试样本数: {total_samples}")
print(f"正确预测数: {correct_predictions}")
print(f"准确率 (Accuracy): {accuracy:.2f}%")
print("="*30)


# --- 保存详细结果到文件 ---
print(f"\n正在将详细评估结果保存到 '{RESULTS_OUTPUT_PATH}'...")
with open(RESULTS_OUTPUT_PATH, 'w', encoding='utf-8') as f:
    json.dump(evaluation_results, f, indent=4, ensure_ascii=False)



      模型评估结果
总测试样本数: 1000
正确预测数: 641
准确率 (Accuracy): 64.10%

正在将详细评估结果保存到 './evaluation_results_2.json'...
