**Preliminary**

In [1]:
!pip install datasets

Collecting datasets
  Downloading datasets-3.6.0-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2025.3.0,>=2023.1.0 (from fsspec[http]<=2025.3.0,>=2023.1.0->datasets)
  Downloading fsspec-2025.3.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.6.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.5/491.5 kB[0m [31m27.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2025.3.0-py3-none-any.whl 

In [2]:
!pip install torch transformers datasets

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

Alg1

In [None]:
import torch, random, torch.nn.functional as F, pandas as pd
import pickle  # 添加pickle导入
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from datasets import load_dataset

# ── 超参 ─────────────────────────────────────────
loss_type  = "Sentiment"      # "Sentiment" or "Topic"
num_shots  = 10                # in‑context demos 数
n_edit     = 1                # 每条示例可替换 token 数
steps      = 10               # PGD 迭代
alpha      = 5             # 步长
eps, k_nn  = 1000.0, 50          # L2 半径 & Word‑Proj 候选
trials     = 10                # 抽样组数（示例用 3；改成 10 跑完整）
seed       = 42
device     = "cuda" if torch.cuda.is_available() else "cpu"
verbose    = True
random.seed(seed); torch.manual_seed(seed)

# ── 模型 & 词表 ──────────────────────────────────
tok  = GPT2Tokenizer.from_pretrained("gpt2")
tok.pad_token = tok.eos_token
model = GPT2LMHeadModel.from_pretrained("gpt2").to(device).eval()
E = model.get_input_embeddings().weight; V, d = E.shape
pos_id = tok("positive", add_special_tokens=False)["input_ids"][0]
neg_id = tok("negative", add_special_tokens=False)["input_ids"][0]
label_word = {1: "positive", 0: "negative"}
topic_word = {0: "world", 1: "sports", 2: "business", 3: "technology"}
sst = load_dataset("glue", "sst2", split="validation[:2000]")

# ── Prompt 构造（固定格式）（应该分generation和classification） ──────────────────────
def build_prompt(demo_df, q_sent, q_lbl):
    if loss_type == "Topic":
        instr  = "Classify the topic of the last review. Here are several examples."
        tag    = "\nTopic:"
        labmap = topic_word
        tgt_id = tok(labmap[q_lbl], add_special_tokens=False)["input_ids"][0]
    else:
        instr  = ("Analyze the sentiment of the last review and respond with either "
                  "positive or negative. Here are several examples.")
        tag    = "\nSentiment:"
        labmap = label_word
        tgt_id = pos_id if q_lbl == 1 else neg_id

    demos_str, demo_sents = "", []
    for sent, lab in zip(demo_df["sentence"][:num_shots], demo_df["label"][:num_shots]):
        s = sent.strip(); demo_sents.append(s)
        demos_str += f"\nReview: {s}{tag}{labmap[lab]}"

    q_stub = f"\nReview: {q_sent.strip()}{tag[:-1]}:"
    prompt = f"{instr}\n{demos_str}{q_stub}"
    return prompt, demo_sents, tgt_id

# ── 工具函数 ─────────────────────────────────────
softprob = lambda lg, idx: lg.softmax(dim=0)[idx].item()
def next_token(ids):
    return model.generate(ids.unsqueeze(0), max_new_tokens=1,
                          do_sample=False, pad_token_id=tok.eos_token_id)[0, -1].item()
def gpt_continue(ids, n=1): ##这里修改续写的长度
    out = model.generate(ids.unsqueeze(0), max_new_tokens=n,
                         do_sample=False, pad_token_id=tok.eos_token_id)
    return tok.decode(out[0, ids.size(0):], skip_special_tokens=True).strip()

# ── 一轮攻击 ─────────────────────────────────────
def attack(demo_df, q_row):
    prompt, demo_sents, tgt_id = build_prompt(demo_df, q_row["sentence"], q_row["label"])
    enc = tok(prompt, return_tensors="pt").to(device)
    ids, mask = enc.input_ids[0], enc.attention_mask[0]

    # spans = [start, end, token_pos_list]
    spans=[]; cur=len(tok(prompt.split('\n')[0]+'\n', add_special_tokens=False)["input_ids"])
    for s in demo_sents:
        cur += len(tok("Review: ", add_special_tokens=False)["input_ids"])
        start = cur
        s_tok = tok(s, add_special_tokens=False)["input_ids"]
        cur  += len(s_tok)
        spans.append([start, cur, []])
        cur  += len(tok("\nSentiment:positive", add_special_tokens=False)["input_ids"])

    # baseline
    pred_b = next_token(ids); base_ok = pred_b == tgt_id
    prob_b = softprob(model(ids.unsqueeze(0)).logits[0, -1], pred_b)
    out_b  = gpt_continue(ids)

    # PGD 初始化
    delta = torch.zeros_like(model.get_input_embeddings()(ids), requires_grad=True)
    trace=[]
    for t in range(steps):
        emb = model.get_input_embeddings()(ids) + delta
        logits = model(inputs_embeds=emb.unsqueeze(0), attention_mask=mask.unsqueeze(0)).logits[0, -1]
        loss = F.cross_entropy(logits.unsqueeze(0), torch.tensor([tgt_id], device=device))
        trace.append(loss.item())
        loss.backward()

        if t == 0:
            for s, e, posL in spans:
                grad_norm = delta.grad[s:e].norm(dim=1)
                topk = grad_norm.topk(min(n_edit, e - s)).indices + s
                posL.extend(topk.tolist())

        with torch.no_grad():
            for _, _, posL in spans:
                for p in posL:
                    delta[p] += alpha * delta.grad[p]
            if delta.view(-1).norm() > eps:
                delta.mul_(eps / delta.view(-1).norm())
            delta.grad.zero_()

    # 保存每个ICE的delta矩阵
    ice_deltas = {}
    for idx, (s, e, posL) in enumerate(spans, 1):
        for p in posL:
            # 对于每个ICE中的每个扰动位置，保存其delta矩阵
            if f"ICE_{idx}" not in ice_deltas:
                ice_deltas[f"ICE_{idx}"] = {}
            pos_in_ice = p - s  # 在当前ICE中的相对位置
            ice_deltas[f"ICE_{idx}"][pos_in_ice] = delta[p].detach().cpu()

    # 将delta矩阵保存到pkl文件
    with open(f"ice_deltas_trial_{trial}.pkl", "wb") as f:
        pickle.dump(ice_deltas, f)

    # Word‑Proj 投影
    ids_adv = ids.clone()
    for _, _, posL in spans:
        for p in posL:
            orig = ids[p].item(); target_vec = E[orig] + delta[p]
            mask_ball = (E - E[orig]).norm(dim=1) <= eps
            cand = torch.arange(V, device=device)[mask_ball]
            if cand.numel() == 0: continue
            cand = cand[(E[cand] - target_vec).norm(dim=1)
                        .topk(min(k_nn, cand.numel()), largest=False).indices]
            best, best_loss = orig, -1e9
            for cid in cand:
                tmp = ids_adv.clone(); tmp[p] = cid
                l = F.cross_entropy(model(tmp.unsqueeze(0)).logits[0, -1].unsqueeze(0),
                                    torch.tensor([tgt_id], device=device))
                if l > best_loss:
                    best, best_loss = cid.item(), l.item()
            ids_adv[p] = best

    # adversarial
    pred_a = next_token(ids_adv); adv_ok = pred_a == tgt_id
    prob_a = softprob(model(ids_adv.unsqueeze(0)).logits[0, -1], pred_a)
    out_a  = gpt_continue(ids_adv)

    # 打印
    if verbose:
        print("\nloss trace:", [f"{l:.3f}" for l in trace])
        print("Baseline :", tok.decode([pred_b]), f"p={prob_b:.2f}", "✓" if base_ok else "✗")
        print("AfterAtk :", tok.decode([pred_a]), f"p={prob_a:.2f}", "✓" if adv_ok else "✗")
        print("GPT‑2 clean :", out_b or "<empty>")
        print("GPT‑2 attack:", out_a or "<empty>")

        print("\n--- token changes ---")
        for idx, (s, e, posL) in enumerate(spans, 1):
            before = tok.decode(ids[s:e], skip_special_tokens=True)
            after  = tok.decode(ids_adv[s:e], skip_special_tokens=True)
            diff   = [(tok.convert_ids_to_tokens([ids[p]])[0],
                       tok.convert_ids_to_tokens([ids_adv[p]])[0]) for p in posL]
            print(f"\nICE {idx}")
            print(" before:", before)
            print(" after :", after)
            print(" changed:", diff)

    return base_ok, adv_ok

# ── 多次试验统计 ─────────────────────────────────
base_cnt = adv_cnt = 0
for trial in range(1, trials + 1):
    demo_df = pd.DataFrame(random.sample(list(sst), num_shots))
    query   = random.sample(list(sst), 1)[0]
    print(f"\n===== Trial {trial} =====")
    b_ok, a_ok = attack(demo_df, query)
    base_cnt += b_ok; adv_cnt += a_ok

print("\nAvg baseline acc :", base_cnt / trials)
print("Avg adversarial  :", adv_cnt / trials)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/35.3k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/3.11M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/72.8k [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/148k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/67349 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/872 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1821 [00:00<?, ? examples/s]

The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.



===== Trial 1 =====


Test on entire dataset

In [None]:
# import torch, random, torch.nn.functional as F, pandas as pd
# from transformers import GPT2Tokenizer, GPT2LMHeadModel
# from datasets import load_dataset

# # ── 超参 ─────────────────────────────────────────
# num_shots      = 20          # demo 数
# n_edit         = 1           # 每条示例可替换 token 数
# steps, alpha   = 30, 2e-1
# eps, k_nn      = 5.0, 50
# seed           = 42
# device         = "cuda" if torch.cuda.is_available() else "cpu"

# verbose_trial  = True        # ← True 显示逐条详细输出
# # print_limit    = 3           # ← 仅打印前 N 条；None 表示全打印
# random.seed(seed); torch.manual_seed(seed)

# # ── 模型 & 词表 ──────────────────────────────────
# tok  = GPT2Tokenizer.from_pretrained("gpt2")
# tok.pad_token = tok.eos_token
# model = GPT2LMHeadModel.from_pretrained("gpt2").to(device).eval()
# E = model.get_input_embeddings().weight; V, d = E.shape
# pos_id = tok("positive", add_special_tokens=False)["input_ids"][0]
# neg_id = tok("negative", add_special_tokens=False)["input_ids"][0]
# label_word = {1: "positive", 0: "negative"}

# # ── 数据集 ───────────────────────────────────────
# train_set = load_dataset("glue", "sst2", split="train")
# test_set  = load_dataset("glue", "sst2", split="validation")   # 872

# # ── Prompt 构造 ──────────────────────────────────
# def build_prompt(demo_df, q_sent, q_lbl):
#     instr  = ("Analyze the sentiment of the last review and respond with either "
#               "positive or negative. Here are several examples.")
#     tag    = "\nSentiment:"
#     tgt_id = pos_id if q_lbl == 1 else neg_id
#     demos, demo_sents = "", []
#     for sent, lab in zip(demo_df["sentence"][:num_shots], demo_df["label"][:num_shots]):
#         s = sent.strip(); demo_sents.append(s)
#         demos += f"\nReview: {s}{tag}{label_word[lab]}"
#     q_stub = f"\nReview: {q_sent.strip()}{tag[:-1]}:"
#     return f"{instr}\n{demos}{q_stub}", demo_sents, tgt_id

# # ── 工具函数 ─────────────────────────────────────
# def next_token(ids):
#     mask = torch.ones_like(ids).unsqueeze(0)
#     return model.generate(ids.unsqueeze(0), max_new_tokens=1,
#                           do_sample=False, pad_token_id=tok.eos_token_id,
#                           attention_mask=mask)[0, -1].item()

# def gpt_continue(ids, n=0):             # 若 n==0 返回空串
#     if n <= 0: return ""
#     mask = torch.ones_like(ids).unsqueeze(0)
#     out = model.generate(ids.unsqueeze(0), max_new_tokens=n,
#                          do_sample=False, pad_token_id=tok.eos_token_id,
#                          attention_mask=mask)
#     return tok.decode(out[0, ids.size(0):], skip_special_tokens=True).strip()

# # ── 主攻击函数 ───────────────────────────────────
# def attack(demo_df, q_row, verbose=False, idx=None):
#     prompt, demo_sents, tgt_id = build_prompt(demo_df, q_row["sentence"], q_row["label"])
#     enc = tok(prompt, return_tensors="pt").to(device)
#     ids, mask = enc.input_ids[0], enc.attention_mask[0]

#     spans, cur = [], len(tok(prompt.split('\n')[0]+'\n', add_special_tokens=False)["input_ids"])
#     for s in demo_sents:
#         cur += len(tok("Review: ", add_special_tokens=False)["input_ids"])
#         start = cur
#         s_tok = tok(s, add_special_tokens=False)["input_ids"]; cur += len(s_tok)
#         spans.append([start, cur, []])
#         cur += len(tok("\nSentiment:positive", add_special_tokens=False)["input_ids"])

#     # baseline
#     pred_b = next_token(ids); base_ok = pred_b == tgt_id

#     # --- PGD ---
#     delta = torch.zeros_like(model.get_input_embeddings()(ids), requires_grad=True)
#     loss_trace=[]
#     for t in range(steps):
#         emb = model.get_input_embeddings()(ids) + delta
#         logits = model(inputs_embeds=emb.unsqueeze(0), attention_mask=mask.unsqueeze(0)).logits[0,-1]
#         loss = F.cross_entropy(logits.unsqueeze(0), torch.tensor([tgt_id], device=device))
#         loss_trace.append(loss.item()); loss.backward()

#         if t == 0:
#             for s,e,posL in spans:
#                 grad = delta.grad[s:e].norm(dim=1)
#                 posL.extend((grad.topk(min(n_edit, e-s)).indices + s).tolist())
#         with torch.no_grad():
#             for _,_,posL in spans:
#                 for p in posL: delta[p]+=alpha*delta.grad[p]
#             if delta.view(-1).norm()>eps: delta.mul_(eps/delta.view(-1).norm())
#             delta.grad.zero_()

#     # Word‑Proj
#     ids_adv = ids.clone()
#     for _,_,posL in spans:
#         for p in posL:
#             orig=ids[p].item(); tgt_vec=E[orig]+delta[p]
#             ball=(E-E[orig]).norm(dim=1)<=eps
#             cand=torch.arange(V,device=device)[ball]
#             if cand.numel()==0: continue
#             cand=cand[(E[cand]-tgt_vec).norm(dim=1)
#                       .topk(min(k_nn,cand.numel()),largest=False).indices]
#             best,best_l=orig,-1e9
#             for cid in cand:
#                 tmp=ids_adv.clone(); tmp[p]=cid
#                 l=F.cross_entropy(model(tmp.unsqueeze(0)).logits[0,-1].unsqueeze(0),
#                                   torch.tensor([tgt_id],device=device))
#                 if l>best_l: best,best_l=cid.item(),l.item()
#             ids_adv[p]=best

#     pred_a = next_token(ids_adv); adv_ok = pred_a == tgt_id

#     # ---------- 打印 ----------
#     if verbose:
#         header = f"\n===== Query {idx} =====" if idx is not None else ""
#         print(header)
#         print("loss trace:", [f"{l:.3f}" for l in loss_trace])
#         print("Baseline :", tok.decode([pred_b]), "✓" if base_ok else "✗")
#         print("AfterAtk :", tok.decode([pred_a]), "✓" if adv_ok else "✗")
#         print("\n--- token changes ---")
#         for n,(s,e,posL) in enumerate(spans,1):
#             bef=tok.decode(ids[s:e],skip_special_tokens=True)
#             aft=tok.decode(ids_adv[s:e],skip_special_tokens=True)
#             diff=[(tok.convert_ids_to_tokens([ids[p]])[0],
#                    tok.convert_ids_to_tokens([ids_adv[p]])[0]) for p in posL]
#             print(f"ICE {n} diff:", diff)
#     return base_ok, adv_ok

# # ── 遍历测试集 ──────────────────────────────────
# base_cnt = adv_cnt = 0
# for i, q in enumerate(test_set, 1):
#     demos = pd.DataFrame(random.sample(list(train_set), num_shots))
#     verbose_flag = verbose_trial and (print_limit is None or i <= print_limit)
#     b, a = attack(demos, q, verbose=verbose_flag, idx=i)
#     base_cnt += b; adv_cnt += a

# print("\n=== SST‑2 Test‑Set Results ===")
# print(f"Baseline accuracy   : {base_cnt / len(test_set):.4f}")
# print(f"Adversarial accuracy: {adv_cnt  / len(test_set):.4f}")
# print(f"Accuracy drop       : {(base_cnt - adv_cnt) / len(test_set):.4f}")


**Budget Profile Calculation**

In [None]:
import pickle
import numpy as np
import torch
from scipy.interpolate import interp1d

# -------------- 1. 载入 ice_deltas --------------
with open("ice_deltas_trial_1.pkl", "rb") as f:
    ice_deltas = pickle.load(f)
# ice_deltas 格式: { "ICE_1": {pos0: tensor(d), …}, "ICE_2": {...}, … }

# -------------- 2. 计算 ||δ_i||₂ --------------
delta_norms = []
n = len(ice_deltas)
for i in range(1, n + 1):
    # 把 ICE_i 下所有 pos 的 delta 堆成矩阵，再求位置 L2 norm 之和
    deltas_i = torch.stack(list(ice_deltas[f"ICE_{i}"].values()))
    delta_norms.append(deltas_i.norm(dim=1).sum().item())
delta_norms = np.array(delta_norms)  # shape=(n,)

# -------------- 3. 省略 emb_norms，全部设为 1 --------------
emb_norms = np.ones_like(delta_norms)

# -------------- 4. 归一化得出离散的 γ  --------------
gamma = delta_norms / emb_norms
gamma = gamma / gamma.sum()
print("离散 Budget Profile γ =", gamma.tolist())

# -------------- 5. 定义连续转换函数 --------------
def to_continuous_budget(gamma, N):
    """
    将离散预算分布 gamma（已归一化）转换为长度为 N 的
    连续分配比例向量，和为 1。
    """
    gamma = np.array(gamma, dtype=float)
    n = gamma.size
    # 原始点：1,2,…,n
    x_orig = np.arange(1, n + 1)
    # 线性插值函数
    pdf_fun = interp1d(
        x_orig, gamma,
        kind="linear",
        bounds_error=False,
        fill_value="extrapolate"
    )
    # 在 [1, n] 区间等距采样 N 个点
    x_new = np.linspace(1, n, N)
    pdf_vals = np.clip(pdf_fun(x_new), 0, None)
    # 归一化为分配比例
    allocations = pdf_vals / pdf_vals.sum()
    return allocations

# -------------- 6. 接口：定义 N 并打印结果 --------------
if __name__ == "__main__":
    N = 20  # 在这里修改 N 的值
    alloc = to_continuous_budget(gamma, N)
    print(f"Continuous Budget Profile (N={N}):\n", alloc)
    print("Sum check:", alloc.sum())

**Alg2**

Budgeted

In [None]:
import torch, random, torch.nn.functional as F, pandas as pd
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from datasets import load_dataset

# ── 固定超参 ─────────────────────────────────────
num_shots, n_edit = 15, 1
steps, alpha      = 100, 2
total_eps, k_nn   = 1000.0, 50         # 置 0 可验证 clean==adv
trials, seed      = 10, 43
device            = "cuda" if torch.cuda.is_available() else "cpu"
verbose           = True
random.seed(seed); torch.manual_seed(seed)

# ── Budget‑Profile Γ（示例）──────────────────────
custom_gamma = [0.6, 0.1, 0.05, 0.05, 0.2]  # 任意正数；自动归一化
def make_gamma(n, custom):
    g = torch.tensor(custom, dtype=torch.float)
    return (g / g.sum()).tolist()

# ── 模型与数据集 ─────────────────────────────────
tok = GPT2Tokenizer.from_pretrained("gpt2")
tok.pad_token = tok.eos_token
model = GPT2LMHeadModel.from_pretrained("gpt2").to(device).eval()
E = model.get_input_embeddings().weight; V, _ = E.shape
pos_id = tok("positive", add_special_tokens=False)["input_ids"][0]
neg_id = tok("negative", add_special_tokens=False)["input_ids"][0]
label_word = {1: "positive", 0: "negative"}
sst = load_dataset("glue", "sst2", split="validation[:2000]")

# ── Prompt 拼装 ──────────────────────────────────
def build_prompt(demos):
    instr = ("Analyze the sentiment of the last review and respond with either "
             "positive or negative. Here are several examples.")
    tag = "\nSentiment:"
    return instr + "".join(f"\nReview: {s.strip()}{tag}{label_word[l]}"
                           for s, l in demos)

# ── 工具函数 ─────────────────────────────────────
def next_token(ids):
    mask = torch.ones_like(ids).unsqueeze(0)
    return model.generate(ids.unsqueeze(0), max_new_tokens=1,
                          do_sample=False, pad_token_id=tok.eos_token_id,
                          attention_mask=mask)[0, -1].item()

def word_proj(orig_id, delta_vec, eps_bound):
    tgt = E[orig_id] + delta_vec
    mask = (E - E[orig_id]).norm(dim=1) <= eps_bound
    cand = torch.arange(V, device=device)[mask]
    if cand.numel() == 0:
        return orig_id
    sel = cand[(E[cand] - tgt).norm(dim=1)
               .topk(min(k_nn, cand.numel()), largest=False).indices]
    return sel[(E[sel] - tgt).norm(dim=1).argmin()].item()

# ── 新增：可靠的 span 定位 ────────────────────────
def span_by_tokens(ids, sentence):
    """在 ids 中定位 sentence 的 token span，自动尝试带/不带前导空格"""
    for prefix in ("", " "):
        pat = tok(prefix + sentence.strip(), add_special_tokens=False)["input_ids"]
        for p in range(len(ids) - len(pat) + 1):
            if ids[p:p + len(pat)].tolist() == pat:
                return p, p + len(pat)
    raise ValueError(f"span not found for [{sentence!r}]")

# ── Sequential‑Budgeted PGD ─────────────────────
def attack(demo_df, query):
    gamma  = make_gamma(num_shots, custom_gamma)
    eps_i  = [g * total_eps for g in gamma]
    demos  = [(s, l) for s, l in zip(demo_df["sentence"], demo_df["label"])]

    hijacked, traces, diffs = [], [], []
    base_vec, adv_vec = [], []

    # ← 将 enumerate(demos, 1) 改为 enumerate(demos)
    for i, (sent, lab) in enumerate(demos):
        q_stub   = f"\nReview: {query['sentence'].strip()}\nSentiment:"
        prompt_i = build_prompt(hijacked + [(sent, lab)]) + q_stub
        enc      = tok(prompt_i, return_tensors="pt").to(device)
        ids, mask = enc.input_ids[0], enc.attention_mask[0]

        # 用 span_by_tokens 定位当前 ICE
        start, end = span_by_tokens(ids, sent)

        # 选敏感 token
        emb0 = model.get_input_embeddings()(ids).detach().clone().requires_grad_(True)
        tgt_q = pos_id if query["label"] == 1 else neg_id
        F.cross_entropy(
            model(inputs_embeds=emb0.unsqueeze(0), attention_mask=mask.unsqueeze(0)).logits[0, -1].unsqueeze(0),
            torch.tensor([tgt_q], dtype=torch.long, device=device)
        ).backward()
        pos_sel = (emb0.grad[start:end].norm(dim=1).topk(n_edit).indices + start).tolist()

        # PGD 更新 δ
        delta = torch.zeros_like(emb0, requires_grad=True)
        trace = []
        for _ in range(steps):
            emb = model.get_input_embeddings()(ids) + delta
            l = F.cross_entropy(
                model(inputs_embeds=emb.unsqueeze(0), attention_mask=mask.unsqueeze(0)).logits[0, -1].unsqueeze(0),
                torch.tensor([tgt_q], dtype=torch.long, device=device)
            )
            trace.append(l.item()); l.backward()
            with torch.no_grad():
                for p in pos_sel:
                    delta[p] += alpha * delta.grad[p]
                # ← 这里 eps_i[i] 对应无越界
                if delta[start:end].norm() > eps_i[i]:
                    delta[start:end] *= eps_i[i] / delta[start:end].norm()
                delta.grad.zero_()
        traces.append(trace)

        # Word‑Proj
        ids_new = ids.clone(); diff = []
        for p in pos_sel:
            nid = word_proj(ids[p].item(), delta[p], eps_i[i])
            diff.append((tok.convert_ids_to_tokens([ids[p]])[0],
                         tok.convert_ids_to_tokens([nid])[0]))
            ids_new[p] = nid
        diffs.append(diff)

        # 写回 hijacked 文本
        hijacked_sent = tok.decode(ids_new[start:end], skip_special_tokens=True).lstrip()
        hijacked.append((hijacked_sent, lab))

        # 记录 length=i+1 的正确性
        clean_prompt_i  = build_prompt(demos[:i+1]) + q_stub
        pred_b = next_token(tok(clean_prompt_i, return_tensors="pt").to(device).input_ids[0])
        attack_prompt_i = build_prompt(hijacked) + q_stub
        pred_a = next_token(tok(attack_prompt_i, return_tensors="pt").to(device).input_ids[0])
        base_vec.append(pred_b == tgt_q)
        adv_vec.append(pred_a == tgt_q)

    if verbose:
        print("\nSequential Budgeted PGD")
        for j, (tr, df) in enumerate(zip(traces, diffs), 1):
            print(f"ICE {j} loss trace:", [f"{x:.2f}" for x in tr])
            print(" changed:", df)
        print("Per‑length baseline:", base_vec)
        print("Per‑length attack  :", adv_vec)

    return base_vec, adv_vec

# ── Trials ─────────────────────────────────────
base_sum = [0] * num_shots
adv_sum  = [0] * num_shots

for t in range(1, trials + 1):
    demo_df = pd.DataFrame(random.sample(list(sst), num_shots))
    query   = random.sample(list(sst), 1)[0]
    print(f"\n===== Trial {t} =====")
    b_vec, a_vec = attack(demo_df, query)
    for i in range(num_shots):
        base_sum[i] += b_vec[i]
        adv_sum[i]  += a_vec[i]

print("\n=== Avg accuracy vs. context length ===")
for i in range(num_shots):
    print(f"len={i+1:2d}  clean: {base_sum[i] / trials:.3f}   adv: {adv_sum[i] / trials:.3f}")

Flat

In [None]:
import torch, random, torch.nn.functional as F, pandas as pd
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from datasets import load_dataset

# ── 固定超参 ─────────────────────────────────────
num_shots, n_edit = 15, 1
steps, alpha      = 100, 2
total_eps, k_nn   = 1000.0, 50         # 置 0 可验证 clean==adv
trials, seed      = 10, 43
device            = "cuda" if torch.cuda.is_available() else "cpu"
verbose           = True
random.seed(seed); torch.manual_seed(seed)

# ── Budget‑Profile Γ（示例）──────────────────────
custom_gamma = [0.2] * n  # 任意正数；自动归一化
def make_gamma(n, custom):
    g = torch.tensor(custom, dtype=torch.float)
    return (g / g.sum()).tolist()

# ── 模型与数据集 ─────────────────────────────────
tok = GPT2Tokenizer.from_pretrained("gpt2")
tok.pad_token = tok.eos_token
model = GPT2LMHeadModel.from_pretrained("gpt2").to(device).eval()
E = model.get_input_embeddings().weight; V, _ = E.shape
pos_id = tok("positive", add_special_tokens=False)["input_ids"][0]
neg_id = tok("negative", add_special_tokens=False)["input_ids"][0]
label_word = {1: "positive", 0: "negative"}
sst = load_dataset("glue", "sst2", split="validation[:2000]")

# ── Prompt 拼装 ──────────────────────────────────
def build_prompt(demos):
    instr = ("Analyze the sentiment of the last review and respond with either "
             "positive or negative. Here are several examples.")
    tag = "\nSentiment:"
    return instr + "".join(f"\nReview: {s.strip()}{tag}{label_word[l]}"
                           for s, l in demos)

# ── 工具函数 ─────────────────────────────────────
def next_token(ids):
    mask = torch.ones_like(ids).unsqueeze(0)
    return model.generate(ids.unsqueeze(0), max_new_tokens=1,
                          do_sample=False, pad_token_id=tok.eos_token_id,
                          attention_mask=mask)[0, -1].item()

def word_proj(orig_id, delta_vec, eps_bound):
    tgt = E[orig_id] + delta_vec
    mask = (E - E[orig_id]).norm(dim=1) <= eps_bound
    cand = torch.arange(V, device=device)[mask]
    if cand.numel() == 0:
        return orig_id
    sel = cand[(E[cand] - tgt).norm(dim=1)
               .topk(min(k_nn, cand.numel()), largest=False).indices]
    return sel[(E[sel] - tgt).norm(dim=1).argmin()].item()

# ── 新增：可靠的 span 定位 ────────────────────────
def span_by_tokens(ids, sentence):
    """在 ids 中定位 sentence 的 token span，自动尝试带/不带前导空格"""
    for prefix in ("", " "):
        pat = tok(prefix + sentence.strip(), add_special_tokens=False)["input_ids"]
        for p in range(len(ids) - len(pat) + 1):
            if ids[p:p + len(pat)].tolist() == pat:
                return p, p + len(pat)
    raise ValueError(f"span not found for [{sentence!r}]")

# ── Sequential‑Budgeted PGD ─────────────────────
def attack(demo_df, query):
    gamma  = make_gamma(num_shots, custom_gamma)
    eps_i  = [g * total_eps for g in gamma]
    demos  = [(s, l) for s, l in zip(demo_df["sentence"], demo_df["label"])]

    hijacked, traces, diffs = [], [], []
    base_vec, adv_vec = [], []

    # ← 将 enumerate(demos, 1) 改为 enumerate(demos)
    for i, (sent, lab) in enumerate(demos):
        q_stub   = f"\nReview: {query['sentence'].strip()}\nSentiment:"
        prompt_i = build_prompt(hijacked + [(sent, lab)]) + q_stub
        enc      = tok(prompt_i, return_tensors="pt").to(device)
        ids, mask = enc.input_ids[0], enc.attention_mask[0]

        # 用 span_by_tokens 定位当前 ICE
        start, end = span_by_tokens(ids, sent)

        # 选敏感 token
        emb0 = model.get_input_embeddings()(ids).detach().clone().requires_grad_(True)
        tgt_q = pos_id if query["label"] == 1 else neg_id
        F.cross_entropy(
            model(inputs_embeds=emb0.unsqueeze(0), attention_mask=mask.unsqueeze(0)).logits[0, -1].unsqueeze(0),
            torch.tensor([tgt_q], dtype=torch.long, device=device)
        ).backward()
        pos_sel = (emb0.grad[start:end].norm(dim=1).topk(n_edit).indices + start).tolist()

        # PGD 更新 δ
        delta = torch.zeros_like(emb0, requires_grad=True)
        trace = []
        for _ in range(steps):
            emb = model.get_input_embeddings()(ids) + delta
            l = F.cross_entropy(
                model(inputs_embeds=emb.unsqueeze(0), attention_mask=mask.unsqueeze(0)).logits[0, -1].unsqueeze(0),
                torch.tensor([tgt_q], dtype=torch.long, device=device)
            )
            trace.append(l.item()); l.backward()
            with torch.no_grad():
                for p in pos_sel:
                    delta[p] += alpha * delta.grad[p]
                # ← 这里 eps_i[i] 对应无越界
                if delta[start:end].norm() > eps_i[i]:
                    delta[start:end] *= eps_i[i] / delta[start:end].norm()
                delta.grad.zero_()
        traces.append(trace)

        # Word‑Proj
        ids_new = ids.clone(); diff = []
        for p in pos_sel:
            nid = word_proj(ids[p].item(), delta[p], eps_i[i])
            diff.append((tok.convert_ids_to_tokens([ids[p]])[0],
                         tok.convert_ids_to_tokens([nid])[0]))
            ids_new[p] = nid
        diffs.append(diff)

        # 写回 hijacked 文本
        hijacked_sent = tok.decode(ids_new[start:end], skip_special_tokens=True).lstrip()
        hijacked.append((hijacked_sent, lab))

        # 记录 length=i+1 的正确性
        clean_prompt_i  = build_prompt(demos[:i+1]) + q_stub
        pred_b = next_token(tok(clean_prompt_i, return_tensors="pt").to(device).input_ids[0])
        attack_prompt_i = build_prompt(hijacked) + q_stub
        pred_a = next_token(tok(attack_prompt_i, return_tensors="pt").to(device).input_ids[0])
        base_vec.append(pred_b == tgt_q)
        adv_vec.append(pred_a == tgt_q)

    if verbose:
        print("\nSequential Budgeted PGD")
        for j, (tr, df) in enumerate(zip(traces, diffs), 1):
            print(f"ICE {j} loss trace:", [f"{x:.2f}" for x in tr])
            print(" changed:", df)
        print("Per‑length baseline:", base_vec)
        print("Per‑length attack  :", adv_vec)

    return base_vec, adv_vec

# ── Trials ─────────────────────────────────────
base_sum = [0] * num_shots
adv_sum  = [0] * num_shots

for t in range(1, trials + 1):
    demo_df = pd.DataFrame(random.sample(list(sst), num_shots))
    query   = random.sample(list(sst), 1)[0]
    print(f"\n===== Trial {t} =====")
    b_vec, a_vec = attack(demo_df, query)
    for i in range(num_shots):
        base_sum[i] += b_vec[i]
        adv_sum[i]  += a_vec[i]

print("\n=== Avg accuracy vs. context length ===")
for i in range(num_shots):
    print(f"len={i+1:2d}  clean: {base_sum[i] / trials:.3f}   adv: {adv_sum[i] / trials:.3f}")