# Домашняя работа 3: Разработка модели детектирования объектов

##  Погосян Арсен Андраникович


На семинаре мы обсудили, разработали и попробовали `SSD` модель на базе `VGG16`, поэтому для успешного выполнения домашнего задания рекомендуется обратиться к семинару.

Задачи домашнего задания:
 - Загрузить набор данных и визуализировать объекты. (2 балла)
 - Разработать функцию для расчета метрики mAP задачи детектирования объектов. Продемонстрировать работу. (4 балла)
 - Натренировать `SSD` модель на базе VGG16. Продемонстрировать повышение метрики mAP (4 балла)
 - Разработать `SSD` модель согласно предлагаемой архитектуры на базе `ResNet18`. Продемонстрировать повышение метрики mAP (10 баллов)
 - (БОНУС) Добавить разнообразные аугментации изображений. Можно позаимствовать из других репозитариев с указанием источника. Повторить эксперименты (5 баллов)

In [1]:
!git clone https://github.com/SergeyMalashenko/ObjectDetectionProblemFromScratch.git utils

Cloning into 'utils'...
remote: Enumerating objects: 10, done.[K
remote: Counting objects: 100% (10/10), done.[K
remote: Compressing objects: 100% (8/8), done.[K
remote: Total 10 (delta 0), reused 7 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (10/10), 24.75 KiB | 12.37 MiB/s, done.


## Описание предлагаемых исходных кодов и набора данных
- voc_dataset.py     - загружает предлагаемый набор данных, здесь также возможно выполнять аугментации над данными.
- voc_dataloader.py  - формирует порцию данных.
- prior_boxes.py     - содержит все необходимые функции для работы с `prior` боксами.
    - prior_boxes - функция генерирует `prior` боксы в соотвествии с конфигурацией
    - match       - функция решает задачу сопоставления `ground truth` боксов из набора данных и `prior` боксов
    - decode      - функция вычисляет поправки между `ground truth` боксами и `prior` боксами, собственно их будет *предсказывать* наша модель
    - encode      - функция пересчитывает `prior` боксы и *предсказанные* поправки в результирующие детектирования

- multibox_loss.py - комплексная функция потерь, решается задача сопоставления *предсказанных* и `ground truth` боксов, применяется техника `hard-negative mining`, вычисляется общая функция потерь.

In [2]:
!gdown --id 14UheyBtIByVktCsOR7OF2gBsTNGAw26E
!tar -xzf dataset.tar.gz

Downloading...
From (original): https://drive.google.com/uc?id=14UheyBtIByVktCsOR7OF2gBsTNGAw26E
From (redirected): https://drive.google.com/uc?id=14UheyBtIByVktCsOR7OF2gBsTNGAw26E&confirm=t&uuid=b2f05b01-6345-4106-aaef-3ccb55e4b3ac
To: /content/dataset.tar.gz
100% 113M/113M [00:01<00:00, 96.1MB/s]


Набор данных состоит из `1528` изображений в разрешении `640x360` (тренировочная выборка - 1464 изображения, тестовая выборка - 64 изображения) в формате `VOC`.

Набор имеет следующую структуру:

|  Директория      | Содержимое |
| ---------------- | ---------- |
| Annotations      | папка содержит разметку в виде набора файлов в формате `XML`, каждый файл содержит информацию об объектах (класс объекта, рамка объекта `xmin,ymin,xmax,ymax`)    |
| ImageSets/Main   | папка содержит два текстовых файла с распределением данных на тренирововчную `trainval.txt` и тестовую выборки `test.txt` |
| JPEGImages       | папка содержит изображения в `JPEG` формате |

## Архитектура модели на основе VGG16

Обязательно посмотрите реализацию и разберитесь в том, как устроенна модель, это будет полезно при разработке собственной модели детектирования.

![Image](output_vgg16.png)

## Архитектура модели на основе ResNet18

Можно исследовать реализацию ResNet в пакете PyTorch, это будет полезно при разработке собственного детектора.

https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py

![Image](output_resnet18.png)

In [3]:
!pip install torchview

Collecting torchview
  Downloading torchview-0.2.6-py3-none-any.whl.metadata (12 kB)
Downloading torchview-0.2.6-py3-none-any.whl (25 kB)
Installing collected packages: torchview
Successfully installed torchview-0.2.6


## Создаем конфигурацию, которая описывает работы нашего детектора (пример для VGG16)

In [4]:
#VGG16
custom_config = {
 'num_classes'  : 3,
 'feature_maps' : [(45,80), (22,40), (11,20), (6,10), (4,8), (2,6)], #VGG16 - 640x360, размерность карт признаков, которые будут использоваться
 'min_sizes'    : [0.10, 0.20, 0.37, 0.54, 0.71, 1.00], #Параметр масштаба боксов
 'max_sizes'    : [0.20, 0.37, 0.54, 0.71, 1.00, 1.05], #Параметр масштаба боксов

 'aspect_ratios': [[2, 3], [2, 3], [2, 3], [2, 3], [2], [2]], #Список содержаший информацию о соотношении сторон для prior боксов
 'num_priors'   : [6, 6, 6, 6, 4, 4], #Число prior боксов
 'variance'     : [0.1, 0.2],
 'clip'         :    True,

 'overlap_threshold': 0.5, #Параметр IoU
 'neg_pos_ratio'    :   3, #Параметр hard-negative mining

 'model_name' : 'vgg16'
}

## Загружаем требуемый набор данных

In [5]:
import sys
#Добавляем директорию, чтобы была возможность импортировать необходимые модули
sys.path.insert(0, 'utils')
from utils.voc_dataloader import get_test_dataloader,get_train_dataloader
import os



voc_root = "dataset"

train_annotation_filename = os.path.join( voc_root, "ImageSets/Main/trainval.txt")
test_annotation_filename  = os.path.join( voc_root, "ImageSets/Main/test.txt")

train_dataloader = get_train_dataloader(voc_root, train_annotation_filename, 1, 1)
test_dataloader  = get_test_dataloader(voc_root, test_annotation_filename, 1, 1)






  check_for_updates()


## Разработать функцию вычисление mAP метрики задачи детектирования

В качестве примера можно использовать https://lightning.ai/docs/torchmetrics/stable/detection/mean_average_precision.html

In [6]:
def compute_iou(pred_box, true_box):
    # Формат box: [x_min, y_min, x_max, y_max]
    x_min1, y_min1, x_max1, y_max1 = pred_box
    x_min2, y_min2, x_max2, y_max2 = true_box

    # Находим пересечение
    inter_x_min = max(x_min1, x_min2)
    inter_y_min = max(y_min1, y_min2)
    inter_x_max = min(x_max1, x_max2)
    inter_y_max = min(y_max1, y_max2)

    inter_area = max(0, inter_x_max - inter_x_min) * max(0, inter_y_max - inter_y_min)
    area1 = (x_max1 - x_min1) * (y_max1 - y_min1)
    area2 = (x_max2 - x_min2) * (y_max2 - y_min2)

    union_area = area1 + area2 - inter_area

    return inter_area / union_area if union_area > 0 else 0

def calculate_ap(rec, prec):
    # Вычисление Average Precision (AP)
    rec = np.array(rec)
    prec = np.array(prec)

    # Добавим (0,0) и (1,1) для корректной интерполяции
    mrec = np.concatenate(([0], rec, [1]))
    mpre = np.concatenate(([0], prec, [0]))

    # Интерполяция
    for i in range(mpre.size - 1, 0, -1):
        mpre[i - 1] = max(mpre[i - 1], mpre[i])

    # Находим индекс первого элемента, равного 1
    i = np.where(mrec[1:] != mrec[:-1])[0]
    ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])

    return ap

def mAP(custom_config, pred_boxes_all, pred_scores_all, gt_boxes_all, pred_labels_all, gt_labels_all, iou_threshold=0.5):
    num_classes = custom_config['num_classes']
    ap_per_class = []

    # Перебираем каждый класс
    for cls in range(num_classes):
        # Собираем все предсказания для этого класса
        cls_pred_boxes = []
        cls_pred_scores = []
        cls_pred_labels = []
        cls_gt_boxes = []
        cls_gt_labels = []

        for i in range(len(gt_boxes_all)):
            gt_boxes = gt_boxes_all[i]
            gt_labels = gt_labels_all[i]

            # Для каждого изображения добавляем боксы и метки этого класса
            for j in range(len(gt_labels)):
                if gt_labels[j] == cls:
                    cls_gt_boxes.append(gt_boxes[j])
                    cls_gt_labels.append(gt_labels[j])

            # Добавляем предсказания для этого изображения
            if i < len(pred_labels_all):
                pred_labels = pred_labels_all[i]
                pred_boxes = pred_boxes_all[i]
                pred_scores = pred_scores_all[i]

                for j in range(len(pred_labels)):
                    if j < len(pred_boxes) and j < len(pred_scores):
                        if pred_labels[j] == cls:
                            cls_pred_boxes.append(pred_boxes[j])
                            cls_pred_scores.append(pred_scores[j])
                            cls_pred_labels.append(pred_labels[j])

        if len(cls_pred_boxes) == 0:
            continue

        # Сортируем предсказания по уверенности
        sorted_idx = np.argsort(cls_pred_scores)[::-1]
        cls_pred_boxes = np.array(cls_pred_boxes)[sorted_idx]
        cls_pred_scores = np.array(cls_pred_scores)[sorted_idx]
        cls_pred_labels = np.array(cls_pred_labels)[sorted_idx]

        tp = np.zeros(len(cls_pred_boxes))
        fp = np.zeros(len(cls_pred_boxes))
        total_true = len(cls_gt_boxes)

        detected = []  # Этот список будет хранить индексы уже обнаруженных true_boxes

        for i, pred_box in enumerate(cls_pred_boxes):
            best_iou = 0
            best_true_box = None
            for j, true_box in enumerate(cls_gt_boxes):
                # Используем IoU для проверки пересечения с уже обнаруженными боками
                iou = compute_iou(pred_box, true_box)
                if iou > best_iou and j not in detected:  # Проверяем, что true_box не был ранее найден
                    best_iou = iou
                    best_true_box = true_box
                    best_true_box_idx = j  # Индекс найденного true_box

            if best_iou >= iou_threshold:
                tp[i] = 1
                detected.append(best_true_box_idx)  # Добавляем индекс true_box в список обнаруженных
            else:
                fp[i] = 1

        # Рассчитываем Precision и Recall
        tp_cumsum = np.cumsum(tp)
        fp_cumsum = np.cumsum(fp)

        recall = tp_cumsum / total_true if total_true > 0 else np.zeros_like(tp_cumsum)
        precision = tp_cumsum / (tp_cumsum + fp_cumsum) if (tp_cumsum + fp_cumsum).sum() > 0 else np.zeros_like(tp_cumsum)

        # Вычисляем Average Precision (AP) для этого класса
        ap = calculate_ap(recall, precision)
        ap_per_class.append(ap)

    # Вычисляем средний AP по всем классам (mAP)
    map_score = np.mean(ap_per_class) if len(ap_per_class) > 0 else 0
    return map_score


## Натренировать модель на основе VGG16

In [7]:
from torchvision.models import VGG16_Weights
from torchvision        import models
from torchview          import draw_graph
from torch              import nn
import torch.nn.init as init

class L2Norm(nn.Module):
    def __init__(self, n_channels, scale):
        super(L2Norm, self).__init__()
        self.n_channels = n_channels
        self.gamma      = scale or None
        self.eps        = 1e-10
        self.weight     = nn.Parameter(torch.Tensor(self.n_channels))
        self.reset_parameters()

    def reset_parameters(self):
        init.constant_(self.weight, self.gamma)

    def forward(self, x):
        norm = torch.sqrt(x.pow(2).sum(dim=1, keepdim=True)) + self.eps
        x = torch.div(x, norm)
        x = self.weight.unsqueeze(0).unsqueeze(2).unsqueeze(3).expand_as(x) * x
        return x

class SSD_VGG16(nn.Module):
    def __init__(self, num_bboxes_s, num_labels = 3):
        super().__init__()

        self.num_bboxes_s = num_bboxes_s
        self.num_labels   = num_labels

        self.used_layer_id_s       = [21, 33, 37, 41, 45, 49] #
        self.norm_layer            = L2Norm(512, 20)

        base_layers       = self._build_base_layers ()
        extra_layers      = self._build_extra_layers()
        self.total_layers = base_layers + extra_layers

        self.conf_layers, self.loc_layers = self._build_conf_loc_layers()

    def _build_base_layers(self):
        backbone_model    = models.vgg16(weights=VGG16_Weights.DEFAULT)  #False

        base_layers = nn.ModuleList(list(backbone_model.features)[:-1])
        base_layers[16].ceil_mode = True

        pool5 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        conv6 = nn.Conv2d( 512, 1024, kernel_size=3, padding=6, dilation=6)
        relu6 = nn.ReLU(inplace=True)
        conv7 = nn.Conv2d(1024, 1024, kernel_size=1)
        relu7 = nn.ReLU(inplace=True)

        nn.init.xavier_uniform_(conv6.weight)
        nn.init.zeros_         (conv6  .bias)
        nn.init.xavier_uniform_(conv7.weight)
        nn.init.zeros_         (conv7  .bias)

        base_layers.extend( [pool5, conv6, relu6, conv7, relu7] )

        return base_layers

    def _build_extra_layers(self):
        extra_layers = []

        conv8_1  = nn.Conv2d( 1024, 256, kernel_size=1, stride=1           )
        relu8_1  = nn.ReLU(inplace=True)
        conv8_2  = nn.Conv2d( 256, 512, kernel_size=3, stride=2, padding=1)
        relu8_2  = nn.ReLU(inplace=True)
        conv9_1  = nn.Conv2d( 512, 128, kernel_size=1, stride=1           )
        relu9_1  = nn.ReLU(inplace=True)
        conv9_2  = nn.Conv2d( 128, 256, kernel_size=3, stride=2, padding=1)
        relu9_2  = nn.ReLU(inplace=True)
        conv10_1 = nn.Conv2d( 256, 128, kernel_size=1, stride=1           )
        relu10_1 = nn.ReLU(inplace=True)
        conv10_2 = nn.Conv2d( 128, 256, kernel_size=3, stride=1           )
        relu10_2 = nn.ReLU(inplace=True)
        conv11_1 = nn.Conv2d( 256, 128, kernel_size=1                     )
        relu11_1 = nn.ReLU(inplace=True)
        conv11_2 = nn.Conv2d( 128, 256, kernel_size=3, stride=1           )
        relu11_2 = nn.ReLU(inplace=True)

        nn.init.xavier_uniform_(conv8_1 .weight)
        nn.init.zeros_         (conv8_1 .bias  )
        nn.init.xavier_uniform_(conv8_2 .weight)
        nn.init.zeros_         (conv8_2 .bias  )
        nn.init.xavier_uniform_(conv9_1 .weight)
        nn.init.zeros_         (conv9_1 .bias  )
        nn.init.xavier_uniform_(conv9_2 .weight)
        nn.init.zeros_         (conv9_2 .bias  )
        nn.init.xavier_uniform_(conv10_1.weight)
        nn.init.zeros_         (conv10_1.bias  )
        nn.init.xavier_uniform_(conv10_2.weight)
        nn.init.zeros_         (conv10_2.bias  )
        nn.init.xavier_uniform_(conv11_1.weight)
        nn.init.zeros_         (conv11_1.bias  )
        nn.init.xavier_uniform_(conv11_2.weight)
        nn.init.zeros_         (conv11_2.bias  )

        extra_layers = nn.ModuleList( [conv8_1, relu8_1, conv8_2, relu8_2, conv9_1, relu9_1, conv9_2, relu9_2, conv10_1, relu10_1, conv10_2, relu10_2, conv11_1, relu11_1, conv11_2, relu11_2] )
        return extra_layers

    def _build_conf_loc_layers(self):
        out_channels_s = [ self.total_layers[i].out_channels for i in self.used_layer_id_s ]

        conf_layers, loc_layers = [], []
        for i, j in enumerate(self.used_layer_id_s):
            conf_layer = nn.Conv2d( self.total_layers[j].out_channels, self.num_bboxes_s[i] * self.num_labels, kernel_size=3, padding=1)
            loc_layer  = nn.Conv2d( self.total_layers[j].out_channels, self.num_bboxes_s[i] * 4              , kernel_size=3, padding=1)

            nn.init.xavier_uniform_(conf_layer.weight)
            nn.init.zeros_         (conf_layer  .bias)
            nn.init.xavier_uniform_(loc_layer .weight)
            nn.init.zeros_         (loc_layer   .bias)

            conf_layers += [conf_layer]
            loc_layers  += [loc_layer ]

        conf_layers = nn.ModuleList(conf_layers)
        loc_layers  = nn.ModuleList(loc_layers )

        return conf_layers, loc_layers

    def forward(self, x, verbose=False):
        source_s, loc_s, conf_s = [], [], []

        for i, current_layer in enumerate(self.total_layers, -1):
            x = current_layer(x)
            if i in self.used_layer_id_s:
                if i == 21:
                    s = self.norm_layer(x)
                else:
                    s = x
                source_s.append(s)
        for s, l, c in zip(source_s, self.loc_layers, self.conf_layers):
            conf_s.append(c(s).permute(0, 2, 3, 1).contiguous())
            loc_s .append(l(s).permute(0, 2, 3, 1).contiguous())
        conf_s = torch.cat([o.view(o.size(0), -1) for o in conf_s], 1)
        loc_s  = torch.cat([o.view(o.size(0), -1) for o in loc_s ], 1)

        conf_s = conf_s.view(conf_s.size(0), -1, self.num_labels)
        loc_s  = loc_s .view(loc_s .size(0), -1, 4              )

        return loc_s, conf_s





In [8]:
from utils.prior_boxes import prior_boxes, match, encode, decode

class MultiBoxLoss(nn.Module):
    """SSD Weighted Loss Function
    Compute Targets:
        1) Produce Confidence Target Indices by matching  ground truth boxes
           with (default) 'priorboxes' that have jaccard index > threshold parameter
           (default threshold: 0.5).
        2) Produce localization target by 'encoding' variance into offsets of ground
           truth boxes and their matched  'priorboxes'.
        3) Hard negative mining to filter the excessive number of negative examples
           that comes with using a large number of default bounding boxes.
           (default negative:positive ratio 3:1)
    Objective Loss:
        L(x,c,l,g) = (Lconf(x, c) + αLloc(x,l,g)) / N
        Where, Lconf is the CrossEntropy Loss and Lloc is the SmoothL1 Loss
        weighted by α which is set to 1 by cross val.
        Args:
            c: class confidences,
            l: predicted boxes,
            g: ground truth boxes
            N: number of matched default boxes
        See: https://arxiv.org/pdf/1512.02325.pdf for more details.
    """

    def __init__(self, overlap_threshold, neg_pos_ratio, variance):
        super(MultiBoxLoss, self).__init__()
        self.threshold     = overlap_threshold
        self.neg_pos_ratio = neg_pos_ratio
        self.variance      = variance

    def forward(self, predictions, targets):
        """Multibox Loss
        Args:
            predictions (tuple): A tuple containing loc preds, conf preds,
            and prior boxes from SSD net.
                conf shape: torch.size(batch_size,num_priors,num_classes)
                loc shape: torch.size(batch_size,num_priors,4)
                priors shape: torch.size(num_priors,4)

            targets (tensor): Ground truth boxes and labels for a batch,
                shape: [batch_size,num_objs,5] (last idx is the label).
        """
        loc_data, conf_data, priors = predictions
        gt_label_s, gt_box_s = targets

        device = loc_data.device

        batch_size  = loc_data .size( 0)
        num_priors  = loc_data .size( 1)
        num_classes = conf_data.size(-1)

        # match priors (default boxes) and ground truth boxes
        loc_t  = torch.zeros(batch_size, num_priors, 4, device=device).float()
        conf_t = torch.zeros(batch_size, num_priors   , device=device).long ()

        neg_pos_ratio = self.neg_pos_ratio
        threshold     = self.threshold
        variance      = self.variance
        #Решаем задачу соответствия между GT боксами и предсказанными боксами
        for idx in range(batch_size):
            loc_t[idx], conf_t[idx] = match(threshold, gt_box_s[idx], priors, variance, gt_label_s[idx])

        pos = conf_t > 0
        num_pos = pos.sum(dim=1, keepdim=True)

        # Вычисляем Localization Loss (Smooth L1)
        # Shape: [batch, num_priors, 4]
        loc_p = loc_data[pos].view(-1, 4)
        loc_t = loc_t   [pos].view(-1, 4)
        loss_l = F.smooth_l1_loss(loc_p, loc_t, reduction='sum')

        # Вычисляем Classification Loss
        loss_c = F.cross_entropy(conf_data.view(-1, num_classes), conf_t.view(-1), reduction='none')
        loss_c = loss_c.view(batch_size, num_priors)

        # Filter out the negative samples and reduce the loss by sum
        loss_c_pos = loss_c[pos].sum()

        # Hard negative mining
        num_neg = torch.clamp(neg_pos_ratio * num_pos, max=pos.size(1) - 1)
        loss_c_neg = loss_c * ~pos
        loss_c_neg, _ = loss_c_neg.sort(1, descending=True)
        neg_mask = torch.zeros_like(loss_c_neg)
        neg_mask[torch.arange(batch_size), num_neg.view(-1)] = 1.
        neg_mask = 1 - neg_mask.cumsum(-1)
        loss_c_neg = (loss_c_neg * neg_mask).sum()

        # Finally we normalize the losses by the number of positives
        N = num_pos.sum()
        loss_l = loss_l / N
        loss_c = (loss_c_pos + loss_c_neg) / N

        return loss_l, loss_c

In [9]:
from torch.optim.lr_scheduler import MultiStepLR
from utils.voc_dataloader     import get_train_dataloader, get_test_dataloader
from utils.prior_boxes        import detect_objects, prior_boxes

from tqdm        import tqdm

import time
import os


def train_process(args, custom_config):
    torch.manual_seed(args.seed)
    np.random.seed(seed=args.seed)

    dataset_root_dir = args.dataset_root_dir
    train_annotation_filename = os.path.join(dataset_root_dir, "ImageSets/Main/trainval.txt")
    test_annotation_filename = os.path.join(dataset_root_dir, "ImageSets/Main/test.txt")
    train_dataloader = get_train_dataloader(dataset_root_dir, train_annotation_filename, args.batch_size, args.num_workers)
    test_dataloader = get_test_dataloader(dataset_root_dir, test_annotation_filename, args.batch_size, args.num_workers)

    learning_rate = args.learning_rate

    if not os.path.exists(args.output):
        os.mkdir(args.output)

    model = SSD_VGG16(custom_config['num_priors'], custom_config['num_classes'])

    prior_box_s = prior_boxes(custom_config)
    prior_box_s_gpu = prior_box_s.cuda()

    overlap_threshold = custom_config['overlap_threshold']
    neg_pos_ratio = custom_config['neg_pos_ratio']
    variance = custom_config['variance']

    criterion = MultiBoxLoss(overlap_threshold, neg_pos_ratio, variance)
    model.cuda()
    criterion.cuda()

    optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate, momentum=args.momentum, weight_decay=args.weight_decay)
    scheduler = MultiStepLR(optimizer=optimizer, milestones=args.multistep, gamma=0.2)

    best_loc_loss, best_cls_loss, best_loss = np.inf, np.inf, np.inf
    train_loss_s, eval_loss_s = [], []
    for epoch in tqdm(list(range(args.epochs))):
        # Train model
        train_loc_loss, train_cls_loss, train_loss = 0, 0, 0
        model.train()
        for i, (image_s_cpu, box_ss_cpu, label_ss_cpu) in enumerate(train_dataloader):
            if len(box_ss_cpu) > 0 and len(label_ss_cpu) > 0:
                image_s_gpu = image_s_cpu.cuda()
                label_ss_gpu = [label_s_cpu.cuda() for label_s_cpu in label_ss_cpu]
                box_ss_gpu = [box_s_cpu.cuda() for box_s_cpu in box_ss_cpu]

                pred_loc_ss_gpu, pred_conf_ss_gpu = model(image_s_gpu)

                loc_loss, cls_loss = criterion(
                    (pred_loc_ss_gpu, pred_conf_ss_gpu, prior_box_s_gpu), (label_ss_gpu, box_ss_gpu)
                )

                loss = loc_loss + cls_loss

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                train_loc_loss += loc_loss.item()
                train_cls_loss += cls_loss.item()
                train_loss += loss.item()
        scheduler.step()
        train_loss_s.append(train_loss)

        # Eval model
        eval_loc_loss, eval_cls_loss, eval_loss = 0, 0, 0
        model.eval()

        # Data for mAP
        pred_boxes_all, pred_scores_all, pred_labels_all = [], [], []
        gt_boxes_all, gt_labels_all = [], []

        for i, (image_s_cpu, box_ss_cpu, label_ss_cpu) in enumerate(test_dataloader):
            if len(box_ss_cpu) > 0 and len(label_ss_cpu) > 0:
                image_s_gpu = image_s_cpu.cuda()
                label_ss_gpu = [label_s_cpu.cuda() for label_s_cpu in label_ss_cpu]
                box_ss_gpu = [box_s_cpu.cuda() for box_s_cpu in box_ss_cpu]

                with torch.no_grad():
                    pred_loc_ss_gpu, pred_conf_ss_gpu = model(image_s_gpu)

                # Convert predictions to boxes, scores, and labels
                pred_boxes, pred_labels, pred_scores = detect_objects(
                    pred_loc_ss_gpu.cpu(), pred_conf_ss_gpu.cpu(), prior_box_s,
                    custom_config['num_classes'], 0.5, 0.3
                )

                pred_boxes_all.extend(pred_boxes)
                pred_scores_all.extend(pred_scores)
                pred_labels_all.extend(pred_labels)
                gt_boxes_all.extend(box_ss_cpu)
                gt_labels_all.extend(label_ss_cpu)

                # Calculate loss for evaluation
                loc_loss, cls_loss = criterion(
                    (pred_loc_ss_gpu, pred_conf_ss_gpu, prior_box_s_gpu), (label_ss_gpu, box_ss_gpu)
                )
                loss = loc_loss + cls_loss

                eval_loc_loss += loc_loss.item()
                eval_cls_loss += cls_loss.item()
                eval_loss += loss.item()
        eval_loss_s.append(eval_loss)


        map_score = mAP(custom_config, pred_boxes_all, pred_scores_all, gt_boxes_all,pred_labels, gt_labels_all)


        print(
            f'epoch[{epoch}] | lr {scheduler.get_last_lr()[0]:.5f} | '
            f'loc_loss [{train_loc_loss:.2f}/{eval_loc_loss:.2f}] | '
            f'cls_loss [{train_cls_loss:.2f}/{eval_cls_loss:.2f}] | '
            f'total_loss [{train_loss:.2f}/{eval_loss:.2f}] | mAP {map_score:.4f}'
        )

        if eval_loss < best_loss:
            torch.save(model.state_dict(), os.path.join(args.output, f"{custom_config['model_name']}.pth"))
            best_loc_loss, best_cls_loss, best_loss = eval_loc_loss, eval_cls_loss, eval_loss

    return model, prior_box_s, train_loss_s, eval_loss_s



