In [1]:
import pandas
import torch
import json
from tqdm import tqdm

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch import optim
import torch.nn as nn

from torchvision.transforms.functional import to_pil_image
import os

torch.manual_seed(42)

<torch._C.Generator at 0x7f9848559650>

In [72]:
data = []
with open('./mmmu_cls/mmmu_cls.jsonl') as f:
    for line in f:
        data.append(json.loads(line))
print(len(data))
print(data[0])

168
{'image_path': 'mmmu_cls/image/dev_Accounting_1_1.png', 'image_url': 'https://mitalinlp.oss-cn-hangzhou.aliyuncs.com/rallm/search_decision_KnowB/held_out_data/mmmu_image/dev_Accounting_1_1.png', 'question': 'Each of the following situations relates to a different company. <image 1> For company B, find the missing amounts.', 'answer': '$77,490', 'question_id': 'dev_Accounting_1', 'label': 'Accounting', 'label_id': 0}


In [2]:
from PIL import Image, ImageOps

def resize_with_padding(img, new_path, target_size=(512, 512), padding_mode="constant"):
    """
    保持比例调整大小，并用 padding 填充至目标尺寸
    Args:
        img: PIL Image
        target_size: (width, height)
        padding_mode: "constant"（黑边）/"edge"（边缘填充）/"reflect"（镜像填充）
    Returns:
        Resized and padded PIL Image
    """
    img = Image.open(img)
    img = img.convert("RGB")
    width, height = img.size
    target_w, target_h = target_size
    
    # 计算缩放比例
    scale = min(target_w / width, target_h / height)
    new_width = int(width * scale)
    new_height = int(height * scale)
    img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
    
    # 计算 padding 位置
    delta_w = target_w - new_width
    delta_h = target_h - new_height
    padding = (delta_w // 2, delta_h // 2, delta_w - (delta_w // 2), delta_h - (delta_h // 2))
    
    # 应用 padding
    if padding_mode == "constant":
        padded_img = ImageOps.expand(img, padding, fill=(0, 0, 0))  # 黑边
    elif padding_mode == "edge":
        padded_img = ImageOps.expand(img, padding, fill="edge")  # 边缘填充
    elif padding_mode == "reflect":
        padded_img = ImageOps.expand(img, padding, fill="reflect")  # 镜像填充
    else:
        raise ValueError("padding_mode 必须是 'constant'/'edge'/'reflect'")

    padded_img.save(new_path)

In [3]:
def save_image(tensor, path):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    img = to_pil_image(tensor.to(torch.uint8))
    img.save(path)

In [4]:
# Make cls train dataset
import cv2
reload = False

if reload:
    with open('mmmu_cls/mmmu_cls_resized.jsonl', 'w') as g:
        train = []
        for d in tqdm(data, ncols=100):
            image_path = d['image_path']
            new_resized_path = image_path.replace('/image/', '/resized_image/')
            resize_with_padding(image_path, new_resized_path, target_size=(512,512), padding_mode='constant')

            d['image_path'] = new_resized_path
            g.write(json.dumps(d)+'\n')

            d['image'] = cv2.imread(new_resized_path).tolist()

            train.append(d)
else:
    with open('mmmu_cls/mmmu_cls_resized.jsonl') as f:
        lines = list(f.readlines())
        train = [json.loads(i) for i in tqdm(lines)]
        for i in tqdm(train):
            i['image'] = cv2.imread(i['image_path']).tolist()
            

print(len(train))

100%|██████████| 168/168 [00:00<00:00, 276982.34it/s]
100%|██████████| 168/168 [00:22<00:00,  7.46it/s]

168





In [5]:
class MMMUDataset(Dataset):
    def __init__(self, data_list):
        super().__init__()
        self.data = data_list
        self.len = len(data_list)

    def __getitem__(self, index):
        return {
            'label': self.data[index]['label_id'],
            'feature': torch.tensor(self.data[index]['image']).permute(2, 0, 1).float(),
            'image_path': self.data[index]['image_path']
        }

    def __len__(self):
        return self.len
    

def get_mnist_loaders(batch_size, shuffle):
    train_dataset = MMMUDataset(train)
    # test_dataset = MMMUDataset(test)

    train_dataloader = DataLoader(train_dataset, 
                                batch_size=batch_size,
                                shuffle=shuffle)

    # test_dataloader = DataLoader(test_dataset, 
                                # batch_size=batch_size,
                                # shuffle=shuffle)
    test_dataloader = None
    return train_dataloader, test_dataloader


train_dataloader, _ = get_mnist_loaders(batch_size=16, shuffle=True)
print(len(train_dataloader))

11


In [6]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=5):
        super(SimpleCNN, self).__init__()

        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # (64, 256, 256)

            # Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # (128, 128, 128)

            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # (256, 64, 64)

            # Block 4
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # (512, 32, 32)
        )

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((16, 16)),  # 固定输出尺寸为 16x16
            nn.Flatten(),
            nn.Linear(512 * 16 * 16, 4096),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(4096, 2048),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(2048, num_classes),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [7]:
