# Исследование и оценка методов и архитектур на задаче детекции дорожных знаков

Код был взят из семинара по детекции курса DLS. Это модифицированная под RTSD версия

In [4]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

import gc
import os
import math
import json
from functools import partial
from collections import Counter, defaultdict

import io
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import timm
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from PIL import Image
from tqdm.auto import tqdm
from torchvision import transforms, models
from torchvision.ops import nms, box_iou
from torchvision.models.detection.anchor_utils import AnchorGenerator
from torchvision.models.feature_extraction import create_feature_extractor, get_graph_node_names

from torchmetrics.detection import MeanAveragePrecision

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


## Подготовка данных

In [5]:
os.listdir("/kaggle/input/")

['rtsd-small']

In [6]:
dataset_path = "/kaggle/input/rtsd-small"
anno_path = dataset_path + "/full_gt_reduced.csv"
image_dir = dataset_path + "/rtsd-dataset-reduced/rtsd-dataset-reduced/rtsd-frames"
# label_map = dataset_path + "/rtsd-dataset-reduced/rtsd-dataset-reduced/label_map.json"

# Временно отфильтруем классы
four_classes = ("6_16", )  # , "5_19_1", "2_1", "2_4"

# with open(label_map, 'r') as f:
    # label_map = json.load(f)
label_map = {cls: i for i, cls in enumerate(four_classes)}  # DEBUG
label_map = {k: v for k, v in label_map.items()}  # не забывай, что классы начинаются с 0!!!
label_map_reverse = {v: k for k, v in label_map.items()}

class_to_color = [tuple(np.random.choice(range(256), size=3)) for _ in range(len(label_map))]

df = pd.read_csv(anno_path)

# Фильтруем классы
df = df[df["sign_class"].isin(label_map.keys())]

# Переводим колонки координат в списки
df['bboxes'] = df[['x_from', 'y_from', 'width', 'height']].apply(lambda row: [row['x_from'], row['y_from'], row['width'], row['height']], axis=1)

# Переводим названия знаков в их индексы
df['sign_class'] = df[['sign_class']].apply(lambda row: label_map[row['sign_class']], axis=1)

# Удаляем ненужные колонки
df = df.drop(
    columns=["Unnamed: 0", "is_train", 'x_from', 'y_from', 'width', 'height', "sign_id"],
    errors="ignore"
)

df = df.rename(columns={"sign_class": "labels"})

# Группируем разметку разных знаков в одну запись
df = df.groupby('filename').agg({
    'labels': lambda x: list(x),
    'bboxes': lambda x: list(x)
}).reset_index()

In [7]:
from skmultilearn.model_selection import IterativeStratification

def stratified_train_test_split(df, test_size=0.3, random_state=None):
    # Create multi-hot encoded labels
    all_sign_ids = sorted(set(sign_id for sublist in df['labels'] for sign_id in sublist))
    y = np.zeros((len(df), len(all_sign_ids)), dtype=int)
    
    for i, sign_ids in enumerate(df['labels']):
        for sign_id in sign_ids:
            col_idx = all_sign_ids.index(sign_id)
            y[i, col_idx] = 1

    stratifier = IterativeStratification(
        n_splits=2,
        order=1,
        sample_distribution_per_fold=[1-test_size, test_size],
        random_state=random_state
    )
    test_indices, train_indices = next(stratifier.split(df.index.values, y))

    return df.iloc[train_indices], df.iloc[test_indices]

def verify_stratification(df_train, df_test):
    # Посчитать появление каждого класса
    all_sign_ids = sorted(set(sign_id for sublist in pd.concat([df_train, df_test])['labels'] 
                          for sign_id in sublist))
    
    results = []
    for sign_id in all_sign_ids:
        train_count = sum(sign_id in signs for signs in df_train['labels'])
        test_count = sum(sign_id in signs for signs in df_test['labels'])
        total = train_count + test_count
        
        if total > 0:
            train_pct = (train_count / total) * 100
            test_pct = (test_count / total) * 100
            results.append((sign_id, f"{train_pct:.1f}% / {test_pct:.1f}%", train_count, test_count))
    
    # Create comparison report
    report = pd.DataFrame(results, 
                         columns=['labels', 'train% / test%', 'train_count', 'test_count'])
    
    print("Stratification Verification Report:")
    print(report.to_string(index=False))

In [8]:
df_train, df_valid = stratified_train_test_split(df, test_size=0.2, random_state=None)

In [9]:
print(df_train.shape)
df_train.head()

(42, 3)


Unnamed: 0,filename,labels,bboxes
1,autosave02_10_2012_11_55_25_2.jpg,[0],"[[194, 260, 49, 21]]"
3,autosave02_10_2012_12_06_34_0.jpg,[0],"[[718, 264, 54, 24]]"
5,autosave02_10_2012_12_07_52_1.jpg,[0],"[[1174, 191, 105, 46]]"
7,autosave02_10_2012_12_55_36_3.jpg,[0],"[[844, 172, 103, 36]]"
9,autosave09_10_2012_10_10_03_3.jpg,[0],"[[881, 332, 40, 19]]"


Создаем датасет для предобработки данных

In [10]:
class RTSDDataset(Dataset):
    def __init__(self, dataframe, image_dir, transform=None):
        self.image_dir = image_dir
        self.data = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        """Загружаем данные и разметку для объекта с индексом `idx`.

        labels: List[int] Набор классов для каждого ббокса,
        boxes: List[List[int]] Набор ббоксов в формате (x_min, y_min, w, h).
        """
        row = self.data.iloc[idx]
        image_name = row['filename']
        image_path = os.path.join(self.image_dir, image_name)
        image = Image.open(image_path).convert("RGB")
        image = np.array(image)  # uint8 (0-255) HWC формат

        target = {}
        target["image_id"] = idx

        if self.transform is not None:
            transformed = self.transform(image=image, bboxes=row["bboxes"], labels=row["labels"])
            image, boxes, labels = transformed["image"], transformed["bboxes"], transformed["labels"]
        else:
            image = transforms.ToTensor()(image)

        # Get dimensions (works for both tensor and numpy)
        if isinstance(image, torch.Tensor):
            _, h, w = image.shape
        else:
            h, w = image.shape[:2]

        # Нормализуем координаты ббоксов к [0, 1]
        # normalized_bboxes = []

        # for bbox in boxes:
        #     x_min, y_min, width, height = bbox

        #     # Normalize coordinates
        #     x_min_norm = x_min / w
        #     y_min_norm = y_min / h
        #     width_norm = width / w
        #     height_norm = height / h

        #     normalized_bboxes.append([x_min_norm, y_min_norm, width_norm, height_norm])

        # normalized_bboxes = np.array(normalized_bboxes, dtype=np.float32)
        
        # boxes = normalized_bboxes
        target['boxes'] = torch.tensor(np.array(boxes), dtype=torch.float32)
        target['labels'] = torch.tensor(labels, dtype=torch.int64)

        return image, target