In [None]:
from argparse import Namespace
import torch
import torch.nn.functional as F
import warnings
warnings.filterwarnings('ignore')

param_s = Namespace(
 dataset_root_dir='dataset',
 epochs = 15, batch_size = 8,
 checkpoint = None, output = 'output',
 multistep = [20, 30, 40],
 learning_rate = 1e-3, momentum = 0.9,
 weight_decay = 0.0005, warmup = None,
 num_workers = 4,
 seed = 42
)

os.makedirs('./models', exist_ok=True)

custom_config = {
 'num_classes'  : 2,
 'feature_maps' : [(45,80), (22,40), (11,20), (6,10), (4,8), (2,6)], #VGG16 - 640x360

 'min_sizes'    : [0.10, 0.20, 0.37, 0.54, 0.71, 1.00],
 'max_sizes'    : [0.20, 0.37, 0.54, 0.71, 1.00, 1.05],

 'aspect_ratios': [[2, 3], [2, 3], [2, 3], [2, 3], [2], [2]],
 'num_priors'   : [6, 6, 6, 6, 4, 4],
 'variance'     : [0.1, 0.2],
 'clip'         :    True,

 'overlap_threshold': 0.5,
 'neg_pos_ratio'    :   3,

 'model_name' : 'vgg16'
}

