# _Modules. Only for colab running_

In [20]:
from os.path import join
import pathlib

# debug
DEBUG = False

# enviroment
DATA_PATH = join(pathlib.Path().resolve(), 'data')

# constants
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

# data statistics
MOT20_EXT_FIRST_AXIS_MEAN = 139
MOT20_EXT_SECOND_AXIS_MEAN = 62
MOT20_EXT_MEAN = None
MOT20_EXT_STD = None


In [21]:
"""
MOT20_ext data format:
train:
|
|- <video_id> // директория, содержащая объекты, вырезанные из соответствующего видео
    |
    |- det
    |   |
    |   |- det.txt // файл описания детекций, хранящий строки в формате <frame number>, <object id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <confidence>, <x>, <y>, <z>
    |
    |- gt
    |   |
    |   |- gt.txt // файл описания ground truth, хранящий строки в формате <frame number>, <object id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <is_consider>, <class>, <visibility>
    |
    |- <object_id> // директория, содержащая изображения вырезанных объектов и файл описания
        |
        |a
        .
        .
        .
        |- <frame_id>.jpg - вырезанная из кадра frame_id область с объектом object_id
"""

from os import listdir, mkdir
from os.path import exists, isdir, join

import pandas as pd
from PIL import Image
from tqdm import tqdm

DET_COLUMNS = ['frame', 'id', 'bb_left', 'bb_top',
               'bb_width', 'bb_height', 'confidence', 'x', 'y', 'z']
DET_TYPES = {
    'frame': int,
    'id': int,
    'bb_left': int,
    'bb_top': int,
    'bb_width': int,
    'bb_height': int,
    'confidence': int,
    'x': int,
    'y': int,
    'z': int
}

GT_COLUMNS = ['frame', 'id', 'bb_left', 'bb_top',
              'bb_width', 'bb_height', 'is_consider', 'class', 'visibility']

GT_TYPES = {
    'frame': int,
    'id': int,
    'bb_left': int,
    'bb_top': int,
    'bb_width': int,
    'bb_height': int,
    'is_consider': int,
    'class': int,
    'visibility': float
}


def __get_file_name_by_id(frame_id: int) -> str:
    return f'{str.zfill(str(frame_id), 6)}.jpg'


def __extract_objects_from_frame(video_path: str, frame: str, data: pd.DataFrame) -> None:
    """Извлекает все объекты из кадра. Сохраняет список вырезанных объектов в соответствующие директории
    ### Parameters:
    - video_path: str - путь до директории с текущим видео
    - frame: str - путь до изображения, из которого вырезаются объекты
    - data: pandas.DataFrame - датафрейм с данными объектов на данном изображении
    """
    current_image = Image.open(frame)
    objects_to_crop = data.to_dict('records')
    for obj in objects_to_crop:
        x, y, w, h = obj['bb_left'], obj['bb_top'], obj['bb_width'], obj['bb_height']
        img = current_image.crop(box=(x, y, x + w, y + h))
        img.save(join(video_path, str(
            int(obj['id'])), __get_file_name_by_id(obj['frame'])))


def __extract_video(mot20_video_path: str, mot20_video_ext_path: str) -> None:
    """Извлекает все объекты из видео
    ### Parameters: 
    - mot20_video_path: str - путь до директории с видео в датасете MOT20
    - mot20_video_ext_path: str - путь до директории с видео в датасете MOT20_ext
    """
    detections = get_dataframe(mot20_video_path, file_type='det')
    ground_truth = get_dataframe(mot20_video_path, file_type='gt')
    # выбираем объекты, которые стоит рассматривать и которые относятся к классу пешеходов или стоящих людей
    persons = ground_truth[((ground_truth['class'] == 1) | (
        ground_truth['class'] == 7)) & ground_truth['is_consider'] == 1]
    # для каждого объекта создаем директорию
    for id in persons['id'].unique():
        mkdir(join(mot20_video_ext_path, str(id)))
    # проходим по всем кадрам видео
    for frame in tqdm(persons['frame'].unique()):
        __extract_objects_from_frame(
            mot20_video_ext_path,
            join(
                mot20_video_path, 'img1', f'{str.zfill(str(frame), 6)}.jpg'),
            persons[persons['frame'] == frame]
        )
        # save_objects(mot20_video_ext_path, objects)