def collate_fn(batch):
    batch = tuple(zip(*batch))
    images = torch.stack(batch[0])
    return images, batch[1]

In [11]:
def compute_dataset_statistics(dataset, batch_size=16):
    """Compute mean and std of dataset in 0-255 range"""
    loader = DataLoader(
        dataset, 
        batch_size=batch_size, 
        shuffle=False,
        collate_fn=lambda x: tuple(zip(*x))  # Simple collate for images
    )
    
    channel_sum = np.zeros(3)
    channel_sq_sum = np.zeros(3)
    total_pixels = 0
    
    for batch in tqdm(loader, desc="Computing statistics"):
        images = batch[0]  # List of numpy images (H, W, 3)

        images = list(images)
        for i in range(len(images)):
            # Convert to tensor if not already
            if not isinstance(images[i], torch.Tensor):
                # Convert numpy (H,W,C) to tensor (C,H,W)
                images[i] = torch.from_numpy(images[i]).permute(2, 0, 1).float()

        # Stack images and convert to float64 for precision
        batch_array = np.stack(images).astype(np.float32)
        batch_array = np.transpose(batch_array, (0, 2, 3, 1))
        b, h, w, c = batch_array.shape
        
        # Sum across batch and spatial dimensions
        channel_sum += batch_array.sum(axis=(0, 1, 2))
        channel_sq_sum += (batch_array ** 2).sum(axis=(0, 1, 2))
        total_pixels += b * h * w

    # Calculate final statistics
    mean = channel_sum / total_pixels
    std = np.sqrt(channel_sq_sum / total_pixels - mean ** 2)
    
    return mean.tolist(), std.tolist()

In [12]:
# stats_transform = A.Compose([
#     A.Resize(height=img_height, width=img_width),
#     # ToTensorV2()
# ], bbox_params=A.BboxParams(format='coco', label_fields=['labels']))

# # Создать датасет для расчета статистики
# stats_dataset = RTSDDataset(
#     df_train,  # Use full training data
#     image_dir,
#     transform=stats_transform
# )

In [13]:
# mean, std = compute_dataset_statistics(stats_dataset)
# mean = np.array(mean) / 255
# std = np.array(std) / 255

# print(f"Mean: {mean}")
# print(f"Std:  {std}")

In [14]:
img_width = 640
img_height = 640

mean = (0.485, 0.456, 0.406)  # Статистика по датасету ImageNet, не отличается от моего
std = (0.229, 0.224, 0.225)

train_transform = A.Compose([
    A.Resize(height=img_height, width=img_width),
    A.Normalize(mean=mean, std=std),
    ToTensorV2(),
], bbox_params=A.BboxParams(format='coco', label_fields=['labels']))

test_transform = A.Compose([
    A.Resize(height=img_height, width=img_width),
    A.Normalize(mean=mean, std=std),
    ToTensorV2(),
], bbox_params=A.BboxParams(format='coco', label_fields=['labels']))

In [15]:
batch_size = 2

train_dataset = RTSDDataset(df_train, image_dir, transform=train_transform)
valid_dataset = RTSDDataset(df_valid, image_dir, transform=test_transform)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

Посмотрим на некоторые статистики по датасету

In [16]:
def train_info(train_dataloader):
    """ Считаем статистики по датасету и рисуем распределение размеров ббоксов по классам. """
    image_min_box_count, image_max_box_count, image_mean_box_count = float('inf'), 0, 0
    total_box_count = 0
    total_image_count = 0
    objects_per_class = {k: 0 for k, v in label_map_reverse.items()}
    bboxes_sizes = defaultdict(list)
    labels = set()
    for images, targets in tqdm(train_dataloader, desc="Train Dataset Info"):
        for i in range(images.shape[0]):
            bboxes = targets[i]["boxes"]
            categories = targets[i]["labels"]

            # Переводим элементы списка из тензоров в скаляры
            categories = [category.item() for category in categories]

            total_box_count += len(bboxes)
            total_image_count += len(images)

            image_min_box_count = min(len(bboxes), image_min_box_count)
            image_max_box_count = max(len(bboxes), image_min_box_count)
        
            for bb, cls in zip(bboxes, categories):
                labels.add(cls)
                
                objects_per_class[cls] += 1

                bboxes_sizes[cls].append(list(bb[2:]))

    bboxes_sizes = dict(sorted(bboxes_sizes.items()))

    print(f"Min bboxes per image: {image_min_box_count}")
    print(f"Max bboxes per image: {image_max_box_count}")
    print(f"Mean bboxes per image: {total_box_count / total_image_count}")
    print("\n\n")

    msg = [f"{label_map_reverse[k]} : {v}" for k, v in objects_per_class.items()]
    print("Number of object per class:\n" + "\n".join(msg))
    print("\n")

    print("\nMean bbox size per class:")
    for cls, boxes_list in bboxes_sizes.items():
        print(f"{label_map_reverse[cls]} : {np.mean(boxes_list, axis=0)}")

    _, axes = plt.subplots(1, 2, figsize=(12, 4))
    x_boxes = [np.array(val)[:, 0] for val in bboxes_sizes.values()]
    y_boxes = [np.array(val)[:, 1] for val in bboxes_sizes.values()]

    for ax, box, direction in zip(axes, [x_boxes, y_boxes], ["width", "height"]):
        bplot = ax.boxplot(box, patch_artist=True, labels=labels)
        for patch, color in zip(bplot["boxes"], class_to_color):
            patch.set_facecolor(np.array(color) / 255)
        ax.set_ylabel(f"Bbox size by {direction}")
        ax.set_title(f"Bboxes distribution per class by {direction}")

