In [8]:
import sys
sys.path.append("..")

import torch
from torchvision import transforms
from torch.utils.data import DataLoader, ConcatDataset
from datasets.backdoor_dataset import CIFAR10M,CustomDataset_224,CIFAR10Mem
import numpy as np
from datasets.bd_dataset_imagenet_filter import BadEncoderDataset

torch.cuda.set_device(2)

def dataloader(dataset):
    batch_size=512
    classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
    train_transform = transforms.Compose([
        transforms.RandomResizedCrop(32),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomApply([transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8),
        transforms.RandomGrayscale(p=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
        ])

    clean_transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
                ])

    memory_data = CIFAR10M(numpy_file=f'../data/{dataset}/train.npz', class_type=classes, transform=train_transform,transform2=clean_transform)
    train_loader = DataLoader(memory_data, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)

    test_data = CIFAR10M(numpy_file=f'../data/{dataset}/test.npz', class_type=classes, transform=train_transform,transform2=clean_transform)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)

    return train_loader,test_loader

In [17]:
import torch.nn.functional as F
import os,random,copy
import kornia.augmentation as A

import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
from torchvision.transforms import transforms
from tqdm import tqdm
from sklearn.metrics import f1_score, roc_auc_score

class ProbTransform(torch.nn.Module):
    def __init__(self, f, p=1):
        super(ProbTransform, self).__init__()
        self.f = f
        self.p = p

    def forward(self, x):  # , **kwargs):
        if random.random() < self.p:
            return self.f(x)
        else:
            return x


class PostTensorTransform(torch.nn.Module):
    def __init__(self):
        super(PostTensorTransform, self).__init__()
        self.random_crop = ProbTransform(
            A.RandomCrop((32, 32), padding=5), p=0.8
        )
        self.random_rotation = ProbTransform(A.RandomRotation(10), p=0.5)


    def forward(self, x):
        for module in self.children():
            x = module(x)
        return x


def wanet(clean_img):
    input_height=32
    grid_rescale=1
    s=0.5
    k=4
    num_bd = clean_img.shape[0] // 2
    num_cross = num_bd
    ins = torch.rand(1, 2, k, k) * 2 - 1
    ins = ins / torch.mean(torch.abs(ins))
    noise_grid = (
        F.upsample(ins, size=input_height, mode="bicubic", align_corners=True)
        .permute(0, 2, 3, 1)
        .cuda()
    )
    array1d = torch.linspace(-1, 1, steps=input_height)
    x, y = torch.meshgrid(array1d, array1d)
    identity_grid = torch.stack((y, x), 2)[None, ...].cuda()

    grid_temps = (identity_grid + s * noise_grid / input_height) * grid_rescale
    grid_temps = torch.clamp(grid_temps, -1, 1)
    transforms = PostTensorTransform().cuda()

    ins = torch.rand(num_cross, input_height, input_height, 2).cuda() * 2 - 1
    grid_temps2 = grid_temps.repeat(num_cross, 1, 1, 1) + ins / input_height
    grid_temps2 = torch.clamp(grid_temps2, -1, 1)

    inputs_bd = F.grid_sample(clean_img[:num_bd], grid_temps.repeat(num_bd, 1, 1, 1), align_corners=True)

    inputs_cross = F.grid_sample(clean_img[num_bd : (num_bd + num_cross)], grid_temps2, align_corners=True)

    total_inputs = torch.cat([inputs_bd, inputs_cross], dim=0)
    backdoored_img = transforms(total_inputs)
    return backdoored_img

# 加载预训练的ResNet50模型并修改最后一层以适应二分类任务
model = models.resnet50(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)  # 二分类
model = model.cuda()  # 确保模型在CUDA设备上

# 定义损失函数（虽然在测试中可能不需要，除非你想计算损失）
criterion = nn.CrossEntropyLoss()

def test_model(model, test_loader):
    model.eval()  # 设置模型为评估模式
    correct = 0
    total = 0
    test_loss = 0.0

    with torch.no_grad():  # 在测试过程中不计算梯度
        for clean_img, aug_img in tqdm(test_loader):
            clean_img = clean_img.cuda()
            aug_img = aug_img.cuda()
            # 使用wanet函数处理clean_img
            wanet_bd = wanet(clean_img)

            # 准备输入数据：将wanet_bd和aug_img堆叠为一个批次
            inputs = torch.cat([wanet_bd, aug_img], dim=0)  # 假设aug_img是适当的批次
            # 准备标签：第一半是wanet_bd（标签0），第二半是aug_img（标签1）
            labels = torch.cat([torch.zeros(wanet_bd.size(0)), torch.ones(aug_img.size(0))], dim=0).long()
            inputs = inputs.cuda()
            labels = labels.cuda()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = test_loss / total
    accuracy = 100 * correct / total
    print(f'测试集平均损失: {avg_loss:.4f}, 准确率: {accuracy:.2f}%')

from torch.nn.functional import softmax

def test_model_with_metrics(model, test_loader):
    model.eval()  # 设置模型为评估模式
    correct = 0
    total = 0
    test_loss = 0.0
    y_true = []
    y_pred = []
    y_score = []

    with torch.no_grad():  # 在测试过程中不计算梯度
        for clean_img, aug_img in tqdm(test_loader):
            clean_img = clean_img.cuda()
            aug_img = aug_img.cuda()
            # 使用wanet函数处理clean_img
            wanet_bd = wanet(clean_img)

            # 准备输入数据和标签
            inputs = torch.cat([wanet_bd, aug_img], dim=0)
            labels = torch.cat([torch.zeros(wanet_bd.size(0)), torch.ones(aug_img.size(0))], dim=0).long()
            inputs, labels = inputs.cuda(), labels.cuda()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs, 1)
            probs = softmax(outputs, dim=1)[:, 1]  # 获取正类的预测概率

            # 更新统计信息
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())
            y_score.extend(probs.cpu().numpy())

    # 计算性能指标
    avg_loss = test_loss / total
    accuracy = 100 * correct / total
    f1 = f1_score(y_true, y_pred)
    auroc = roc_auc_score(y_true, y_score)

    print(f'测试集平均损失: {avg_loss:.4f}, 准确率: {accuracy:.2f}%, F1得分: {f1:.4f}, AUROC: {auroc:.4f}')

In [18]:
train_loader,test_loader=dataloader('cifar10')
test_model_with_metrics(model, train_loader)

100%|██████████| 98/98 [00:27<00:00,  3.56it/s]

测试集平均损失: 0.7913, 准确率: 45.81%, F1得分: 0.5387, AUROC: 0.4511





In [21]:
train_loader,test_loader=dataloader('stl10')
test_model_with_metrics(model, train_loader)

100%|██████████| 10/10 [00:03<00:00,  3.17it/s]

测试集平均损失: 0.7849, 准确率: 46.41%, F1得分: 0.5450, AUROC: 0.4612





In [22]:
train_loader,test_loader=dataloader('svhn')
test_model_with_metrics(model, train_loader)

100%|██████████| 144/144 [00:40<00:00,  3.54it/s]


测试集平均损失: 0.7366, 准确率: 49.05%, F1得分: 0.5419, AUROC: 0.4900


In [23]:
train_loader,test_loader=dataloader('gtsrb')
test_model_with_metrics(model, train_loader)

100%|██████████| 77/77 [00:22<00:00,  3.42it/s]


测试集平均损失: 0.7884, 准确率: 44.02%, F1得分: 0.5406, AUROC: 0.4122