model, prior_box_s, train_loss_s, eval_loss_s = train_process(param_s, custom_config)

  0%|          | 0/15 [00:00<?, ?it/s]

epoch[0] | lr 0.00100 | loc_loss [388.40/14.53] | cls_loss [1059.02/65.49] | total_loss [1447.42/80.02] | mAP 0.0387


  7%|▋         | 1/15 [02:15<31:35, 135.39s/it]

epoch[1] | lr 0.00100 | loc_loss [225.22/7.46] | cls_loss [785.65/14.69] | total_loss [1010.88/22.15] | mAP 0.0941


 13%|█▎        | 2/15 [04:30<29:18, 135.28s/it]

epoch[2] | lr 0.00100 | loc_loss [159.59/5.64] | cls_loss [396.99/13.11] | total_loss [556.58/18.74] | mAP 0.0958


 20%|██        | 3/15 [06:46<27:04, 135.37s/it]

epoch[3] | lr 0.00100 | loc_loss [132.39/4.73] | cls_loss [282.98/10.87] | total_loss [415.38/15.60] | mAP 0.0988


 27%|██▋       | 4/15 [09:02<24:52, 135.68s/it]

epoch[4] | lr 0.00100 | loc_loss [116.38/4.55] | cls_loss [254.71/10.55] | total_loss [371.09/15.10] | mAP 0.0983


 33%|███▎      | 5/15 [11:18<22:38, 135.85s/it]