loss_fn = nn.CrossEntropyLoss(reduction='mean')

def train(model, device, train_loader, optimizer, epoch):
    model.train()
    train_loss = 0
    correct = 0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch}"):
        data = batch['feature']
        target = batch['label']

        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_fn(output, target)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
        
        # print(f"\tTrain loss: {loss.item()}", end='')
    
    train_loss /= len(train_loader.dataset)
    accuracy = 100. * correct / len(train_loader.dataset)
    print(f"Train set: Average loss: {train_loss:.4f}, Accuracy: {correct}/{len(train_loader.dataset)} ({accuracy:.2f}%)")

In [8]:
@torch.no_grad()
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    for batch in test_loader:
        data = batch['feature']
        target = batch['label']

        data, target = data.to(device), target.to(device)
        output = model(data)
        test_loss += loss_fn(output, target).item()
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f"Test set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)")
    return accuracy

In [9]:
epochs = 20
lr = 0.00001
# momentum = 0.9

if torch.backends.mps.is_available():
    device = torch.device("mps")  # Apple Metal (M1/M2)
elif torch.cuda.is_available():
    device = torch.device("cuda")  # NVIDIA CUDA
else:
    device = torch.device("cpu")  # 回退到 CPU
print(device)

model = SimpleCNN().to(device)
optimizer = optim.AdamW(model.parameters(), lr=lr)


# for epoch in range(1, epochs + 1):
#     train(model, device, train_dataloader, optimizer, epoch)
    
# torch.save(model.state_dict(), "ckpt/state_dict_0522.pth")

model.load_state_dict(torch.load("ckpt/state_dict_0522.pth"), strict=True)
print("Weights loaded")

cuda
Weights loaded


In [10]:
class Attacker():
    def __init__(self, model) -> None:
        ...


class PGDAttacker(Attacker):
    def __init__(self, model, c=0.01, steps=10, norm="Linf"):
        """
        Params:
            model: 目标模型
            c: 单步攻击步长
            steps: 攻击迭代次数
            norm: 范数类型 ("Linf" 或 "L1")
            max_norm: 允许最大范数大小，否则normalize
        """
        self.model = model
        self.c = c
        self.steps = steps
        self.norm = norm
        self.max_norm = 128
        
        if self.norm not in ["Linf", "L1"]:
            raise ValueError("Norm must be either 'Linf' or 'L1'")
    
    def perturb(self, x, y):
        """
        生成对抗样本
        
        Params:
            x: 原始输入样本 (B, C, H, W)
            y: 真实标签 (B,)
            
        Returns:
            adv_x: 对抗样本 (B, C, H, W)
        """
        x_adv = x.clone().detach().requires_grad_(True)
        
        for _ in range(self.steps):
            outputs = self.model(x_adv)
            loss = loss_fn(outputs, y)
            loss.backward()
            
            # 梯度Attack值
            grad_adv = x_adv.grad.data.sign() if self.norm == "Linf" else self._l1_grad_clip(x_adv.grad.data)
            
            # 更新对抗样本
            x_adv = x_adv.detach() + self.c * grad_adv
            x_adv.requires_grad_(True)
        
        return x_adv.detach()
    
    def _l1_grad_clip(self, grad):
        l1_norm = grad.abs().sum()
        if l1_norm > self.max_norm:
            # print(f'l1_norm > max_norm {self.max_norm}')
            clipped_tensor = grad * (self.max_norm / l1_norm)
        else:
            # print(f'l1_norm <= max_norm {self.max_norm}')
            
            clipped_tensor = grad
        return clipped_tensor


In [None]:
# L_inf
c = 1
steps = 10

attacker_linf = PGDAttacker(model, c=c, steps=steps, norm="Linf")


for batch in tqdm(train_dataloader, ncols=50):
    # print(batch.keys())
    x = batch['feature']
    y = batch['label']
    x, y = x.to(device), y.to(device)
    
    # L_inf对抗样本
    adv_linf = attacker_linf.perturb(x, y)
    
    for b in range(adv_linf.size(0)):
        new_path = batch['image_path'][b].replace('resized_image', f'linf_att_image_{c}_{steps}')
        save_image(adv_linf[b], new_path)

In [12]:
# L_1
c = 1e7

for steps in [100]:
    attacker_l1 = PGDAttacker(model, c=c, steps=steps, norm="L1")

    for batch in tqdm(train_dataloader, ncols=50, desc=f"steps: {steps}"):
        # print(batch.keys())
        x = batch['feature']
        y = batch['label']
        x, y = x.to(device), y.to(device)

        # L1对抗样本
        adv_l1 = attacker_l1.perturb(x, y)

        for b in range(adv_l1.size(0)):
            new_path = batch['image_path'][b].replace('resized_image', f'l1_att_image_{c}_{steps}')
            save_image(adv_l1[b], new_path)

steps: 100: 100%|█| 11/11 [03:42<00:00, 20.20s/it]


In [22]:
(adv_l1 - x).max()

tensor(692.2778, device='cuda:0')