In [20]:
from transformers import AutoTokenizer, AutoModel
import torch

# 选择适合任务的 CodeBERT 变体
model_name = "microsoft/codebert-base"

# 加载模型和分词器
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

# 示例源代码
code = ["def hello_world():\n    print('Hello, World!')","def hello_world():\n    print('Hello, World!')"]

# 使用分词器编码源代码
tokenized_code = tokenizer.encode_plus(code, return_tensors="pt", add_special_tokens=True)

# 获取模型输出
with torch.no_grad():
    model_output = model(**tokenized_code)

# 提取模型输出的表示


In [23]:
# for name, param in model.named_parameters():
#     print(name,param)
print(tokenized_code)



{'input_ids': tensor([[    0,  9232, 20760,  1215,  8331, 49536, 50118,  1437,  1437,  1437,
          5780, 45803, 31414,     6,   623,   328, 27645,     2,     2,  9232,
         20760,  1215,  8331, 49536, 50118,  1437,  1437,  1437,  5780, 45803,
         31414,     6,   623,   328, 27645,     2]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}


In [None]:
import torch.nn.functional as F
class FreeLB(object):
    def __init__(self, adv_K=3, adv_lr=1e-2, adv_init_mag=2e-2, adv_max_norm=0., adv_norm_type='l2'):
        self.adv_K = adv_K
        self.adv_lr = adv_lr
        self.adv_max_norm = adv_max_norm
        self.adv_init_mag = adv_init_mag    # adv-training initialize with what magnitude, 即我们用多大的数值初始化delta
        self.adv_norm_type = adv_norm_type
        
    def attack(self, model, inputs,labels, gradient_accumulation_steps=1):
        input_ids = inputs['input_ids']
        if isinstance(model, torch.nn.DataParallel):
            embeds_init = model.encoder.roberta.embeddings.word_embeddings(input_ids)
        else:
            embeds_init = model.encoder.roberta.embeddings.word_embeddings(input_ids)
        if self.adv_init_mag > 0:   # 影响attack首步是基于原始梯度(delta=0)，还是对抗梯度(delta!=0)
            input_mask = inputs['attention_mask'].to(embeds_init)
            input_lengths = torch.sum(input_mask, 1)
            if self.adv_norm_type == "l2":
                delta = torch.zeros_like(embeds_init).uniform_(-1, 1) * input_mask.unsqueeze(2)
                dims = input_lengths * embeds_init.size(-1)
                mag = self.adv_init_mag / torch.sqrt(dims)
                delta = (delta * mag.view(-1, 1, 1)).detach()
            elif self.adv_norm_type == "linf":
                delta = torch.zeros_like(embeds_init).uniform_(-self.adv_init_mag, self.adv_init_mag)
                delta = delta * input_mask.unsqueeze(2)
        else:
            delta = torch.zeros_like(embeds_init)  # 扰动初始化
        loss, logits = None, None
        for astep in range(self.adv_K):
            delta.requires_grad_()
            inputs['inputs_embeds'] = delta + embeds_init  # 累积一次扰动delta
            inputs['input_ids'] = None
            outputs = model.encoder(**inputs)[0]
            logits=outputs # 4*1
            prob=F.sigmoid(logits)
            
            labels=labels.float()
            loss=torch.log(prob[:,0]+1e-10)*labels+torch.log((1-prob)[:,0]+1e-10)*(1-labels)
            loss=-loss.mean()
            
            
            loss = loss / gradient_accumulation_steps
            loss.backward()
            delta_grad = delta.grad.clone().detach()  # 备份扰动的grad
            if self.adv_norm_type == "l2":
                denorm = torch.norm(delta_grad.view(delta_grad.size(0), -1), dim=1).view(-1, 1, 1)
                denorm = torch.clamp(denorm, min=1e-8)
                delta = (delta + self.adv_lr * delta_grad / denorm).detach()
                if self.adv_max_norm > 0:
                    delta_norm = torch.norm(delta.view(delta.size(0), -1).float(), p=2, dim=1).detach()
                    exceed_mask = (delta_norm > self.adv_max_norm).to(embeds_init)
                    reweights = (self.adv_max_norm / delta_norm * exceed_mask + (1 - exceed_mask)).view(-1, 1, 1)
                    delta = (delta * reweights).detach()
            elif self.adv_norm_type == "linf":
                denorm = torch.norm(delta_grad.view(delta_grad.size(0), -1), dim=1, p=float("inf")).view(-1, 1, 1)  # p='inf',无穷范数，获取绝对值最大者
                denorm = torch.clamp(denorm, min=1e-8)  # 类似np.clip，将数值夹逼到(min, max)之间
                delta = (delta + self.adv_lr * delta_grad / denorm).detach()  # 计算该步的delta，然后累加到原delta值上(梯度上升)
                if self.adv_max_norm > 0:
                    delta = torch.clamp(delta, -self.adv_max_norm, self.adv_max_norm).detach()
            else:
                raise ValueError("Norm type {} not specified.".format(self.adv_norm_type))
            if isinstance(model, torch.nn.DataParallel):  
                embeds_init = model.encoder.roberta.embeddings.word_embeddings(input_ids)
            else:
                embeds_init = model.encoder.roberta.embeddings.word_embeddings(input_ids)
        return loss


In [None]:
if args.do_adv:
    inputs = {
        "input_ids": input_ids,
        "bbox": layout,
        "token_type_ids": segment_ids,
        "attention_mask": input_mask,
        "masked_lm_labels": lm_label_ids
    }
    loss, prediction_scores = freelb.attack(model, inputs)
loss.backward()
optimizer.step()
scheduler.step()
model.zero_grad()


In [None]:
import argparse
import os

from torch.utils.data import DataLoader, SequentialSampler
from torch.utils.data.dataloader import default_collate
from torch_cka import CKA
import torch
from transformers import RobertaConfig, RobertaForSequenceClassification, RobertaTokenizer

# import sys
# sys.path.append("..")
from model import Model
from run import TextDataset


def compare_codebert(name1, name2, data_path, args):
    path_dict = {
        "ori": "./saved_models/",
        "adv": "./saved_models/FREELB"
    }

    name_dict = {
        "ori": "CodeBERT",
        "adv": "CodeBERT-FreeLB",
        "pre": "CodeBERT-pre"
    }



    config = RobertaConfig.from_pretrained(args.model_name_or_path)
    config.num_labels = 1
    tokenizer = RobertaTokenizer.from_pretrained(args.model_name_or_path)

    model1 = RobertaForSequenceClassification.from_pretrained(args.model_name_or_path, config=config)
    model1 = Model(model1, config, tokenizer, None)

    if name1 != "pre":
        model_path1 = path_dict[name1]
        checkpoint_prefix = 'checkpoint-best-acc/model.bin'
        output_dir = os.path.join(model_path1, '{}'.format(checkpoint_prefix))
        model1.load_state_dict(torch.load(output_dir))


    model1.to(args.device)

    # for name, param in model1.named_parameters():
    #     print(name, param.shape)
    #
    # exit(0)

    model2 = RobertaForSequenceClassification.from_pretrained(args.model_name_or_path, config=config)
    model2 = Model(model2, config, tokenizer, None)

    if name2 != "pre":
        model_path2 = path_dict[name2]
        checkpoint_prefix = 'checkpoint-best-acc/model.bin'
        output_dir = os.path.join(model_path2, '{}'.format(checkpoint_prefix))
        model2.load_state_dict(torch.load(output_dir))
    model2.to(args.device)

    test_dataset = TextDataset(tokenizer, args, data_path)
    test_sampler = SequentialSampler(test_dataset)
    test_dataloader = DataLoader(test_dataset, sampler=test_sampler, batch_size=8, num_workers=4,
                                 pin_memory=True)

    layers = ['encoder.roberta.embeddings']
    for i in range(12):
        layer = 'encoder.roberta.encoder.layer.{}.output'.format(i)
        layers.append(layer)

    # layers.append('encoder.classifier')

    cka = CKA(model1, model2,
              model1_name=name_dict[name1],  # good idea to provide names to avoid confusion
              model2_name=name_dict[name2],
              model1_layers=layers,
              model2_layers=layers,
              device='cuda')
    cka.compare(test_dataloader)  # secondary dataloader is optional
    if not os.path.exists("./cka/"):
        os.makedirs("./cka/")
    cka.plot_results(save_path=f"./cka/Defect_FreeLB_CodeBERT_{name1}_{name2}.png")
    results = cka.export()  # returns a dict that contains model names, layer names and the CKA matrix

    for key, value in results.items():
        print(key, value)


parser = argparse.ArgumentParser()

## Required parameters
parser.add_argument("--model_name_or_path", default=None, type=str,
                    help="The model checkpoint for weights initialization.")

parser.add_argument("--test_data_file", default="../dataset/test.jsonl", type=str,
                    help="An optional input evaluation data file to evaluate the perplexity on (a text file).")
parser.add_argument("--block_size", default=-1, type=int,
                    help="Optional input sequence length after tokenization."
                         "The training dataset will be truncated in block of this size for training."
                         "Default to the model max input length for single sentence inputs (take into account special tokens).")


args = parser.parse_args()

args.block_size = 512


args.model_name_or_path = "microsoft/codebert-base"
args.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

args.test_data_file = "../dataset/test_adv.jsonl"

name1 = "ori"
name2 = "FreeLB"
name3 = "pre"


compare_codebert(name1, name2, args.test_data_file, args)



# for name, param in model1.named_parameters():
#     print(name, param.shape)


In [15]:
import torch

# 创建两个形状为 [16, 512, 768] 的张量
tensor_A = torch.randn(16, 512, 768)
tensor_B = torch.randn(2, 8)
tensor_C = tensor_B.ne(1)
print(tensor_C)
input_lengths = torch.sum(tensor_C, 1)
# 使用 torch.mul() 进行逐元素相乘
# result = torch.einsum("abc,abb->abc",tensor_A,tensor_B)
# print(result.shape)
print(input_lengths*768)

tensor([[True, True, True, True, True, True, True, True],
        [True, True, True, True, True, True, True, True]])
tensor([6144, 6144])


In [11]:
import torch

# 创建两个形状为 [16, 512, 768] 的张量
tensor_A = torch.randn(16, 512, 768)
tensor_B = torch.randn(16, 512, 768)

# 使用 torch.mul() 进行逐元素相乘
result1 = torch.mul(tensor_A, tensor_B)

# 或者直接使用 * 运算符
result2 = tensor_A * tensor_B

# 打印结果的形状
print(result1.shape)  # 输出 torch.Size([16, 512, 768])
print(result2.shape)  # 输出 torch.Size([16, 512, 768])


torch.Size([16, 512, 768])
torch.Size([16, 512, 768])
