In [1]:
import sys
sys.path.append('..')
import os
import torch
import pandas as pd
from tqdm.auto import tqdm
from unittest.mock import patch
import types
from importlib.machinery import ModuleSpec


# --- 补丁1：注入“全功能”伪模块 ---
def create_full_fake_module(name, attributes_to_add):
    spec = ModuleSpec(name, None)
    module = types.ModuleType(name)
    module.__spec__ = spec
    for attr in attributes_to_add:
        setattr(module, attr, lambda *args, **kwargs: None)
    return module

sys.modules['num2words'] = create_full_fake_module('num2words', ['num2words'])
sys.modules['word2number'] = create_full_fake_module('word2number', ['w2n'])
print(">>> 补丁1生效：已成功注入“全功能”伪模块。")

# --- 补丁2：“直捣黄龙”，直接替换nltk.download函数 ---
def dummy_nltk_download(*args, **kwargs):
    print(">>> 补丁2生效：已成功拦截并跳过 nltk.download() 调用！<<<")
    return True # 返回成功状态

# 使用正确的函数路径进行替换
patcher = patch('nltk.download', dummy_nltk_download)
patcher.start()
print(">>> 补丁2生效：已成功替换 nltk.download 函数。")


# --- 补丁3：手动为NLTK“指路” ---
nltk_data_dir = os.path.expanduser('~/nltk_data')
import nltk
if nltk_data_dir not in nltk.data.path:
    nltk.data.path.append(nltk_data_dir)
    print(f"成功将 '{nltk_data_dir}' 添加到NLTK的搜索路径。")

# --- 补丁4：设置其他环境变量 ---
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
os.environ['TOKENIZERS_PARALLELISM'] = 'false' 
sys.path.append('..')
# ====================================================================
from src.models.model_loader import load_main_llm
from src.utils.data_loader import load_sst2_dataset, load_agnews_dataset
from src.pipeline import AhpPipeline
from src.defenses import baseline_defense # 引入一个简单的基线模型
from textattack.models.wrappers import ModelWrapper
from textattack.attack_recipes import TextBuggerLi2018, DeepWordBugGao2018
from textattack import Attacker
from textattack.datasets import HuggingFaceDataset,Dataset

# 确保PyTorch能使用GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"当前使用的设备: {device}")

>>> 补丁1生效：已成功注入“全功能”伪模块。
>>> 补丁2生效：已成功替换 nltk.download 函数。


2025-10-21 21:24:07.098890: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-10-21 21:24:07.153979: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 AVX512_FP16 AVX_VNNI AMX_TILE AMX_INT8 AMX_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-10-21 21:24:08.361612: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


当前使用的设备: cuda


  import pkg_resources


In [2]:
# --- 在这里选择要测试的数据集 ---
# 可选项: 'sst2' 或 'ag_news'
TASK_NAME = 'ag_news' 
# TASK_NAME = 'sst2'

# 为了快速得到结果，我们先在少量样本上测试
NUM_SAMPLES_TO_TEST = 50 

# 加载主模型 ( circulus/alpaca-7b )
local_model_path = "/root/autodl-tmp/circulus_alpaca-7b"
print(f"准备从本地路径加载主模型: {local_model_path}")
main_model, main_tokenizer = load_main_llm(model_name=local_model_path, use_4bit=True)

`torch_dtype` is deprecated! Use `dtype` instead!
The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


准备从本地路径加载主模型: /root/autodl-tmp/circulus_alpaca-7b
正在加载主模型: /root/autodl-tmp/circulus_alpaca-7b...


The following generation flags are not valid and may be ignored: ['pad_token_id']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama.LlamaTokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565 - if you loaded a llama tokenizer from a GGUF file you can ignore this message
You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama_fast.LlamaTokenizerFast'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggin

主模型加载成功。


In [3]:
if TASK_NAME == 'sst2':
    dataset_hf = load_sst2_dataset(split='validation')
    label_map = {"negative": 0, "positive": 1}
    text_column = "sentence"
    label_column = "label_text"

elif TASK_NAME == 'ag_news':
    dataset_hf = load_agnews_dataset(split='test')
    label_map = {"World": 0, "Sports": 1, "Business": 2, "Sci/Tech": 3}
    text_column = "text"
    label_column = "label_text"

else:
    raise ValueError(f"未知的任务名称: {TASK_NAME}")

# --- 核心修改：从内存中的数据手动构建 TextAttack 数据集 ---

# 1. 从 Hugging Face 数据集中提取文本和标签，并转换为 (文本, 标签ID) 的元组列表
#    这是我们需要的本地数据格式
local_data_samples = [
    (row[text_column], label_map[row[label_column]]) 
    for row in dataset_hf.shuffle().select(range(NUM_SAMPLES_TO_TEST))
]

# 2. 使用通用的 textattack.Dataset 类来封装我们的本地数据
attack_dataset = Dataset(local_data_samples)

print(f"\n已通过本地方式准备好攻击数据集: {TASK_NAME}, 样本数: {len(local_data_samples)}")

正在加载AG News数据集 (test split)...
AG News数据集加载成功。

已通过本地方式准备好攻击数据集: ag_news, 样本数: 50


In [4]:
# ===================================================================
# 单元格 4: 定义 TextAttack 模型包装器 (最终修正版)
# ===================================================================
import numpy as np