def get_dataframe(video_path: str, file_type: str = 'det') -> pd.DataFrame:
    """
    Возвращает датафрейм с обнаружениями или ground truth для указанного видео
    ### Parameters
    - video_path: str - путь до директории с видео
    - file_type: str - det для файла с обнаружениями, gt для ground truth
    """
    df = None

    if (file_type == 'det'):
        df = pd.read_csv(
            join(video_path, 'det', 'det.txt'),
            names=DET_COLUMNS,
            dtype=DET_TYPES
        )
    else:
        df = pd.read_csv(
            join(video_path, 'gt', 'gt.txt'),
            names=GT_COLUMNS,
            dtype=GT_TYPES
        )

    return df


def run(data_path: str) -> None:
    """
    Выполняет преобразование датасета MOT20
    ### Parameters
    - data_path: str - путь до директории с датасетами
    """
    mot20_ext_path = join(data_path, 'MOT20_ext')
    mot20_path = join(data_path, 'MOT20')
    # создание директорий
    if (not (exists(mot20_ext_path) and isdir(mot20_ext_path))):
        # создаем основную директорию
        mkdir(mot20_ext_path)
        # мы используем данные для трейна, так как мы не будем обучаться на тесте
        mkdir(join(mot20_ext_path, 'train'))
        # проходим по всем видео в исходной
        for video_id in listdir(join(mot20_path, 'train')):
            # сохраняем пути до видео в исходной и в новой директориях
            current_path = join(
                join(data_path, 'MOT20'), 'train', video_id)
            current_path_ext = join(mot20_ext_path, 'train', video_id)
            mkdir(current_path_ext)
            __extract_video(current_path, current_path_ext)


In [22]:
import numpy as np
import functools


def __aggregate_to_continious(x: int | list[list[int]], y: int):
    """Аггрегирующая функция для разделения списка чисел на непрерывные отрезки"""
    if (not type(x) == list):
        if (y - x == 1):
            return [[x, y]]
        else:
            return [[x], [y]]
    else:
        last = x[-1][-1]
        if (y - last > 1):
            x.append([y])
        else:
            x[-1].append(y)
    return x


def __get_possible_tuples_count_segment(distance: int, segment: list[int]) -> int:
    """Рассчитывает количество возможных пар для отрезка"""
    return len(segment) - distance - 1


def __get_neighbours_tuples_count(distance: int, segments: list[list[int]]) -> int:
    """Рассчитывает количество возможных пар из граничных элементов"""
    sum = 0
    prev = None
    for s in segments:
        if (prev == None):
            prev = s
            continue
        if (s[0] - prev[-1] - 1 == distance):
            sum += 1
        prev = s

    return sum


def get_possible_tuples_count(distance: int, segments: list[list[int]]) -> int:
    """Рассчитывает количество возможных пар для списка отрезков"""
    sum = 0
    for s in segments:
        sum += max(0, __get_possible_tuples_count_segment(distance, s))

    sum += __get_neighbours_tuples_count(distance, segments)
    return sum


def split_to_continuous_segments(array_numbers: list[int]) -> list[list[int]]:
    """Возвращает список непрерывных отрезков чисел"""
    if (len(array_numbers) == 0):
        return [[]]
    elif (len(array_numbers) == 1):
        return [array_numbers]
    else:
        return functools.reduce(__aggregate_to_continious, sorted(array_numbers))


def __get_possible_tuples(distance: int, segment: list[int]) -> list[tuple[int, int]]:
    end = max(len(segment) - distance - 1, 0)
    return [(i, i + distance + 1) for i in range(segment[0], segment[end])]


def get_possible_tuples(distance: int, segments: list[list[int]]) -> list[tuple[int, int]]:
    """Возвращает список возможных пар чисел с заданным расстоянием для отрезка
    ### Parameters: 
    - distance: int - расстояние между элементами
    - segments: list[list[int]] - список непрерывных отрезков
    """
    res = []
    prev = None
    for segment in segments:
        tuples = __get_possible_tuples(distance, segment)
        if (prev is not None and segment[0] - prev[-1] - 1 == distance):
            res.append((prev[-1], segment[0]))

        prev = segment
        res += tuples

    return res