epoch[5] | lr 0.00100 | loc_loss [101.58/4.16] | cls_loss [239.76/10.37] | total_loss [341.34/14.54] | mAP 0.1061


 40%|████      | 6/15 [13:35<20:25, 136.16s/it]

epoch[6] | lr 0.00100 | loc_loss [92.81/3.65] | cls_loss [228.21/9.55] | total_loss [321.02/13.20] | mAP 0.1086


 47%|████▋     | 7/15 [15:52<18:12, 136.55s/it]

epoch[7] | lr 0.00100 | loc_loss [85.26/3.41] | cls_loss [218.10/9.09] | total_loss [303.35/12.50] | mAP 0.1040


 53%|█████▎    | 8/15 [18:08<15:53, 136.26s/it]

epoch[8] | lr 0.00100 | loc_loss [78.46/2.99] | cls_loss [208.40/8.50] | total_loss [286.87/11.49] | mAP 0.1059


 67%|██████▋   | 10/15 [22:40<11:20, 136.20s/it]

epoch[9] | lr 0.00100 | loc_loss [74.55/3.01] | cls_loss [200.19/9.37] | total_loss [274.74/12.39] | mAP 0.1149
epoch[10] | lr 0.00100 | loc_loss [69.83/2.69] | cls_loss [194.93/8.36] | total_loss [264.76/11.05] | mAP 0.1152


 80%|████████  | 12/15 [27:12<06:48, 136.17s/it]