# box_sizes = train_info(train_dataloader)

In [17]:
# Возьмем один батч
batch_index, i = 1, 0
for images, targets in train_dataloader:
    i += 1
    if i == batch_index:
        break

In [18]:
def denormalize(image_tensor, mean, std):
    """Обратная нормализация для отображения изображения"""
    mean = torch.tensor(mean).view(3, 1, 1)
    std = torch.tensor(std).view(3, 1, 1)
    return image_tensor * std + mean

# Вспомогательные функции для отрисовки данных
def add_bbox(image, box, label='', color=(128, 128, 128), txt_color=(0, 0, 0), lw=2, normalized=False, title=""):
    x_min, y_min, width, height = box
    
    if normalized:
        h, w = image.shape[:2]
        x1 = int(x_min * w)
        y1 = int(y_min * h)
        x2 = int((x_min + width) * w)
        y2 = int((y_min + height) * h)
    else:
        x1 = int(x_min)
        y1 = int(y_min)
        x2 = int(x_min + width)
        y2 = int(y_min + height)

    lw = int(max(round(sum(image.shape) / 2 * 0.003), lw))
    
    h, w = image.shape[:2]

    color = tuple(map(int, color))
    
    cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness=lw, lineType=cv2.LINE_AA)

    if label:
        tf = max(lw - 1, 1)
        w, h = cv2.getTextSize(label, 0, fontScale=lw / 3, thickness=tf)[0]
        outside = y1 - h >= 3
        x2, y2 = x1 + w, y1 - h - 3 if outside else y1 + h + 3

        cv2.rectangle(image, (x1, y1), (x2, y2), color, -1, cv2.LINE_AA)
        cv2.putText(
            image,
            label,
            (x1, y1 - 2 if outside else y1 + h + 2),
            0,
            lw / 3,
            txt_color,
            thickness=tf,
            lineType=cv2.LINE_AA
        )

    return image

def plot_examples(images, targets=None, predictions=None, image_norm=None, indices=None, num_examples=6, row_figsize=(12, 3), title=""):
    if indices is None:
        indices = np.random.choice(len(df), size=num_examples, replace=False)
    else:
        num_examples = len(indices)

    ncols = min(num_examples, 3)
    nrows = math.ceil(num_examples / 3)
    
    fig, axes = plt.subplots(nrows, ncols, figsize=(row_figsize[0], row_figsize[1] * nrows), tight_layout=True)
    axes = axes.reshape(-1)

    if title:
        fig.suptitle(title, fontsize=16, y=1.05)

    for i, ax in zip(range(min(len(images), num_examples)), axes):
        image = images[i].clone().detach().cpu()
        
        # 1. Денормализуем изображение
        if image_norm:
            mean, std = image_norm
            image = denormalize(image, mean, std)
        
        # 2. Переводим в формат для OpenCV
        image = image.mul(255).clamp(0, 255).permute(1, 2, 0).numpy()
        image = image.astype(np.uint8)
        image_bgr = np.ascontiguousarray(image[..., ::-1])  # RGB -> BGR

        if targets:
            bboxes = targets[i]["boxes"]
            classes = targets[i]["labels"]
            for bbox, label in zip(bboxes, classes):
                color = class_to_color[label]
                class_name = label_map_reverse[label.item()]
                img = add_bbox(image_bgr, bbox, label=str(class_name), color=(255, 0, 0), lw=2)

        if predictions:
            preds = predictions[i]
            for bbox, label, score in zip(preds["boxes"], preds["labels"], preds["scores"]):
                color = class_to_color[label]
                if isinstance(label, torch.Tensor):
                    label = label.item()
                label = label_map_reverse[label]
                img = add_bbox(img, bbox, label=f"Class {label}: {score:.2f}", color=(0, 0, 255))

        # Переводим в rgb обратно для matplotlib
        image_rgb = image_bgr[..., ::-1]
        ax.imshow(image_rgb)
        ax.set_title(f"Image id: {i}")
        ax.set_xticks([])
        ax.set_yticks([])

In [19]:
# plot_examples(images, targets=targets, image_norm=(mean, std), num_examples=min(len(images), 3), title="Image samples")

## Архитектура модели

### Backbone

In [20]:
pretrained_model = models.efficientnet_b7(weights='DEFAULT')

Downloading: "https://download.pytorch.org/models/efficientnet_b7_lukemelas-c5b4e57e.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b7_lukemelas-c5b4e57e.pth
100%|██████████| 255M/255M [00:01<00:00, 173MB/s]  


In [21]:
return_nodes = {
    '0': '0',
    '1.0.block.2': '1',
    '2.1.add': '2',
    '3.1.add': '3',
    '4.2.add': '4',
    '5.2.add': '5',
    '6.3.add': '6',
    '7.0.block.3': '7',
    '8': '8',
}

In [22]:
class Backbone(nn.Module):
    def __init__(self, pretrained_model, return_nodes, unfreeze_block=None):
        super().__init__()
        self.backbone = pretrained_model.features

        for param in self.backbone.parameters():
            param.requires_grad = False

        if unfreeze_block is not None:
            for param in self.backbone[-unfreeze_block:].parameters():
                param.requires_grad = True

        self.backbone = create_feature_extractor(
            self.backbone, return_nodes=return_nodes)

    def forward(self, x):
        return self.backbone(x)

### Neck

In [23]:
class ConvTransposeBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = nn.ConvTranspose2d(
            in_channels, out_channels, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.bn = nn.BatchNorm2d(out_channels)
        self.silu = nn.SiLU()

    def forward(self, x):
        return self.silu(self.bn(self.conv(x)))

In [24]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, out_channels,
                              kernel_size=3, stride=1, padding=1)
        self.bn = nn.BatchNorm2d(out_channels)
        self.silu = nn.SiLU()

    def forward(self, x):
        return self.silu(self.bn(self.conv(x)))

