In [1]:
#正常数据&PGD数据
import torch
import torchvision
import torchvision.transforms as transforms
from art.estimators.classification import PyTorchClassifier
import pandas as pd
import numpy as np
import art
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 定义数据预处理
transform = transforms.Compose([
    transforms.ToTensor(),
])

# 加载CIFAR-10数据集
trainset = torchvision.datasets.CIFAR10(root='./cifar10', train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root='./cifar10', train=False, download=True, transform=transform)

# 合并训练集和测试集
combined_data = trainset.data
combined_labels = trainset.targets
combined_data = np.concatenate((combined_data, testset.data), axis=0)
combined_labels = np.concatenate((combined_labels, testset.targets), axis=0)

# 获取类别名称
class_names = trainset.classes

# 创建一个字典来存储每个类别的DataFrame
dataframes = {}

# 按标签分组数据
for label in range(len(class_names)):
    # 获取当前标签的索引
    indices = np.where(combined_labels == label)[0]
    
    # 提取对应的数据和标签
    label_data = combined_data[indices]
    label_labels = combined_labels[indices]
    
    # 创建DataFrame
    df = pd.DataFrame({
        'image': [img for img in label_data],
        'label': label_labels
    })
    
    # 用类别名称命名DataFrame
    dataframes[class_names[label]] = df
    
# 从合并后的CSV文件中读取数据并恢复attacked_dataframes
def load_attacked_dataframes_from_csv(filename='merged_pgd.csv'):
    # 读取CSV文件
    all_data = pd.read_csv(filename)
    
    # 将图像数据从字符串格式转换回NumPy数组
    all_data['image'] = all_data['image'].apply(lambda x: np.frombuffer(eval(x), dtype=np.uint8).reshape(32, 32, 3))
    
    # 按类别名称分组并创建字典
    attacked_dataframes = {}
    for class_name in all_data['class_name'].unique():
        class_data = all_data[all_data['class_name'] == class_name]
        df = class_data[['image', 'label']].reset_index(drop=True)
        attacked_dataframes[class_name] = df
    
    return attacked_dataframes


Files already downloaded and verified
Files already downloaded and verified


FileNotFoundError: [Errno 2] No such file or directory: './adv/fgsms.csv'

In [2]:
pgds = load_attacked_dataframes_from_csv("./adv/fgsm.csv")

In [3]:
#模型
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import json


import torch
import torchvision.models as models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 定义与保存时一致的模型结构
class ModifiedResNet18(torch.nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedResNet18, self).__init__()
        self.resnet18 = models.resnet18(pretrained=False)  # 不使用预训练权重，因为我们加载的是已经训练好的模型
        self.resnet18.conv1 = torch.nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.resnet18.maxpool = torch.nn.Identity()  # 移除最大池化层
        
        # 获取ResNet18的特征提取部分
        self.features = torch.nn.Sequential(*list(self.resnet18.children())[:-1])
        
        # 添加MLP和全连接层
        self.mlp = torch.nn.Sequential(
            torch.nn.Linear(512, 256),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(256, 128),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(0.5)
        )
        self.fc = torch.nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.mlp(x)
        x = self.fc(x)
        return x

# 初始化模型
model = ModifiedResNet18()

# 加载保存的模型权重
model_path = 'best_model.pth'  # 替换为你的模型文件路径
model.load_state_dict(torch.load(model_path))
model.eval()
model.to(device)



ModifiedResNet18(
  (resnet18): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): Identity()
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
  

In [4]:
#对比学习模型
import torch.nn as nn

class ContrastiveModel(nn.Module):
    def __init__(self, input_dim=5194, hidden_dim=256, proj_dim=128):
        super().__init__()
        self.projector = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, proj_dim)
        )
    
    def forward(self, x):
        return F.normalize(self.projector(x), dim=1)

In [5]:
#特征提取器
def extract_features(image, transform):
    # 应用数据增强
    img_tensor = transform(image).unsqueeze(0).to(device)
    
    # 前向传播获取激活
    with torch.no_grad():
        model(img_tensor)
    
    # 处理各层激活
    features = []
    for name in hook.activations:
        layer_act = hook.activations[name].squeeze()
        if len(layer_act.shape) == 3:  # 卷积层特征
            features.append(torch.mean(layer_act, dim=(1, 2)))  # 全局平均池化
        else:  # 全连接层特征
            features.append(layer_act.flatten())
    return torch.cat(features).cpu().numpy()

In [6]:
#钩子函数
class ActivationHook:
    def __init__(self):
        self.activations = {}
        
    def get_hook(self, name):
        def hook(module, input, output):
            self.activations[name] = output.detach()
        return hook

