In [1]:
import numpy as np
import torch
from transformers import BertTokenizer, BertForSequenceClassification, BertForMaskedLM
import re
from sentence_transformers import SentenceTransformer, util

2023-10-07 10:36:18.294095: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-10-07 10:36:18.333074: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# 加载同义词集
cos_sim = np.load('cos_sim_counter_fitting.npy')

In [3]:
# 选择最相似的同义词
def pick_most_similar_words_batch(src_words, sim_mat, idx2word, ret_count=10, threshold=0.):
    """
    Given a list of source words (their indices), a similarity matrix, and an index-to-word mapping,
    this function returns the top `ret_count` similar words for each source word, filtered by a given threshold.
    
    Parameters:
    - src_words: List of source word indices.
    - sim_mat: Similarity matrix of shape (vocab_size, vocab_size).
    - idx2word: A mapping from word index to actual word.
    - ret_count: Number of top similar words to return for each source word.
    - threshold: A similarity threshold to filter out words.
    
    Returns:
    - sim_words: A list of lists containing similar words for each source word.
    - sim_values: A list of lists containing similarity values for each word in sim_words.
    """
    
    # 对于每个src_word，找到其与其他所有单词的相似度排名（从高到低）
    sim_order = np.argsort(-sim_mat[src_words, :])[:, 1:1 + ret_count]
    
    sim_words, sim_values = [], []  # 初始化列表以保存结果

    # 遍历src_words的每个词
    for idx, src_word in enumerate(src_words):
        # 获取对应src_word的相似度值
        sim_value = sim_mat[src_word][sim_order[idx]]
        
        # 根据阈值筛选出大于等于threshold的相似度值
        mask = sim_value >= threshold
        
        # 使用mask获取单词和其相似度值
        sim_word, sim_value = sim_order[idx][mask], sim_value[mask]
        
        # 将单词索引转换为实际的单词
        sim_word = [idx2word[id] for id in sim_word]
        
        # 保存结果
        sim_words.append(sim_word)
        sim_values.append(sim_value)

    return sim_words, sim_values  # 返回相似单词及其相似度值