In [23]:
"""
Содержит классы для загрузки данных MOT20_ext, преобразованных из MOT20 dataset
MOT20_ext data format:
train:
|
|- <video_id> // директория, содержащая объекты, вырезанные из соответствующего видео
    |
    |- det
    |   |
    |   |- det.txt // файл описания детекций, хранящий строки в формате <frame number>, <object id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <confidence>, <x>, <y>, <z>
    |
    |- gt
    |   |
    |   |- gt.txt // файл описания ground truth, хранящий строки в формате <frame number>, <object id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <is_consider>, <class>, <visibility>
    |
    |- <object_id> // директория, содержащая изображения вырезанных объектов и файл описания
        |
        |
        .
        .
        .
        |- <frame_id>.jpg - вырезанная из кадра frame_id область с объектом object_id
"""

from os.path import join

import cv2
from numpy.random import choice
from torch.utils.data import Dataset as Dataset
from tqdm import tqdm as tqdm


class MOT20ExtDataset(Dataset):
    """
    Создает объект типа Dataset, загружающий данные преобразованного датасета MOT20_ext
    Возвращает пары изображений и метку: 1, если на изображении один и тот же объект, иначе 0 
    """

    def __init__(
        self,
            video_path: str,
            transform=None,
            visibility_threshold: float = 1,
            frame_distance: int | list[int] | tuple[int, int] = 0,
            negative_proportion: float = 0.5
    ) -> None:
        """
        Создает объект типа Dataset, загружающий данные преобразованного датасета MOT20_ext.
        Возвращает пары изображений и метку: 0, если на изображении один и тот же объект, иначе 1 
        ### Parameters:
        - video_path: str - путь до директории с видео датасета МОТ20_ехт. Ожидается, что в директории находятся файлы описаний и ground truth
        - transform - применяемые аугментации
        - visibility_threshold: float - порог видимости (поле visibility) объекта, используемого в обучении
        - frame_distance: int | list[int] | tuple[int, int] - допустимое расстояние между кадрами, объекты из которых используются в обучении. Если переданы два числа в виде начального и конечного значений - конечное включается 
        - negative_proportion: float - доля объектов, значение метки для которых 0
        """
        super(MOT20ExtDataset).__init__()
        self.video_path = video_path
        self.visibility_threshold = visibility_threshold
        self._check_distance_correct(frame_distance)
        self.frame_distance = frame_distance
        self.detections = get_dataframe(video_path, file_type='det')
        df = get_dataframe(video_path, file_type='gt')
        # берем объекты которые стоит учитывать при обучении
        df = df[df['is_consider'] == 1]
        # выбираем с видимостью выше заданной
        df = df[df['visibility'] >= visibility_threshold]
        self.ground_truth = df
        # формируем словарь, используемый для длины и индексации
        self._objetcs_pairs_dict = self._get_pairs_dict()
        # рассчитываем количество объектов с меткой 1
        self._len_1 = self._calc_len()
        # рассчитываем длину всего датасета
        self._len = round(self._len_1 / (1 - negative_proportion))
        self.transform = transform

    def _check_distance_correct(self, distance: int) -> None:
        """Проверяет корректность типов и значений для расстояния"""
        if (type(distance) == int):
            if (distance < 0):
                raise ValueError(
                    'Distance between frames must be non negative integer')
        elif (type(distance) == list):
            for d in distance:
                if (not type(d) == int):
                    raise TypeError(
                        'Each distance value must be non negative integer')
                if (distance < 0):
                    raise ValueError(
                        'Distance between frames must be non negative integer')
        elif (type(distance) == tuple):
            start, end = distance
            if (not (type(start) == int and type(end) == int)):
                raise TypeError(
                    'Each distance value must be non negative integer')
            if (start < 1):
                raise ValueError(
                    'Start index of distance must be non negative')
            if (end < start):
                raise ValueError(
                    'End index of distance must be bigger then start')
        else:
            raise TypeError(
                'Distance argument must be integre or list of integres or tuple of two integres')

    def _get_pairs_dict(self) -> dict[int, int | dict[int, int]]:
        """Возвращает словарь, содержащий количество возможных пар для каждого объекта. В случае нескольких d набор пар представлен как словари"""
        objects_lens = {}
        for object_id in sorted(self.ground_truth['id'].unique()):
            object_frames = sorted(
                self.ground_truth[self.ground_truth['id'] == object_id]['frame'].values)
            segments = split_to_continuous_segments(object_frames)
            if (type(self.frame_distance) == int):
                objects_lens[object_id] = get_possible_tuples_count(
                    self.frame_distance, segments)
            elif (type(self.frame_distance) == list):
                objects_lens[object_id] = {}
                for d in self.frame_distance:
                    objects_lens[object_id][d] = get_possible_tuples_count(
                        d, segments)
            else:
                start_d, end_d = self.frame_distance
                objects_lens[object_id] = {}
                for d in range(start_d, end_d + 1):
                    objects_lens[object_id][d] = get_possible_tuples_count(
                        d, segments)

        return objects_lens

    def _calc_len(self) -> int:
        """Рассчитывает длину датасета"""
        count = 0
        for v in self._objetcs_pairs_dict.values():
            if (type(v) == int):
                count += v
            else:
                count += sum(v.values())

        return count

    def _get_pairs_by_idx(self, idx: int) -> tuple[int, int, int, int]:
        """Возвращает пару объектов класса 1, распологающуюся в датасете по индексу idx,
        в формате tuple - (id первого объекта, id второго объекта, frame_id первого объекта, frame_id второго объекта)"""
        previous_pairs_count = 0
        object_id = -1
        object_pairs = None
        # поиск по парам, объекты на которых совпадают
        # поиск по всем парам всех объектов со всеми возможными дистанциями
        for id, pairs in self._objetcs_pairs_dict.items():
            # считаем, сколько пар есть у данного объекта
            if (type(pairs) == int):
                previous_pairs_count += pairs
            else:
                previous_pairs_count = sum(pairs.values())

            # если индекс больше, чем пар у текущего объекта - берем следующий
            if (idx >= previous_pairs_count):
                continue
            else:
                #  ищем среди пар данного объекта
                iidx = idx - previous_pairs_count
                object_frames = sorted(
                    self.ground_truth[self.ground_truth['id'] == id]['frame'].values)
                segments = split_to_continuous_segments(object_frames)
                object_id = id
                if (type(pairs) == int):
                    object_pairs = get_possible_tuples(
                        self.frame_distance, segments)[iidx]
                    return (object_id, object_id, *object_pairs)
                else:
                    # ищем необходимую дистанцию
                    current_previous_pairs_count = 0
                    for d, d_pairs_len in pairs:
                        current_previous_pairs_count += d_pairs_len
                        if (iidx >= current_previous_pairs_count):
                            continue
                        else:
                            # ищем для пар с данной дистанцией
                            iiidx = iidx - current_previous_pairs_count
                            object_pairs = get_possible_tuples(d, segments)[
                                iiidx]
                            return (object_id, object_id, *object_pairs)

    def _get_pairs0_by_idx(self, idx: int) -> tuple[int, int, int, int]:
        """Возвращает пару объектов класса 0, распологающуюся в датасете по индексу idx,
        в формате tuple - (id первого объекта, id второго объекта, frame_id первого объекта, frame_id второго объекта).
        Объект берется рандомно
        """
        object_ids = self.ground_truth['id'].unique()
        id1 = choice(object_ids)
        frame_ids = self.ground_truth[self.ground_truth['id']
                                      == id1]['frame']
        frame1 = choice(frame_ids)
        id2 = choice(object_ids)
        frame_ids = self.ground_truth[self.ground_truth['id']
                                      == id2]['frame']
        frame2 = choice(frame_ids)

        return (id1, id2, frame1, frame2)

    def __len__(self) -> int:
        return self._len

    def __getitem__(self, idx: int) -> tuple[cv2.Mat, cv2.Mat, int]:
        """Возвращает два изображения в формате cv2.Mat и метку: 1, если на изображении один и тот же объект, иначе 0
        Нумерация начинается с 0. Объекты в датасете хранятся в порядке:
        - пары <объект;объект>
            - пары <объект;объект> отсортированы по возрастанию id
            - пары для одного объекта отсортированы по возрастанию distance, затем frame_id первого кадра пары
        - пары <объект;другой_объект>
            - рандомный объект
        """
        id1, id2, frame_id1, frame_id2 = self._get_pairs_by_idx(
            idx) if (idx < self._len_1) else self._get_pairs0_by_idx(idx)
        img1 = cv2.imread(join(self.video_path, str(
            id1), f'{str(frame_id1).zfill(6)}.jpg'))
        img2 = cv2.imread(join(self.video_path, str(
            id2), f'{str(frame_id2).zfill(6)}.jpg'))
        if (self.transform):
            img1 = self.transform(image=img1)['image']
            img2 = self.transform(image=img2)['image']

        return (img1, img2, 0 if (id1 == id2) else 1)


