In [1]:
import torch
import torch.nn.functional as F

import os
import argparse
from tqdm import tqdm
import torch.nn as nn

# from defense_channel_lips import CLP

from models.simclr_model import SimCLR
from evaluation.nn_classifier import create_torch_dataloader,predict_feature,net_train,net_test_with_logger,net_test,NeuralNet

os.environ["CUDA_VISIBLE_DEVICES"] = "2"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def val(net, data_loader):
    with torch.no_grad():
        net.eval()
        n_correct = 0
        n_total = 0

        for images, targets in data_loader:
            images, targets = images.to(device), targets.to(device)

            logits = net(images)
            prediction = logits.argmax(-1)

            n_correct += (prediction==targets).sum()
            n_total += targets.shape[0]
            
        acc = n_correct / n_total * 100

    return acc


In [2]:
import numpy 
import cv2
import numpy as np
from datasets.backdoor_dataset import CIFAR10Mem, CIFAR10Pair, BadEncoderTestBackdoor, ReferenceImg, BadEncoderDataset
from torchvision import transforms
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data import Dataset
import torch
import random

class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img, target = self.data[idx]
        return img, target
test_transform_cifar10 = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])])

test_transform_cifar10_2 = transforms.Compose([
    transforms.ToTensor()
    ])

def read_poison_pattern(images, pattern_file):
    
    if pattern_file is None:
        return None, None
    pts = []
    pt_masks = []
    for f in pattern_file:
        if isinstance(f, tuple):
            pt = cv2.imread(f[0])
            pt_mask = cv2.imread(f[1], cv2.IMREAD_GRAYSCALE)
            pt_mask = pt_mask / 255
        elif isinstance(f, str):
            pt = cv2.imread(f)
  
            pt_gray = cv2.cvtColor(pt, cv2.COLOR_BGR2GRAY)
            pt_mask = np.float32(pt_gray > 20)
        pt = cv2.resize(pt, (32, 32))
        pt_mask = cv2.resize(pt_mask, (32, 32))
        pt_mask = numpy.expand_dims(pt_mask, axis=2)
        for i in range(len(images)):
            images[i] = torch.tensor(np.transpose((1 - pt_mask) * (images[i].permute(1,2,0).numpy()) + pt* pt_mask,(2,0,1)))
    
    return images

test_file_path = '/data2/zyx/DRUPE-main/DRUPE-main/data/stl10/test.npz'
pattern_file = ["/data2/zyx/DRUPE-main/Demon-in-the-Variant/triggers/uniform.png"]
test_posion_data = CIFAR10Mem(numpy_file=test_file_path, class_type= list(range(10)), transform=test_transform_cifar10_2)
test_posion_images = [img for img, label in test_posion_data if label != 9]

class DeNormalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        for t, m, s in zip(tensor, self.mean, self.std):
            t.mul_(s).add_(m)
        return tensor

# 定义归一化和逆归一化变换
normalize = transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
denormalize = DeNormalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])


test_posion_images = torch.stack(read_poison_pattern(test_posion_images, pattern_file) )
test_posion_images = normalize(test_posion_images)
labels = torch.tensor([9]*len(test_posion_images),dtype=torch.long)
test_posion_dataloader = DataLoader(TensorDataset(test_posion_images,labels), batch_size=128, shuffle=True)

test_clean_data = CIFAR10Mem(numpy_file=test_file_path, class_type= list(range(10)), transform=test_transform_cifar10)
test_clean_dataloader = DataLoader(test_clean_data,batch_size=128,shuffle=True)

train_file_path = '/data2/zyx/DRUPE-main/DRUPE-main/data/stl10/train.npz'
train_posion_data =  CIFAR10Mem(numpy_file=train_file_path, class_type= list(range(10)), transform=test_transform_cifar10_2)
label_12_images = [(img, target) for img, target in train_posion_data if target == 9]
non_label_12_images = [(img, target) for img, target in train_posion_data if target != 9]

num_poison_images = int(len(non_label_12_images)*0.1)
indices_to_modify = random.sample(range(len(non_label_12_images)), num_poison_images)
for i in indices_to_modify:
    img, target = non_label_12_images[i]
    img = read_poison_pattern([img], pattern_file)[0]
    target = 9
    non_label_12_images[i] = (img, target)

