<h3 style="text-align: center;"><b>Школа глубокого обучения ФПМИ МФТИ</b></h3>

<h3 style="text-align: center;"><b>Домашнее задание. Детекция объектов</b></h3>

В этом домашнем задании мы продолжим работу над детектором из семинара, поэтому при необходимости можете заимствовать оттуда любой код.

Домашнее задание можно разделить на следующие части:

* Переделываем модель [4]
  * Backbone[1],
  * Neck [2],
  * Head [1]
* Label assignment [3]:
  * TAL [3]
* Лоссы [1]:
  * CIoU loss [1]
* Кто больше? [5]
  * 0.05 mAP [1]
  * 0.1 mAP  [2]
  * 0.2 mAP [5]

**Максимальный балл:** 10 баллов. (+3 балла бонус).

In [203]:
import torch
import numpy as np
import pandas as pd
import albumentations as A

from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset
from albumentations.pytorch.transforms import ToTensorV2

### Загрузка данных

Мы продолжаем работу с датасетом из семинара - Halo infinite ([сслыка](https://universe.roboflow.com/graham-doerksen/halo-infinite-angel-aim)). Загрузка данных и создание датасета полностью скопированы из семинара.

Сначала загружаем данные

In [204]:
! git clone https://huggingface.co/datasets/Francesco/halo-infinite-angel-videogame

splits = {'train': 'data/train-00000-of-00001-0d6632d599c29801.parquet',
          'validation': 'data/validation-00000-of-00001-c6b77a557eeedd52.parquet',
          'test': 'data/test-00000-of-00001-866d29d8989ea915.parquet'}

df_train = pd.read_parquet("/content/halo-infinite-angel-videogame/" + splits["train"])
df_test = pd.read_parquet("/content/halo-infinite-angel-videogame/" + splits["test"])

fatal: destination path 'halo-infinite-angel-videogame' already exists and is not an empty directory.


Создаем датасет для предобработки данных

In [205]:
class HaloDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        df_objects = pd.json_normalize(dataframe['objects'])[["bbox", "category"]]
        df_images = pd.json_normalize(dataframe['image'])[["bytes"]]
        self.data = dataframe[["image_id"]].join(df_objects).join(df_images)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        """Загружаем данные и разметку для объекта с индексом `idx`.

        labels: List[int] Набор классов для каждого ббокса,
        boxes: List[List[int]] Набор ббоксов в формате (x_min, y_min, w, h).
        """
        row = self.data.iloc[idx]
        image = Image.open(io.BytesIO(row["bytes"]))
        image = np.array(image)

        target = {}
        target["image_id"] = row["image_id"]

        labels = [row["category"]] if isinstance(row["category"], int) else row['category']
        # Вычитаем единицу чтобы классы начинались с нуля
        labels = [label - 1 for label in labels]
        boxes = row['bbox'].tolist()

        if self.transform is not None:
            transformed = self.transform(image=image, bboxes=boxes, labels=labels)
            image, boxes, labels = transformed["image"], transformed["bboxes"], transformed["labels"]
        else:
            image = transforms.ToTensor()(image)

        target['boxes'] = torch.tensor(np.array(boxes), dtype=torch.float32)
        target['labels'] = torch.tensor(labels, dtype=torch.int64)
        return image, target

def collate_fn(batch):
    batch = tuple(zip(*batch))
    images = torch.stack(batch[0])
    return images, batch[1]

Чтобы модель не переобучалась, можно добавить больше аугментаций, весь список можно посмотреть тут [[ссылка](https://explore.albumentations.ai/)].

Какие можно использовать аугментации?
* Добавить зум `RandomResizedCrop`,
* Сделать цветовые аугментации типа `RandomBrightnessContrast` и/или `HueSaturationValue`,
* Добавить шум `GaussNoise`,
* Вырезать случайные части изображения `CoarseDropout`,
* И любые другие!

Аугментации можно комбинировать посредствам `A.OneOf`, `A.SomeOf` или `A.RandomOrder`.

Хоть аугментации ограничиваются только вашей фантазией, перед обучением советуем посмотреть на результат преобразований и убедиться, что изображение ещё поддается детекции:)

In [206]:
import cv2

mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)

train_transform = A.Compose(
    [
        A.Normalize(mean=mean, std=std),
        A.Downscale(
          scale_range=(0.5, 0.75),
          interpolation_pair={'downscale': cv2.INTER_NEAREST, 'upscale': cv2.INTER_LINEAR},
          p=0.5
        ),
        A.GaussianBlur(
            sigma_limit=(3.0, 7.0),
            blur_limit=0,
            p=1.0
        ),
        A.Sharpen(
          alpha=(0.2, 0.5),
          lightness=(0.5, 1.0),
          method='kernel',
          p=1.0
        ),
        A.ChromaticAberration(
          primary_distortion_limit=0.05,
          secondary_distortion_limit=0.1,
          mode='green_purple',
          interpolation=cv2.INTER_LINEAR,
          p=1.0
        ),
        A.CoarseDropout(
          num_holes_range=(3, 6),
          hole_height_range=(10, 20),
          hole_width_range=(10, 20),
          fill="inpaint_ns",
          p=1.0
        ),
        A.RandomResizedCrop(
          size=(64, 64),
          scale=(0.5, 0.9),
          ratio=(0.75, 1.33),
          interpolation=cv2.INTER_LINEAR,
          mask_interpolation=cv2.INTER_NEAREST,
          area_for_downscale="image",
          p=1.0
        ),
        A.HorizontalFlip(p=0.5),
        A.RandomBrightnessContrast(p=0.2),
        A.GaussNoise(std_range=(0.1, 0.2), p=1.0),
        A.Solarize(threshold_range=(0.5, 0.5), p=1.0),
        ToTensorV2(),
    ],
    bbox_params=A.BboxParams(format='coco', label_fields=['labels'])
)

test_transform = A.Compose(
    [
        A.Normalize(mean=mean, std=std),
        ToTensorV2(),
    ]
)

Не забываем инициализировать наш датасет

In [207]:
train_dataset = HaloDataset(df_train, transform=train_transform)
test_dataset = HaloDataset(df_test, transform=test_transform)

## Переделываем модель [4 балла]

В семинаре мы реализовали самый базовый детектор, а сейчас настало время его улучшать.

### Backbone [1 балл]

Хорошей практикой считается размораживать несколько последних слоев в backbone, это позволяет немного улучить качество модели. Давайте улушчим класс Backbone из лекции, добавив ему возможность разморозки __k__ последних слоев или блоков (на ваш выбор).

In [208]:
import timm
import torch
import torch.nn as nn
class Backbone(nn.Module):
    def __init__(self, model_name = 'efficientnet_b0', out_indices=(-1, -2, -3, -4), unfreeze_last = 2):
        super().__init__()
        self.backbone = timm.create_model(model_name=model_name, pretrained=True, features_only=True, out_indices=out_indices)

        blocks = self.backbone.blocks

        for block in blocks:
          for p in block.parameters():
            p.requires_grad = False

        for block in blocks[-unfreeze_last:]:
          for p in block.parameters():
            p.requires_grad = True

    def forward(self, x):
        return self.backbone(x)

### NECK [2 балла]

Следующее улучшение коснется шеи. Предлагаем реализовать знакомую из лекции архитектуру FPN.

#### Feature Pyramid Network

<center><img src="https://user-images.githubusercontent.com/57972646/69858594-b14a6c00-12d5-11ea-8c3e-3c17063110d3.png"/></center>


* [Feature Pyramid Networks for Object Detection](https://arxiv.org/abs/1612.03144)

Она состоит из top-down пути, в котором происходит 2 вещи:
1. Увеличивается пространственная размерность фичей,
2. С помощью скипконнекшеннов, добавляются фичи из backbone модели.

Для увеличения пространственной размерности используется __nearest neighbor upsampling__, а фичи из шеи и бекбоуна суммируются.

__TIPS__:
* Можете использовать базовые классы из лекции,
* Воспользуйтесь AnchorGenerator-ом, чтобы создавать якоря сразу для нескольких выходов,
* Не забудьте использовать nn.ModuleList, если захотите сделать динамическое количество голов у модели,
* Также, можно добавить доп конволюцию (3х3 с паддингом) у каждого выхода шеи.

In [209]:
class Neck(nn.Module):
    def __init__(self, in_channels, out_channels, use_activations = True):
        super().__init__()

        self.use_activations = use_activations
        self.activation = nn.ReLU() if use_activations else nn.Identity()

        self.lateral_convs = nn.ModuleList([
            nn.Conv2d(in_ch, out_channels, kernel_size=1)
            for in_ch in in_channels
        ])

        self.smooth_convs = nn.ModuleList([
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
            for _ in in_channels
        ])

        self.upsample = nn.UpsamplingNearest2d(scale_factor=2)


    def forward(self, features):
      feats = [lat(f) for f, lat in zip(features, self.lateral_convs)]

      for i in range(len(feats)-2, -1, -1):
        up = self.upsample(feats[i+1])
        feats[i] = feats[i] + up

      results = [self.activation(conv(f)) for f, conv in zip(feats, self.smooth_convs)]

      return results

### Head [1 балл]

В качестве шеи можно выбрать __один из двух__ вариантов:

#### 1. Decoupled Head

Реализовать Decoupled Head из [YOLOX](https://arxiv.org/abs/2107.08430).
<center><img src="https://i.ibb.co/BVtBR2R3/Decoupled-head.jpg"/></center>

**TIP**: Возьмите за основу голову из семинара, тк она сильно похожа на Decoupled Head.

Изменять количество параметров у шей на разных уровнях не обязательно.

#### 2. Confidence score free head

Нужно взять за основу голову из семинара и полностью убрать предсказание confidence score. Чтобы модель предсказывала только 2 группы: ббоксы и классы.

Есть следующие способы удаления confidence score:
* Добавление нового класса ФОН. Обычно его обозначают нулевым классом.
* Присваивание ббоксам БЕЗ объекта вектор из нулей в качестве таргета.

Выберете тот, который вам больше нравится и будте внимательны при расчете лосса!

**Важно!** Удаление confidence score повлияет на следующие методы из семинара:
* target_assign
* ComputeLoss
* _filter_predictions

In [210]:
import torch.nn.functional as F
import torch.nn as nn

class Head(nn.Module):
    def __init__(self, in_channels, num_classes, out_channels=256):
        super().__init__()

        self.stem = nn.Conv2d(in_channels, out_channels, 1)

        self.cls_conv1 = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.cls_conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.cls_pred = nn.Conv2d(out_channels, num_classes, 1)

        self.reg_conv1 = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.reg_conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.reg_pred = nn.Conv2d(out_channels, 4, 1)

        self.obj_pred = nn.Conv2d(out_channels, 1, 1)

    def forward(self, x):
        x = F.relu(self.stem(x))

        cls = F.relu(self.cls_conv1(x))
        cls = F.relu(self.cls_conv2(cls))
        cls_out = self.cls_pred(cls)

        reg = F.relu(self.reg_conv1(x))
        reg = F.relu(self.reg_conv2(reg))
        reg_out = self.reg_pred(reg)

        obj_out = self.obj_pred(reg)

        return cls_out, reg_out, obj_out


Теперь можно снова реализовать класс детектора с учетом всех частей выше!

In [211]:
class Detector(nn.Module):
    def __init__(
        self,
        backbone_model_name="efficientnet_b0",
        neck_n_channels=256,
        num_classes=4,
    ):
        super().__init__()

        self.backbone = Backbone(model_name=backbone_model_name)

        # TIMM возвращает фичи в прямом порядке: [P3,P4,P5,P6]
        in_channels = list(self.backbone.backbone.feature_info.channels()[-4:])

        # но FPN ожидает от low→high, поэтому перевернём
        in_channels = list(reversed(in_channels))

        self.neck = Neck(in_channels=in_channels, out_channels=neck_n_channels)

        self.heads = nn.ModuleList([
            Head(neck_n_channels, num_classes),
            Head(neck_n_channels, num_classes),
            Head(neck_n_channels, num_classes),
            Head(neck_n_channels, num_classes)
        ])

        self.num_classes = num_classes

    def forward(self, x):
        feats = self.backbone(x)
        feats = self.neck(feats)

        cls_outs, reg_outs, obj_outs = [], [], []

        for feat, head in zip(feats, self.heads):
            c, r, o = head(feat)
            cls_outs.append(c)
            reg_outs.append(r)
            obj_outs.append(o)

        return cls_outs, reg_outs, obj_outs


## Label assignment [3 балла]
В этой секции предлагается заменить функцию `assign_target` на более современный алгоритм который называется Task alignment learning.

Он описан в статье [TOOD](https://arxiv.org/abs/2108.07755) в секции 3.2. Для удобства вот его основные шаги:

1. Посчитать значение метрики для каждого предсказанного ббокса:
    
$$t = s^\alpha * u^\beta$$
    
где,
* $s$ — classification score, или вероятность принадлежности предсказанного ббокса к классу реального ббокса (**GT**);
* $u$ — IoU между предсказанным и реальным ббоксами;
* $\alpha,\ \beta$ — нормализационные константы, обычно $\alpha = 6.0, \ \beta = 1.0$.
    
2. Отфильтровать предсказания на основе **GT**.

    Для якорных детекторов, обычно, выбираются только те предсказания, центры якорей которых находятся внутри GT.
4. Для каждого **GT** выбрать несколько (обычно 5 или 13) самых подходящих предсказаний.
5. Если предсказание рассматривается в качестве подходящего для нескольких **GT** — выбрать **GT** с наибольшим пересечением по IoU.


**BAЖНО**: если будете использовать Runner из лекции, не забудьте поменять параметры  в `self.assign_target_method` в методе `_run_train_epoch`.

In [231]:
def tal_assigner(pred_boxes,
                 pred_scores,
                 gt_boxes,
                 gt_labels,
                 alpha=1.0,
                 beta=6.0,
                 topk=10):

    # если pred_boxes нет (None) — считаем пустым
    N = pred_boxes.size(0) if (pred_boxes is not None and pred_boxes.numel() > 0) else 0
    M = gt_boxes.size(0) if (gt_boxes is not None and gt_boxes.numel() > 0) else 0

    if N == 0 or M == 0:
        return (torch.zeros(N, dtype=torch.bool, device=gt_boxes.device if gt_boxes is not None else "cpu"),
                torch.full((N,), -1, dtype=torch.long, device=gt_boxes.device if gt_boxes is not None else "cpu"),
                torch.zeros(N, device=gt_boxes.device if gt_boxes is not None else "cpu"))

    # вычисляем пересечение
    lt = torch.max(pred_boxes[:, None, :2], gt_boxes[None, :, :2])
    rb = torch.min(pred_boxes[:, None, 2:], gt_boxes[None, :, 2:])
    wh = (rb - lt).clamp(min=0)
    inter = wh[..., 0] * wh[..., 1]

    area1 = (pred_boxes[:, 2] - pred_boxes[:, 0]) * (pred_boxes[:, 3] - pred_boxes[:, 1])
    area2 = (gt_boxes[:, 2] - gt_boxes[:, 0]) * (gt_boxes[:, 3] - gt_boxes[:, 1])
    union = area1[:, None] + area2[None, :] - inter
    ious = inter / union.clamp(min=1e-6)

    cls_scores = pred_scores[:, gt_labels]
    metric = (cls_scores ** alpha) * (ious ** beta)

    topk_val, topk_idx = torch.topk(metric, k=min(topk, N), dim=0)
    matching_matrix = torch.zeros_like(metric, dtype=torch.bool)
    matching_matrix[topk_idx, torch.arange(M)] = True

    multiple_match = matching_matrix.sum(dim=1) > 1
    if multiple_match.any():
        idxs = multiple_match.nonzero(as_tuple=False).squeeze(1)
        for i in idxs:
            best_gt = torch.argmax(metric[i])
            matching_matrix[i] = False
            matching_matrix[i, best_gt] = True

    positive_mask = matching_matrix.any(dim=1)
    assigned_gt_idx = matching_matrix.argmax(dim=1)
    assigned_gt_idx[~positive_mask] = -1
    alignment_metric = (metric * matching_matrix).sum(dim=1)

    return positive_mask, assigned_gt_idx, alignment_metric


### DIoU [1]

Вместо SmoothL1, который используется в семинаре, реализуем лосс, основанный на пересечении ббоксов. В качестве тренировки давайте напишем Distance Intersection over Union (DIoU).

<center><img src=https://wikidocs.net/images/page/163613/Free_Fig_5.png></center>

Для его реализации разобъем задачу на части:

**1. Реализуем IoU:**

Пусть даны координаты для предсказанного ($B^p$) и истинного ($B^g$) ббоксов в формате XYXY или VOC PASCAL (левый верхний и правый нижний углы):

$B^p=(x^p_1, y^p_1, x^p_2, y^p_2)$, $B^g=(x^g_1, y^g_1, x^g_2, y^g_2)$, тогда алгоритм расчета будет следующий:

    1. Найдем площади обоих ббоксов:
$$ A^p = (x^p_2 - x^p_1) * (y^p_2 - y^p_1) $$
$$ A^g = (x^g_2 - x^g_1) * (y^g_2 - y^g_1) $$

    2. Посчитаем пересечение между ббоксами:

Тут мы предлагаем вам подумать как в общем виде можно расчитать размеры ббокса, который будет являться пересечением $B^p$ и $B^g$, а затем посчитать его площадь:

$$x^I_1 = \qquad \qquad y^I_1 = $$
$$x^I_2 = \qquad \qquad y^I_2 = $$

В общем виде, площать будет записываться следующим образом:

Если $x^I_2 > x^I_1$ & $y^I_2 > y^I_1$, тогда:

$$I = (x^I_2 - x^I_1) * (y^I_2 - y^I_1)$$

Иначе, $I = 0$.

    3. Считаем объединение ббоксов.

Мы можем посчитать эту площадь как сумму площадей двух ббоксов минус площадь пересечения (тк мы считаем её два раз в сумме площадей):

$$U = A^p + A^g - I$$

    4. Вычисляем IoU.

$$IoU = \frac{I}{U}$$

**2. Посчитаем диагональ выпуклой оболочки:**

Для расчета диагонали, сначала выпишите координаты верхнего левого и правого нижнего углов. Подумайте, чему будут равны эти координаты в общем случае?

$$x^c_1 = \qquad \qquad y^c_1 = $$
$$x^c_2 = \qquad \qquad y^c_2 = $$

Подсказка: Нарисуйте несколько вариантов пересечений предсказания и GT на бумажке, и выпишите координаты для выпуклой оболочки.

Тогда квадрат диагонали можно посчитать по формуле:

$$c^2 = (x^c_2 - x^c_1)^2 + (y^c_2 - y^c_1)^2$$

**3. Рассчитаем расстояние между цетрами ббоксов:**

Сначала находим координаты центров каждого из ббоксов (если ббоксы в формате YOLO, то и считать ничего не нужно), затем считаем Евклидово расстояние между центрами.

$d = $

Собираем все части вместе и считаем лосс по формуле:

$$ DIoU = 1 - IoU + \frac{d^2}{c^2}$$

Помните, что пар ббоксов может быть много! Возвращайте усредненное значение лосса.

In [213]:
from torchvision.ops import distance_box_iou_loss

In [214]:
def gen_bbox(num_boxes=10):
    min_corner = torch.randint(0, 100, (num_boxes, 2))
    max_corner = torch.randint(50, 150, (num_boxes, 2))

    for i in range(2):
        wrong_order = min_corner[:, i] > max_corner[:, i]
        if wrong_order.any():
            min_corner[wrong_order, i], max_corner[wrong_order, i] = max_corner[wrong_order, i], min_corner[wrong_order, i]
    return torch.cat((min_corner, max_corner), dim=1)

In [215]:
pred_boxes = gen_bbox(num_boxes=100)
true_boxes = gen_bbox(num_boxes=100)

In [216]:
print(f" DIoU: {distance_box_iou_loss(pred_boxes, true_boxes, reduction="mean").item()}")

 DIoU: 1.0470222234725952


In [217]:
def diou_loss(pred_boxes, gt_boxes, eps=1e-6):

    px1, py1, px2, py2 = pred_boxes.unbind(dim=1)
    gx1, gy1, gx2, gy2 = gt_boxes.unbind(dim=1)

    area_p = (px2 - px1) * (py2 - py1)
    area_g = (gx2 - gx1) * (gy2 - gy1)

    ix1 = torch.max(px1, gx1)
    iy1 = torch.max(py1, gy1)
    ix2 = torch.min(px2, gx2)
    iy2 = torch.min(py2, gy2)

    inter_w = (ix2 - ix1).clamp(min=0)
    inter_h = (iy2 - iy1).clamp(min=0)
    inter = inter_w * inter_h


    union = area_p + area_g - inter + eps

    iou = inter / union

    cx1 = torch.min(px1, gx1)
    cy1 = torch.min(py1, gy1)
    cx2 = torch.max(px2, gx2)
    cy2 = torch.max(py2, gy2)

    c2 = (cx2 - cx1)**2 + (cy2 - cy1)**2 + eps

    pcx = (px1 + px2) / 2
    pcy = (py1 + py2) / 2

    gcx = (gx1 + gx2) / 2
    gcy = (gy1 + gy2) / 2

    d2 = (pcx - gcx)**2 + (pcy - gcy)**2

    diou = 1 - iou + d2 / c2

    return diou.mean()

In [218]:
import numpy as np
pred_boxes = gen_bbox(num_boxes=1000)
true_boxes = gen_bbox(num_boxes=1000)

# проверим что написанный лосс выдает те же результаты что и лосс из торча.
assert np.isclose(diou_loss(pred_boxes, true_boxes), distance_box_iou_loss(pred_boxes, true_boxes, reduction="mean"))

## Кто больше? [5 баллов]

Наконец то мы дошли до самый интересной части. Тут мы раздаем очки за mAP'ы!

Все что вы написали выше вам поможет улучшить качество итогового детектора, настало время узнать насколько сильно :)

За достижения порога по mAP на тестовом наборе вы получаете баллы:
* 0.05 mAP [1]
* 0.1 mAP [2]
* 0.2 mAP [5]


**TIPS**:
1. На семинаре мы специально не унифицировали формат ббоксов между методами, чтобы обратить ваше внимание что за этим нужно следить. Чтобы было проще, сразу унифицируете формат по всему ноутбуку. Советуем использовать формат xyxy, тк IoU и NMS из torch используют именно этот формат. (Не забудьте поменять формат у таргета в `HaloDataset`).

2. Попробуйте перейти к IoU-based лоссу при обучении. То есть обучать не смещения, а сразу предсказывать ббокс.

3. Поэксперементируйте с подходами target assignment'а в процессе обучения. Например, можно на первых итерациях использовать обычный метод, а затем подключить TAL.

4. Добавьте аугментаций!

Можно взять [albumentations](https://albumentations.ai/docs/getting_started/bounding_boxes_augmentation/), библиотеку, которую мы использовали всеминаре. Или базовые аугментации из торча [тык](https://pytorch.org/vision/main/transforms.html). Если будете использовать торч, не забудте про ббоксы, transforms из коробки не будет их агументировать.

5. Можете реализовать другую шею, которую мы обсуждали на лекции [Path Aggregation Network](https://arxiv.org/abs/1803.01534) она точно улучшит ваше итоговое качество.

6. Попробуйте добавлять различные блоки из YOLO архитектур в шею вместо единичных конволюционных слоев. (Например, замените конволюции 3х3 на CSP блоки).

7. Попробуйте заменить NMS на другой метод (WeightedNMS, SoftNMS, etc.). Немного ссылок:
    * Статья про SoftNMS [тык](https://arxiv.org/pdf/1704.04503)
    * Статья про WeightedNMS [тык](https://openaccess.thecvf.com/content_ICCV_2017_workshops/papers/w14/Zhou_CAD_Scale_Invariant_ICCV_2017_paper.pdf)
    * Есть их реализация, правда на нумбе [git](https://github.com/ZFTurbo/Weighted-Boxes-Fusion?tab=readme-ov-file)

8. Не бойтесь эксперементировать и удачи!

Также, напишите развернутые ответы на следующие вопросы:

**Questions:**
1. Какой метод label assignment'a помогает лучше обучаться модели? Почему?
2. Какое из сделаных вами улучшений внесло наибольший вклад в качество модели? Как вы думаете, почему это произошло?
3. Какое из сделанных вами улучшений вообще не изменило метрику? Как вы думаете, почему это произошло?

In [219]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class ComputeLoss:
    """
    Лосс для YOLOX-подобной головы:
    - cls: BCE with logits
    - bbox: DIoU loss
    - iou branch: BCE with logits to GT IoU
    """

    def __init__(self, num_classes, diou_weight=5.0, cls_weight=1.0, iou_weight=1.0):
        self.num_classes = num_classes
        self.diou_weight = diou_weight
        self.cls_weight = cls_weight
        self.iou_weight = iou_weight

        self.bce = nn.BCEWithLogitsLoss(reduction="sum")

    def __call__(self, outputs, assigned):
        """
        outputs = (cls_logits, bbox_preds, iou_preds)
            cls_logits : [A, C]
            bbox_preds : [A, 4]
            iou_preds  : [A, C]   (или [A,1])

        assigned:
            pos_indices     : [N_pos]
            pos_gt_boxes    : [N_pos, 4]
            pos_gt_labels   : [N_pos]
            pos_anchors     : [N_pos, 4]
        """

        cls_logits, bbox_preds, iou_preds = outputs
        pos_ids = assigned["pos_indices"]

        if len(pos_ids) == 0:
            # нет позитивных якорей → штраф только по классификации (все должны быть 0)
            return self.bce(cls_logits, torch.zeros_like(cls_logits))

        # ---------------------------------------------------------
        # 1. CLASSIFICATION LOSS
        # ---------------------------------------------------------
        cls_tgt = torch.zeros_like(cls_logits)
        cls_tgt[pos_ids, assigned["pos_gt_labels"]] = 1.0

        cls_loss = self.bce(cls_logits, cls_tgt)

        # ---------------------------------------------------------
        # 2. BBOX DIoU LOSS
        # ---------------------------------------------------------
        pred_boxes = bbox_preds[pos_ids]
        gt_boxes   = assigned["pos_gt_boxes"]

        diou = self.diou_loss(pred_boxes, gt_boxes).sum()

        # ---------------------------------------------------------
        # 3. IOU BRANCH LOSS
        # ---------------------------------------------------------
        # Предсказывает IoU → GT IoU по DIoU: 1 − DIoU
        with torch.no_grad():
            gt_iou = 1.0 - self.diou_loss(pred_boxes, gt_boxes)

        iou_loss = self.bce(iou_preds[pos_ids, assigned["pos_gt_labels"]], gt_iou)

        # ---------------------------------------------------------
        # 4. TOTAL
        # ---------------------------------------------------------
        total = (
            self.cls_weight * cls_loss +
            self.diou_weight * diou +
            self.iou_weight * iou_loss
        )

        return total / pos_ids.numel()

    # =============================================================
    #              DIoU LOSS implementation
    # =============================================================
    @staticmethod
    def diou_loss(box1, box2):
        """
        box1, box2 shape: [N,4] format xyxy
        returns DIoU (not 1-DIoU)
        """

        # intersection
        lt = torch.max(box1[:, :2], box2[:, :2])
        rb = torch.min(box1[:, 2:], box2[:, 2:])
        wh = (rb - lt).clamp(min=0)
        inter = wh[:, 0] * wh[:, 1]

        # union
        area1 = (box1[:, 2] - box1[:, 0]) * (box1[:, 3] - box1[:, 1])
        area2 = (box2[:, 2] - box2[:, 0]) * (box2[:, 3] - box2[:, 1])
        union = area1 + area2 - inter + 1e-7

        iou = inter / union

        # enclosing box
        x1_c = torch.min(box1[:, 0], box2[:, 0])
        y1_c = torch.min(box1[:, 1], box2[:, 1])
        x2_c = torch.max(box1[:, 2], box2[:, 2])
        y2_c = torch.max(box1[:, 3], box2[:, 3])
        c2 = (x2_c - x1_c).pow(2) + (y2_c - y1_c).pow(2) + 1e-7

        # center distance
        x1 = (box1[:, 0] + box1[:, 2]) / 2
        y1 = (box1[:, 1] + box1[:, 3]) / 2
        x2 = (box2[:, 0] + box2[:, 2]) / 2
        y2 = (box2[:, 1] + box2[:, 3]) / 2
        d2 = (x1 - x2).pow(2) + (y1 - y2).pow(2)

        return 1 - iou + d2 / c2



In [220]:
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=9, shuffle=False, collate_fn=collate_fn)

In [221]:
def collate_fn(batch):
    images = []
    targets = []
    for img, t in batch:
        images.append(img)
        targets.append(t)
    return torch.stack(images), targets

In [222]:
from torch.utils.data import DataLoader

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

train_loader = DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=2,
    collate_fn=collate_fn
)

test_loader = DataLoader(
    test_dataset,
    batch_size=8,
    shuffle=False,
    num_workers=2,
    collate_fn=collate_fn
)


Using device: cpu


In [223]:
model = Detector(num_classes=4)
model = model.to(device)



In [224]:
compute_loss = ComputeLoss(
    num_classes=4,
    diou_weight=5.0,
    cls_weight=1.0,
    iou_weight=1.0
)


In [225]:
import torch.optim as optim

optimizer = optim.Adam(
    model.parameters(),
    lr=1e-3,
    weight_decay=5e-5
)

scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=80
)


In [226]:
!pip install torchmetrics



In [227]:
from functools import partial
from tqdm import tqdm
import torch
import numpy as np
import matplotlib.pyplot as plt
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from torchvision.ops import nms

class Runner:
    """Базовый класс для обучения и валидации anchor-free модели с TAL assignment."""

    def __init__(self, model, compute_loss, optimizer, train_dataloader,
                 assign_target_method, device=None, scheduler=None,
                 val_dataloader=None, val_every=5,
                 score_threshold=0.1, nms_threshold=0.5, max_boxes_per_cls=8):
        self.model = model
        self.compute_loss = compute_loss
        self.optimizer = optimizer
        self.train_dataloader = train_dataloader

        # assign_target_method теперь принимает (pred_boxes, pred_scores, gt_boxes, gt_labels)
        self.assign_target_method = assign_target_method

        self.device = "cpu" if device is None else device
        self.scheduler = scheduler
        self.val_dataloader = val_dataloader
        self.val_every = val_every
        self.score_threshold = score_threshold
        self.nms_threshold = nms_threshold
        self.max_boxes_per_cls = max_boxes_per_cls

        self.batch_loss = []
        self.epoch_loss = []
        self.val_metric = []


    def _run_train_epoch(self, dataloader, verbose=True):
      """ Обучить модель одну эпоху """
      self.model.train()
      batch_loss = []

      for images, targets in (pbar := tqdm(dataloader, desc="Process train epoch", leave=False)):
          images = images.to(self.device)
          outputs = self.model(images)

          accum_loss = 0.0
          for ix in range(images.shape[0]):
              gt_boxes = targets[ix]['boxes'].to(self.device)
              gt_labels = targets[ix]['labels'].to(self.device)

              # Вызов TAL assigner с позиционными аргументами
              assigned_targets = self.assign_target_method(
                  outputs[1][ix],   # pred_boxes (регрессия)
                  outputs[2][ix],   # pred_scores (objectness)
                  gt_boxes,
                  gt_labels
              )

              outputs_ixs = [out[ix] for out in outputs]
              loss = self.compute_loss(outputs_ixs, assigned_targets)
              accum_loss += loss

          accum_loss /= images.shape[0]
          batch_loss.append(accum_loss.cpu().detach().item())

          self.optimizer.zero_grad()
          accum_loss.backward()
          self.optimizer.step()

          if verbose:
              pbar.set_description(f"Current batch loss: {batch_loss[-1]:.4f}")

      return batch_loss

    def train(self, num_epochs=10, verbose=True):
        val_desc = ""
        for epoch in (epoch_pbar := tqdm(range(1, num_epochs+1), desc="Train epoch", total=num_epochs)):
            loss = self._run_train_epoch(self.train_dataloader, verbose=verbose)
            self.batch_loss.extend(loss)
            self.epoch_loss.append(np.mean(self.batch_loss[-len(self.train_dataloader):]))

            if self.val_dataloader is not None and epoch % self.val_every == 0:
                val_metric = self.validate()
                self.val_metric.append(val_metric)
                val_desc = f" Val {val_metric:.4f}"

            if verbose:
                epoch_pbar.set_description(f"Last epoch loss: Train {self.epoch_loss[-1]:.4f}" + val_desc)

            if self.scheduler is not None:
                self.scheduler.step()

    @torch.no_grad()
    def validate(self, dataloader=None):
        self.model.eval()
        dataloader = self.val_dataloader if dataloader is None else dataloader
        metric = MeanAveragePrecision(box_format="xywh", iou_type="bbox")

        for images, targets in tqdm(dataloader, desc="Running validation", leave=False):
            images = images.to(self.device)
            outputs = self.model(images)
            predicts = _filter_predictions(outputs, self.score_threshold, self.nms_threshold,
                                           max_boxes_per_cls=self.max_boxes_per_cls, return_type="torch")
            metric.update(predicts, targets)

        return metric.compute()["map"].item()

    def plot_loss(self, row_figsize=3):
        nrows = 2 if self.val_metric else 1
        _, ax = plt.subplots(nrows, 1, figsize=(12, row_figsize*nrows), tight_layout=True)
        ax = np.array([ax]) if not isinstance(ax, np.ndarray) else ax

        ax[0].plot(self.batch_loss, label="Train batch Loss", color="tab:blue")
        ax[0].plot(np.arange(1, len(self.batch_loss)+1, len(self.train_dataloader)),
                   self.epoch_loss, color="tab:orange", label="Train epoch Loss")
        ax[0].grid()
        ax[0].set_title("Train Loss")
        ax[0].set_xlabel("Iterations")
        ax[0].set_ylabel("Loss")

        if self.val_metric:
            ax[1].plot(np.arange(self.val_every, len(self.batch_loss)+1,
                                 len(self.val_dataloader) * self.val_every),
                       np.array(self.val_metric) * 100, color="tab:green", label="Validation mAP")
            ax[1].grid()
            ax[1].set_title("Validation mAP")
            ax[1].set_xlabel("Iterations")
            ax[1].set_ylabel("mAP (%)")

        plt.legend()
        plt.show()


def _filter_predictions(predictions, score_threshold=0.1, nms_threshold=0.5,
                        max_boxes_per_cls=8, return_type="list"):
    bboxes, confidences, cls_probs = predictions
    all_final_scores = confidences[:, :, None] * cls_probs
    num_classes = cls_probs.shape[-1]
    final_predictions = []

    for boxes, final_scores in zip(bboxes, all_final_scores):
        preds = {"boxes": [], "labels": [], "scores": []}

        for cls in range(num_classes):
            cls_scores = final_scores[:, cls]
            keep_ixs = cls_scores > score_threshold
            if keep_ixs.sum() == 0:
                continue

            cls_boxes = boxes[keep_ixs]
            cls_scores = cls_scores[keep_ixs]

            if len(cls_boxes) > max_boxes_per_cls:
                pos = torch.argsort(cls_scores, descending=True)
                cls_boxes = cls_boxes[pos[:max_boxes_per_cls]]
                cls_scores = cls_scores[pos[:max_boxes_per_cls]]

            boxes_xyxy = cls_boxes.clone()
            boxes_xyxy[:, 2:] = boxes_xyxy[:, :2] + boxes_xyxy[:, 2:]
            pred_ixs = nms(boxes_xyxy, cls_scores, nms_threshold)

            for ix in pred_ixs:
                preds["boxes"].append(cls_boxes[ix].cpu().tolist())
                preds["labels"].append(cls)
                preds["scores"].append(cls_scores[ix].item())

        if return_type == "torch":
            for key, item in preds.items():
                preds[key] = torch.tensor(item)
        final_predictions.append(preds)

    return final_predictions


In [228]:
runner = Runner(
    model=model,
    compute_loss=compute_loss,
    optimizer=optimizer,
    train_dataloader=train_loader,
    # напрямую передаем функцию TAL_assigner без kwargs
    assign_target_method=lambda anchors, gt_boxes, gt_labels, num_classes: tal_assigner(
        anchors,  # pred_boxes
        torch.ones(anchors.size(0), dtype=torch.float, device=anchors.device),  # pred_scores (здесь можно потом переделать под реальные предсказания)
        gt_boxes,
        gt_labels,
        alpha=0.5,
        beta=6.0,
        topk=10
    ),
    device=device,
    scheduler=scheduler,
    val_every=1,
    score_threshold=0.2,
    nms_threshold=0.5,
    max_boxes_per_cls=10,
    val_dataloader=test_loader
)

print("Runner собран")


Runner собран


In [233]:
import torch
from torch.utils.data import DataLoader

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
model.train()

num_epochs = 10

for epoch in range(1, num_epochs + 1):
    epoch_loss = 0.0

    for images, targets in train_loader:
        images = images.to(device)
        # targets: list of dicts {"boxes": tensor, "labels": tensor}
        gt_boxes = [t['boxes'].to(device) for t in targets]
        gt_labels = [t['labels'].to(device) for t in targets]

        optimizer.zero_grad()
        cls_outs, reg_outs, obj_outs = model(images)  # Detector output

        batch_loss = 0.0
        for i in range(images.size(0)):
            positive_mask, assigned_gt_idx, alignment_metric = tal_assigner(
                pred_boxes=reg_outs[i],      # регрессированные боксы
                pred_scores=obj_outs[i],      # объектный скор
                gt_boxes=gt_boxes[i],
                gt_labels=gt_labels[i]
            )

            # compute_loss должен быть функцией, которая принимает pred и assigned_target
            # например:
            loss = compute_loss(
                preds=(cls_outs[i], reg_outs[i], obj_outs[i]),
                assigned_targets=(positive_mask, assigned_gt_idx, alignment_metric)
            )
            batch_loss += loss

        batch_loss = batch_loss / images.size(0)
        batch_loss.backward()
        optimizer.step()

        epoch_loss += batch_loss.item()

    print(f"Epoch {epoch} | Loss: {epoch_loss / len(train_loader):.4f}")


RuntimeError: The size of tensor a (16) must match the size of tensor b (2) at non-singleton dimension 4

In [None]:
runner.plot_loss(row_figsize=4)

Ниже определена вспомогательная функция для валидации качества. Можете использовать `Runner.validate`. Важное уточнение, ей нужен метод для фильтрации предсказаний. Можете тоже скопировать его из семинара, если он у вас не менялся.

In [None]:
from torchmetrics.detection import MeanAveragePrecision

@torch.no_grad()
def validate(dataloader, filter_predictions_func, box_format="xyxy", device="cpu", score_threshold=0.1, nms_threshold=0.5, **kwargs):
    """ Метод для валидации модели.
    Возвращает mAP (0.5 ... 0.95).
    """
    self.model.eval()
    # Считаем метрику mAP с помощью функции из torchmetrics
    metric = MeanAveragePrecision(box_format=box_format, iou_type="bbox")
    for images, targets in tqdm(dataloader, desc="Running validation", leave=False):
        images = images.to(device)
        outputs = self.model(images)
        predicts = filter_predictions_func(outputs, score_threshold, nms_threshold, **kwargs)
        metric.update(predicts, targets)
    return metric.compute()["map"].item()