In [24]:
import albumentations as A
import torch
from albumentations.pytorch import ToTensorV2


def get_resize_transform(size: tuple[int, int]) -> A.Sequential:
    """Возвращает преобразование изменения размера"""
    return A.Sequential([
        A.Resize(*size),
    ])


def get_norm_transform(mean: list[int] = IMAGENET_MEAN, std: list[int] = IMAGENET_STD) -> A.Sequential:
    """Возвращает преобразование нормализации и приведения к тензору"""
    return A.Sequential([
        A.Normalize(mean=mean, std=std),
        ToTensorV2()
    ])


In [25]:
from copy import deepcopy
from datetime import datetime
from os.path import join

import albumentations as A
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
from torch import Generator, nn
from torch.optim import Adam, Optimizer
from torch.utils.data import ConcatDataset, DataLoader, random_split
from torchvision import models
from tqdm import tqdm

# Обучение сиамской сети для различия двух объектов

Обучение на данных MOT20Ext

## Загрузка данных

In [26]:
resize_transform = get_resize_transform((MOT20_EXT_FIRST_AXIS_MEAN, MOT20_EXT_SECOND_AXIS_MEAN)) 
norm_transform = get_norm_transform()
transform = A.Compose([resize_transform, norm_transform])

In [27]:
dataset01 = MOT20ExtDataset(join(DATA_PATH, 'MOT20_ext/train/MOT20-01/'), transform=transform)
dataset02 = MOT20ExtDataset(join(DATA_PATH, 'MOT20_ext/train/MOT20-02/'), transform=transform)
dataset03 = MOT20ExtDataset(join(DATA_PATH, 'MOT20_ext/train/MOT20-03/'), transform=transform)
dataset05 = MOT20ExtDataset(join(DATA_PATH, 'MOT20_ext/train/MOT20-05/'), transform=transform)