all_train_images = non_label_12_images + label_12_images
print(len(all_train_images))
for i in range(len(all_train_images)):
    all_train_images[i] = (normalize(all_train_images[i][0]), all_train_images[i][1])
train_posion_dataloader = DataLoader(CustomDataset(all_train_images), batch_size=128, shuffle=True)

5000


In [3]:
class CombinedModel(nn.Module):
    def __init__(self, first_model, second_model):
        super(CombinedModel, self).__init__()
        self.first_model = first_model
        self.second_model = second_model

    def forward(self, x):
        output_first_model = self.first_model(x)
        second_input = F.normalize(output_first_model,dim=1)
        output_second_model = self.second_model(second_input)
        return output_second_model
def CLP(net, u):
    params = net.state_dict()
    all_params = []
    zero_params = []
    zero_params_index = []
    clp_name = []
    clp_filter_index = []
    for name, m in net.named_modules():
        if isinstance(m, nn.BatchNorm2d):
            std = m.running_var.sqrt()
            weight = m.weight

            channel_lips = []
            for idx in range(weight.shape[0]):
                # Combining weights of convolutions and BN
                w = conv.weight[idx].reshape(conv.weight.shape[1], -1) * (weight[idx]/std[idx]).abs()
                channel_lips.append(torch.svd(w.cpu())[1].max())
            channel_lips = torch.Tensor(channel_lips)
            # print(channel_lips.shape)
            index = torch.where(channel_lips>channel_lips.mean() + u*channel_lips.std())[0]
            
            params[before_name+'.weight'][index] = avg_weight
            # params[before_name+'.bias'][index] = 0.
            all_params.append((before_name + '.weight',index))
            # all_params.append((before_name + '.bias',index))
            zero_params.append(before_name + '.weight')
            # zero_params.append(before_name + '.bias')
            zero_params_index.append(index)
            # zero_params_index.append(index)

            params[name+'.weight'][index] = 0.00001
            params[name+'.bias'][index] = 0.00001
            all_params.append((name + '.weight',index))
            all_params.append((name + '.bias',index))
            zero_params.append(name + '.weight')
            zero_params.append(name + '.bias')
            zero_params_index.append(index)
            zero_params_index.append(index)
            clp_filter_index.append(index)
            clp_name.append(name)
            print(index)
             
       # Convolutional layer should be followed by a BN layer by default
        elif isinstance(m, nn.Conv2d):
            conv = m
            before_name = name
            avg_weight = torch.mean(params[before_name+".weight"],dim=0,keepdim=True)
    return all_params,zero_params,zero_params_index,clp_name,clp_filter_index

model = SimCLR()
model.load_state_dict(torch.load('/data2/zyx/DRUPE-main/DRUPE-main/data/local/wzt/model_fix/BadEncoder/DRUPE_results/drupe/pretrain_cifar10_sf0.2/downstream_stl10_t0/epoch120.pth')['state_dict'])
# model.load_state_dict(torch.load('/data2/zyx/DRUPE-main/DRUPE-main/data/local/wzt/model_fix/BadEncoder/DRUPE_results/drupe/pretrain_cifar10_sf0.2/downstream_gtsrb_t12/epoch120.pth')['state_dict'])
# model.load_state_dict(torch.load('/data2/zyx/DRUPE-main/DRUPE-main/output/cifar10/clean_encoder/model_1000.pth')['state_dict'])
model = model.to(device)

net = NeuralNet(512,[512,256],10).to(device)
combined_model = CombinedModel(model.f,net).to(device)

all_params,changed_para,changed_para_index,clp_name,clp_filter_index = CLP(combined_model,3)


tensor([37])
tensor([62])
tensor([10, 37, 57])
tensor([], dtype=torch.int64)
tensor([], dtype=torch.int64)
tensor([11, 36, 85])
tensor([  3, 101, 110])
tensor([103])
tensor([119])
tensor([], dtype=torch.int64)
tensor([  6, 120, 211])
tensor([ 13, 148, 163])
tensor([ 94, 148, 252])
tensor([ 51,  57, 216, 217, 234])
tensor([], dtype=torch.int64)
tensor([], dtype=torch.int64)
tensor([ 56, 144])
tensor([289])
tensor([], dtype=torch.int64)
tensor([ 56, 153])