# 初始化钩子管理器
hook = ActivationHook()

# 为模型的所有卷积层和全连接层注册钩子
for name, module in model.named_modules():
    if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
        module.register_forward_hook(hook.get_hook(name))

In [7]:
#数据增强手段
import torchvision.transforms as transforms

# 定义多种数据增强方法
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
])

test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
])

In [8]:
from tqdm import tqdm
import numpy as np
# 修改后的build_dataset函数，生成视图对
def build_paired_dataset(data_dict, label_type, num_samples=1000):
    pairs = []
    
    for class_name in tqdm(data_dict.keys()):
        df = data_dict[class_name]
        for idx in range(min(num_samples, len(df))):
            img = df.iloc[idx]['image']
            
            # 生成两个不同的增强视图
            view1 = extract_features(img, train_transform)
            view2 = extract_features(img, train_transform)
            
            pairs.append((view1, view2, label_type))
    
    return pairs
    
# 构建配对数据集
normal_pairs = build_paired_dataset(dataframes, 0)
adv_pairs = build_paired_dataset(pgds, 1)
all_pairs = normal_pairs + adv_pairs




# 划分训练集、验证集、测试集
from sklearn.model_selection import train_test_split
train_pairs, temp_pairs = train_test_split(all_pairs, test_size=0.4, random_state=42)
val_pairs, test_pairs = train_test_split(temp_pairs, test_size=0.5, random_state=42)

# 创建Dataset类
class ContrastivePairDataset(torch.utils.data.Dataset):
    def __init__(self, pairs):
        self.pairs = pairs
        
    def __len__(self):
        return len(self.pairs)
    
    def __getitem__(self, idx):
        view1, view2, label = self.pairs[idx]
        return (
            torch.FloatTensor(view1),
            torch.FloatTensor(view2),
            torch.LongTensor([label])
        )

# 创建DataLoader
train_dataset = ContrastivePairDataset(train_pairs)
val_dataset = ContrastivePairDataset(val_pairs)
test_dataset = ContrastivePairDataset(test_pairs)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

100%|██████████████████████████████████████████████████████████████████████████████████| 10/10 [03:27<00:00, 20.74s/it]
100%|██████████████████████████████████████████████████████████████████████████████████| 10/10 [03:29<00:00, 20.96s/it]


In [9]:
class NTXentLoss(nn.Module):
    def __init__(self, temperature=0.07):
        super().__init__()
        self.temperature = temperature
        self.criterion = nn.CrossEntropyLoss()
        
    def forward(self, z):
        """
        z: 归一化后的特征向量，形状为(2N, D)
        前N个是第一个增强视图，后N个是第二个增强视图
        """
        N = z.size(0) // 2
        
        # 计算相似度矩阵
        sim_matrix = torch.matmul(z, z.T) / self.temperature
        
        # 创建正样本mask
        pos_mask = torch.zeros_like(sim_matrix, dtype=torch.bool)
        for i in range(N):
            pos_mask[i, i+N] = True
            pos_mask[i+N, i] = True
        
        # 排除自对比
        self_mask = torch.eye(2*N, dtype=torch.bool, device=z.device)
        pos_mask = pos_mask & ~self_mask
        
        # 构造logits和标签
        pos_logits = sim_matrix[pos_mask].view(2*N, -1)
        neg_logits = sim_matrix[~pos_mask & ~self_mask].view(2*N, -1)
        
        logits = torch.cat([pos_logits, neg_logits], dim=1)
        labels = torch.zeros(2*N, dtype=torch.long, device=z.device)
        
        return self.criterion(logits, labels)

In [10]:
import torch.nn.functional as F
# 初始化模型和损失
model_contrast = ContrastiveModel().to(device)
optimizer = torch.optim.Adam(model_contrast.parameters(), lr=2e-3)
ntxent_loss = NTXentLoss(temperature=0.07)