epoch[11] | lr 0.00100 | loc_loss [66.49/2.75] | cls_loss [188.38/8.59] | total_loss [254.87/11.35] | mAP 0.1170


 87%|████████▋ | 13/15 [29:28<04:32, 136.13s/it]

epoch[12] | lr 0.00100 | loc_loss [64.49/2.64] | cls_loss [188.05/8.51] | total_loss [252.54/11.15] | mAP 0.1118
epoch[13] | lr 0.00100 | loc_loss [61.77/2.55] | cls_loss [186.27/8.37] | total_loss [248.04/10.91] | mAP 0.1111


 93%|█████████▎| 14/15 [31:46<02:16, 136.67s/it]

epoch[14] | lr 0.00100 | loc_loss [59.63/2.62] | cls_loss [179.64/7.95] | total_loss [239.27/10.57] | mAP 0.1006


100%|██████████| 15/15 [34:02<00:00, 136.17s/it]


## Разработать модель на основе ResNet18

In [10]:
from torchvision import models
from torchvision.models import ResNet18_Weights
from torch import nn


class BaseBlock(nn.Module):
    def __init__(self, in_channels, out_channels,  operator=None, strd=1):
        super(BaseBlock, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.operator = operator
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=strd, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=strd, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)


    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.operator is not None:
            identity = self.operator(x)

        out += identity
        out = self.relu(out)

        return out

class ExtraBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ExtraBlock, self).__init__()
        self.out_channels = in_channels
        self.conv1  = nn.Conv2d( in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.relu1  = nn.ReLU(inplace=True)
        self.conv2  = nn.Conv2d( out_channels, in_channels, kernel_size=3, stride=2, padding=1)
        self.relu2  = nn.ReLU(inplace=True)
    def forward(self, x):
        out = self.conv1(x)
        out = self.relu1(out)
        out = self.conv2(out)
        out = self.relu2(out)
        return out


class resnet18(nn.Module):
    def __init__(self, num_bboxes_s, num_labels = 3):
        super().__init__()

        self.num_bboxes_s = num_bboxes_s
        self.num_labels   = num_labels

        self.used_layer_id_s       = [7, 8, 9, 10, 11, 12] #

        base_layers       = self._build_base_layers ()
        extra_layers      = self._build_extra_layers()
        self.total_layers = base_layers + extra_layers

        self.conf_layers, self.loc_layers = self._build_conf_loc_layers()

    def _build_base_layers(self):
        backbone_model    = models.resnet18(weights=ResNet18_Weights.DEFAULT)  #False

        base_layers = nn.ModuleList(list(backbone_model.children())[:-3])
        base_block_1 = BaseBlock(in_channels=256, out_channels=128, operator = nn.Sequential(nn.Conv2d(256, 128, kernel_size=(1,1), stride=(1,1), bias=False), nn.BatchNorm2d(128)))
        base_block_2 = BaseBlock(in_channels=128, out_channels=128)

        base_layers.extend( [base_block_1, base_block_2] )
        return base_layers

    def _build_extra_layers(self):
        extra_layers = []

        extra_block1 = ExtraBlock(128, 64)
        extra_block2 = ExtraBlock(128, 64)
        extra_block3 = ExtraBlock(128, 64)
        extra_block4 = ExtraBlock(128, 64)
        extra_block5 = ExtraBlock(128, 64)
        extra_layers = nn.ModuleList( [extra_block1,
                                       extra_block2,
                                       extra_block3,
                                       extra_block4,
                                       extra_block5, ] )
        return extra_layers

    def _build_conf_loc_layers(self):
        out_channels_s = [ self.total_layers[i].out_channels for i in self.used_layer_id_s ]

        conf_layers, loc_layers = [], []
        for i, j in enumerate(self.used_layer_id_s):
            conf_layer = nn.Conv2d( self.total_layers[j].out_channels, self.num_bboxes_s[i] * self.num_labels, kernel_size=3, padding=1)
            loc_layer  = nn.Conv2d( self.total_layers[j].out_channels, self.num_bboxes_s[i] * 4              , kernel_size=3, padding=1)

            conf_layers += [conf_layer]
            loc_layers  += [loc_layer ]

        conf_layers = nn.ModuleList(conf_layers)
        loc_layers  = nn.ModuleList(loc_layers )

        return conf_layers, loc_layers

    def forward(self, x, verbose=False):
        source_s, loc_s, conf_s = [], [], []
        for i, current_layer in enumerate(self.total_layers, -1):
            x = current_layer(x)
            if i in self.used_layer_id_s:
                s = x
                source_s.append(s)
        for s, l, c in zip(source_s, self.loc_layers, self.conf_layers):
            conf_s.append(c(s).permute(0, 2, 3, 1).contiguous())
            loc_s .append(l(s).permute(0, 2, 3, 1).contiguous())
        conf_s = torch.cat([o.view(o.size(0), -1) for o in conf_s], 1)
        loc_s  = torch.cat([o.view(o.size(0), -1) for o in loc_s ], 1)

        conf_s = conf_s.view(conf_s.size(0), -1, self.num_labels)
        loc_s  = loc_s .view(loc_s .size(0), -1, 4              )
        return loc_s, conf_s


## Натренировать модель на основе ResNet18

In [11]:
def train_resnet(args, custom_config):
    torch.manual_seed(args.seed)
    np.random.seed(seed=args.seed)

    dataset_root_dir = args.dataset_root_dir
    train_annotation_filename = os.path.join(dataset_root_dir, "ImageSets/Main/trainval.txt")
    test_annotation_filename = os.path.join(dataset_root_dir, "ImageSets/Main/test.txt")
    train_dataloader = get_train_dataloader(dataset_root_dir, train_annotation_filename, args.batch_size, args.num_workers)
    test_dataloader = get_test_dataloader(dataset_root_dir, test_annotation_filename, args.batch_size, args.num_workers)

    learning_rate = args.learning_rate

    if not os.path.exists(args.output):
        os.mkdir(args.output)

    model = resnet18(custom_config['num_priors'], custom_config['num_classes'])

    prior_box_s = prior_boxes(custom_config)
    prior_box_s_gpu = prior_box_s.cuda()

    overlap_threshold = custom_config['overlap_threshold']
    neg_pos_ratio = custom_config['neg_pos_ratio']
    variance = custom_config['variance']

    criterion = MultiBoxLoss(overlap_threshold, neg_pos_ratio, variance)
    model.cuda()
    criterion.cuda()

    optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate, momentum=args.momentum, weight_decay=args.weight_decay)
    scheduler = MultiStepLR(optimizer=optimizer, milestones=args.multistep, gamma=0.2)

    best_loc_loss, best_cls_loss, best_loss = np.inf, np.inf, np.inf
    train_loss_s, eval_loss_s = [], []
    for epoch in tqdm(list(range(args.epochs))):
        # Train model
        train_loc_loss, train_cls_loss, train_loss = 0, 0, 0
        model.train()
        for i, (image_s_cpu, box_ss_cpu, label_ss_cpu) in enumerate(train_dataloader):
            if len(box_ss_cpu) > 0 and len(label_ss_cpu) > 0:
                image_s_gpu = image_s_cpu.cuda()
                label_ss_gpu = [label_s_cpu.cuda() for label_s_cpu in label_ss_cpu]
                box_ss_gpu = [box_s_cpu.cuda() for box_s_cpu in box_ss_cpu]

                pred_loc_ss_gpu, pred_conf_ss_gpu = model(image_s_gpu)

                loc_loss, cls_loss = criterion(
                    (pred_loc_ss_gpu, pred_conf_ss_gpu, prior_box_s_gpu), (label_ss_gpu, box_ss_gpu)
                )

                loss = loc_loss + cls_loss

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                train_loc_loss += loc_loss.item()
                train_cls_loss += cls_loss.item()
                train_loss += loss.item()
        scheduler.step()
        train_loss_s.append(train_loss)

        # Eval model
        eval_loc_loss, eval_cls_loss, eval_loss = 0, 0, 0
        model.eval()

        # Data for mAP
        pred_boxes_all, pred_scores_all, pred_labels_all = [], [], []
        gt_boxes_all, gt_labels_all = [], []

        for i, (image_s_cpu, box_ss_cpu, label_ss_cpu) in enumerate(test_dataloader):
            if len(box_ss_cpu) > 0 and len(label_ss_cpu) > 0:
                image_s_gpu = image_s_cpu.cuda()
                label_ss_gpu = [label_s_cpu.cuda() for label_s_cpu in label_ss_cpu]
                box_ss_gpu = [box_s_cpu.cuda() for box_s_cpu in box_ss_cpu]

                with torch.no_grad():
                    pred_loc_ss_gpu, pred_conf_ss_gpu = model(image_s_gpu)

                # Convert predictions to boxes, scores, and labels
                pred_boxes, pred_labels, pred_scores = detect_objects(
                    pred_loc_ss_gpu.cpu(), pred_conf_ss_gpu.cpu(), prior_box_s,
                    custom_config['num_classes'], 0.5, 0.3
                )

                pred_boxes_all.extend(pred_boxes)
                pred_scores_all.extend(pred_scores)
                pred_labels_all.extend(pred_labels)
                gt_boxes_all.extend(box_ss_cpu)
                gt_labels_all.extend(label_ss_cpu)

                # Calculate loss for evaluation
                loc_loss, cls_loss = criterion(
                    (pred_loc_ss_gpu, pred_conf_ss_gpu, prior_box_s_gpu), (label_ss_gpu, box_ss_gpu)
                )
                loss = loc_loss + cls_loss

                eval_loc_loss += loc_loss.item()
                eval_cls_loss += cls_loss.item()
                eval_loss += loss.item()
        eval_loss_s.append(eval_loss)


        map_score = mAP(custom_config, pred_boxes_all, pred_scores_all, gt_boxes_all,pred_labels, gt_labels_all)


        print(
            f'epoch[{epoch}] | lr {scheduler.get_last_lr()[0]:.5f} | '
            f'loc_loss [{train_loc_loss:.2f}/{eval_loc_loss:.2f}] | '
            f'cls_loss [{train_cls_loss:.2f}/{eval_cls_loss:.2f}] | '
            f'total_loss [{train_loss:.2f}/{eval_loss:.2f}] | mAP {map_score:.4f}'
        )

        if eval_loss < best_loss:
            torch.save(model.state_dict(), os.path.join(args.output, f"{custom_config['model_name']}.pth"))
            best_loc_loss, best_cls_loss, best_loss = eval_loc_loss, eval_cls_loss, eval_loss

    return model, prior_box_s, train_loss_s, eval_loss_s