In [25]:
class Neck(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv0 = ConvBlock(2560, 320)
        self.conv1 = ConvBlock(960, 192)
        self.conv2 = ConvBlock(576, 112)
        self.conv_transpose1 = ConvTransposeBlock(112, 112)
        self.conv3 = ConvBlock(336, 80)
        self.conv4 = ConvBlock(240, 40)

    def forward(self, x, encoder_outputs):
        x = self.conv0(x)
        x = torch.cat([x, encoder_outputs[7]], dim=1)
        x = self.conv1(x)
        x = torch.cat([x, encoder_outputs[6]], dim=1)
        x = self.conv2(x)
        x = self.conv_transpose1(x)
        x = torch.cat([x, encoder_outputs[5]], dim=1)
        x = self.conv3(x)
        x = torch.cat([x, encoder_outputs[4]], dim=1)
        x = self.conv4(x)

        return x

### Head

In [26]:
class DetectionHead(nn.Module):
    def __init__(self, in_channels, num_anchors, num_classes):
        super().__init__()
        self.neck = nn.Conv2d(in_channels, in_channels,
                              kernel_size=1, padding=0)
        self.cls_head = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1),
            nn.SiLU(),
            nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1),
            nn.SiLU(),
            nn.Conv2d(in_channels, num_anchors *
                      num_classes, kernel_size=1, padding=0),
        )
        self.bbox_neck = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1),
            nn.SiLU(),
            nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1),
            nn.SiLU(),
        )
        self.reg_head = nn.Conv2d(
            in_channels, num_anchors * 4, kernel_size=1, padding=0)
        self.obj_head = nn.Conv2d(
            in_channels, num_anchors * 1, kernel_size=1, padding=0)

        self.num_classes = num_classes

    def forward(self, x):
        x = F.silu(self.neck(x))

        cls_logits = self.cls_head(x)

        bbox_neck = self.bbox_neck(x)

        bbox_reg = self.reg_head(bbox_neck)
        confidence_score = self.obj_head(bbox_neck)

        return bbox_reg, confidence_score, cls_logits

### Detector