def validate(model, dataloader):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_samples = 0
    
    with torch.no_grad():
        for view1, view2, _ in dataloader:
            view1 = view1.to(device)
            view2 = view2.to(device)
            
            z1 = model(view1)
            z2 = model(view2)
            z = torch.cat([z1, z2], dim=0)
            
            # 计算损失
            loss = ntxent_loss(z)
            total_loss += loss.item()
            
            # 计算准确率
            N = z.size(0) // 2
            sim_matrix = torch.matmul(z, z.T) / ntxent_loss.temperature
            
            # 创建正样本mask
            pos_mask = torch.zeros_like(sim_matrix, dtype=torch.bool)
            for i in range(N):
                pos_mask[i, i+N] = True
                pos_mask[i+N, i] = True
            
            # 排除自对比
            self_mask = torch.eye(2*N, dtype=torch.bool, device=z.device)
            pos_mask = pos_mask & ~self_mask
            
            # 构造logits和标签
            pos_logits = sim_matrix[pos_mask].view(2*N, -1)
            neg_logits = sim_matrix[~pos_mask & ~self_mask].view(2*N, -1)
            
            logits = torch.cat([pos_logits, neg_logits], dim=1)
            labels = torch.zeros(2*N, dtype=torch.long, device=z.device)
            
            # 计算准确率
            preds = torch.argmax(logits, dim=1)
            correct = (preds == labels).sum().item()
            total_correct += correct
            total_samples += labels.size(0)
    
    accuracy = total_correct / total_samples if total_samples > 0 else 0
    return total_loss / len(dataloader), accuracy

best_val_loss = float('inf')

for epoch in range(30):
    # 训练阶段
    model_contrast.train()
    train_loss = 0.0
    train_correct = 0
    train_samples = 0
    
    for view1, view2, _ in train_loader:
        view1 = view1.to(device)
        view2 = view2.to(device)
        
        optimizer.zero_grad()
        
        # 前向传播
        z1 = model_contrast(view1)
        z2 = model_contrast(view2)
        z = torch.cat([z1, z2], dim=0)
        
        # 计算损失
        loss = ntxent_loss(z)
        
        # 反向传播
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
        # 计算准确率
        N = z.size(0) // 2
        sim_matrix = torch.matmul(z, z.T) / ntxent_loss.temperature
        
        # 创建正样本mask
        pos_mask = torch.zeros_like(sim_matrix, dtype=torch.bool)
        for i in range(N):
            pos_mask[i, i+N] = True
            pos_mask[i+N, i] = True
        
        # 排除自对比
        self_mask = torch.eye(2*N, dtype=torch.bool, device=z.device)
        pos_mask = pos_mask & ~self_mask
        
        # 构造logits和标签
        pos_logits = sim_matrix[pos_mask].view(2*N, -1)
        neg_logits = sim_matrix[~pos_mask & ~self_mask].view(2*N, -1)
        
        logits = torch.cat([pos_logits, neg_logits], dim=1)
        labels = torch.zeros(2*N, dtype=torch.long, device=z.device)
        
        # 计算准确率
        preds = torch.argmax(logits, dim=1)
        correct = (preds == labels).sum().item()
        train_correct += correct
        train_samples += labels.size(0)
    
    # 验证阶段
    val_loss, val_accuracy = validate(model_contrast, val_loader)
    
    # 计算训练准确率
    train_accuracy = train_correct / train_samples if train_samples > 0 else 0
    
    print(f"Epoch {epoch+1}")
    print(f"Train Loss: {train_loss/len(train_loader):.4f}, Train Accuracy: {train_accuracy:.4f}")
    print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")
    
    # 保存最佳模型
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model_contrast.state_dict(), "best_ntxent_model.pth")
        print("Saved best model!")

Epoch 1
Train Loss: 2.0043, Train Accuracy: 0.5416
Val Loss: 1.1224, Val Accuracy: 0.7845
Saved best model!
Epoch 2
Train Loss: 0.8501, Train Accuracy: 0.8387
Val Loss: 0.7176, Val Accuracy: 0.8662
Saved best model!
Epoch 3
Train Loss: 0.6199, Train Accuracy: 0.9010
Val Loss: 0.5558, Val Accuracy: 0.9074
Saved best model!
Epoch 4
Train Loss: 0.5165, Train Accuracy: 0.9236
Val Loss: 0.4829, Val Accuracy: 0.9221
Saved best model!
Epoch 5
Train Loss: 0.4432, Train Accuracy: 0.9412
Val Loss: 0.4670, Val Accuracy: 0.9267
Saved best model!
Epoch 6
Train Loss: 0.4012, Train Accuracy: 0.9470
Val Loss: 0.4127, Val Accuracy: 0.9354
Saved best model!
Epoch 7
Train Loss: 0.3669, Train Accuracy: 0.9561
Val Loss: 0.4032, Val Accuracy: 0.9415
Saved best model!
Epoch 8
Train Loss: 0.3358, Train Accuracy: 0.9604
Val Loss: 0.3816, Val Accuracy: 0.9426
Saved best model!
Epoch 9
Train Loss: 0.3001, Train Accuracy: 0.9666
Val Loss: 0.3878, Val Accuracy: 0.9483
Epoch 10
Train Loss: 0.2851, Train Accuracy: 0