In [13]:
from argparse import Namespace
import torch
import torch.nn.functional as F
import warnings
warnings.filterwarnings('ignore')
import numpy as np

param_s = Namespace(
 dataset_root_dir='dataset',
 epochs = 15, batch_size = 8,
 checkpoint = None, output = 'output',
 multistep = [20, 30, 40],
 learning_rate = 1e-3, momentum = 0.9,
 weight_decay = 0.0005, warmup = None,
 num_workers = 4,
 seed = 42
)

os.makedirs('./models', exist_ok=True)

custom_config = {
 'num_classes'  : 3,
 'feature_maps' : [(23, 40), (12, 20), (6, 10), (3, 5), (2, 3), (1, 2)], #640x360
 'min_sizes'    : [0.10, 0.20, 0.3, 0.4, 0.7, 1.00],
 'max_sizes'    : [0.20, 0.3, 0.4, 0.7, 1.00, 1.1],

 'aspect_ratios': [[2, 3], [2, 3], [2, 3], [2, 3], [2, 3], [2, 3]],
 'num_priors'   : [6, 6, 6, 6, 6, 6],
 'variance'     : [0.1, 0.2],
 'clip'         :    True,

 'overlap_threshold': 0.5,
 'neg_pos_ratio'    :   3,

 'model_name' : 'resnet18'
}