In [27]:
class Detector(nn.Module):
    def __init__(self, pretrained_model,
                 return_nodes,
                 unfreeze_block,
                 input_size=(640, 640),
                 anchor_sizes=(32, 64, 128),
                 anchor_ratios=(0.5, 1.0, 2.0),
                 num_classes=4):
        super().__init__()

        self.stride = 16 # во сколько раз уменьшаеться фиче мапа после backbone
        feature_map_size = (input_size[0] // self.stride, input_size[1] // self.stride)
        print("feature_map_size", feature_map_size)
        self.backbone = Backbone(
            pretrained_model, return_nodes, unfreeze_block
        )
        self.neck = Neck()
        self.head = DetectionHead(
            40, len(anchor_sizes) * len(anchor_ratios), num_classes
        )

        anchor_generator = AnchorGenerator(
            sizes=(anchor_sizes, ), aspect_ratios=(anchor_ratios, ))
        anchors = anchor_generator.grid_anchors([feature_map_size], strides=[[self.stride, self.stride]])
        anchors = torch.stack(anchors, dim=0)
        print(anchors.shape)
        
        anchors_xy = anchors[:, :, :2]
        anchor_sizes = (anchors[:, :, 2:] - anchors[:, :, :2])

        self.register_buffer("anchors", anchors)
        self.register_buffer("anchors_xy", anchors_xy)
        self.register_buffer("anchor_sizes", anchor_sizes)

    def forward(self, x):
        x = self.backbone(x)

        encoder_outputs = [x['0'], x['1'], x['2'], x['3'],
                           x['4'], x['5'], x['6'], x['7'], x['8']]
        x = self.neck(x['8'], encoder_outputs)
        bbox_reg, confidence_score, cls_logits = self.head(x)

        N = x.shape[0]
        cls_logits = cls_logits.permute(0, 2, 3, 1).contiguous()
        cls_logits = cls_logits.view(N, -1, self.head.num_classes)
        bbox_reg = bbox_reg.permute(0, 2, 3, 1).contiguous()
        bbox_reg = bbox_reg.view(N, -1, 4)
        confidence_score = confidence_score.permute(0, 2, 3, 1).contiguous()
        confidence_score = confidence_score.view(N, -1)

        if self.training:
            # В процессе тренировки возвращаем просто смещения и логиты
            return bbox_reg, confidence_score, cls_logits

        bbox = self.decode_bboxes(bbox_reg)
        confidence_score = torch.sigmoid(confidence_score)
        cls_probs = torch.softmax(cls_logits, dim=-1)

        return bbox, confidence_score, cls_probs

    def decode_bboxes(self, bbox_reg):
        anchor_center_x = (self.anchors[:, :, 0] + self.anchors[:, :, 2]) / 2
        anchor_center_y = (self.anchors[:, :, 1] + self.anchors[:, :, 3]) / 2
        anchor_width = self.anchors[:, :, 2] - self.anchors[:, :, 0]
        anchor_height = self.anchors[:, :, 3] - self.anchors[:, :, 1]
        
        # Предсказанные смещения якорей
        tx = bbox_reg[:, :, 0]
        ty = bbox_reg[:, :, 1]
        tw = bbox_reg[:, :, 2]
        th = bbox_reg[:, :, 3]
        
        x_center = anchor_center_x + torch.sigmoid(tx) * anchor_width
        y_center = anchor_center_y + torch.sigmoid(ty) * anchor_height
        w = torch.exp(tw) * anchor_width
        h = torch.exp(th) * anchor_height
        
        # Переводим в [x_min, y_min, w, h] формат
        x_min = x_center - w / 2
        y_min = y_center - h / 2
        return torch.stack([x_min, y_min, w, h], dim=-1)

## Label assignment

In [28]:
def safe_logit(x):
    """ Безопасная logit-функция(обратная сигмоиде) без деления на ноль. """
    eps = 1e-6
    x = torch.clamp(x, eps, 1 - eps)
    return torch.log(x / (1 - x))


def get_target_offset(anchor_box, gt_box):
    """ Расчитываем таргет как желаемые смещения от якорей до GT.

    anchor_box: torch.Tensor в формате (x_min, y_min, x_max, y_max),
    gt_box: torch.Tensor в формате (x_min, y_min, x_max, y_max).
    """
    # Конвертируем GT в формат (x_center, y_center), (w, h)
    gt_center = (gt_box[:2] + gt_box[2:]) / 2
    gt_size = gt_box[2:] - gt_box[:2]

    # Конвертируем якоря в формат (x_center, y_center), (w, h)
    anchor_center = (anchor_box[:2] + anchor_box[2:]) / 2
    anchor_size = anchor_box[2:] - anchor_box[:2]

    # Вычисляем значения смещений для положительных ббоксов
    tx = (gt_center[0] - anchor_center[0]) / anchor_size[0]
    ty = (gt_center[1] - anchor_center[1]) / anchor_size[1]
    target_tx = safe_logit(tx)
    target_ty = safe_logit(ty)

    target_tw = torch.log(gt_size[0] / anchor_size[0])
    target_th = torch.log(gt_size[1] / anchor_size[1])
    return torch.tensor([target_tx, target_ty, target_tw, target_th]).to(anchor_box.device)

In [38]:
def assign_target(images, anchors, gt_boxes, gt_labels, num_classes, pos_th=0.6, neg_th=0.3, image_side_size=640):
    num_anchors = anchors.shape[0]

    target_objectness = torch.zeros(num_anchors, device=anchors.device)
    target_offsets = torch.zeros((num_anchors, 4), device=anchors.device)
    target_cls = torch.zeros((num_anchors, num_classes), device=anchors.device)
    
    # Если на изображении нет объектов, возвращаем пустые списки
    if gt_boxes.numel() == 0:
        return target_offsets, target_objectness, target_cls
        
    # box_iou работает с форматом ббоксов (x_min, y_min, x_max, y_max)
    # Якоря находятся в нужном формате, а GT - нет, тк имеет формат (x_min, y_min, w, h)
    # Переведем GT боксы в нужный формат
    gt_xyxy = gt_boxes.clone()
    gt_xyxy[:, 2:] = gt_xyxy[:, :2] + gt_xyxy[:, 2:]

    # Считаем iou между всеми якорями и всеми GT
    ious = box_iou(anchors, gt_xyxy)  # [num_anchors, num_gt]
    # Находим самый оптимальный GT для каждого якоря
    best_iou, best_gt_idx = ious.max(dim=1)

    # Отмечаем якоря, которые будут пропущены при расчете лосса
    ignore_mask = (best_iou >= neg_th) & (best_iou < pos_th)
    ignore_indices = ignore_mask.nonzero(as_tuple=True)[0]
    target_objectness[ignore_mask] = -1

    # Отмечаем якоря, для которых будет считаться локализационный лосс
    pos_mask = best_iou >= pos_th
    pos_indices = pos_mask.nonzero(as_tuple=True)[0]
    
    # Отмечаем якоря, для которых будет считаться конфиденс лосс
    neg_mask = best_iou < pos_th
    neg_indices = neg_mask.nonzero(as_tuple=True)[0]

    # print("Number of gt:", len(gt_xyxy))
    # print("Positive anchors:", len(pos_indices))
    # print("Negative anchors:", len(neg_indices))
    # print("Ignored anchors:", len(ignore_indices))
    # print("Max IoU:", best_iou.max().item(), "Mean IoU:", best_iou.mean().item())

    for pos in pos_indices:
        gt_idx = best_gt_idx[pos]
        gt_box = gt_xyxy[gt_idx]
        anchor_box = anchors[pos]
        # print(gt_box, anchor_box)
        # plot_examples(images, targets=targets, image_norm=(mean, std), predictions=predicts, num_examples=min(batch_size, 3))

        target_offsets[pos] = get_target_offset(anchor_box, gt_box)
        target_objectness[pos] = 1
        target_cls[pos, gt_labels[gt_idx]] = 1

    # Присваиваем предсказание с самым большим IoU для GT
    # у которых не нашлось ни оного предсказания
    for gt_idx in range(gt_xyxy.shape[0]):
        if not ((target_objectness == 1) & (best_gt_idx == gt_idx)).any():
            best_anchor_idx = torch.argmax(ious[:, gt_idx])
            target_offsets[best_anchor_idx] = get_target_offset(
                anchors[best_anchor_idx], gt_xyxy[gt_idx])
            target_objectness[best_anchor_idx] = 1
            target_cls[best_anchor_idx, gt_labels[gt_idx]] = 1
    return target_offsets, target_objectness, target_cls

### Функция потерь

In [39]:
class ComputeLoss:

    def __init__(self,
                 bbox_loss=None, obj_loss=None, cls_loss=None,
                 weight_bbox=5, weight_obj=1, weight_cls=1
                 ):
        self.bbox_loss = nn.SmoothL1Loss() if bbox_loss is None else bbox_loss
        self.obj_loss = nn.BCEWithLogitsLoss() if obj_loss is None else obj_loss
        self.cls_loss = nn.BCEWithLogitsLoss() if cls_loss is None else cls_loss
        self.weight_bbox = weight_bbox
        self.weight_obj = weight_obj
        self.weight_cls = weight_cls

    def __call__(self, predicts, targets):
        pred_offsets, pred_obj_logits, pred_cls_logits = predicts
        target_boxes, target_obj, target_cls = targets
        # Confidence score считается только для предсказаний соотв отрицательным и положительным якорям
        valid_mask = target_obj != -1
        loss_obj = self.obj_loss(
            pred_obj_logits[valid_mask], target_obj[valid_mask])

        # Локализационная и классификационные части считаются для предсказаинй соотв положительным якорям
        pos_mask = target_obj == 1
        if pos_mask.sum() > 0:
            loss_cls = self.cls_loss(
                pred_cls_logits[pos_mask], target_cls[pos_mask])
            loss_bbox = self.bbox_loss(
                pred_offsets[pos_mask], target_boxes[pos_mask])
        else:
            loss_cls = torch.tensor(0.0, device=pred_offsets.device)
            loss_bbox = torch.tensor(0.0, device=pred_offsets.device)
        return self.weight_bbox * loss_bbox + self.weight_obj * loss_obj + self.weight_cls * loss_cls

## Обучение детектора

In [40]:
class EarlyStopping:
    """
    A utility class for early stopping during model training.
    Monitors validation loss to halt training when the model stops improving.

    Attributes:
        patience (int): Number of epochs to wait for improvement before stopping.
        delta (float): Minimum change in the monitored quantity to qualify as an improvement.
    """

    def __init__(self, patience: int = 5, delta: float = 0) -> None:
        self.patience = patience
        self.delta = delta
        self.best_score = None
        self.early_stop = False
        self.counter = 0
        self.best_model_state = None

    def __call__(self, val_loss: int, model: nn.Module) -> None:
        """
        Update the early stopping criteria based on the validation loss and the current model state.

        Args:
            val_loss (float): The current validation loss.
            model (nn.Module): The model instance to store the state of.

        Returns:
            None
        """
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.best_model_state = model.state_dict()
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.best_model_state = model.state_dict()
            self.counter = 0

    def load_best_model(self, model: nn.Module) -> None:
        """
        Load the best model state into the specified model.

        Args:
            model (nn.Module): The model instance to load the state into.

        Returns:
            None
        """
        model.load_state_dict(self.best_model_state)

In [50]:
class Runner:

    def __init__(self, model, early_stopping, compute_loss, optimizer, train_dataloader, assign_target_method, device=None,
                 scheduler=None, assign_target_kwargs=None,
                 val_dataloader=None, val_every=1, score_threshold=0.1, nms_threshold=0.5, max_boxes_per_cls=8):
        self.model = model
        self.early_stopping = early_stopping
        self.compute_loss = compute_loss
        self.optimizer = optimizer
        self.train_dataloader = train_dataloader
        assign_target_kwargs = {} if assign_target_kwargs is None else assign_target_kwargs
        self.assign_target_method = partial(
            assign_target_method, **assign_target_kwargs)
        self.device = "cpu" if device is None else device
        self.scheduler = scheduler

        # Валидационные параметры
        self.val_dataloader = val_dataloader
        self.val_every = val_every
        self.score_threshold = score_threshold
        self.nms_threshold = nms_threshold
        self.max_boxes_per_cls = max_boxes_per_cls

        # Вспомогательные массивы
        self.train_batch_loss = []
        self.val_batch_loss = []
        self.epoch_loss = []
        self.val_metric = []

    def _run_train_epoch(self, dataloader, verbose=True):
        """ Обучить модель одну эпоху на данных из `dataloader` """
        self.model.train()
        batch_loss = []
        for images, targets in (pbar := tqdm(dataloader, desc=f"Process train epoch", leave=False)):
            images = images.to(self.device)
            outputs = self.model(images)

            anchors = self.model.anchors.view(-1, 4)
            accum_loss = 0.0
            for ix in range(images.shape[0]):
                gt_boxes = targets[ix]['boxes'].to(self.device)
                gt_labels = targets[ix]['labels'].to(self.device)
                # выбираем какие якоря будут использоваться при расчете лосса.
                assigned_targets = self.assign_target_method(images, anchors, gt_boxes, gt_labels,
                                                             num_classes=self.model.head.num_classes)
                # Считаем лосс на основании предсказаний модели и таргетов.
                outputs_ixs = [out[ix] for out in outputs]
                loss = self.compute_loss(outputs_ixs, assigned_targets)
                accum_loss += loss
            accum_loss = accum_loss / images.shape[0]
            batch_loss.append(accum_loss.cpu().detach().item())

            # Делаем шаг оптимизатора после расчета лосса для всех элементов батча
            self.optimizer.zero_grad()
            accum_loss.backward()
            self.optimizer.step()
        # Обновляем описание tqdm бара усредненным значением лосса за предыдущй батч
            if verbose:
                pbar.set_description(f"Current batch loss: {batch_loss[-1]:.4}")
        return batch_loss

    def train(self, num_epochs=10, verbose=True):
        """ Обучаем модель заданное количество эпох. """
        val_desc = ""

        for epoch in (epoch_pbar := tqdm(range(1, num_epochs+1), desc="Train epoch", total=num_epochs)):
            # Обучаем модель одну эпоху
            loss = self._run_train_epoch(
                self.train_dataloader, verbose=verbose)
            self.train_batch_loss.extend(loss)
            self.epoch_loss.append(
                np.mean(self.train_batch_loss[-len(self.train_dataloader):]))

            # Делаем валидацию, если был передан валидационный датасет
            if self.val_dataloader is not None and epoch % self.val_every == 0:
                val_metric, avg_val_loss = self.validate()
                self.val_metric.append(val_metric)
                self.val_batch_loss.append(avg_val_loss)
                val_desc = f" valid mAP {val_metric}, val loss {avg_val_loss}"

                self.early_stopping(avg_val_loss, self.model)
                if self.early_stopping.early_stop:
                    print("Early stopping")
                    break
            else:
                val_desc = ''

            # Обновляем описание tqdm бара усредненным значением лосса за предыдую эпоху
            if verbose:
                epoch_pbar.set_description(
                    f"{epoch} epoch train loss: {self.epoch_loss[-1]:.4}" + val_desc)
            # Делаем шаг scheduler'a если он был передан
            if self.scheduler is not None:
                self.scheduler.step()

        early_stopping.load_best_model(model)

    @torch.no_grad()
    def validate(self, dataloader=None):
        """ Метод для валидации модели. Если dataloader не передан, будет использоваться self.val_dataloder.
        Возвращает mAP (0.5 ... 0.95).
        """
        self.model.eval()
        dataloader = self.val_dataloader if dataloader is None else dataloader
        total_loss = 0.0
        num_batches = 0
        
        # Считаем метрику mAP с помощью функции из torchmetrics
        metric = MeanAveragePrecision(box_format="xywh", iou_type="bbox")

        max_score = 0
        for images, targets in tqdm(dataloader, desc="Running validation", leave=False):
            images = images.to(self.device)
            outputs = self.model(images)

            predicts = _filter_predictions(outputs, self.score_threshold, self.nms_threshold,
                                           max_boxes_per_cls=self.max_boxes_per_cls, return_type="torch")

            # DEBUG: max score
            bboxes, confidences, cls_probs = outputs
            num_classes = cls_probs.shape[-1]
            all_final_scores = confidences[:, :, None] * cls_probs
            max_score = max(all_final_scores.max().item(), max_score)
            # plot_examples(images, targets=targets, image_norm=(mean, std), predictions=predicts, num_examples=min(batch_size, 3))

            metric.update(predicts, targets)

            # Compute validation loss
            anchors = self.model.anchors.view(-1, 4)
            batch_loss = 0.0
            for ix in range(images.shape[0]):
                gt_boxes = targets[ix]['boxes'].to(self.device)
                gt_labels = targets[ix]['labels'].to(self.device)
                assigned_targets = self.assign_target_method(
                    images, anchors, gt_boxes, gt_labels, num_classes=self.model.head.num_classes
                )
                outputs_ixs = [out[ix] for out in outputs]
                loss = self.compute_loss(outputs_ixs, assigned_targets)
                batch_loss += loss.item()
            total_loss += batch_loss / images.shape[0]
            num_batches += 1
            
        print("max_score:", max_score)
            
        avg_val_loss = total_loss / num_batches
        map_value = metric.compute()["map"].item()
        
        return map_value, avg_val_loss

    def plot_loss(self, row_figsize=3):
        nrows = 3 if self.val_metric else 1
        _, ax = plt.subplots(nrows, 1, figsize=(
            12, row_figsize*nrows), tight_layout=True)
        ax = np.array([ax]) if not isinstance(ax, np.ndarray) else ax
        ax[0].plot(self.train_batch_loss, label="Train batch Loss", color="tab:blue")
        ax[0].plot(np.linspace(1, len(self.train_batch_loss), len(self.epoch_loss)), self.epoch_loss,
                   color="tab:orange", label="Train epoch Loss")
        ax[0].grid()
        ax[0].set_title("Train Loss")
        ax[0].set_xlabel("Number of Iterations")
        ax[0].set_ylabel("Loss")
        if self.val_metric:
            ax[1].plot(np.linspace(self.val_every, len(self.train_batch_loss), len(self.val_metric)),
                       np.array(self.val_metric) * 100, color="tab:green", label="Validation mAP")
            ax[1].grid()
            ax[1].set_title("Valiation mAP")
            ax[1].set_xlabel("Number of Iterations")
            ax[1].set_ylabel("mAP (%)")

            ax[2].plot(np.linspace(self.val_every, len(self.train_batch_loss), len(self.val_metric)),
                       np.array(self.val_batch_loss), color="tab:red", label="Validation Loss")
            ax[2].grid()
            ax[2].set_title("Validation Loss")
            ax[2].set_xlabel("Number of Iterations")
            ax[2].set_ylabel("Loss")
        plt.legend()
        plt.show()


def _filter_predictions(predictions, score_threshold=0.1, nms_threshold=0.5, max_boxes_per_cls=8, return_type="list"):
    """ Ббоксы в `predictions` должны быть в формате (x_min, y_min, w, h). """
    # Итоговый скор считается как произведение уверенности модели в том что в данном якоре
    # и вероятность каждого класса в данном якоре.
    bboxes, confidences, cls_probs = predictions
    all_final_scores = confidences[:, :, None] * cls_probs

    num_classes = cls_probs.shape[-1]
    final_predictions = []
    
    # Для каждого элемента в `predictions` независимо выбираем ббоксы и скоры
    for boxes, final_scores in zip(bboxes, all_final_scores):
        preds = {"boxes": [], "labels": [], "scores": []}

        # Для каждого класса отдельно фильтруем ббоксы с помощью NMS
        for cls in range(num_classes):
            cls_scores = final_scores[:, cls]
            
            # Фильтруем ббоксы, score которых меньше порога
            keep_ixs = cls_scores > score_threshold
            if keep_ixs.sum() == 0:
                continue
            cls_boxes = boxes[keep_ixs]
            cls_scores = cls_scores[keep_ixs]

            # Если предсказаний слишком много, выбираем только самые уверенные
            if len(cls_boxes) > max_boxes_per_cls:
                pos = torch.argsort(cls_scores, descending=True)
                cls_boxes = cls_boxes[pos[:max_boxes_per_cls]]
                cls_scores = cls_scores[pos[:max_boxes_per_cls]]

            # Конвертируем ббоксы в формат x_min, y_min, x_max, y_max из COCO
            boxes_xyxy = cls_boxes.clone()
            boxes_xy_min = boxes_xyxy[:, :2] - boxes_xyxy[:, 2:]
            boxes_xy_max = boxes_xyxy[:, :2] + boxes_xyxy[:, 2:]
            boxes_xyxy[:, :2] = boxes_xy_min
            boxes_xyxy[:, 2:] = boxes_xy_max
            # Запускаем NMS по всем оставшимся ббоксам класса cls
            pred_ixs = nms(boxes_xyxy, cls_scores, nms_threshold)
            # Сохраняем все предсказания для класса cls
            for ix in pred_ixs:
                preds["boxes"].append(cls_boxes[ix].cpu().tolist())
                preds["labels"].append(cls)
                preds["scores"].append(cls_scores[ix].item())
        if return_type == "torch":
            for key, item in preds.items():
                preds[key] = torch.tensor(item)
        elif return_type != "list":
            raise ValueError(
                f"Received unexpected `return_type`. Could be either `torch` or `list`, not {return_type}")

        final_predictions.append(preds)
    return final_predictions

In [42]:
# Расчитываем среднюю высоту и ширину для таргетов
all_widths = []
all_heights = []
for images, targets in tqdm(train_dataset):
    for i in range(images.shape[0]):
        boxes = targets['boxes']
        all_widths.extend(boxes[:, 2].tolist())
        all_heights.extend(boxes[:, 3].tolist())

mean_w = np.mean(all_widths)
mean_h = np.mean(all_heights)
print(f"Mean GT box size: {mean_w:.1f}x{mean_h:.1f} pixels")

from sklearn.cluster import KMeans

# Делим размеры на n кластеров
gt_sizes = np.array([[w, h] for w, h in zip(all_widths, all_heights)])
kmeans = KMeans(n_clusters=3, random_state=0).fit(gt_sizes)
cluster_centers = kmeans.cluster_centers_  # Shape: (5, 2)

# Получаем средний размер каждого кластера
base_sizes = np.mean(cluster_centers, axis=1).tolist()
base_sizes = tuple((int(x), ) for x in base_sizes)

aspect_ratios = ((0.5, 1, 2), ) * len(base_sizes)

print("base_sizes", base_sizes)
print("aspect_ratios", aspect_ratios)

  0%|          | 0/42 [00:00<?, ?it/s]

Mean GT box size: 36.0x23.6 pixels
base_sizes ((22,), (42,), (76,))
aspect_ratios ((0.5, 1, 2), (0.5, 1, 2), (0.5, 1, 2))




In [43]:
lr = 1e-3

model = Detector(
    pretrained_model,
    return_nodes,
    # anchor_sizes=base_sizes,
    # anchor_ratios=aspect_ratios,
    unfreeze_block=2,
    num_classes=len(four_classes),
    input_size=(img_width, img_height)
).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer, T_max=10, eta_min=5e-5)