In [28]:
dataset = ConcatDataset([dataset01, dataset02, dataset03, dataset05])

In [29]:
len(dataset)

271442

### Создание даталоадеров

In [30]:
TEST_PROPROTION = 0.2
VAL_PROPORTION = 0.15
TRAIN_PROPORTION = 1 - TEST_PROPROTION - VAL_PROPORTION
sum([TEST_PROPROTION, VAL_PROPORTION, TRAIN_PROPORTION])

1.0

In [31]:
generator = torch.manual_seed(0)
dataset_use, dataset_unuse = random_split(dataset, [0.002, 0.998])
len(dataset_use)

543

In [32]:
train_set, val_set, test_set = random_split(
    dataset_use, [TRAIN_PROPORTION, VAL_PROPORTION, TEST_PROPROTION], generator=generator)

In [33]:
BATCH_SIZE = 8

In [34]:
train_loader = DataLoader(
    train_set,
    shuffle=True,
    batch_size=BATCH_SIZE,
    drop_last=True,
    generator=generator,
    
)

val_loader = DataLoader(
    val_set,
    shuffle=True,
    batch_size=BATCH_SIZE,
    drop_last=True,
    generator=generator
)

test_loader = DataLoader(
    test_set,
    shuffle=True,
    batch_size=BATCH_SIZE,
    drop_last=True,
    generator=generator
)

In [35]:
len(train_loader), len(val_loader), len(test_loader)

(44, 10, 13)

### Расчет статистик

In [36]:
loader = DataLoader(
    dataset,
    shuffle=True,
    batch_size=BATCH_SIZE,
    drop_last=True,
    generator=generator,
)

In [37]:
# get_statistics(loader)

### Проверка отображения

In [38]:
# batch = next(iter(train_loader))
# x1, x2, y = batch[0][0], batch[1][0], batch[2][0]
# display_images((x1, x2), y)

In [39]:
# batch = next(iter(train_loader))
# display_batch(batch)

## Обучение

### DEBUG - Delete after

In [40]:
resnet = models.resnet18(pretrained=True)
resnet



ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [41]:
class SiameseBasicCNN(nn.Module):
    """Простейшая сиамская сверточная нейронная сеть"""

    def __init__(self) -> None:
        super(SiameseBasicCNN, self).__init__()
        self.resnet = resnet
        # разморозим последний слой
        for x in resnet.parameters():
            x.requires_grad = False
        for x in resnet.fc.parameters():
            x.requires_grad = True

    def forward(self, x1, x2):
        output1 = self.resnet(x1)
        output2 = self.resnet(x2)

        return F.pairwise_distance(
            output1, output2, keepdim=True)

