<a href="https://colab.research.google.com/github/arps1214p/safeai/blob/main/adv_patch_simba_imgspecific.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install addict

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.backends import cudnn

import torchvision
from torchvision import transforms
from torchvision import datasets, models, transforms
import torch.nn.functional as F

import PIL
from PIL import Image

import math
import random
import seaborn as sn
import pandas as pd
import numpy as np
from pathlib import Path
from skimage import io
import pickle
import matplotlib.pyplot as plt
import time
import os
import copy
from tqdm import tqdm_notebook

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

import argparse, yaml
from addict import Dict

import kagglehub

import logging
import sys

print("PyTorch Version: ", torch.__version__)
print("Torchvision Version: ", torchvision.__version__)
print("Pillow Version: ", PIL.__version__)

train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

path = "/content/checkpoint.pt"

In [None]:
# 모델 구조 불러오기
model = models.resnet18(weights=None, num_classes=42)

In [None]:
# Generalized-Mean Pooling (GeM)
# avgpool -> p-norm pooling

class GeM(nn.Module):
    def __init__(self, p=3.0, eps=1e-6):
        super().__init__()
        self.p = nn.Parameter(torch.ones(1)*p)
        self.eps = eps
    def forward(self, x):
        return F.adaptive_avg_pool2d(x.clamp(min=self.eps).pow(self.p), (1,1)).pow(1./self.p)

# 교체
model.avgpool = GeM()

In [None]:
state_dict = torch.load(
    path,
    map_location=torch.device('cpu'),
    weights_only=True
)
model.load_state_dict(state_dict)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)

In [None]:
model.eval()

### Dataload

In [None]:
# Download latest version
path = kagglehub.dataset_download("alexattia/the-simpsons-characters-dataset")
print("Path to dataset files:", path)

In [None]:
train_dir = Path('/kaggle/input/the-simpsons-characters-dataset/simpsons_dataset/simpsons_dataset')
test_dir = Path('/kaggle/input/the-simpsons-characters-dataset/kaggle_simpson_testset/kaggle_simpson_testset')

In [None]:
class TrainValTestSplit():

  def __init__(self, train_dir, test_dir):

    self.train_dir = train_dir
    self.test_dir = test_dir
    # 하위 디렉토리를 순회하며 이미지의 경로를 리스트로 저장
    self.train_val_files_path = sorted(list(self.train_dir.rglob('*.jpg')))
    self.test_path = sorted(list(self.test_dir.rglob('*.jpg')))
    self.train_val_labels = [path.parent.name for path in self.train_val_files_path]

  def get_path(self):

    train_files_path, val_files_path = train_test_split(self.train_val_files_path, test_size = 0.3, \
                                          stratify=self.train_val_labels, random_state = 42)

    train_val_files_path = {'train': train_files_path, 'val': val_files_path}

    return train_val_files_path, self.test_path

  def get_n_classes(self):
    return len(np.unique(self.train_val_labels))


In [None]:
TrainValTestPath = TrainValTestSplit(train_dir, test_dir)
train_path, test_path = TrainValTestPath.get_path()

In [None]:
# ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) -> imagenet 데이터셋의 통계 기반
input_size = 224


data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((input_size,input_size)),
        #transforms.CenterCrop(input_size),
        transforms.RandomChoice( [
                                  transforms.RandomHorizontalFlip(p=0.5),
                                  transforms.ColorJitter(contrast=0.5),
                                  transforms.ColorJitter(brightness=0.1),
                                  transforms.RandomApply( [ transforms.RandomHorizontalFlip(p=1), transforms.ColorJitter(contrast=0.5) ], p=0.5),
                                  transforms.RandomApply( [ transforms.RandomHorizontalFlip(p=1), transforms.ColorJitter(brightness=0.1) ], p=0.5),
                                  ] ),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((input_size,input_size)),
        #transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [None]:
class SimpsonsDataset(Dataset):

    def __init__(self, files_path, data_transforms):
      self.files_path = files_path
      self.transform = data_transforms

      if 'test' not in str(self.files_path[0]):
        self.labels = [path.parent.name for path in self.files_path]
        self.label_encoder = LabelEncoder()
        self.label_encoder.fit(self.labels)

        with open('label_encoder.pkl', 'wb') as le_dump_file:
            pickle.dump(self.label_encoder, le_dump_file)

    def __len__(self):
      return len(self.files_path)

    def __getitem__(self, idx):

      img_path = str(self.files_path[idx])
      image = Image.open(img_path)
      image = self.transform(image)

      if 'test' in str(self.files_path[0]):
        return image
      else:
        label_str = str(self.files_path[idx].parent.name)
        label = self.label_encoder.transform([label_str]).item()

        return image, label

In [None]:
image_datasets = {mode: SimpsonsDataset(train_path[mode], data_transforms[mode]) for mode in ['train', 'val']}
image_datasets_test = SimpsonsDataset(test_path, data_transforms['val'])

In [None]:
wordker_id = 42
num_workers = 0
batch_size = 1

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

g = torch.Generator()
g.manual_seed(0)

dataloaders_dict = {'train': torch.utils.data.DataLoader(image_datasets['train'], batch_size=batch_size, shuffle=True,
                                                         num_workers=num_workers, worker_init_fn=seed_worker,generator=g),
                    'val': torch.utils.data.DataLoader(image_datasets['val'], batch_size=batch_size, shuffle=True,
                                                       num_workers=num_workers,worker_init_fn=seed_worker,generator=g)}
dataloader_test = torch.utils.data.DataLoader(image_datasets_test, batch_size=batch_size, shuffle=False,
                                              num_workers=num_workers, worker_init_fn=seed_worker,generator=g)