model, prior_box_s, train_loss_s, eval_loss_s = train_resnet(param_s, custom_config)

  7%|▋         | 1/15 [02:35<36:19, 155.70s/it]

epoch[0] | lr 0.00100 | loc_loss [399.69/11.99] | cls_loss [748.19/25.61] | total_loss [1147.88/37.60] | mAP 0.1039


 13%|█▎        | 2/15 [04:35<29:09, 134.54s/it]

epoch[1] | lr 0.00100 | loc_loss [188.16/6.28] | cls_loss [438.25/14.68] | total_loss [626.41/20.97] | mAP 0.3089


 20%|██        | 3/15 [06:30<25:09, 125.82s/it]

epoch[2] | lr 0.00100 | loc_loss [123.31/4.31] | cls_loss [292.08/12.15] | total_loss [415.39/16.47] | mAP 0.2850


 27%|██▋       | 4/15 [08:08<21:02, 114.76s/it]

epoch[3] | lr 0.00100 | loc_loss [98.13/3.80] | cls_loss [268.98/11.47] | total_loss [367.10/15.27] | mAP 0.2795


 33%|███▎      | 5/15 [09:37<17:35, 105.57s/it]

epoch[4] | lr 0.00100 | loc_loss [83.44/3.32] | cls_loss [245.11/11.22] | total_loss [328.55/14.54] | mAP 0.2423


 40%|████      | 6/15 [11:00<14:40, 97.80s/it] 

epoch[5] | lr 0.00100 | loc_loss [71.21/3.15] | cls_loss [229.85/11.32] | total_loss [301.06/14.46] | mAP 0.2821


 47%|████▋     | 7/15 [12:18<12:08, 91.11s/it]

epoch[6] | lr 0.00100 | loc_loss [65.54/3.04] | cls_loss [218.11/10.47] | total_loss [283.65/13.51] | mAP 0.2516


 53%|█████▎    | 8/15 [13:29<09:53, 84.79s/it]

epoch[7] | lr 0.00100 | loc_loss [61.17/2.65] | cls_loss [208.14/10.16] | total_loss [269.31/12.80] | mAP 0.2138


 60%|██████    | 9/15 [14:44<08:10, 81.81s/it]

epoch[8] | lr 0.00100 | loc_loss [57.26/3.08] | cls_loss [200.80/10.25] | total_loss [258.06/13.33] | mAP 0.2536


 67%|██████▋   | 10/15 [15:54<06:30, 78.17s/it]

epoch[9] | lr 0.00100 | loc_loss [53.80/2.71] | cls_loss [194.18/9.64] | total_loss [247.98/12.35] | mAP 0.2009


 73%|███████▎  | 11/15 [17:02<04:59, 74.90s/it]

epoch[10] | lr 0.00100 | loc_loss [51.85/2.69] | cls_loss [185.76/9.62] | total_loss [237.60/12.30] | mAP 0.2352


 80%|████████  | 12/15 [18:04<03:33, 71.21s/it]

epoch[11] | lr 0.00100 | loc_loss [48.32/2.45] | cls_loss [181.20/9.72] | total_loss [229.52/12.17] | mAP 0.2578


 87%|████████▋ | 13/15 [19:10<02:19, 69.64s/it]

epoch[12] | lr 0.00100 | loc_loss [46.73/2.67] | cls_loss [178.28/10.12] | total_loss [225.01/12.80] | mAP 0.2430


 93%|█████████▎| 14/15 [20:04<01:04, 64.89s/it]

epoch[13] | lr 0.00100 | loc_loss [44.19/2.52] | cls_loss [170.36/9.63] | total_loss [214.55/12.15] | mAP 0.2254


100%|██████████| 15/15 [20:57<00:00, 83.86s/it]

epoch[14] | lr 0.00100 | loc_loss [42.51/2.39] | cls_loss [165.08/9.42] | total_loss [207.60/11.81] | mAP 0.2600





Видим значительное улучшение метрики МАП

## (БОНУС) Добавить разнообразные аугментации изображений в классе Dataset. Провести эксперименты и продемонстрировать метрику mAP.