smooth_l1_loss = nn.SmoothL1Loss()
obj_bce_loss = nn.BCEWithLogitsLoss()
cls_bce_loss = nn.BCEWithLogitsLoss()
compute_loss = ComputeLoss(
    bbox_loss=smooth_l1_loss,
    obj_loss=obj_bce_loss,
    cls_loss=cls_bce_loss,
    weight_bbox=1
)

feature_map_size (40, 40)
torch.Size([1, 14400, 4])


In [47]:
early_stopping = EarlyStopping(patience=15, delta=0.0001)
runner = Runner(
    model,
    early_stopping,
    compute_loss,
    optimizer,
    train_dataloader,
    assign_target,
    device=device,
    scheduler=scheduler,
    assign_target_kwargs={"neg_th": 0.4, "pos_th": 0.5},
    val_dataloader=train_dataloader,
    score_threshold=0.1
)

num_epochs = 1

In [48]:
torch.cuda.empty_cache()
gc.collect()
print(torch.cuda.memory_allocated())  # Память, выделенная для тензоров
print(torch.cuda.memory_reserved()) 

0
0


In [49]:
runner.train(num_epochs=num_epochs, verbose=True)

Train epoch:   0%|          | 0/1 [00:00<?, ?it/s]

Process train epoch:   0%|          | 0/21 [00:00<?, ?it/s]