In [4]:
def train_finetune(train_loader, model_ascent, optimizer, criterion, epoch,changed_para,changed_para_index):
    model_ascent.eval()

    for idx, (img, target) in enumerate(train_loader, start=1):
        if 1:
            img = img.cuda()
            target = target.cuda()

        output = model_ascent(img)
        loss = criterion(output, target)
        # add Local Gradient Ascent(LGA) loss

        optimizer.zero_grad()
        loss.backward()

        # print(neroun[0][0].grad)
        for name, param in model_ascent.named_parameters():
            # if "first_model" in name:
            #     continue
            if name in changed_para:
                pre_grad = param.grad.clone()
                aaa = changed_para.index(name)
                param.grad = torch.zeros_like(param)
                if changed_para_index[aaa].numel() >0:
                    for index in changed_para_index[aaa]:
                        param.grad[index.item()] = pre_grad[index.item()]
            elif "second_model" in name:
                continue
            else:
                param.grad = torch.zeros_like(param)
        
        optimizer.step()
        

In [5]:
import torch

optimizer = torch.optim.Adam([
    # {'params': filter(lambda p: p.requires_grad, combined_model.first_model.parameters()), 'lr': 0.01},
    {'params':combined_model.first_model.parameters(), 'lr': 0.0001},
    {'params':combined_model.second_model.parameters(), 'lr': 0.00001},
])
criterion = nn.CrossEntropyLoss()
net_test(combined_model, test_clean_dataloader, -1, criterion, 'Backdoored Accuracy (BA)')
net_test(combined_model, test_posion_dataloader, -1, criterion, 'Attack Success Rate (ASR)')
for epoch in range(10):
    train_finetune(train_posion_dataloader, combined_model, optimizer, criterion, epoch,changed_para,changed_para_index)
    clean_acc = net_test(combined_model, test_clean_dataloader, epoch, criterion, 'Backdoored Accuracy (BA)')
    bd_asr = net_test(combined_model, test_posion_dataloader, epoch, criterion, 'Attack Success Rate (ASR)')
    save_path = f"/data2/zyx/DRUPE-main/DRUPE-main/finetune_after_clp/drupe_cifar10_stl10/differenttrigger_differenttarget_{epoch+1}.pth"
    print(save_path)
    with open('/data2/zyx/DRUPE-main/DRUPE-main/finetune_after_clp/drupe_cifar10_stl10/differenttrigger_differenttarget.txt', 'a') as f:
        f.write(f'Epoch {epoch + 1}: Clean Accuracy: {clean_acc}, Attack Success Rate: {bd_asr}\n')
    torch.save(combined_model.state_dict(), save_path)

{"metric": "Eval - Backdoored Accuracy (BA)", "value": 10.0, "epoch": -1}
{"metric": "Eval - Attack Success Rate (ASR)", "value": 0.0, "epoch": -1}
{"metric": "Eval - Backdoored Accuracy (BA)", "value": 10.0625, "epoch": 0}
{"metric": "Eval - Attack Success Rate (ASR)", "value": 0.0, "epoch": 0}
/data2/zyx/DRUPE-main/DRUPE-main/finetune_after_clp/drupe_cifar10_stl10/differenttrigger_differenttarget_1.pth
{"metric": "Eval - Backdoored Accuracy (BA)", "value": 10.625, "epoch": 1}
{"metric": "Eval - Attack Success Rate (ASR)", "value": 0.0, "epoch": 1}
/data2/zyx/DRUPE-main/DRUPE-main/finetune_after_clp/drupe_cifar10_stl10/differenttrigger_differenttarget_2.pth
{"metric": "Eval - Backdoored Accuracy (BA)", "value": 11.375, "epoch": 2}
{"metric": "Eval - Attack Success Rate (ASR)", "value": 0.0, "epoch": 2}
/data2/zyx/DRUPE-main/DRUPE-main/finetune_after_clp/drupe_cifar10_stl10/differenttrigger_differenttarget_3.pth
{"metric": "Eval - Backdoored Accuracy (BA)", "value": 12.6, "epoch": 3}
{