# TextAttack 需要一个包装器来调用我们的模型或防御框架
class DefenseModelWrapper(ModelWrapper):
    def __init__(self, defense_pipeline):
        self.model = defense_pipeline
        self.num_labels = len(label_map)
    
    def __call__(self, text_input_list):
        # 1. 正常获取模型的预测结果 (数字ID列表)
        predictions = [self.model.predict_single(text) for text in text_input_list]
        pred_ids = [label_map.get(pred_text, 0) for pred_text in predictions]

        # 2. 创建 one-hot 编码的向量列表
        one_hot_vectors = []
        for pred_id in pred_ids:
            one_hot_vector = np.zeros(self.num_labels)
            one_hot_vector[pred_id] = 1.0
            one_hot_vectors.append(one_hot_vector)
            
        # --- 核心修改：将向量列表堆叠成一个单一的Numpy数组 ---
        # 如果输入1个句子，返回 shape (1, 4) 的数组
        # 如果输入10个句子，返回 shape (10, 4) 的数组
        # 这样 len(返回结果) 就等于输入的数量了
        return np.array(one_hot_vectors)

In [5]:
print("正在初始化所有防御方法...")

# 1. 无防御的原始模型
baseline_model = baseline_defense(main_model, main_tokenizer, task=TASK_NAME)
baseline_wrapper = DefenseModelWrapper(baseline_model)

# 2. AHP 防御框架 (使用实验一选出的最优规则 'nli')
ahp_defense = AhpPipeline(
    main_model=main_model,
    main_tokenizer=main_tokenizer,
    pruner_name='nli', # 使用最优规则
    k_val=3,
    m_val=10,
    task=TASK_NAME
)
ahp_wrapper = DefenseModelWrapper(ahp_defense)

# 将所有防御方法放入字典，方便遍历
defenses_to_test = {
    "Baseline (No Defense)": baseline_wrapper,
    "AHP Defense (nli-pruner)": ahp_wrapper,
    # TODO: 未来可以添加 Self-Denoise 等其他防御方法
}

print("防御方法初始化完成。")

正在初始化所有防御方法...
正在加载NLI模型: roberta-large-mnli...
NLI模型加载成功。
防御方法初始化完成。


In [6]:
print("正在初始化攻击方法...")

attacks_to_test = {
    "TextBugger": TextBuggerLi2018,
    "DeepWordBug": DeepWordBugGao2018,
}

print("攻击方法初始化完成。")

正在初始化攻击方法...
攻击方法初始化完成。


In [None]:
results = []

for defense_name, defense_wrapper in defenses_to_test.items():
    for attack_name, attack_recipe in attacks_to_test.items():
        print(f"\n{'='*20} 正在评估 {'='*20}")
        print(f"  防御方法: {defense_name}")
        print(f"  攻击算法: {attack_name}")
        print(f"{'='*46}")

        # 配置攻击
        attacker = Attacker(attack_recipe.build(defense_wrapper), attack_dataset)
        
        # 执行攻击并获取结果
        attack_results = attacker.attack_dataset()

        # 从结果中提取我们需要的指标
        num_total = len(attack_results)
        num_failures = sum(1 for r in attack_results if r.goal_function_result.succeeded)
        num_successes = num_total - num_failures
        
        # 原始准确率 (在攻击成功+失败的样本上的准确率)
        original_accuracy = (num_successes / num_total) * 100 if num_total > 0 else 0
        
        # 攻击后准确率 (只计算攻击失败的样本，因为成功的都被攻击器改变了标签)
        # 注意: TextAttack的结果对象里，num_failures代表攻击成功，num_successes代表攻击失败
        accuracy_under_attack = (sum(1 for r in attack_results if not r.goal_function_result.succeeded) / num_total) * 100 if num_total > 0 else 0
        
        # 攻击成功率 (ASR)
        # 在原始预测正确的样本中，有多少被成功攻击了
        num_originally_correct = sum(1 for r in attack_results if r.original_result.goal_function_result.succeeded)
        num_attack_success = sum(1 for r in attack_results if r.goal_function_result.succeeded)
        attack_success_rate = (num_attack_success / num_originally_correct) * 100 if num_originally_correct > 0 else 0

        print("\n评估完成:")
        print(f"  - 原始准确率 (Clean Accuracy): {original_accuracy:.2f}%")
        print(f"  - 攻击后准确率 (Accuracy under Attack): {accuracy_under_attack:.2f}%")
        print(f"  - 攻击成功率 (ASR): {attack_success_rate:.2f}%")

        results.append({
            "Defense": defense_name,
            "Attack": attack_name,
            "Clean Accuracy (%)": original_accuracy,
            "Accuracy under Attack (%)": accuracy_under_attack,
            "Attack Success Rate (%)": attack_success_rate
        })

In [None]:
results_df = pd.DataFrame(results)

print("\n\n" + "="*30)
print("实验二：鲁棒性评估 - 结果汇总")
print("="*30)
print(results_df.to_string())

# --- 自动编号并保存结果 ---
if not os.path.exists('../results'):
    os.makedirs('../results')

base_path = f'../results/experiment_2_robustness_{TASK_NAME}'
extension = '.csv'
save_path = f"{base_path}{extension}"

counter = 1
while os.path.exists(save_path):
    save_path = f"{base_path}_{counter}{extension}"
    counter += 1

results_df.to_csv(save_path, index=False)
print(f"\n实验结果已成功保存到: {save_path}")