class ContrastiveLoss(nn.Module):
    """Функция потерь для двух объектов. Вычисляет евклидово расстояние между объектами"""

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

# TODO: поменять
    def forward(self, x, y):
        loss_contrastive = torch.mean(
            (1-y) * torch.pow(x, 2) +
            y * torch.pow(torch.clamp(self.margin - x, min=0.0), 2))

        return loss_contrastive


In [42]:
class SiameseBasicCNN2(nn.Module):
    """Простейшая сиамская сверточная нейронная сеть"""

    def __init__(self) -> None:
        super(SiameseBasicCNN2, self).__init__()
        self.resnet = resnet
        # разморозим последний слой
        for x in resnet.parameters():
            x.requires_grad = False
        for x in resnet.layer4.parameters():
            x.requires_grad = True
        for x in resnet.fc.parameters():
            x.requires_grad = True

    def forward(self, x1, x2):
        output1 = self.resnet(x1)
        output2 = self.resnet(x2)

        return F.pairwise_distance(
            output1, output2, keepdim=True)

class ContrastiveLoss(nn.Module):
    """Функция потерь для двух объектов. Вычисляет евклидово расстояние между объектами"""

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

# TODO: поменять
    def forward(self, x, y):
        
        loss_contrastive = torch.mean(
            (1-y) * torch.pow(x, 2) +
            y * torch.pow(torch.clamp(self.margin - x, min=0.0), 2))

        return loss_contrastive


### Обучение

In [43]:
def train(
    model: torch.nn.Module = None,
    train_loader: DataLoader = None,
    val_loader: DataLoader = None,
    optimizer: Optimizer = None,
    criterion = None,
    epoch_count: int = 10,
    scheduler: None = None,
    threshold: float = 0.5,
    device: torch.device = torch.device('cpu'),
):
    losses_train = []
    accuracies_train = []
    losses_val = []
    accuracies_val = []
    best_val_accuracy = 0

    for epoch in range(epoch_count):
        print('Epoch {}/{}:'.format(epoch, epoch_count - 1), flush=True)
        for phase in ['train', 'val']:
            if (phase == 'train'):
                dataloader = train_loader
                if (scheduler is not None):
                    scheduler.step()
                model.train()
            else:
                dataloader = val_loader
                model.eval()

            running_loss = 0.
            running_acc = 0.
            # TODO: определить какая будет метрика качества

            for (x1, x2, y) in tqdm(dataloader):
                x1, x2, y = x1.to(device), x2.to(device), y.to(device)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    distance = model(x1, x2)
                    loss = criterion(distance, y)
                    d = distance.clone()
                    d[d <= threshold] = 0
                    d[d > threshold] = 1
                    
                    if (phase == 'train'):
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item()
                running_acc += torch.eq(d, y).float().mean()
            
            epoch_loss = running_loss / len(dataloader)
            epoch_acc = running_acc / len(dataloader)
            if phase == 'val':
                losses_val.append(epoch_loss)
                accuracies_val.append(epoch_acc)
            else:
                losses_train.append(epoch_loss)
                accuracies_train.append(epoch_acc)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc), flush=True)
            
            if phase == 'val' and best_val_accuracy < epoch_acc:
                best_val_accuracy = epoch_acc
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': epoch_loss,
                    'model_name': model.name,
                }, f'./{model.name}_{datetime.now().strftime("%d.%m_%H:%M")}.pth')  # checkpoint_name + '_iou_{:.2f}_epoch_{}.pth'.format(self._max_score, i))
                print(f'Model saved at {model.name}.pth')
    
    return model, {
        'train': (losses_train, accuracies_train),
        'val': (losses_val, accuracies_val)
    }

### Настройка параметров обучения

In [44]:
model = SiameseBasicCNN()
lr = 1e-3
criterion = ContrastiveLoss()
optimizer = Adam(model.parameters(), lr)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model, results = train(
    model=model,
    criterion=criterion,
    device=device,
    train_loader=train_loader,
    val_loader=val_loader,
    epoch_count=10,
    optimizer=optimizer,
)

Epoch 0/9:


 18%|█▊        | 8/44 [00:03<00:16,  2.17it/s]


KeyboardInterrupt: 

In [None]:
model

SiameseBasicCNN(
  (resnet): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, tr