# BERT-based Attack

# 1. 实验简介
本次实验简单实现了[基于BERT模型的文本对抗攻击](https://arxiv.org/abs/2004.01970)的 BAE 攻击算法，该算法的基本思想是：使用BERT模型对重要词汇进行替换，保证文本语义改变不大的同时尽可能降低正确标签的预测概率。

算法的流程如下：
 <p align="center">
 <img src="./src/BAE-R_algorithm.png" style="zoom: 60%" >
 </p>

该 BAE 算法有 "R"(Replace) 和 "I"(insert) 两个版本，以及 "RI" 一起使用的版本。本实验关注于 "BAE-R" 算法，即替换不同位置的单词以达到对抗攻击的效果。 "BAE-I" 算法为在不同单词左右插入新单词以达到对抗攻击的效果，基本思想与 "BAE-R" 类似。

BAE-R 算法的基本流程如下：
1. 先对 $S$ 中的每个token $t_i$ 计算其重要性 $I_i$（重要性的计算方法在后文）。
2. 按重要性 $I_i$ 从高到低遍历 $i$ ，将第 $i$ 个单词转为 "[MASK]"，使用 BERT 模型预测 [MASK] 位置最可能的前 $K$ 个 token
3. 过滤掉前 $k$ 个 token 中，替换后语义与原文不同的（语义的计算方法在后文）。
4. 如果有替换的 token $t$ 使得模型的预测发生改变，则**返回**使得模型预测改变的替换中，与 $S$ 相似度最高的 $t$ 。
5. 否则将该位置 $i$ 换成使得模型在正确标签 $y$ 上预测概率降低最大的 $t$ ，使用替换后的句子 $S_{adv} $继续 2 中的循环。
6. 若循环结束，说明未找到对抗样本，返回 None 。

# 2. 所需模型与库的下载

下载 sentence-transformers 库，用于提取输入句子中的语义信息。使用方法见 huggingface 网页 (https://huggingface.co/sentence-transformers/distiluse-base-multilingual-cased-v1)

In [None]:
!pip install -U sentence-transformers

下载 bert-MLM 模型至本地目录 `./model/bert-base-uncased` 。该模型为 maskedLM ，用于预测 [MASK] 位置的可能的单词。详情见模型网页 (https://huggingface.co/google-bert/bert-base-uncased) 。

In [None]:
!HF_ENDPOINT=https://hf-mirror.com huggingface-cli download google-bert/bert-base-uncased config.json model.safetensors tokenizer.json tokenizer_config.json vocab.txt --local-dir ./model/bert-base-uncased

下载语义嵌入模型至本地目录 `./model/distiluse-base-multilingual-cased-v1` 。该模型用于将句子文本嵌入至 768 维的向量中，论文中使用嵌入向量的**余弦相似度**来衡量两个句子的语义信息一致度。详情见模型网页 (https://huggingface.co/sentence-transformers/distiluse-base-multilingual-cased-v1)

In [None]:
!HF_ENDPOINT=https://hf-mirror.com huggingface-cli download sentence-transformers/distiluse-base-multilingual-cased-v1 --exclude "*.bin" "*.h5" "onnx/*" "openvino/*" --local-dir ./model/distiluse-base-multilingual-cased-v1

下载 bert-uncased 微调后的情感分类模型，该模型用于分类输入文本的情感信息。模型对每个样本的输出为二维向量：第 0 维代表分类为 negative 的概率，第 1 维代表分类为 positive 的概率。详情见模型网页 (https://huggingface.co/textattack/bert-base-uncased-imdb)

In [None]:
!HF_ENDPOINT=https://hf-mirror.com huggingface-cli download textattack/bert-base-uncased-imdb --exclude "flax_model.msgpack" --local-dir ./model/bert-base-uncased-imdb

下载imdb数据集（这里只需要测试集，因为已经下载了预训练的情感分类模型。感兴趣的同学可以尝试自己训练一个模型）。imdb 数据集中的每个条目有 `text` 和 `label` 两个域。`text` 域代表待分类的文本，`label` 域代表文本标签。详情见数据集网页 (https://huggingface.co/datasets/stanfordnlp/imdb)

In [None]:
!HF_ENDPOINT=https://hf-mirror.com huggingface-cli download stanfordnlp/imdb plain_text/test-00000-of-00001.parquet --local-dir ./dataset/imdb --repo-type dataset

# 3. 模型的加载与对应函数的定义

In [None]:
from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModelForSequenceClassification
from sentence_transformers import SentenceTransformer
from datasets import load_dataset
import torch
from tqdm import tqdm

# 加载 bert_MLM 模型
bert_MLM = AutoModelForMaskedLM.from_pretrained("./model/bert-base-uncased")
bert_MLM.eval()
bert_MLM = bert_MLM.to("cuda")
bert_MLM_tokenizer = AutoTokenizer.from_pretrained("./model/bert-base-uncased")

使用 bert-MLM 预测mask位置的前k个可能单词

In [None]:
def top_k_predictions(model, tokenizer, sentence, k=10):
    """
    返回一个列表，其中的每个元素是 k 个最可能的单词的列表
    sentence 为输入句子，必须包含 [MASK] 词。可以是成批次的
    """
    inputs = tokenizer(sentence, return_tensors="pt").to(model.device)
    outputs = model(**inputs)
    logits = outputs.logits

    mask_position = inputs.input_ids == tokenizer.mask_token_id
    batch_indices, token_indices = mask_position.nonzero(as_tuple=True)

    # 获取 [MASK] 位置的 logits
    masked_logits = logits[batch_indices, token_indices]

    # 计算 softmax 得到概率
    # 获取 top-k 的词和概率
    predictions = torch.nn.functional.softmax(masked_logits, dim=-1)
    topk_preds, topk_ids = torch.topk(predictions, k, dim=-1)

    # 解码 top-k 词
    topk_tokens = [tokenizer.convert_ids_to_tokens(row.tolist()) for row in topk_ids]
    
    return topk_tokens

# 测试输入
test_sentence = [
    "The capital of France is [MASK].",
    "The capital of China is [MASK].",
]
print(top_k_predictions(bert_MLM, bert_MLM_tokenizer, test_sentence, k=5))

加载 imdb 数据集

In [None]:
from torch.utils.data import DataLoader
dataset = load_dataset("./dataset/imdb", split="test")
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)
# 打印 dataset 相关的信息
print(next(iter(dataloader)))


加载预训练情感分类模型

In [None]:
# 加载预训练的情感分类模型
model = AutoModelForSequenceClassification.from_pretrained("./model/bert-base-uncased-imdb")
model.eval()
model = model.to("cuda")
tokenizer = AutoTokenizer.from_pretrained("./model/bert-base-uncased-imdb")

acc_num = 0
# 成 batch 测试模型在 imdb 测试集上的准确率
# for batch in tqdm(dataloader):
#     inputs = batch["text"]
#     labels = batch["label"]
#     inputs = tokenizer(inputs, return_tensors="pt", padding="max_length", truncation=True, max_length=256).to(model.device)
#     outputs = model(**inputs)

#     preds = torch.argmax(outputs.logits, dim=1).cpu().detach().numpy()
#     acc_num += (preds == labels.numpy()).sum()

# 逐条测试模型在 imdb 测试集上的准确率
# with torch.no_grad():
#     for i in tqdm(range(len(dataset))):
#         inputs = dataset[i]["text"]
#         labels = dataset[i]["label"]
#         inputs = tokenizer(inputs, return_tensors="pt", padding="max_length", truncation=True, max_length=256).to(model.device)
#         outputs = model(**inputs)

#         preds = torch.argmax(outputs.logits, dim=1).cpu().detach().numpy()
#         acc_num += (preds == labels).sum()
#     print("Accuracy: ", acc_num / len(dataset))

加载 USE 模型

In [None]:
use_model = SentenceTransformer("./model/distiluse-base-multilingual-cased-v1")
use_model.eval()
print(use_model)

# 计算两个句子嵌入后的预先相似度
def cosine_similarity(use_model, sentence_1, sentence_2):
    sentence_1_embedding = use_model.encode(sentence_1)
    sentence_2_embedding = use_model.encode(sentence_2)
    cosine_sim = torch.nn.functional.cosine_similarity(
        torch.tensor(sentence_1_embedding).unsqueeze(0),
        torch.tensor(sentence_2_embedding).unsqueeze(0),
        dim=1
    )
    return cosine_sim.item()

# 测试两个句子之间的余弦相似度
sentence = "The food was good."
sentence_1 = "The food was so good."
sentence_2 = "The food was so bad."
sentence_3 = "The food was awful."
print(cosine_similarity(use_model, sentence, sentence_1))
print(cosine_similarity(use_model, sentence, sentence_2))
print(cosine_similarity(use_model, sentence, sentence_3))

定义计算每个词重要性的函数。重要性的计算方法如下，将该词从原句子中去除，计算 logits 层中正确类别概率的减少量。

In [None]:
# 定义停用词表（这里采取很简单的形式）
stop_words = set([
    "the", "and", "a", "to", "is", "in", "on", "for", "with", "this", "that", 
    "it", "of", "an", "as", "by", "be", "are", "was", "were", "am", "have", "has", "(", ")"
])
def word_importance(model, tokenizer, sentence, label):
    # sentence 是一个输入句子（不支持成批次处理）
    # 返回按重要性排序的：单词列表，对应的分数，单词所在的位置
    model.eval()
    with torch.no_grad():
        inputs = tokenizer(sentence, return_tensors="pt", padding="max_length", truncation=True, max_length=256).to(model.device)
        outputs = model(**inputs)
        original_score = torch.nn.functional.softmax(outputs.logits, dim=-1).squeeze(0)[label].item()

        input_words = tokenizer.tokenize(sentence)

        words = []
        position = []
        word_importance = []
        for i in range(len(input_words)):
            # 如果在停用词表中，则跳过
            # 可以选择是否判断停用词
            if input_words[i] in stop_words:
                continue
            exclude_input_words = input_words[:i] + input_words[i+1:]
            exclude_input = tokenizer.convert_tokens_to_ids(exclude_input_words)
            exclude_input = torch.tensor([exclude_input]).to(model.device)
            attention_mask = torch.ones_like(exclude_input).to(model.device)

            exclude_output = model(input_ids=exclude_input, attention_mask=attention_mask)
            exclude_score = torch.nn.functional.softmax(exclude_output.logits, dim=-1).squeeze(0)[label].item()
            position.append(i)
            words.append(input_words[i])
            word_importance.append(original_score - exclude_score)

        word_importance = torch.tensor(word_importance)

        sorted_indice = torch.argsort(word_importance, descending=True)
        sorted_word = [words[i] for i in sorted_indice.tolist()]
        sorted_importance = [word_importance[i].item() for i in sorted_indice.tolist()]
        position = [position[i] for i in sorted_indice.tolist()]

        return sorted_word, sorted_importance, position

# 测试 word_importance 函数
sentence = "This film offers many delights and surprises."
label = 0

sorted_words, sorted_importance, position = word_importance(model, tokenizer, sentence, label)
print("sentence: " + dataset[0]["text"])

cnt = 0
for word, importance, pos in zip(sorted_words, sorted_importance, position):
    print("|-| word: \"{}\", importance: {}, position: {}".format(word, importance, pos))
    if cnt > 10:
        break
    cnt += 1


进行 bert-attack 攻击

In [None]:
RED = '\033[31m'
GREEN = '\033[32m'
RESET = '\033[0m'
def bert_attack(model, tokenizer, use_model, replace_model, replace_tokenizer, sentence, label, k=10, threshold=0.8):

    original_sentence = sentence
    inputs = tokenizer(sentence, return_tensors="pt", padding="max_length", truncation=True, max_length=256).to(model.device)
    outputs = model(**inputs)

    original_score = torch.nn.functional.softmax(outputs.logits, dim=-1).squeeze(0)[label].item()
    # 计算原来样本的预测值
    preds = torch.argmax(outputs.logits, dim=1).item()  
    # 将单词按重要性排序
    sorted_words, _, position = word_importance(model, tokenizer, sentence, label)
    while True:
        for i in position:
            input_words = tokenizer.tokenize(sentence)
            # 替换单词
            new_sentence = input_words[:i] + ["[MASK]"] + input_words[i+1:]
            new_sentence = tokenizer.convert_tokens_to_string(new_sentence)
            # 得到前 k 个可能的替换单词
            candidate_list = top_k_predictions(replace_model, replace_tokenizer, new_sentence, k=k)[0]

            similarity = []
            for word in candidate_list:
                new_sentence = input_words[:i] + [word] + input_words[i+1:]
                new_sentence = tokenizer.convert_tokens_to_string(new_sentence)
                # 计算替换单词后的余弦相似度
                sim = cosine_similarity(use_model, original_sentence, new_sentence)
                similarity.append(sim)
            # 将候选单词和相似度按相似度排序
            sorted_indice = torch.argsort(torch.tensor(similarity), descending=True)
            candidate_list = [candidate_list[i] for i in sorted_indice.tolist()]
            similarity = [similarity[i] for i in sorted_indice.tolist()]

            reduction = []
            for word, sim in zip(candidate_list, similarity):
                # 若当前相似度小于阈值，则跳出循环
                if sim < threshold:
                    break
                new_sentence = input_words[:i] + [word] + input_words[i+1:]
                new_sentence = tokenizer.convert_tokens_to_string(new_sentence)

                new_inputs = tokenizer(new_sentence, return_tensors="pt", padding="max_length", truncation=True, max_length=256).to(model.device)
                new_outputs = model(**new_inputs)
                new_score = torch.nn.functional.softmax(new_outputs.logits,dim=-1).squeeze(0)[label].item()
                # 将预测值的改变量添加到列表中，以用于排序
                reduction.append(original_score - new_score)
                 
                new_preds = torch.argmax(new_outputs.logits, dim=1).item()

                if new_preds != label:
                    # 若攻击成功，则返回新的句子
                    print(f"replace {GREEN}{input_words[i]}{RESET} with {RED}{word}{RESET}, similarity: {sim}, \
prediction of label: {GREEN}{original_score}{RESET}--->{RED}{new_score}{RESET}")
                    print("attack success!")
                    return new_sentence
            
            if len(reduction) == 0:
                continue
            
            # 否则将当前单词替换为 导致目标预测概率下降最大 的单词
            max_reduction_index = torch.argmax(torch.tensor(reduction)).item()
            max_reduction_word = candidate_list[max_reduction_index]
            max_reduction = reduction[max_reduction_index]

            if max_reduction == 0:
                continue

            sentence = input_words[:i] + [max_reduction_word] + input_words[i+1:]
            print(f"replace {GREEN}{input_words[i]}{RESET} with {RED}{max_reduction_word}{RESET}, similarity: {similarity[max_reduction_index]}\
, prediction of label: {GREEN}{original_score}{RESET}--->{RED}{original_score - max_reduction}{RESET}")
            
            sentence = tokenizer.convert_tokens_to_string(sentence)
            inputs = tokenizer(sentence, return_tensors="pt", padding="max_length", truncation=True, max_length=256).to(model.device)
            outputs = model(**inputs)
            original_score = torch.nn.functional.softmax(outputs.logits, dim=-1).squeeze(0)[label].item()

        print("Attack failed!")
        return None

# 使用 dataset[0] 来演示攻击效果
# 以下代码为展示输出的代码
sentence = dataset[0]["text"]
label = dataset[0]["label"]

inputs = tokenizer(sentence, return_tensors="pt", padding="max_length", truncation=True, max_length=256).to(model.device)
outputs = model(**inputs)
original_score = torch.nn.functional.softmax(outputs.logits, dim=-1).squeeze(0)
preds = torch.argmax(outputs.logits, dim=1).item()
label = preds

with torch.no_grad():
    attack_sentence = bert_attack(model, tokenizer, use_model, bert_MLM, bert_MLM_tokenizer, sentence, label, k=10, threshold=0.8)

    if attack_sentence is not None:    
        inputs = tokenizer(attack_sentence, return_tensors="pt", padding="max_length", truncation=True, max_length=256).to(model.device)
        outputs = model(**inputs)
        attack_preds = torch.argmax(outputs.logits, dim=1).item()
        attack_score = torch.nn.functional.softmax(outputs.logits, dim=-1).squeeze(0)

        original_sentence = tokenizer.convert_tokens_to_string(tokenizer.tokenize(sentence))
        # highlight the changed words
        # 仅作简单的对比示例，未考虑分词不同的情况，因此颜色对比的结果可能不同
        # cnt = 0
        # original_tokenization = tokenizer.tokenize(original_sentence)
        # attack_tokenization = tokenizer.tokenize(attack_sentence)
        # for word in original_tokenization:
        #     if attack_tokenization[cnt] != word:
        #         original_tokenization = original_tokenization[:cnt] + [GREEN + word + RESET] + original_tokenization[cnt+1:]
        #         attack_tokenization = attack_tokenization[:cnt] + [RED + attack_tokenization[cnt] + RESET] + attack_tokenization[cnt+1:]
        #     cnt += 1

        # original_sentence = tokenizer.convert_tokens_to_string(original_tokenization)
        # attack_sentence = tokenizer.convert_tokens_to_string(attack_tokenization)

        print("original sentence: \n" + original_sentence)
        print(f"original pred: {preds}")
        print("original score: Negative: {}, Positive: {}".format(original_score[0].item(), original_score[1].item()))
        print("attack sentence: \n" + attack_sentence)
        print(f"attack pred: {attack_preds}")
        print("attack score: Negative: {}, Positive: {}".format(attack_score[0].item(), attack_score[1].item()))

        print("cosine_similarity: ", cosine_similarity(use_model, original_sentence, attack_sentence))
    else:
        pass

