# Тесты генерации DPatch на YOLO

## Вспомогательные методы

In [1]:
from types import SimpleNamespace
import torch
from ultralytics import YOLO
from ultralytics.utils.loss import v8DetectionLoss
from art.estimators.object_detection import PyTorchYolo
from art.attacks.evasion import RobustDPatch
import cv2
from matplotlib import pyplot as plt
import numpy as np
from PIL import Image

from auxiliary import dataset_loader

In [2]:
COCO_INSTANCE_CATEGORY_NAMES = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
        'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
        'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
        'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
        'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
        'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
        'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
        'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors',
        'teddy bear', 'hair drier', 'toothbrush']

def get_image_with_boxes(img, labels, confidence=0.15):
    text_size = 1
    text_th = 3
    rect_th = 2

    boxes = labels.get('boxes')
    pred_cls = labels.get('labels')
    scores = labels.get('scores')

    for i in range(len(boxes)):
        if scores[i] >= confidence:
            cv2.rectangle(img, (int(boxes[i][0]), int(boxes[i][1])), (int(boxes[i][2]), int(boxes[i][3])),
                          color=(0, 255, 0), thickness=rect_th)
            cv2.putText(img, f"{COCO_INSTANCE_CATEGORY_NAMES[pred_cls[i]]} {round(float(scores[i]), 2)}",
                        (int(boxes[i][0]), int(boxes[i][1]) - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, text_size, (0, 255, 0), thickness=text_th)

    return img

def show_predictions(patched_images, clean_images, patch_title, model, filename=None):
    clean_preds = model.predict(clean_images)
    patched_preds = model.predict(patched_images)

    # Определяем количество строк (по одному ряду на каждую пару изображений)
    n = len(patched_preds)
    fig, axs = plt.subplots(n, 2, figsize=(9, 4 * n))

    # Если только одно изображение, axs будет 1D, нужно преобразовать в 2D
    if n == 1:
        axs = axs.reshape(1, -1)

    axs[0, 0].set_title("Original")
    axs[0, 1].set_title(patch_title)

    for i in range(n):
        # Оригинальное изображение с bounding box
        im_orig = (clean_images[i]).transpose(1, 2, 0) * 255
        im_orig_boxed = get_image_with_boxes(im_orig.copy(), clean_preds[i])
        axs[i, 0].imshow(im_orig_boxed.astype("uint8"))
        axs[i, 0].axis("off")

        # Патченное изображение с bounding box
        im_adv = (patched_images[i]).transpose(1, 2, 0) * 255
        im_adv_boxed = get_image_with_boxes(im_adv.copy(), patched_preds[i])
        axs[i, 1].imshow(im_adv_boxed.astype("uint8"))
        axs[i, 1].axis("off")

    if filename:
        plt.tight_layout()
        plt.savefig(filename)

def save_patch(patch, filename):
    # 1. Транспонируем в формат (H, W, C)
    patch_hwc = np.transpose(patch, (1, 2, 0))

    # 2. Нормализуем значения (если патч в [-1, 1] или [0, 1])
    if patch_hwc.min() < 0:  # Предполагаем, что патч в [-1, 1]
        patch_hwc = (patch_hwc + 1) * 127.5  # Масштабируем в [0, 255]
    elif patch_hwc.max() <= 1:  # Если патч в [0, 1]
        patch_hwc = patch_hwc * 255

    # 3. Конвертируем в uint8 и сохраняем
    patch_uint8 = patch_hwc.astype(np.uint8)
    Image.fromarray(patch_uint8).save(filename)

## Загрузка датасета

In [3]:
conf = {
    "type": "images",
    "path": "images_for_patch"
  }

dataset = dataset_loader.DataFactory.load_dataset(conf)
x = dataset.data
y = dataset.target

Loading images: 100%|██████████| 22/22 [00:00<00:00, 86.23it/s]


## Создание модели для атаки

### Адаптер

Позволяет передать модель в ART, иначе будет ругаться

In [4]:
class YoloWrapper(torch.nn.Module):
    """
    Обертка для модели YOLO, обеспечивающая совместимость с инструментами тестирования атак.

    Поддерживает:
    - Работу с моделями YOLO через ultralytics
    - Расчет потерь для обучения
    - Предсказание с возможностью постобработки

    .. attribute:: orig_model
        Оригинальная модель YOLO.
        :type: YOLO

    .. attribute:: classes
        Список имен классов.
        :type: list

    .. attribute:: model
        Внутренняя модель PyTorch.
        :type: torch.nn.Module

    .. attribute:: predict_count
        Счетчик предсказаний.
        :type: int

    .. attribute:: loss_fn
        Функция потерь для детекции.
        :type: v8DetectionLoss
    """
    def __init__(self, model, **kwargs):
        """
        Инициализация обертки для YOLO модели.

        :param model: Путь к модели или загруженная модель YOLO
        :type model: str or YOLO
        :param kwargs: Дополнительные параметры:
                       - ``tal_topk``: количество топ-k для TAL (Task-Aligned Learning)
                       - ``box``: коэффициент потерь для bounding box
                       - ``cls``: коэффициент потерь для классификации
                       - ``dfl``: коэффициент потерь для Distribution Focal Loss
        :type kwargs: dict

        .. note::
            Модель автоматически переводится в режим оценки (eval), а её параметры замораживаются.
            Функция потерь инициализируется с гиперпараметрами по умолчанию, которые можно переопределить.

        Пример:

        .. code-block:: python

            wrapper = YoloWrapper("yolov8n.pt", box=7.5, cls=0.5, dfl=1.5)
        """
        super().__init__()
        self.orig_model = YOLO(model)
        self.classes = list(self.orig_model.names.values())
        self.model = self.orig_model.model
        self.predict_count = 0

        # Замораживаем параметры модели
        for param in self.model.parameters():
            param.requires_grad = False

        # Инициализация функции потерь
        self.loss_fn = v8DetectionLoss(self.model, tal_topk=kwargs.get("tal_topk", 10))
        self.loss_fn.hyp = SimpleNamespace(box=kwargs.get("box", 7.5),
                                           cls=kwargs.get("cls", 0.5),
                                           dfl=kwargs.get("dfl", 1.5)
        )

    def forward(self, x, targets=None):
        """
        Прямой проход модели.

        :param x: Входной тензор изображения
        :type x: torch.Tensor
        :param targets: Тензор целей для обучения в формате ``[batch_idx, class, x1, y1, x2, y2]``
        :type targets: torch.Tensor, optional
        :return:
            - В режиме обучения: словарь с компонентами потерь
            - В режиме оценки: тензор предсказаний формы ``(n, 8400, 85)``
        :rtype: dict or torch.Tensor

        .. note::
            В режиме обучения возвращается словарь с ключами:
            - ``'loss_box'``: потери для координат bounding box
            - ``'loss_cls'``: потери классификации
            - ``'loss_dfl'``: Distribution Focal Loss

        .. warning::
            В режиме оценки выходной тензор имеет форму ``(n, 8400, 85)``, где:
            - Первые 4 значения — координаты bbox (x1, y1, x2, y2)
            - 5-е значение — objectness score
            - Остальные — оценки классов
        """
        if self.training:
            # Подготовка батча для расчета потерь
            batch = {
                "batch_idx": targets[:, 0].long(),
                "cls": targets[:, 1].long(),
                "bboxes": targets[:, 2:6].float(),
            }
            loss_total, loss_tensor = self.loss_fn(self.model(x), batch)

            loss_dict = {
                'loss_box': loss_total[0],  # Потери для bounding box
                'loss_cls': loss_total[1],  # Потери классификации
                'loss_dfl': loss_total[2]   # Distribution Focal Loss
            }

            return loss_dict

        else:
            # Получение предсказаний модели
            pred = self.model(x)[0]  # (n, 84, 8400)

            # Разделение на компоненты
            boxes = pred[:, 0:4, :]  # Координаты bbox
            class_scores = pred[:, 4:, :]  # Оценки классов

            # Расчет objectness score как максимальной оценки класса
            objectness, _ = class_scores.max(dim=1)  # (n, 8400)
            objectness = objectness.unsqueeze(1)  # (n, 1, 8400)

            # Объединение в формат [x1, y1, x2, y2, obj_score, class_scores...]
            pred = torch.cat([
                boxes,
                objectness,
                class_scores
            ], dim=1)  # -> (n, 85, 8400)

            # Транспонирование в формат [batch, 8400, 85]
            pred = pred.permute(0, 2, 1)

            return pred

    def predict(self, x, postprocess=False):
        """
        Предсказание на входных данных.

        :param x: Входные данные (изображения)
        :type x: np.ndarray or torch.Tensor
        :param postprocess: Флаг постобработки результатов
        :type postprocess: bool
        :return: Список результатов для каждого изображения, где каждый элемент — словарь с ключами:
                 - ``'boxes'``: координаты bounding boxes (форма зависит от `postprocess`)
                 - ``'labels'``: метки классов (строки при `postprocess=True`, иначе int)
                 - ``'scores'``: оценки уверенности
        :rtype: list[dict]

        .. note::
            - При ``postprocess=True`` возвращаются numpy-массивы и строковые метки.
            - При ``postprocess=False`` возвращаются тензоры и целочисленные метки.

        Пример возвращаемого значения:

        .. code-block:: python

            [
                {
                    'boxes': np.array([[100, 50, 200, 150], ...]),
                    'labels': ['cat', 'dog'],
                    'scores': np.array([0.95, 0.87])
                },
                ...
            ]
        """
        self.orig_model.model.eval()
        results = self.orig_model(torch.tensor(x), verbose=False)
        outputs = []
        for result in results:
            if postprocess:
                # Возвращаем numpy массивы с постобработкой
                boxes = result.boxes.xyxy.numpy()
                scores = result.boxes.conf.numpy()
                labels = [result.names.get(int(l), "unk") for l in result.boxes.cls]
            else:
                # Возвращаем тензоры без постобработки
                boxes = result.boxes.xyxy
                scores = result.boxes.conf
                labels = result.boxes.cls.int()

            outputs.append({
                'boxes': boxes,
                'labels': labels,
                'scores': scores
            })
        return outputs

    def train(self, state=True):
        """
        Установка режима обучения/оценки.

        :param state: Флаг режима:
                      - ``True``: режим обучения
                      - ``False``: режим оценки
        :type state: bool

        .. note::
            Метод также устанавливает внутреннее состояние ``self.training``.
            Параметры модели заморожены, поэтому обучение не изменяет веса.

        Пример:

        .. code-block:: python

            wrapper.train(True)   # Перевод в режим обучения
            wrapper.train(False)  # Перевод в режим оценки
        """
        self.model.train() if state else self.model.eval()
        self.training = state

### Создание модели с параметрами

In [5]:
model = YOLO('yolo11s.pt').to(torch.device('cuda'))
detector = YoloWrapper(model)
cls = PyTorchYolo(detector,
                  input_shape=(3,640,640),
                  clip_values=(0,1),
                  attack_losses=('loss_cls',
                                 'loss_box',
                                 'loss_dfl'))

## Проведение атаки с параметрами

In [6]:
atk = RobustDPatch(
    cls,
    patch_shape=(3, 160, 160),
    patch_location=(0,0),
    crop_range=(0,0),
    brightness_range=(0.7, 1.0),
    rotation_weights=(1,0,0,0),
    sample_size=1,
    max_iter=1000,
    learning_rate=0.05,
    batch_size=3,
    targeted=False,
    verbose=True,
)

In [7]:
patch = atk.generate(x=x)
save_patch(patch, '0609_yolo_dpatch_1000.png')

RobustDPatch iteration:   0%|          | 0/1000 [00:00<?, ?it/s]

In [12]:
x.shape

(22, 3, 640, 640)

In [6]:
torch.cuda.is_available()

True