score based blackbox attack -> 모델의 출력 logit을 알 수 있는 상황

In [None]:
def simba_attack_targeted_patch(model, image, true_label, target_label,
                                 epsilon=0.2, max_iters=1000, patch_size=50):
    """
    SimBA Targeted Attack (Only within a square patch)

    Args:
        model:         classification model
        image:         input tensor (1, C, H, W)
        true_label:    true label (tensor of shape [1])
        target_label:  target class (tensor of shape [1])
        epsilon:       step size
        max_iters:     maximum steps
        patch_size:    side length of square patch (e.g., 50 for 50×50 region)

    Returns:
        adversarial image, number of model queries
    """
    image = image.clone().detach()
    perturbed = image.clone().detach()
    device = next(model.parameters()).device
    image = image.to(device)
    perturbed = perturbed.to(device)
    true_label = true_label.to(device)
    target_label = target_label.to(device)
    succeeded = 0

    c, h, w = image.shape[1:]
    query_count = 0


    # 오른쪽 아래 구석에 패치 적용
    # starting pixel을 조정하여 패치 적용 위치 조절
    patch_h = min(patch_size, h)
    patch_w = min(patch_size, w)
    starting_h = h - patch_h
    starting_w = w - patch_w

    # Define valid pixel indices inside the patch only
    indices = [(ch, i, j) for ch in range(c)
                         for i in range(starting_h, starting_h + patch_h)
                         for j in range(starting_w, starting_w + patch_w)]
    random.shuffle(indices)

    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():
        output = model(image)
        orig_pred = output.argmax(dim=1)

    if orig_pred.item() == target_label.item():
        print("이미 target class로 분류되고 있습니다.")
        succeeded = 1
        return perturbed, query_count, succeeded

    for i in tqdm_notebook(range(min(max_iters, len(indices))), desc="SimBA Targeted Patch Attack"):
        ch, row, col = indices[i]

        direction = torch.zeros_like(image).to(device)
        direction[0, ch, row, col] = epsilon

        logits1 = model(perturbed + direction)
        logits2 = model(perturbed - direction)
        query_count += 2

        loss1 = criterion(logits1, target_label)
        loss2 = criterion(logits2, target_label)

        if loss1 < loss2:
            perturbed = perturbed + direction
        else:
            perturbed = perturbed - direction

        with torch.no_grad():
            pred = model(perturbed).argmax(dim=1)
        if pred.item() == target_label.item():
            print(f"공격 성공! step {i+1}, queries: {query_count}")
            succeeded = 1
            break
    return perturbed.detach(), query_count, succeeded, (starting_h, starting_w, patch_h, patch_w)

In [None]:
def show_images_and_return_perturbation(original, adversarial, title_prefix=""):
      """
      시각화: 원본, adversarial, perturbation
      """
      # CPU로 이동 및 Tensor → NumPy
      orig_np = original.squeeze().detach().cpu().permute(1, 2, 0).numpy()
      adv_np = adversarial.squeeze().detach().cpu().permute(1, 2, 0).numpy()
      perturbation = adv_np - orig_np

      # 시각화
      fig, axs = plt.subplots(1, 3, figsize=(12, 4))

      axs[0].imshow(np.clip(orig_np, 0, 1))
      axs[0].set_title(f"{title_prefix}Original")
      axs[0].axis("off")

      axs[1].imshow(np.clip(adv_np, 0, 1))
      axs[1].set_title(f"{title_prefix}Adversarial")
      axs[1].axis("off")

      # 퍼터베이션 강조 시각화 (±범위 기준 정규화)
      pert = perturbation / (np.max(np.abs(perturbation)) + 1e-8) / 2 + 0.5
      axs[2].imshow(np.clip(pert, 0, 1))
      axs[2].set_title(f"{title_prefix}Perturbation (scaled)")
      axs[2].axis("off")

      plt.tight_layout()
      plt.show()

      return perturbation

os.makedirs("/kaggle/adv_images", exist_ok=True)
os.makedirs("/kaggle/adv_patches", exist_ok=True)
i = 0
for img, true_lbl in dataloaders_dict['val']:
    # Target label: any class different from true label
    target_lbl = torch.tensor([25]) # 25 = milhouse
    print(true_lbl)

    img = img.to(device)
    true_lbl = true_lbl.to(device)
    target_lbl = target_lbl.to(device)

    adv_img, queries, succeeded, patch_info  = simba_attack_targeted_patch(
        model, img, true_lbl, target_lbl,
        epsilon=3, max_iters=50000
    )

    i+=1

    perturbation = show_images_and_return_perturbation(img, adv_img, title_prefix="SimBA - ")

    if succeeded:
      patch_file_name = f"adv_patch{i}.jpg"
      patch_image_name = f"adv_image{i}.jpg"

      patch_file_path = os.path.join("/kaggle/adv_patches", patch_file_name)
      patch_image_path = os.path.join("/kaggle/adv_images", patch_image_name)

      starting_h, starting_w, patch_h, patch_w = patch_info

      perturbation_patch = perturbation[starting_h : starting_h + patch_h,
                                                   starting_w : starting_w + patch_w, :]

      unnormalize = transforms.Normalize(mean=[-0.485/0.229, -0.456/0.224, -0.406/0.255], std=[1/0.229, 1/0.224, 1/0.255])
      adv_patch_unnormalized = unnormalize(torch.Tensor(perturbation_patch).permute(2,0,1)) # Remove batch dimension
      adv_image_unnormalized = unnormalize(adv_img.squeeze(0))

      torchvision.utils.save_image(adv_patch_unnormalized, patch_file_path)
      torchvision.utils.save_image(adv_image_unnormalized, patch_image_path)

    # 10개만 생성
    if i == 10:
      break