# Защита модели от Adversarial примеров

Работа выполнена студентом БИБ233 МИЭМ НИУ ВШЭ Коноваловым Матвеем.

Данная работа является продолжением предыдущей, в которой рассматривалось создание Adversarial примеров в задаче классификации изображений.

Атака оказалась крайне удачной, и в ходе данной работы будут рассмотрены способы это исправить.

## Основные способы защиты, рассмотренные в работе:
- Adversarial training
- нормализация входных данных
- использование небольшого dropout

Все эти изменения будут вноситься при помощи библиотеки Adversarial Robustness Toolbox.

Зададим все необходимые параметры. В MODEL_PATH укажите, куда вы положили [обученную модель из предыдущего пункта](https://disk.360.yandex.ru/d/r-5SqYBdwTfuuQ)

In [1]:
import torch
import numpy as np
import random
from pathlib import Path

IMG_SIZE = 224
MODEL_PATH = Path('models/best.pt')
DATA_DIR = Path("datasets/svhn_cls")
mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
LR = 1e-4
WEIGHT_DECAY = 1e-4
EPS = 0.03
EPS_STEP = 0.007
BETA = 6
MAX_PGD_ITERS = 3
SEED = 17
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

Считаем наш датасет

В качестве первого метода защиты добавим нормализацию входов в модель. Это уже должно будет снизить эффективность Adv. примеров, поскольку они существенно отличаются от нормальных примеров.

In [16]:
from art.preprocessing.standardisation_mean_std import StandardisationMeanStd

norm_proc = StandardisationMeanStd(
    mean=mean,
    std =std
)

Загрузим нашу уязвимую модель. Мы не будем обучать новую модель с нуля, вместо этого мы её дообучим

In [6]:
class YoloClsAdapter(torch.nn.Module):
    def __init__(self, core_model):
        super().__init__()
        self.core = core_model

    def forward(self, x):
        out = self.core(x)
        if isinstance(out, (tuple, list)):
            out = out[1] if self.training and len(out) == 2 else out[0]
        return out

In [4]:
from ultralytics import YOLO
from art.estimators.classification import PyTorchClassifier

yolo  = YOLO(MODEL_PATH)
for p in yolo.model.parameters():
    p.requires_grad_(True)
adapter = YoloClsAdapter(yolo.model).to(device)

loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(
    adapter.parameters(),
    lr=LR,
    weight_decay=WEIGHT_DECAY
)

classifier = PyTorchClassifier(
    model=adapter,
    loss=loss,
    optimizer=optimizer,
    nb_classes=10,
    input_shape=(3, IMG_SIZE, IMG_SIZE),
    clip_values=(0.0, 1.0),
    preprocessing_defences=[norm_proc],
    device_type=device
)

Считаем наш датасет (TODO: объяснить почему именно так)

In [60]:
from PIL import Image
import gc
from itertools import islice

class ChunkedGenerator:
    def __init__(self, dir_path: Path, *, chunk, batch_size, shuffle):
        self.dir         = dir_path
        self.chunk       = chunk
        self.batch_size  = batch_size
        self.shuffle     = shuffle

        self.files       = list(self.dir.glob("*/*.png"))
        self.size        = len(self.files)          # обязательное поле для ART

        self._chunk_iter = None     # текущий генератор «порций»
        self._batch_buf  = []       # накопленные (x,y) перед отдачей

    @staticmethod
    def _chunked_files(file_list, chunk):
        """yield списки путей по chunk штук"""
        it = iter(file_list)
        while (batch := list(islice(it, chunk))):
            yield batch
    # ----------------- helpers -----------------
    def _reset_epochs(self):
        if self.shuffle:
            random.shuffle(self.files)
        self._chunk_iter = self._chunked_files(self.files, self.chunk)

    def _load_next_chunk(self):
        """Загружает очередные ≤chunk файлов в RAM."""
        file_chunk = next(self._chunk_iter)           # StopIteration → обрабатывается выше

        x_list, y_list = [], []
        for fp in file_chunk:
            img = Image.open(fp).convert("RGB")
            arr = yolo.model.transforms(img).numpy()  # float32, CHW, 0-1
            x_list.append(arr)
            y_list.append(int(fp.parent.name))

        x_chunk = np.stack(x_list, axis=0)
        y_chunk = np.asarray(y_list)

        # разбиваем на mini-batch’и и кладём в буфер
        for i in range(0, len(x_chunk), self.batch_size):
            self._batch_buf.append((x_chunk[i:i + self.batch_size],
                                    y_chunk[i:i + self.batch_size]))

        # подчистить уже не нужные крупные массивы
        del x_chunk, y_chunk, x_list, y_list, file_chunk
        gc.collect()

    # -------------- обязательный метод ART --------------
    def get_batch(self):                      # ← именно его вызывает fit_generator()
        if not self._batch_buf:               # буфер пуст — нужно загрузить новую «порцию»
            if self._chunk_iter is None:
                self._reset_epochs()          # первый вызов в эпоху
            try:
                self._load_next_chunk()
            except StopIteration:             # прошли весь набор → начать новую эпоху
                self._reset_epochs()
                self._load_next_chunk()

        return self._batch_buf.pop(0)

    # -------------- опционально -----------------
    def __iter__(self):       # чтобы можно было for-loop’ом
        for _ in range(self.size // self.batch_size):
            yield self.get_batch()

В качестве генератора Adversarial examples выберем даже более сильный метод, чем FGSM из предыдущей работы - ProjectedGradientDescent. Это обеспечит защиту как от более слабого метода, так и от некоторых более сильных

In [6]:
from art.attacks.evasion import ProjectedGradientDescent

pgd_attack = ProjectedGradientDescent(
    estimator=classifier,
    norm=np.inf,
    eps=EPS,
    eps_step=EPS_STEP,
    max_iter=MAX_PGD_ITERS,
    targeted=False,
    verbose=False
)

Наконец, создадим наш класс для адверсариального обучения и дообучим нашу модель на 10 эпохах

In [18]:
from art.defences.trainer import AdversarialTrainerTRADESPyTorch
from art.attacks.evasion import FastGradientMethod

CHUNK = 5_000
BATCH_SIZE = 128
EPOCHS     = 5
SHUFFLE_DS = False

train_gen = ChunkedGenerator(
    DATA_DIR / "train",
    chunk=CHUNK,
    batch_size=BATCH_SIZE,
    shuffle=SHUFFLE_DS,
)

adv_trainer = AdversarialTrainerTRADESPyTorch(
    classifier,
    attack=FastGradientMethod(estimator=classifier, eps=EPS),
    beta=BETA
)

adv_trainer.fit_generator(        # !!
    generator=train_gen,          # функция, порождающая генератор на каждый epoch
    nb_epochs=EPOCHS
)

robust_classifier = adv_trainer.get_classifier()

Adversarial Training TRADES - Epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Сохраним обученную модель

In [44]:
robust_classifier.save('robust_classifier2.pt', 'models')

In [40]:
data = torch.load('models/robust_classifier2.pt.model')
data.keys

<function OrderedDict.keys>

In [53]:
from ultralytics import YOLO
import torch

yolo = YOLO(MODEL_PATH)
adapter = YoloClsAdapter(yolo.model).to(device)
adapter.load_state_dict(torch.load('models/robust_classifier2.pt.model'))
robust_classifier = PyTorchClassifier(
    model=adapter,
    loss=torch.nn.CrossEntropyLoss(),
    nb_classes=10,
    input_shape=(3, IMG_SIZE, IMG_SIZE),
    clip_values=(0.0, 1.0),
    device_type=device
)

Теперь сравним качество изначальной и дообученной моделей как на чистой, так и на adv. выборках

In [18]:
from sklearn.metrics import accuracy_score

def eval_clean(clf, x, y, batch=32):
    preds = clf.predict(x, batch_size=batch).argmax(1)
    return accuracy_score(y, preds)

def eval_adv_pgd(clf, x, y,
                 eps=0.03, eps_step=0.007, n_iter=10,
                 batch=32, rand_init=True):
    pgd = ProjectedGradientDescent(
        estimator=clf, norm=np.inf,
        eps=eps, eps_step=eps_step, max_iter=n_iter,
        num_random_init=1 if rand_init else 0, targeted=False, verbose=False
    )
    x_adv = pgd.generate(x=x, batch_size=batch)

    y_pred_clean = clf.predict(x,      batch_size=batch).argmax(1)
    y_pred_adv   = clf.predict(x_adv,  batch_size=batch).argmax(1)

    clean_correct = y_pred_clean == y
    adv_correct   = y_pred_adv   == y

    clean_acc = clean_correct.mean()
    adv_acc   = adv_correct.mean()
    asr       = (clean_correct & ~adv_correct).sum() / clean_correct.sum()
    return clean_acc, adv_acc, asr

In [19]:
from art.estimators.classification import PyTorchClassifier

basic_yolo  = YOLO(MODEL_PATH)
adapter = YoloClsAdapter(basic_yolo.model).to(device).eval()

loss = torch.nn.CrossEntropyLoss()

basic_cls = PyTorchClassifier(
    model=adapter,
    loss=loss,
    nb_classes=10,
    input_shape=(3, IMG_SIZE, IMG_SIZE),
    clip_values=(0.0, 1.0),
    device_type=device
)

In [57]:
from PIL import Image

N_SAMPLES = 1000

all_files = list((DATA_DIR / 'test').glob("*/*.png"))
assert all_files, f"Ничего не найдено в {DATA_DIR}"

idx_files = random.sample(all_files, N_SAMPLES) # Выбираем N_SAMPLES картинок

x, y = [], []
for fp in idx_files:
    img = Image.open(fp).convert("RGB")
    arr = basic_yolo.model.transforms(img)
    x.append(arr)
    y.append(int(fp.parent.name))

clean_x = np.stack(x, axis=0)
clean_y = np.asarray(y)

In [44]:
from art.attacks.evasion import FastGradientMethod

EPS = 8/255

# Генерируем FGSM по изначальной модели
basic_fgsm = FastGradientMethod(estimator=basic_cls, eps=EPS)
basic_x_fgsm  = basic_fgsm.generate(x=clean_x)

# FGSM по защищённой модели
robust_fgsm = FastGradientMethod(estimator=robust_classifier, eps=EPS)
robust_x_fgsm  = basic_fgsm.generate(x=clean_x)

In [61]:
train_gen = ChunkedGenerator(
    DATA_DIR / "test",
    chunk=1000,
    batch_size=1000,
    shuffle=False,
)
x, y = train_gen.get_batch()

In [63]:
accuracy_score(y, basic_cls.predict(x).argmax(1))

0.977

In [51]:
import pandas as pd

# ❶ Cводим всё в словари --------------------------------------------------------
tests   = dict(  # порядок сохранится как объявлен
    clean_train   = clean_x,
    basic_x_fgsm  = basic_x_fgsm,
    robust_x_fgsm = robust_x_fgsm
)
models  = dict(basic = basic_cls, robust = robust_classifier)

# ❷ Предсказания всех (model × test) за один проход ----------------------------
preds = {
    (m, t): mdl.predict(x).argmax(1)          # argmax → метки классов
    for m, mdl in models.items()
    for t, x   in tests.items()
}

cols = ["basic_accuracy", "basic_ASR", "robust_accuracy", "robust_ASR"]
df = pd.DataFrame(index=tests, columns=cols, dtype="float32")

for m in models:
    clean_pred = preds[(m, "clean_train")]
    acc_col = f"{m}_accuracy"
    asr_col = f"{m}_ASR"

    for t in tests:
        adv_pred = preds[(m, t)]
        accuracy = np.mean(adv_pred == clean_y)
        if t == "clean_train":
            asr = 0
        else:
            asr = np.sum((clean_pred == clean_y) & (adv_pred != clean_y))

        df.at[t, acc_col] = accuracy
        df.at[t, asr_col] = asr

df

  df.at[t, acc_col] = accuracy
  df.at[t, acc_col] = accuracy


Unnamed: 0,basic_accuracy,basic_ASR,robust_accuracy,robust_ASR
clean_train,0.96,0.0,0.091,0.0
basic_x_fgsm,0.326,639.0,0.085,7.0
robust_x_fgsm,0.326,639.0,0.085,7.0