Running validation:   0%|          | 0/21 [00:00<?, ?it/s]

TypeError: assign_target() missing 1 required positional argument: 'gt_labels'

In [None]:
torch.save(model.state_dict(), "model.pt")

In [None]:
runner.plot_loss(row_figsize=4)

## Расчёт метрик

In [None]:
from torchmetrics.detection import MeanAveragePrecision

@torch.no_grad()
def validate(dataloader, filter_predictions_func, box_format="xyxy", device="cpu", score_threshold=0.1, nms_threshold=0.5, **kwargs):
    """ Метод для валидации модели.
    Возвращает mAP (0.5 ... 0.95).
    """
    self.model.eval()
    # Считаем метрику mAP с помощью функции из torchmetrics
    metric = MeanAveragePrecision(box_format=box_format, iou_type="bbox")
    for images, targets in tqdm(dataloader, desc="Running validation", leave=False):
        images = images.to(device)
        outputs = self.model(images)
        predicts = filter_predictions_func(outputs, score_threshold, nms_threshold, **kwargs)
        metric.update(predicts, targets)
    return metric.compute()["map"].item()


In [None]:
@torch.no_grad()
def predict(model, images, device, score_threshold=0.1, nms_threshold=0.5, max_boxes_per_cls=8, return_type='list'):
    """ Предсказание моделью для переданного набора изображений после фильтрации по score_threshold
    и применения NMS.

    Параметры
    --------
    images : torch.tensor, содержащий картинки для которых нужно сделать предсказание.
    Необходимые преобразования должны быть сделаны ДО. Внутри метода `predict` никаких преобразований
    не происходит.
    score_threshold : Все предсказания, с (confidence score * cls_probs) < score_threshold будут проигнорированны.
    nms_threshold : Предсказания, имеющие пересечение по IoU >= nms_threshold будут считаться одним предсказанием.
    max_boxes_per_cls : Максимальное количество ббоксов на изображение для одного класса после фильтрации по `score_threshold`.

    Returns
    -------
    final_predictions : List[dict], где каждый словарь содержащий следующие ключи:
        "boxes" : координаты ббоксов на i-ом изображении,
        "labels" : классы внутри ббоксов,
        "scores" : Confidence scores для ббоксов.
    """
    model.eval()
    images = images.to(device)
    outputs = model(images)
    final_predictions =  _filter_predictions(outputs, score_threshold=score_threshold, nms_threshold=nms_threshold,
                                             max_boxes_per_cls=max_boxes_per_cls, return_type=return_type)
    return final_predictions

In [None]:
test_iter = iter(train_dataloader)  # DEBUG: test_dataloader

In [None]:
score_threshold = 0.15
nms_threshold = 0.1

images, targets = next(test_iter)
preds = predict(model, images, device=device, score_threshold=score_threshold, nms_threshold=nms_threshold)

plot_examples(images, targets=targets, image_norm=(mean, std), predictions=preds, num_examples=min(len(images), 3))

In [None]:
print("Final mAP:", runner.validate())