In [4]:
# 计算影响最大的词
def influential_tokens(sentence, model, tokenizer, k=5):
    # 确保模型在评估模式
    model.eval()

    # 获取原始logit输出
    inputs = tokenizer(sentence, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        original_logits = model(**inputs).logits

    # 为每个token计算logit差异
    token_ids = inputs["input_ids"][0].tolist()  # 获取token ids
    tokens = [tokenizer.decode([token_id]) for token_id in token_ids]  # 转换为tokens
    diffs = []

    for i, token_id in enumerate(token_ids):
        # 如果是[CLS], [SEP]或[PAD]，则跳过
        if token_id in [tokenizer.cls_token_id, tokenizer.sep_token_id, tokenizer.pad_token_id]:
            continue

        # 将当前token替换为[MASK]
        masked_input_ids = inputs["input_ids"].clone()
        masked_input_ids[0][i] = tokenizer.mask_token_id

        # 获取mask后的logits
        with torch.no_grad():
            masked_logits = model(input_ids=masked_input_ids, attention_mask=inputs["attention_mask"]).logits

        # 计算logit差异
        diff = torch.abs(original_logits - masked_logits).sum().item()
        diffs.append((tokens[i], i, diff))  # 保存token, index和差异值

    # 根据差异大小排序tokens
    sorted_tokens = sorted(diffs, key=lambda x: x[2], reverse=True)

    # 返回前k个token及其索引
    return [(token_info[0], token_info[1]) for token_info in sorted_tokens[:k]]

In [5]:
# 正则表达式去掉特殊字符
def extract_words(sentence):
    words = re.findall(r'\b\w+\b', sentence)
    return ' '.join(words)

In [6]:
# 加载模型

# 窃取的模型
model_path = '/home/ubuntu/zhc/work/adversarial_output'
tokenizer = BertTokenizer.from_pretrained(model_path)
model = BertForSequenceClassification.from_pretrained(model_path)

sentence = "I drove by yesterday to get a sneak peak. It re-opens on July and I can't wait to take my kids. The new range looks amazing. The entire range appears to be turf, which may or many not help your game, but it looks really nice. The tee boxes look state of the art and the club house looks like something you'll see on a newer course. Can't wait to experience it!"
sentence = extract_words(sentence)
# print(len(sentence))
k = 2

top_k_tokens_and_indices = influential_tokens(sentence, model, tokenizer, k)

In [7]:
top_k_tokens_and_indices

[('amazing', 28), ('nice', 48)]

In [8]:
def get_token_from_encoded(sentence, index, tokenizer):
    token_ids = tokenizer.encode(sentence, add_special_tokens=True)
    token = tokenizer.decode([token_ids[index]])
    return token


perturb_idxes = []
words_perturb = []
for i in range(k):
    perturb_idxes.append(top_k_tokens_and_indices[i][1])
    words_perturb.append(top_k_tokens_and_indices[i][0])


In [9]:
perturb_idxes, words_perturb #得到替换词下标和词

([28, 48], ['amazing', 'nice'])

In [10]:
idx2word = {}
word2idx = {}

print("Building vocab...")
with open('./counter-fitted-vectors.txt', 'r') as ifile:
    for line in ifile:
        word = line.split()[0]
        if word not in idx2word:
            idx2word[len(idx2word)] = word
            word2idx[word] = len(idx2word) - 1

Building vocab...


In [11]:
words_perturb_idx = [word2idx[word] for word in words_perturb if word in word2idx]
words_perturb_idx # 同义词典的位置

[7805, 25360]

In [12]:
words_perturb = [(idx, idx2word[idx]) for idx in words_perturb_idx]
words_perturb

[(7805, 'amazing'), (25360, 'nice')]

In [13]:
synonym_words, _ = pick_most_similar_words_batch(words_perturb_idx, cos_sim, idx2word, 200, 0.1)

In [14]:
synonym_words

[['stunning',
  'astounding',
  'impressive',
  'astonishing',
  'startling',
  'surprising',
  'striking',
  'staggering',
  'incredible',
  'breathtaking',
  'unbelievable',
  'awesome',
  'remarkable',
  'spectacular',
  'fantastic',
  'magnificent',
  'phenomenal',
  'dazzling',
  'marvelous',
  'extraordinary',
  'terrific',
  'fabulous',
  'superb',
  'marvellous',
  'sublime',
  'wonderful',
  'noteworthy',
  'splendid',
  'admirable',
  'wondrous',
  'excellent',
  'whopping',
  'gorgeous',
  'resplendent',
  'exceptional',
  'dramatic',
  'brilliant',
  'breathless',
  'sumptuous',
  'sensational',
  'exquisite',
  'uncanny',
  'extraordinaire',
  'funky',
  'beautiful',
  'tremendous',
  'impressed',
  'formidable',
  'magnifique',
  'outstanding',
  'fantastical',
  'glamorous',
  'great',
  'stupendous',
  'delightful',
  'prodigious',
  'memorable',
  'super',
  'ravishing',
  'glorious',
  'mesmerizing',
  'handsome',
  'supernatural',
  'unthinkable',
  'stun',
  'lovely

In [15]:

def replace_token_at_index(sentence, index, replacement_word, tokenizer):
    token_ids = tokenizer.encode(sentence, add_special_tokens=False)
    replacement_ids = tokenizer.encode(replacement_word, add_special_tokens=False)
    
    if len(replacement_ids) != 1:
        # 将replacement_word设置为"[UNK]"标记
        replacement_word = "[UNK]"
        replacement_ids = tokenizer.encode(replacement_word, add_special_tokens=False)
    
    token_ids[index] = replacement_ids[0]
    return tokenizer.decode(token_ids)

def generate_nested_sentences(sentence, indices, replacement_words_nested_list, tokenizer):
    if len(indices) != len(replacement_words_nested_list):
        raise ValueError("Length of indices and replacement words nested list should be the same.")
    
    nested_sentences = []
    for idx, replacement_words_list in zip(indices, replacement_words_nested_list):
        modified_sentences_for_idx = []
        for replacement_word in replacement_words_list:
            modified_sentence = replace_token_at_index(sentence, idx, replacement_word, tokenizer)
            modified_sentences_for_idx.append(modified_sentence)
        nested_sentences.append(modified_sentences_for_idx)
    return nested_sentences


indices = perturb_idxes
replacement_words_nested_list = synonym_words

nested_sentences = generate_nested_sentences(sentence, indices, replacement_words_nested_list, tokenizer)


In [16]:
nested_sentences

[['i drove by yesterday to get a sneak peak it re opens on july and i can t wait to take my kids the new range looks amazing stunning entire range appears to be turf which may or many not help your game but it looks really nice the tee boxes look state of the art and the club house looks like something you ll see on a newer course can t wait to experience it',
  'i drove by yesterday to get a sneak peak it re opens on july and i can t wait to take my kids the new range looks amazing [UNK] entire range appears to be turf which may or many not help your game but it looks really nice the tee boxes look state of the art and the club house looks like something you ll see on a newer course can t wait to experience it',
  'i drove by yesterday to get a sneak peak it re opens on july and i can t wait to take my kids the new range looks amazing impressive entire range appears to be turf which may or many not help your game but it looks really nice the tee boxes look state of the art and the clu

In [17]:
# 加载微调后的BERT模型
MODEL_PATH = "/home/ubuntu/zhc/work/adversarial_output"
adver_tokenizer = BertTokenizer.from_pretrained(MODEL_PATH)
adver_model = BertForSequenceClassification.from_pretrained(MODEL_PATH)

# 计算Logit输出
def compute_logits(sentence):
    inputs = adver_tokenizer(sentence, return_tensors="pt", padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        outputs = adver_model(**inputs)
    logits = outputs.logits
    return logits

# 计算概率输出
def compute_probabilities(sentence):
    logits = compute_logits(sentence)
    probabilities = torch.nn.functional.softmax(logits, dim=1)
    return probabilities

# 计算M3概率矩阵
def compute_difference_matrix(original_sentence, nested_adversarial_sentences, nested_replacement_words):
    difference_matrix = []

    # 为原始句子计算softmax概率
    original_probs = compute_probabilities(original_sentence)
    #print(original_probs)
    original_pred_class = torch.argmax(original_probs, dim=1).item()
    original_pred_prob = original_probs[0, original_pred_class].item()

    for group_idx, group in enumerate(nested_adversarial_sentences):
        group_differences = {}

        for sentence_idx, sentence in enumerate(group):
            adversarial_probs = compute_probabilities(sentence)
            #print(adversarial_probs)

            adversarial_pred_class = torch.argmax(adversarial_probs, dim=1).item()
            adversarial_pred_prob = adversarial_probs[0, original_pred_class].item()
            #print(adversarial_pred_prob)

            # 计算概率差值
            difference = original_pred_prob - adversarial_pred_prob

            # 使用替换词作为key存储差值
            replacement_word = nested_replacement_words[group_idx][sentence_idx]
            group_differences[replacement_word] = difference
            #print(difference)
            # break
        difference_matrix.append(group_differences)
    return difference_matrix


# 假设你已经有一个嵌套的对抗句子列表
original_sentence = sentence

nested_adversarial_sentences = nested_sentences

# 假设你有一个替换词列表与上面的对抗句子列表相对应
nested_replacement_words = synonym_words


difference_matrix = compute_difference_matrix(original_sentence, nested_adversarial_sentences, nested_replacement_words)
print(difference_matrix)

[{'stunning': 3.30805778503418e-05, 'astounding': 5.0067901611328125e-06, 'impressive': 2.4497509002685547e-05, 'astonishing': 1.52587890625e-05, 'startling': 2.580881118774414e-05, 'surprising': 1.4543533325195312e-05, 'striking': 3.2842159271240234e-05, 'staggering': 3.0100345611572266e-05, 'incredible': 1.5616416931152344e-05, 'breathtaking': 5.0067901611328125e-06, 'unbelievable': 1.049041748046875e-05, 'awesome': 1.901388168334961e-05, 'remarkable': 1.4781951904296875e-05, 'spectacular': 1.8596649169921875e-05, 'fantastic': 1.52587890625e-05, 'magnificent': 3.2007694244384766e-05, 'phenomenal': 5.0067901611328125e-06, 'dazzling': 2.7000904083251953e-05, 'marvelous': 1.2755393981933594e-05, 'extraordinary': 2.4378299713134766e-05, 'terrific': 1.8835067749023438e-05, 'fabulous': 2.568960189819336e-05, 'superb': 2.6166439056396484e-05, 'marvellous': 5.0067901611328125e-06, 'sublime': 1.996755599975586e-05, 'wonderful': 1.049041748046875e-05, 'noteworthy': -8.344650268554688e-07, 'spl

In [18]:
import heapq

def k_best_viterbi(probs, k=1):
    # 初始化路径
    paths = [(score, [token]) for token, score in probs[0].items()]
    
    for i in range(1, len(probs)):
        new_paths = []
        for prev_score, prev_path in paths:
            for next_token, next_score in probs[i].items():
                new_score = prev_score + next_score
                heapq.heappush(new_paths, (new_score, prev_path + [next_token]))
                if len(new_paths) > k:
                    heapq.heappop(new_paths)
        paths = new_paths
        
    return sorted(paths, key=lambda x: x[0], reverse=True)


all_paths = k_best_viterbi(difference_matrix, k=10000)
for score, path in all_paths:
    print(path, score)

['strange', 'perfection'] 0.0005709528923034668
['strange', 'illustrious'] 0.0005657076835632324
['strange', 'glorious'] 0.0005654692649841309
['strange', 'majestic'] 0.0005645155906677246
['strange', 'satisfying'] 0.000564277172088623
['strange', 'seductive'] 0.000564277172088623
['strange', 'ravi'] 0.0005635619163513184
['strange', 'superb'] 0.0005627274513244629
['strange', 'extraordinary'] 0.0005624294281005859
['strange', 'brilliant'] 0.0005620718002319336
['strange', 'splendid'] 0.000561833381652832
['strange', 'lavish'] 0.000561833381652832
['strange', 'exquisite'] 0.0005614757537841797
['strange', 'sublime'] 0.0005612373352050781
['strange', 'magnificent'] 0.0005609989166259766
['strange', 'sterling'] 0.0005608797073364258
['strange', 'perfect'] 0.0005605220794677734
['strange', 'neat'] 0.0005602836608886719
['strange', 'fabulous'] 0.0005600452423095703
['strange', 'terrific'] 0.0005598068237304688
['strange', 'beauty'] 0.000559687614440918
['strange', 'fortunate'] 0.0005596876

In [19]:
def replace_tokens_in_sentence(sentence, indices, replacement_tokens, tokenizer):
    # Tokenize the original sentence
    tokens = tokenizer.tokenize(sentence)
    
    # Replace tokens at specified indices
    for i, index in enumerate(indices):
        tokens[index-1] = replacement_tokens[i]

    # Decode back to sentence
    return tokenizer.decode(tokenizer.convert_tokens_to_ids(tokens))


In [20]:
# 加载受害模型和tokenizer

black_box_path = "/home/ubuntu/zhc/work/output"
tokenizer = BertTokenizer.from_pretrained(black_box_path)
model = BertForSequenceClassification.from_pretrained(black_box_path)
model.eval()
model.to('cuda')  # if you are using GPU

def predict_sentiment(sentence, model, tokenizer):
    inputs = tokenizer(sentence, return_tensors="pt", truncation=True, padding=True, max_length=512)
    inputs = {k: v.to('cuda') for k, v in inputs.items()}  # Move to GPU if using GPU
    with torch.no_grad():
        outputs = model(**inputs)
    logits = outputs.logits
    predicted_class = torch.argmax(logits, dim=1).item()
    return predicted_class

original_sentiment = predict_sentiment(sentence, model, tokenizer)

print(original_sentiment)



1


In [21]:
success = False

for score, path in all_paths:
    modified_sentence = replace_tokens_in_sentence(sentence, indices, path, tokenizer)
    modified_sentiment = predict_sentiment(modified_sentence, model, tokenizer)
    print(modified_sentiment)
    if modified_sentiment != original_sentiment:
        print(f"Attack successful with sentence: {modified_sentence}")
        success = True
        break

if not success:
    print("Attack failed after trying all paths.")

1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
Attack successful with sentence: i drove by yesterday to get a sneak peak it re opens on july and i can t wait to take my kids the new range looks strange the entire range appears to be turf which may or many not help your game but it looks really fined the tee boxes look state of the art and the club house looks like something you ll see on a newer course can t wait to experience it
