<a href="https://colab.research.google.com/github/ArceusRay/-/blob/main/%E8%AF%AD%E4%B9%89%E5%88%86%E5%89%B2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#三类语义分割

#Three-class semantic segmentation

提出用三个类来解决动物的语义分割问题：“背景”类（标签0）、“猫”类（标签1）和“狗”类（标签2）。
![图片](https://miro.medium.com/max/1130/1*DDEkOFC93pEbrTdyhdpXZg.png)

为此，我们将准备[数据集](https://drive.google.com/uc?export=download&id=1ZsRAXiPgOU5Am8tNZ7mruwtJh3ck8TI5)，实现指标/损失函数，实现和训练我们自己的[PSPNet](https://arxiv) .org/abs /1612.01105）类似的架构。

It is proposed to solve the problem of semantic segmentation of animals with three classes: the “background” class (label 0), the “cat” class (label 1) and the “dog” class (label 2).
![Image](https://miro.medium.com/max/1130/1*DDEkOFC93pEbrTdyhdpXZg.png)

To do this, we will prepare [dataset](https://drive.google.com/uc?export=download&id=1ZsRAXiPgOU5Am8tNZ7mruwtJh3ck8TI5), implement metrics/loss functions, implement and train our own [PSPNet](https://arxiv.org/abs /1612.01105)-like architecture.

In [None]:
!gdown -q --id 1ZsRAXiPgOU5Am8tNZ7mruwtJh3ck8TI5 -O data.zip
!unzip -qq data.zip

### 加载模块
### Loading modules

In [None]:
# Загружаем pytorch для работы с нейронными сетями
import torch
import torch.nn as nn
import torch.nn.functional as F

# Для работы с изображениями/графиками
from torchvision import transforms
# Загружаем способы интерполяции изображений
from torchvision.transforms.functional import InterpolationMode as IM
import matplotlib.pyplot as plt

# Для логирования метрик и функций потерь в ходе обучения
from torch.utils.tensorboard import SummaryWriter

# Для удобной работы с обучающей/тестовой выборкой
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import numpy as np
from tqdm.notebook import tqdm

## 第 1 部分：数据准备

## Part 1: Data Preparation

### 1.1 数据集预处理

### 1.1 Preprocessing dataset

要开始使用数据，需要完成以下步骤：
- **决定从磁盘存储/读取数据的方法。 建议比较三个选项：“HDF5”、“内存映射文件”和“原始”视图（在磁盘上存储“.jpg/.png”文件）。 所有必需的类已在“utils.py”文件中描述。 建议仅测量每种格式的数据读取速度，然后选择最快的一种。**
     - 让我们更详细地讨论这些存储格式的特性。 在计算机视觉任务中，数据集通常很大，无法装入 RAM。 `hdf5` 格式允许将信息数组拆分为 [块](https://www.oreilly.com/library/view/python-and-hdf5/9781491944981/ch04.html)，这些块以以下形式组织[B 树]( https://en.wikipedia.org/wiki/B-tree )。 这种类型的存储对于有效读取“hyperslabs”（内存中不连续的多维数组切片）是必需的。 默认情况下，“hdf5”连续存储数据。
     - 将“内存映射”文件写入 RAM 允许您跳过缓冲阶段，从而跳过复制操作，直接延迟加载信息。 这种方法的独特之处在于，从算法上来说，读取速度的“最佳情况”是在连续的信息块（连续）上实现的，而“最坏情况”则相反，是在内存中的非连续块（顺序为程度比“hdf5”中可能出现的情况更严重）。

- **将所有对（图像、掩码）设置为单个大小“target_shape”，稍后在配置字典“default_config”中指定**。 建议采取以下操作顺序：
     1. **使用 [transforms.Resize](https://pytorch.org/vision/stable/ generated/torchvision.transforms.Resize.html#torchvision.transforms.Resize) 插值（默认为双线性插值）像素值​​将原始图像调整为指定大小**。 然而，这样的操作会扭曲图像的原始长宽比，这会对网络的预测能力产生负面影响。 例如，猫脸的整体外观将取决于图像的原始大小，而不是“猫”类的实体：它可能不会被拉伸，它可能会垂直/水平拉伸。 同一实体的表示不一致可能会导致学习不稳定，因为任何输入图像的卷积核尺寸都是相同的！ 幸运的是，这个问题已经在“transforms.Resize”中得到解决：给定一个整数参数“size”，输入图像的最小边将被插值到“size”，另一边（最大）将被插值到“size * aspect_ratio” `，即保持边的比例`aspect_ratio`
     2. 目前，源图像只有一侧符合所需的“target_shape”尺寸。 剩余边可能大于所需尺寸。 然后您需要**使用[transforms.CenterCrop](https://pytorch.org/vision/stable/ generated/torchvision.transforms.CenterCrop.html#torchvision.transforms.CenterCrop)**裁剪图像。

> 可以使用 [transforms.Compose](https://pytorch.org/vision/stable/ generated/torchvision.transforms.Compose.html) 来顺序执行 `transforms` 模块的操作。

`为什么一般来说有必要将所有图像缩小到相同大小？`

To start working with data, you need to complete the following steps:
- **Decide on the method of storing/reading data from the disk. It is proposed to compare three options: `HDF5`, `memory-mapped files` and `raw` view (storing `.jpg/.png` files on disk). All necessary classes are already described in the `utils.py` file. It is suggested only to measure the data reading speed for each format, then select the fastest one.**
     - Let's talk in more detail about the features of these storage formats. In computer vision tasks, datasets, as a rule, are large in size, which does not fit into RAM. The `hdf5` format allows you to split arrays of information into [chunks](https://www.oreilly.com/library/view/python-and-hdf5/9781491944981/ch04.html), which are organized in the form of [B-trees]( https://en.wikipedia.org/wiki/B-tree). This type of storage is necessary for efficient reading of `hyperslabs` - multidimensional array slices that are non-contiguous in memory. By default, `hdf5` stores data contiguously.
     - `Memory-mapping` files into RAM allows you to skip the buffering stage, thereby skipping the copy operation, lazily loading information directly. The peculiarity of this approach is that algorithmically the `Best case` of reading speed is achieved on a continuous block of information (contiguous), and the `Worst case` - on the contrary, on a non-contiguous block in memory (orders of magnitude worse than potentially possible in `hdf5`).

- **Bring all pairs (image, mask) to a single size `target_shape`, specified later in the configuration dictionary `default_config`**. The following sequence of actions is suggested:
     1. **Using [transforms.Resize](https://pytorch.org/vision/stable/generated/torchvision.transforms.Resize.html#torchvision.transforms.Resize) to interpolate (bilinear interpolation by default) the pixel values at resizing the original image to the specified size**. However, such an operation distorts the original aspect ratio of the image, which can negatively affect the predictive ability of the network. For example, the overall appearance of a cat's face will depend on the original size of the image, and not on the entity of the "cat" class: it may not be stretched, it may be stretched vertically/horizontally. Inconsistency in the representation of the same entity can lead to unstable learning, since the dimensions of the convolution kernel are the same for any input image! Fortunately, this problem has already been solved in `transforms.Resize`: given an integer argument `size`, the smallest side of the input image will be interpolated to `size`, and the other side (largest) to `size * aspect_ratio`, i.e. maintaining the ratio sides `aspect_ratio`
     2. Currently, only one side of the source image matches the required `target_shape` size. It is possible that the remaining side is larger than the required size. Then you need to **crop the image using [transforms.CenterCrop](https://pytorch.org/vision/stable/generated/torchvision.transforms.CenterCrop.html#torchvision.transforms.CenterCrop)**.

> Sequential execution of the operations of the `transforms` module can be done using [transforms.Compose](https://pytorch.org/vision/stable/generated/torchvision.transforms.Compose.html).

 `Why, in general, is it necessary to reduce all images to the same size?`

尽管用于解决语义分割问题的网络并不依赖于特定的图像大小，但它们仍然有必要使图像大小相同，因为 学习到的网络参数决定了给定大小的感受野，这使得网络可以很好地解决固定图像大小的问题。 输入图像时大小不同，相同的感受野不再是最佳的（即，如果网络正在解决语义分割问题立即在新尺寸的图像上，学习到的感受野将会不同）。 所以，如果你学会在小图片上检测某个物体，那么当输入大图片时，相同的可见范围不足以提取背景。

Although networks used to solve semantic segmentation problems do not depend on a specific image size, they are still necessary to make the images the same size because the learned network parameters determine the receptive field of a given size, which allows the network to work well Fix issue with fixed image size. When input images are of different sizes, the same receptive field is no longer optimal (i.e. if the network is solving a semantic segmentation problem immediately on images of new sizes, the learned receptive fields will be different). So, if you learn to detect an object on a small image, the same visible range will not be enough to extract the background when a large image is input.

In [None]:
from utils import *
from PIL.Image import Image as Image_t
def resize(img: Image_t, target_shape: tuple[int, int]) -> np.array:


    """
    Приводит входное изображение (или маску) `img` к размеру `target_shape`, указанной выше
    последовательностью действий. Предполагается, что требуемый размер `target_shape` "квадратный"
    """
    # Проверяем равенство желаемых ширины и высоты в target_shape
    assert target_shape[0] == target_shape[1]

    # Масштабируем наименьшую размерность `img` под `target_shape`
    # В качестве способа интерполяции выберем интерполяцию методом ближайшего соседа
    # Это необходимо для сохранения множества значений маски сегментации
    img = transforms.Resize(target_shape[0], interpolation=IM.NEAREST)(img)
    resize_transform = transforms.Compose([
        # Обрезаем "лишние" пиксели. Если их нет, то CenterCrop ничего не изменит (случай "меньше").
        transforms.CenterCrop(target_shape),  # используйте transforms.CenterCrop

        # Преобразуем PIL.Image изображение в массив np.array
        transforms.Lambda(lambda x: np.array(x))  # используйте transforms.Lambda
    ])

    return resize_transform(img)

In [None]:
def prepare_dataset(config: dict, storage_class: Type[storage_class]):
    """
    Предобрабатывает датасет и эффективно его сохраняет на диск
    """
    with open(config["annotation_file"]) as f:
        lines = f.readlines()

    # Заводим массивы для блоков изображений, помещаемых в память
    input_chunk = np.empty((config["chunk_size"], *config["target_shape"], 3), dtype=np.uint8)
    target_chunk = np.empty((config["chunk_size"], *config["target_shape"]), dtype=np.uint8)

    # Делим датасет на блоки
    config["dataset_size"] = len(lines)
    num_chunks = config["dataset_size"] // config["chunk_size"] + bool(config["dataset_size"] % config["chunk_size"])
    dataset = storage_class(config)

    # Читаем изображения с диска, предобрабатываем и сохраняем в выбранный нами формат
    for chunk_idx in tqdm(range(num_chunks)):
        for pos in range(config["chunk_size"]):
            flat_idx = chunk_idx * config["chunk_size"] + pos
            if (flat_idx >= config["dataset_size"]):
                break

            img_name, label = lines[flat_idx].rstrip("\n").split(' ')

            input_raw = Image.open(os.path.join(config["input_dir"], img_name + ".jpg")).convert("RGB")
            target_raw = Image.open(os.path.join(config["target_dir"], img_name + ".png")).convert('L')

            input_chunk[pos] = resize(input_raw, config["target_shape"])
            target_chunk[pos] = renumerate_target(resize(target_raw, config["target_shape"]), int(label))
        dataset.append(input_chunk, target_chunk)
    dataset.lock()

    return dataset

为简单起见，我们将选择具有相同边的“target_shape”图像的大小。 建议的尺寸为“256x256”，但选择权在于您。 请注意，进一步代码的性能取决于图像的大小（图像越大，训练所需的时间越长）。

For simplicity, we will choose the size of the `target_shape` images with the same sides. The suggested size is `256x256`, although the choice is yours. Please note that the performance of further code depends on the size of the images (the larger the images, the longer it takes to train).

In [None]:
# если хотите перезапустить ячейку, возможно вам понадобится удалить некоторые файлы, раскоменьте строчку ниже
!rm -rf SegTask/trainval.h5 SegTask/trainval

# Конфигурация датасета
default_config = {
             "input_dir": "SegTask/images",
             "target_dir": "SegTask/seg_masks",
             "target_shape": (256, 256), # Можно любой другой размер картинки
             "chunk_size": 512, # количество изображений в блоке, загружаемых в оперативную память
            }

# Конфигурации обучающей и тестовой выборок отличаются файлов аннотации
config_train = {"annotation_file":  "SegTask/trainval.txt"} | default_config
config_test = {"annotation_file": "SegTask/test.txt"} | default_config

train_data_hdf5 = prepare_dataset(config_train, storage_hdf5)
train_data_memmap = prepare_dataset(config_train, storage_memmap)
train_data_raw = prepare_dataset(config_train, storage_raw)



### 1.2 创建数据集和DataLoader

### 1.2 Creating Dataset and DataLoader

Pytorch [提供](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html)为我们的数据提供方便的数据集和数据加载器包装器，它们有效地将我们的数据集分割成给定大小的“批次”（块） ，以及并行化“num_workers”线程上的读取过程。

此外，为了进一步的工作，我们将需要数据的[增强](https://pytorch.org/vision/stable/transforms.html)。 其目标是通过对图像应用变换来进一步扩展训练集，这些变换改变图像的绝对像素值，但不破坏其信息内容。

例如，[ColorJitter](https://pytorch.org/vision/stable/generated/torchvision.transforms.ColorJitter.html#torchvision.transforms.ColorJitter) 转换能够将图像的亮度更改为随机数，这不会改变它的上下文。 但是，不建议使用 [RandomCrop](https://pytorch.org/vision/stable/generated/torchvision.transforms.RandomCrop.html#torchvision.transforms.RandomCrop) 变换，因为动物的脸部有可能会变形。不包含在照片中，动物的类别将不明确。 因此，每次从训练集中调用一个对象时，都会对其应用随机变换/一系列随机变换。 **请注意，图像变换必须与其分割掩模一致**。

Pytorch [provides](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html) us with convenient Dataset and DataLoader wrappers for our data, which effectively slice our dataset into `batches` (blocks) of a given size, as well as parallelize the reading process on `num_workers` threads.

Also, for further work we will need [augmentation](https://pytorch.org/vision/stable/transforms.html) of data. Its goal is to further expand the training set by applying transformations to the images that change their absolute pixel values, but do not destroy their information content.

For example, the [ColorJitter](https://pytorch.org/vision/stable/generated/torchvision.transforms.ColorJitter.html#torchvision.transforms.ColorJitter) transformation is capable of changing the brightness of an image to a random number, which does not change its context. However, the [RandomCrop](https://pytorch.org/vision/stable/generated/torchvision.transforms.RandomCrop.html#torchvision.transforms.RandomCrop) transformation is not recommended, since there is a chance that the animal’s face will not be included in the photo and the class of the animal will be ambiguous. Thus, each time an object is called from the training set, a random transformation/series of random transformations will be applied to it. **Please note that the image transformation must be consistent with its segmentation mask**.

**需要实现以下增强转换：**
- `水平翻转`
- `ColorJitter`
- `随机视角`
对于每个转换，您都需要编写一个魔术方法“__call__”，它允许您将类对象（转换）作为函数（C++ 中的函子）进行访问：

**It is required to implement the following augmentation transformations:**
- `HorizontalFlip`
- `ColorJitter`
- `RandomPerspective`

For each of these transformations, you need to write a magic method `__call__`, which allows you to access a class object (transformation) as a function (functor from C++):
```Python
# инициализация
obj = Example()
# вызывается __call__
obj()
```

In [None]:
from torchvision.transforms.functional import hflip
from torchvision.transforms.functional import perspective
from torchvision.transforms import ColorJitter as CJ


class HorizontalFlip():
    def __init__(self, prob: float):
        self.p = prob

    def __call__(self, pair: tuple[Image_t, Image_t]) -> tuple[Image_t, Image_t]:
        """
        `pair` содержит пару (изображение, сегментационная маска)
        * Почитайте: https://pytorch.org/vision/main/generated/torchvision.transforms.functional.hflip.html
        """
        if (np.random.binomial(1, self.p)):
            pair = (hflip(pair[0]), hflip(pair[1]))
        return pair


class ColorJitter():
    def __init__(self, prob: float, param: tuple[float, ...]):
        self.p = prob
        self.CJ = CJ(*param)

    def __call__(self, pair: tuple[Image_t, Image_t]) -> tuple[Image_t, Image_t]:
        """
        `pair` содержит пару (изображение, сегментационная маска)
        * Почитайте: https://pytorch.org/vision/main/generated/torchvision.transforms.ColorJitter.html
        * Сделайте по аналогии с HorizontalFlip
        """
        if np.random.binomial(1, self.p):
          pair = (self.CJ(pair[0]), pair[1])
        return pair


class RandomPerspective():
    def __init__(self, prob: float, param: float):
        self.p = prob
        self.distortion_scale = param

    def __call__(self, pair: tuple[Image_t, Image_t]) -> tuple[Image_t, Image_t]:
        """
        `pair` содержит пару (изображение, сегментационная маска)
        * Почитайте: https://pytorch.org/vision/main/generated/torchvision.transforms.RandomPerspective.html
        * Сделайте по аналогии с HorizontalFlip
        * Используйте `transforms.RandomPerspective.get_params` и `torchvision.transforms.functional.perspective`
          рекомендуем параметры: fill=0, interpolation=IM.NEAREST
        """
        if np.random.binomial(1, self.p):
          width, height = pair[0].size
          startpoints = [[0, 0], [width - 1, 0], [width - 1, height - 1], [0, height - 1]]
          half_height = height // 2
          half_width = width // 2
          topleft = [
            int(torch.randint(0, int(self.distortion_scale * half_width) + 1, size=(1,)).item()),
            int(torch.randint(0, int(self.distortion_scale * half_height) + 1, size=(1,)).item()),
          ]
          topright = [
            int(torch.randint(width - int(self.distortion_scale * half_width) - 1, width, size=(1,)).item()),
            int(torch.randint(0, int(self.distortion_scale * half_height) + 1, size=(1,)).item()),
          ]
          botright = [
            int(torch.randint(width - int(self.distortion_scale * half_width) - 1, width, size=(1,)).item()),
            int(torch.randint(height - int(self.distortion_scale * half_height) - 1, height, size=(1,)).item()),
          ]
          botleft = [
            int(torch.randint(0, int(self.distortion_scale * half_width) + 1, size=(1,)).item()),
            int(torch.randint(height - int(self.distortion_scale * half_height) - 1, height, size=(1,)).item()),
          ]
          endpoints = [topleft, topright, botright, botleft]
          pair = (perspective(pair[0], startpoints, endpoints, interpolation=IM.BILINEAR),
          perspective(pair[1], startpoints, endpoints, interpolation=IM.NEAREST))
        return pair

让我们应用已实施的转换并确保它们有效：

Let's apply the implemented transformations and make sure they work:

In [None]:
img_idx = np.random.randint(0, 100)
f, ax = plt.subplots(2, 4, figsize=(16, 8))
pair = train_data_hdf5[img_idx]

imgs2draw = {"Source": pair,
            "HorizontalFlip": HorizontalFlip(1.0)(pair),
            "ColorJitter": ColorJitter(1.0, (0.4, 0.4, 0.4))(pair),
            "RandomPerspective": RandomPerspective(1.0, 0.25)(pair)
}
for idx, (name, pair) in enumerate(imgs2draw.items()):
    ax[0, idx].imshow(pair[0])
    ax[0, idx].set_title(name, fontsize=20)
    ax[1, idx].imshow(colorize(np.array(pair[1])))

plt.show()

接下来，我们描述我们的“SegmentationData”类以及使用 ImageNet“归一化”将 PIL.Image 类型的图像减少为 pytorch 张量的操作。 ImageNet 标准化是[标准标准化](https://pytorch.org/vision/main/generated/torchvision.transforms.Normalize.html) 的特例，其中每个通道的平均值（红、绿、蓝颜色通道） ）和每通道 RMS 偏差是在[大量图像样本](https://en.wikipedia.org/wiki/ImageNet)上计算的。

`为什么需要对图像应用标准化？`

Next, we describe our `SegmentationData` class and the operations of reducing images of type PIL.Image to pytorch tensors with ImageNet `normalization`. ImageNet normalization is a special case of [Standard normalization](https://pytorch.org/vision/main/generated/torchvision.transforms.Normalize.html), in which the per-channel average (red, green, blue color channels) and per-channel RMS deviations were calculated on [a huge sample of images](https://en.wikipedia.org/wiki/ImageNet).

 `Why do you need to apply normalization to images?`

一般来说，数据标准化允许使用由它们构造的损失函数，并且具有零均值和单位方差（例如多维球）。 在这样的空间中，优化变得更加容易和快捷。 此外，模型中的权重较小总是好的。 如果特征具有不同的尺度，这通常会失败，这正是我们正在努力解决的问题。 我们还有许多技术可以通过标准化中间输出、保存输出和梯度的分布等来提高质量。 为此，数据最初必须具有良好的分布。

In general, data normalization allows you to work with loss functions constructed from them and having zero mean and unit variance (such a multidimensional ball). In such a space it is much easier and faster to optimize. In addition, it is always good to have small weights in the model. This usually breaks down if the features have different scales, which is what we are struggling with. We also have many techniques at our disposal to improve quality by normalizing intermediate outputs, saving distributions to outputs and gradients, etc. For all this, it is necessary that the data initially have a beautiful distribution.

In [None]:
# Определяем устройство для вычислений (!желательно GPU!)
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# конфигурации для преобразования картинок в тензора и обратно (для визуализации и обучения)
t_dict = {
    "forward_input": transforms.Compose([
        transforms.PILToTensor(),
        transforms.Lambda(lambda x: x.float().to(DEVICE)/255.0),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    ]),
    "backward_input": transforms.Compose([
        transforms.Normalize(mean=[0.0, 0.0, 0.0],
                                     std=[1./0.229, 1./0.224, 1./0.225]),
        transforms.Normalize(mean=[-0.485, -0.456, -0.406],
                                     std=[1.0, 1.0, 1.0]),
        transforms.Lambda(lambda x: x.permute(1, 2, 0).cpu().numpy())
    ]),
    "forward_target": transforms.Compose([
        transforms.PILToTensor(),
        transforms.Lambda(lambda x: x.long().squeeze().to(DEVICE)),
    ]),
    "backward_target": transforms.Compose([
        transforms.Lambda(lambda x: x.cpu().numpy())
    ]),
    "augment": transforms.Compose([
        HorizontalFlip(0.5),
        ColorJitter(0.5, (0.4, 0.4, 0.4)),
        RandomPerspective(0.5, 0.25)
    ]),
}


class SegmentationDataset(Dataset):
    def __init__(self, dataset_raw: Type[storage_class], transforms: dict, train_flag: bool = True):
        """
        Наследуем весь функционал из `Dataset` для наших данных `dataset_raw`
        `transforms` содержит преобразования PIL.Image <-> torch.tensor и аугментации
        `train_flag` регулирует аугментацию данных (для тестовой выборки она не нужна)
        """
        super().__init__()
        self.dataset_raw = dataset_raw
        self.transforms = transforms
        self.train_flag = train_flag

    def __len__(self):
        return self.dataset_raw.dataset_size

    def __getitem__(self, idx: int) -> tuple[Image_t, Image_t]:
        input, target = self.dataset_raw[idx]

        if (self.train_flag):
            input, target = self.transforms["augment"]((input, target))

        return self.transforms["forward_input"](input), self.transforms["forward_target"](target)

In [None]:
from torch.utils.data import random_split

# Разделяем выборку на обучающую и валидационную
def split_train_val(train_data: Type[storage_class], train_portion: float = 0.8):
    """
    `train_data` предобработанные данные
    `train_portion` доля объектов, которая будет приходиться на обучающую выборку
    """
    trainval_dataset = SegmentationDataset(train_data, t_dict, train_flag=True)

    train_size = int(len(trainval_dataset) * train_portion)
    val_size = len(trainval_dataset) - train_size
    return random_split(trainval_dataset, [train_size, val_size])

train_dataset_hdf5, val_dataset_hdf5 = split_train_val(train_data_hdf5)
train_dataset_memmap, val_dataset_memmap = split_train_val(train_data_memmap)
train_dataset_raw, val_dataset_raw = split_train_val(train_data_raw)

让我们绘制一个随机图像（应用随机增强变换后）：

Let's draw a random image (after applying random augmentation transformations):

In [None]:
img_idx = np.random.randint(0, 100)
draw(train_dataset_hdf5[img_idx], t_dict);

In [None]:
dataloader_config = {
    "batch_size": 16,
    "shuffle": True,
    "num_workers": 0
}
train_dataloader_hdf5 = DataLoader(train_dataset_hdf5, **dataloader_config)
val_dataloader_hdf5 = DataLoader(val_dataset_hdf5, **dataloader_config)

train_dataloader_memmap = DataLoader(train_dataset_memmap, **dataloader_config)
val_dataloader_memmap = DataLoader(val_dataset_memmap, **dataloader_config)

train_dataloader_raw = DataLoader(train_dataset_raw, **dataloader_config)
val_dataloader_raw = DataLoader(val_dataset_raw, **dataloader_config)

### 1.3 测量从磁盘读取数据集的速度

### 1.3 Measuring the speed of reading a dataset from disk

**测量每种存储格式的数据集读取时间：**

**Measure the reading time of our dataset for each of the storage formats:**

In [None]:
def speedtest(dataloader: Type[DataLoader]) -> None:
    for batch in dataloader:
        pass

In [None]:
%timeit speedtest(train_dataloader_hdf5)

In [None]:
%timeit speedtest(train_dataloader_memmap)

In [None]:
%timeit speedtest(train_dataloader_raw)

事实证明，内存映射格式是最快的。 这是因为循环是在整个数据集中顺序传递的，并且数据是连续存储的。 在这种情况下，理论上，所讨论的存储格式应该表现出更好的质量。 如果我们处理不连续的数据，结果很可能会有所不同。

The memory-mapped format turned out to be the fastest. This is explained by the fact that the cycle was passed sequentially throughout the entire dataset, the data of which is stored continuously. It is in this case that, in theory, the storage format in question should show better quality. If we were working with non-contiguous data, the result would most likely be different.

**创建获胜格式的测试数据加载器。**

**Create a test Dataloader of the winning format.**

In [None]:
dataloader_config = {
    "batch_size": 16,
    "shuffle": False,
    "num_workers": 0
}
# используйте: `config_test`, `prepare_dataset`, `SegmentationDataset`, новый `dataloader_config`

test_data = prepare_dataset(config_test, storage_memmap)
test_dataset = SegmentationDataset(test_data, t_dict, train_flag=False)
test_dataloader = DataLoader(test_dataset, **dataloader_config)
# добавьте новые переменные с выбранным форматом
#train_dataloader = train_dataloader_<Chosen Format>
train_dataloader = train_dataloader_memmap
val_dataloader = val_dataloader_memmap


## 第 2 部分：损失函数、指标和 PSPNet 解码器的实现

## Part 2: Implementation of loss functions, metrics and PSPNet decoder

之前，我们介绍了 Unet 架构，这是一种用于图像分割领域的卷积自动编码器。 在本任务中，我们将分析更先进的分割网络架构[PSPNet](https://arxiv.org/abs/1612.01105)。 该网络的一个显着特征是“金字塔池模块”，与 Unet 不同，它允许在生成图像“局部”区域的特征时考虑图像的“全局”上下文。

Previously, we introduced the Unet architecture, a convolutional autoencoder used in the field of image segmentation. In this task, we will analyze a more advanced segmentation network architecture [PSPNet](https://arxiv.org/abs/1612.01105). A distinctive feature of this network is the "pyramid pooling module" which, unlike Unet, allows you to take into account the "global" context of the image when generating features for "local" regions of the image.

让我们考虑一下所提出的“类 PSPNet 网络”的架构：
![图片](https://drive.google.com/uc?id=1WNumWndaJAbZBch0dLf6iT8KiSdUbIFG)

我们将使用预训练的 [ResNeXt](https://pytorch.org/hub/pytorch_vision_resnext/) 网络作为“Encoder”编码器。 我们将使用它来获取输入图像“x”的两个深度表示：
- **输出`x_main`**（图中：Encoder的顶部输出） - “平均”中间表示，低级特征（颜色、对象轮廓、笔画）和高级特征（抽象特征）之间的折衷反映图像的语义）
- **输出`x_supp`**（图中：编码器的较低输出） - 包含最高级别特征的最终表示，其中有关对象的确切空间位置的信息显着丢失

将输出分成 2 个流的原因是，需要有关对象空间位置的编码信息（“x_main”）和有关整个图像整体语义的辅助信息（“x_supp”）来完成语义任务分割。 我们不能只使用“x_supp”输出，例如在分类问题中所做的那样，因为我们需要了解有关该对象在图像中的位置的额外知识。

**写一个 `Decoder` 解码器**
- **`金字塔池模块`**。 这个过程对于提取不同尺度的全局上下文是必要的，这是经典卷积神经网络所缺乏的（滤波器大小内的局部上下文）。
     * 多个不同尺度的池化操作并行应用于输入张量“x_main”，产生以下大小的表示：“1x1”、“2x2”、“3x3”和“6x6”。
     * 这些中间张量的通道被有效压缩（使用“nn.Conv2d，滤波器尺寸为 1x1”）。 我们这样做是为了压缩信息，以及单独权衡每个张量（我们称它们为不同尺度的全局上下文张量）。
     * 然后我们使用插值将结果张量恢复到原始大小。
     * 输出张量是通过连接这些全局上下文获得的。 每个上下文都包含有关整个原始图像的不同细节级别的信息。 需要实现该块的“forward”阶段。 为了澄清信息，您可以参考[文章](https://arxiv.org/abs/1612.01105)。
- **`补充模块`** 对输入张量 `x_supp` 执行非线性变换，并将通道数减少到 **`金字塔池模块`** 输出处的数量。 已经提出了这种转换（层组合）的架构选项，但如果您愿意，可以尝试一下
- **`上采样模块`** 对输入张量执行非线性变换，通道数量减少，通道数量与使用空间维度插值增加 2 倍交替进行。 作为变换的结果，该块的输出具有与编码器的图像输入相同的空间维度。
- **`分割头`**将输入张量非线性变换为分数张量。 每个像素的输出张量都有“num_classes”分数（在我们的例子中为 3）。 给定像素的最大得分索引是其类别标签（0、1 或 2）。

Let's consider the proposed architecture of a `PSPNet-like network`:
![picture](https://drive.google.com/uc?id=1WNumWndaJAbZBch0dLf6iT8KiSdUbIFG)

We will use the pre-trained [ResNeXt](https://pytorch.org/hub/pytorch_vision_resnext/) network as the `Encoder` encoder. We will use it to obtain two deep representations of our input image `x`:
- **output `x_main`** (in the picture: the top output of Encoder) - “average” intermediate representation, a compromise between low-level features (color, object outlines, strokes) and high-level features (abstract features reflecting the semantics of the image)
- **output `x_supp`** (in the picture: lower output of Encoder) - the final representation containing the highest-level features, in which information about the exact spatial location of objects is significantly lost

This splitting of the output into 2 streams is explained by the need for encoded information about the spatial location of objects (`x_main`) and auxiliary information about the semantics of the entire image as a whole (`x_supp`) for the task of semantic segmentation. We cannot afford to use only the `x_supp` output, as is done, for example, in classification problems, because we are required to have additional knowledge about the location of this object in the image.

**write a `Decoder` decoder, namely to write blocks:**
- **`Pyramid Pooling Module`**. This procedure is necessary to extract global context at different scales, which classical convolutional neural networks lack (local context within the filter size).
     * Several pooling operations of different scales are applied in parallel to the input tensor `x_main`, resulting in representations of the following sizes: `1x1`, `2x2`, `3x3` and `6x6`.
     * The channels of these intermediate tensors are effectively compressed (using `nn.Conv2d with a filter size of 1x1`). We do this to compress information, as well as to individually weigh each tensor (let's call them global context tensors of different scales).
     * Then we restore the resulting tensors to their original sizes using interpolation.
     * The output tensor is obtained by concatenating these global contexts. Each context contains information about the entire original image at different levels of detail. It is required to implement the `forward` stage of this block. To clarify the information, you can refer to [article](https://arxiv.org/abs/1612.01105).
- **`Supplementary Module`** performs a nonlinear transformation over the input tensor `x_supp` with a reduction in the number of channels to the number at the output of the **`Pyramid Pooling Module`**. An architecture option for this transformation (layer composition) has already been proposed, but if you wish, you can experiment with it
- **`Upsample Module`** performs nonlinear transformations on the input tensor with a decrease in the number of channels, which alternate with an increase using interpolation of spatial dimensions by a factor of 2. As a result of the transformations, the output of this block has the same spatial dimension as the image input to the encoder.
- **`Segmentation Head`** non-linearly transforms the input tensor into a score tensor. The output tensor for each pixel has `num_classes` scores (3 in our case). The maximum score index for a given pixel is its class label (0, 1 or 2).

### 2.1 类PSPNet网络编码器和解码器

### 2.1 PSPNet-like network encoder and decoder

In [None]:
from torchvision.models.resnet import ResNet

pretrained_model = torch.hub.load('pytorch/vision:v0.10.0', 'resnext50_32x4d', pretrained=True)

# Выставляем evaluation mode (влияет на поведение таких слоев как BatchNorm2d, Dropout)
pretrained_model.eval();

由于编码器是预先训练的，因此有必要固定（冻结）权重，以便梯度不会流过它们。 这保证了编码器在自动编码器训练过程中不会改变，同时也节省了计算资源（不构建编码器梯度图）。

Since the encoder is pre-trained, it is necessary to fix (freeze) the weights so that the gradient does not flow through them. This ensures that the encoder does not change during autoencoder training, and also saves computational resources (the encoder gradient graph is not built).

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, pretrained_model: Type[ResNet]):
        """
        Извлекает предобученные именованные слои кодировщика `pretrained_model`
        Разделяет слои на `main` и `supp` потоки (см. архитектуру выше)

        Вход: тензор (Batch_size, 3, Height, Width)

        Выход: x_main тензор (Batch_size, 512, Height // 8, Width // 8)
        Выход: x_supp тензор (Batch_size, 2048, Height // 32, Width // 32)
        """
        super().__init__()

        self.encoder_main = nn.Sequential()
        for name, child in list(pretrained_model.named_children())[:-4]:
            print(f"Pretrained main module {name} is loaded")
            self.encoder_main.add_module(name, child)

        self.encoder_supp = nn.Sequential()
        for name, child in list(pretrained_model.named_children())[-4:-2]:
            print(f"Pretrained supp module {name} is loaded")
            self.encoder_supp.add_module(name, child)

    def freeze(self) -> None:
        """
        Замораживает веса кодировщика
        """
        for p in self.parameters():
            p.requires_grad = False
        self.eval()

    def unfreeze(self) -> None:
        """
        Размораживает веса кодировщика
        """
        for p in self.parameters():
            p.requires_grad = True
        self.train()

    def forward(self, x: torch.tensor) -> tuple[torch.tensor, torch.tensor]:
        x_main = self.encoder_main(x)
        x_supp = self.encoder_supp(x_main)
        return x_main, x_supp

In [None]:
encoder = EncoderBlock(pretrained_model)

**为了估计模型的复杂性，我们需要一个函数来计算其参数的数量；为此，请使用 [model.parameters()](https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.parameters)。 实施如下：**

**To estimate the complexity of the model, we need a function for counting the number of its parameters; for this, use the [model.parameters()](https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.parameters). Implement it below:**

In [None]:
def count_parameters(model: Type[nn.Module]) -> int:
    """
    Считает число весов в модели `model`, для которых требуется градиент
    * Используйте model.parameters() чтобы получить список параметров:
        https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.parameters
    * Используйте requires_grad, для проверки, считается ли для данного параметра градиент:
        https://pytorch.org/docs/stable/generated/torch.nn.parameter.Parameter.html#torch.nn.parameter.Parameter
    """
    total_params = 0
    for parameter in model.parameters():
      if parameter.requires_grad:
        total_params += parameter.numel()

    return total_params

让我们确保“.freeze()”方法成功冻结权重：

Let's make sure that the `.freeze()` method successfully freezes the weights:

In [None]:
print("Encoder #parameters before freeze():", count_parameters(encoder))
encoder.freeze()
print("Encoder #parameters after freeze():", count_parameters(encoder))

* **实现`PyramidPoolingModule`、`Upsample`和`SegmentationHead`**

* **Implement `PyramidPoolingModule`, `Upsample` and `SegmentationHead` **

In [None]:
class PyramidPoolingModule(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, bin_sizes: tuple[int, ...]):
        """
        Вход: тензор (Batch_size, `in_channels`, Height, Width)
        `bin_sizes` - пространственные размерности для каждой пулинг операции
        Пример: bin_sizes = (1, 2, 3, 6).

        Выход: тензор (Batch_size, `in_channels` + len(`bin_sizes`) * `out_channels`, Height, Width)
        """
        super().__init__()
        self.bins = []

        for bin_size in bin_sizes:
            self.bins.append(nn.Sequential(
                nn.AdaptiveAvgPool2d(bin_size), # почитайте: https://pytorch.org/docs/stable/generated/torch.nn.AdaptiveAvgPool2d.html
                nn.Conv2d(in_channels, out_channels, kernel_size=1), # почитайте: https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
                nn.BatchNorm2d(out_channels), # почитайте: https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm2d.html
                nn.ReLU(inplace=True)
            ))

        self.bins = nn.ModuleList(self.bins)

    def forward(self, x: torch.tensor) -> torch.tensor:
        h, w = x.shape[2:]
        out = [x,]
        """
        * Пройдитесь циклом по self.bins и примените исходное изображение отдельно
          к каждому  блоку операций Pooling + Conv + BatchNorm + ReLU
        * После применения не забудьте на каждый выход сделать `Upscale`
          (используйте torch.functional.interpolate)
          и добавить результат в список `out`
        * Сконкатенируйте тензоры из out в один большой тензор по размерности с каналами
          с помощью torch.cat
        """
        for bin in self.bins:
          pooling = bin(x)
          upscale = F.interpolate(pooling, size=(h, w))
          out.append(upscale)
        return torch.cat([x] + out, dim=1)


class SupplementaryModule(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, dropout: float):
        """
        Вход: тензор (Batch_size, `in_channels`, Height, Width)

        Выход: тензор (Batch_size, `out_channels`, Height, Width)
        """
        super().__init__()
        self.suppl = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // 2, kernel_size=3, padding=1),
            nn.BatchNorm2d(in_channels // 2),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=dropout),
            nn.Conv2d(in_channels // 2, out_channels, kernel_size=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x: torch.tensor) -> torch.tensor:
        return self.suppl(x)


class Upsample(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        """
        Вход: тензор (Batch_size, `in_channels`, Height, Width)

        Выход: тензор (Batch_size, `out_channels`, 2 * Height, 2 * Width)
        """
        super().__init__()

        # Рекомендуем попробовать Conv2d (kernel: 3x3, padding=1) + BatchNorm2d + ReLU
        self.us_transform = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    def forward(self, x: torch.tensor) -> torch.tensor:
        """
        Увеличьте входное изображение в два раза, а затем примените us_transform
        Используйте torch.functional.interpolate c mode='bilinear', align_corners=True
        """
        conv_block = self.us_transform(x)
        us = F.interpolate(conv_block, scale_factor=[2, 2])
        return us


class UpsampleModule(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        """
        Вход: тензор (Batch_size, `in_channels`, Height, Width)

        Выход: тензор (Batch_size, `out_channels`, 8 * Height, 8 * Width)
        """
        super().__init__()
        self.upsample = nn.Sequential(
            Upsample(in_channels, in_channels // 4),
            Upsample(in_channels // 4, in_channels // 8),
            Upsample(in_channels // 8, out_channels)
        )

    def forward(self, x: torch.tensor) -> torch.tensor:
        return self.upsample(x)

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, bin_sizes: tuple[int, ...], dropout: float = 0.1):
        """
        Вход  x_main: тензор (Batch_size, `in_channels`, Height, Width)
        Вход  x_supp: тензор (Batch_size, 4 * `in_channels`, Height // 4, Width // 4)

        Выход: тензор (Batch_size, `out_channels`, 8 * Height, 8 * Width)
        """
        super().__init__()
        assert in_channels % len(bin_sizes) == 0

        self.PPM = PyramidPoolingModule(in_channels, in_channels // len(bin_sizes), bin_sizes)
        self.SM = SupplementaryModule(4 * in_channels, in_channels, dropout)
        self.UM = UpsampleModule(4 * in_channels, out_channels)

    def forward(self, x_main: torch.tensor, x_supp: torch.tensor) -> torch.tensor:
        h_supp, w_supp = x_supp.shape[2:]
        x_supp = F.interpolate(input=x_supp, size=(4 * h_supp, 4 * w_supp), mode='bilinear', align_corners=True)

        x_supp = self.SM(x_supp)
        x_main = self.PPM(x_main)

        out = self.UM(torch.cat([x_main, x_supp], dim=1))
        return out


class SegmentationHead(nn.Module):
    def __init__(self, in_channels: int, num_classes: int, dropout: float = 0.1):
        """
        Вычисляет score для каждого из классов
        Вход: тензор (Batch_size, `in_channels`, Height, Width)

        Выход: тензор (Batch_size, `num_classes`, Height, Width)
        """
        super().__init__()
        # Рекомендуем попробовать: BatchNorm2d + ReLU + Dropout2d + Conv2d (1 x 1)
        self.segmentation_head = nn.Sequential(
            nn.Conv2d(in_channels, num_classes, kernel_size=1)
        )

    def forward(self, x: torch.tensor, x_supp: torch.tensor) -> torch.tensor:
        """
        На будущее зададим фиктивный аргумент `x_supp`, который пока не будем использовать
        """
        return self.segmentation_head(x)

### 2.2 指标的实施

### 2.2 Implementation of metrics

在分割任务中，主要使用以下指标来评估神经网络的预测能力：

让 $\mathrm{P}$ 表示预测段。 mask（预测），$\mathrm{S}$ 表示每个 seg 类别的分数。 掩码（分数），$\mathrm{T}$ 表示 seg。 掩码（目标）。 然后：

In the segmentation task, the following metrics are mainly used to assess the predictive ability of a neural network:

Let $\mathrm{P}$ denote the forecast seg. masks (Prediction), $\mathrm{S}$ denotes the scores for each seg class. masks (Scores), and $\mathrm{T}$ means seg. mask (Target). Then:
- **Intersection over Union metric**:
$$
\mathrm{IoU}(P, T) = \dfrac{\sum_{i=1}^{M}\sum_{j=1}^{N}[P_{ij}*T_{ij}]}{\sum_{i=1}^{M}\sum_{j=1}^{N} [P_{ij} + T_{ij} - P_{ij}*T_{ij}]}\text{, where } P, T \in \{0, 1\}^{M \times N}
$$
- **Recall metric**:
$$
\mathrm{Recall}(P, T) = \dfrac{\sum_{i=1}^{M}\sum_{j=1}^{N}[P_{ij} * T_{ij}]}{\sum_{i=1}^{M}\sum_{j=1}^{N} T_{ij}}\text{,where } P, T \in \{0, 1\}^{M \times N}
$$

上述指标是针对二值分割的情况描述的，这并不适合我们。 让我们将它们推广到多类分割的情况：将 K 类问题想象为 K 两类问题，然后对它们的指标进行“宏观”或“微观”平均。 **需要实现指定指标的多类版本，并支持宏观和微观平均。 请注意，指标是针对批次中的每个元素计算的。 “reduce”参数负责减少批量维度上的指标（见下文）。**

The above metrics are described for the case of binary segmentation, which is not suitable for us. Let's generalize them to the case of multiclass segmentation: imagine a K-class problem as K two-class ones, and then `macro' or `micro' average the metrics for them. **It is required to implement multi-class versions of the specified metrics with support for macro- and micro-averaging. Please note that metrics are calculated for each element from the batch. The `reduce` argument is responsible for reducing metrics along the batch dimension (see below).**

我们还将使用两个不同但相似的损失函数进行训练：
- 交叉熵损失：

We will also use two different but similar loss functions for training:
- Cross Entropy Loss:

$$
\mathrm{CE}(S, T) = - \dfrac{1}{MN}\sum_{c=1}^{K}\sum_{i=1}^{M}\sum_{j=1}^{N} \big[\log \mathrm{Softmax}(S)_{cij}*\mathbb{I}[T_{ij} == c]\big]\text{, where } S \in \mathbb{R}^{K \times M \times N}, T \in \{1, ..., K\}^{M \times N}
$$
- [Focal Loss](https://arxiv.org/abs/1708.02002):
$$
\mathrm{FL}(S, T) = - \dfrac{1}{MN}\sum_{c=1}^{K}\sum_{i=1}^{M}\sum_{j=1}^{N} \big[(1 - \mathrm{Softmax}(S)_{cij})^{\gamma}*\log \mathrm{Softmax}(S)_{cij}*\mathbb{I}[T_{ij} == c]\big]\text{, где } S \in \mathbb{R}^{K \times M \times N}, T \in \{1, ..., K\}^{M \times N}, \gamma \in \mathbb{R}_{+} - \text{hyperparameter}
$$

**不需要实现损失函数**。 此外，在任何地方都需要确保正确处理“ignore_index”值，在我们的例子中等于 255（不参与指标/损失函数的计算）。 如果$\mathrm{T}$中不存在某些类别的代表，则在宏观平均时无需考虑这些类别。

**Loss functions do not need to be implemented**. Also, everywhere it is necessary to ensure correct processing of `ignore_index` values, which in our case are equal to 255 (do not participate in the calculation of metrics/loss functions). If representatives of some classes are absent in $\mathrm{T}$, then there is no need to take these classes into account during macro-averaging.

In [None]:
class MetricsCollection():
    def __init__(self, num_classes: int, ignore_index: int = 255):
        self.num_classes = num_classes
        self.ignore_index = ignore_index

    def IoUMetric(self, prediction: torch.tensor, target: torch.tensor, average: str = "macro", reduce: str = "mean") -> Union[torch.tensor, float]:
        """
        `prediction` предсказанная сегментационная маска размера (Batch_size, Height, Width)
        `target` истинная сегментационная маска размера (Batch_size, Height, Width)
        `average` тип мультклассового усреднения
        `reduce` редукция значений метрики вдоль размерности Batch; None - без редукции
        """

        """
        micro - суммируем знаменатель для всех классов, числитель для всех классов
                и делим одно на другое
        macro - считаем метрику по каждому классу отдельно, затем усредняем
        """
        assert average in ["micro", "macro"]

        """
        sum - сумма метрик по всем картинкам
        mean - среднее метрик по всем картинкам
        none - массив метрик по всем картинкам
        """
        assert reduce in ["sum", "mean", "none"]
        device = prediction.device
        batch_size = target.shape[0]
        macro_num_classes = torch.zeros((batch_size)).to(device)
        iou_sum_for_macro = torch.zeros((batch_size)).to(device)
        numerator_sum_for_micro = torch.zeros((batch_size)).to(device)
        denominator_sum_for_micro = torch.zeros((batch_size)).to(device)
        for num_class in range(self.num_classes):
          target_class = (target == num_class) * 1
          prediction_class = (prediction == num_class) * 1
          ignore_index = target != self.ignore_index
          numerator_no_reduce = prediction_class * target_class
          denominator_no_reduce = (prediction_class + target_class - numerator_no_reduce) * ignore_index
          numerator = torch.sum(numerator_no_reduce, dim=(1, 2))
          denominator = torch.sum(denominator_no_reduce, dim=(1, 2))
          if average == 'macro':
            valid_idxs = torch.any(target_class.view(batch_size, -1), dim=1)
            iou_class = torch.zeros((batch_size)).to(device)
            iou_class[valid_idxs] = numerator[valid_idxs] / denominator[valid_idxs]
            iou_sum_for_macro += iou_class
            macro_num_classes += valid_idxs * 1
          else:
            numerator_sum_for_micro += numerator
            denominator_sum_for_micro += denominator

        if average == 'macro':
          iou_no_reduce = iou_sum_for_macro / macro_num_classes
        else:
          iou_no_reduce = numerator_sum_for_micro / denominator_sum_for_micro

        if reduce == 'sum':
          iou = torch.sum(iou_no_reduce)
        elif reduce == 'mean':
          iou = torch.mean(iou_no_reduce)
        else:
          iou = iou_no_reduce

        return iou

    def RecallMetric(self, prediction: torch.tensor, target: torch.tensor, average: str = "macro", reduce: str = "mean") -> Union[torch.tensor, float]:
        """
        `prediction` предсказанная сегментационная маска размера (Batch_size, Height, Width)
        `target` истинная сегментационная маска размера (Batch_size, Height, Width)
        `average` тип мультклассового усреднения
        `reduce` редукция значений метрики вдоль размерности Batch; None - без редукции
        """

        """
        micro - суммируем знаменатель для всех классов, числитель для всех классов
                и делим одно на другое
        macro - считаем метрику по каждому классу отдельно, затем усредняем
        """
        assert average in ["micro", "macro"]

        """
        sum - сумма метрик по всем картинкам
        mean - среднее метрик по всем картинкам
        none - массив метрик по всем картинкам
        """
        assert reduce in ["sum", "mean", "none"]
        device = prediction.device
        batch_size = target.shape[0]
        macro_num_classes = torch.zeros((batch_size)).to(device)
        recall_sum_for_macro = torch.zeros((batch_size)).to(device)
        numerator_sum_for_micro = torch.zeros((batch_size)).to(device)
        denominator_sum_for_micro = torch.zeros((batch_size)).to(device)

        for num_class in range(self.num_classes):
          target_class = (target == num_class) * 1
          prediction_class = (prediction == num_class) * 1
          ignore_index = target != self.ignore_index
          numerator_no_reduce = prediction_class * target_class
          denominator_no_reduce = target_class * ignore_index
          numerator = torch.sum(numerator_no_reduce, dim=(1, 2))
          denominator = torch.sum(denominator_no_reduce, dim=(1, 2))
          if average == 'macro':
            valid_idxs = torch.any(target_class.view(batch_size, -1), dim=1)
            recall_class = torch.zeros((batch_size)).to(device)
            recall_class[valid_idxs] = numerator[valid_idxs] / denominator[valid_idxs]
            recall_sum_for_macro += recall_class
            macro_num_classes += valid_idxs * 1
          else:
            numerator_sum_for_micro += numerator
            denominator_sum_for_micro += denominator
        if average == 'macro':
          recall_no_reduce = recall_sum_for_macro / macro_num_classes
        else:
          recall_no_reduce = numerator_sum_for_micro / denominator_sum_for_micro
        if reduce == 'sum':
          recall = torch.sum(recall_no_reduce)
        elif reduce == 'mean':
          recall = torch.mean(recall_no_reduce)
        else:
          recall = recall_no_reduce

        return recall

    def FocalLoss(self, scores: torch.tensor, target: torch.tensor, reduce: str = "mean", gamma: float = 1.) -> Union[torch.tensor, float]:
        """
        `scores` score'ы каждого класса сегментационной маски размера (Batch_size, num_classes, Height, Width)
        `target` истинная сегментационная маска размера (Batch_size, Height, Width)
        `reduce` редукция значений функции потерь вдоль размерности Batch; None - без редукции
        """
        assert scores.shape[1] == self.num_classes
        assert reduce in ["sum", "mean", "none"]

        ce_loss = F.cross_entropy(scores, target, ignore_index=self.ignore_index, reduction="none")
        coef = (1 - torch.exp(-ce_loss))**gamma
        focal_loss = coef * ce_loss
        norm = (focal_loss.numel() - (target == self.ignore_index).sum())

        if (reduce == "sum"):
            return focal_loss.sum() / norm * scores.shape[0]
        elif (reduce == "mean"):
            return focal_loss.sum() / norm
        else:
            return focal_loss.sum(dim=[1, 2]) / norm * scores.shape[0]

    def CrossEntropyLoss(self, scores: torch.tensor, target: torch.tensor, reduce: str = "mean") -> Union[torch.tensor, float]:
        """
        `scores` score'ы каждого класса сегментационной маски размера (Batch_size, num_classes, Height, Width)
        `target` истинная сегментационная маска размера (Batch_size, Height, Width)
        `reduce` редукция значений функции потерь вдоль размерности Batch; None - без редукции
        """
        assert scores.shape[1] == self.num_classes
        assert reduce in ["sum", "mean", "none"]

        if (reduce == "sum"):
            return F.cross_entropy(scores, target, ignore_index=self.ignore_index, reduction="mean") * scores.shape[0]
        elif (reduce == "mean"):
            return F.cross_entropy(scores, target, ignore_index=self.ignore_index, reduction="mean")
        else:
            return F.cross_entropy(scores, target, ignore_index=self.ignore_index, reduction="none")

    @classmethod
    def ListMetrics(cls):
        return [method for method in dir(cls) if (method.endswith("Metric"))]

    @classmethod
    def ListLosses(cls):
        return [method for method in dir(cls) if (method.endswith("Loss"))]

In [None]:
metric_class = MetricsCollection(num_classes=3, ignore_index=255)

prediction = torch.tensor([[[0, 0, 0, 0], [0, 0, 1, 0], [0, 1, 1, 0], [0, 0, 0, 0]],
                           [[0, 0, 0, 0], [0, 2, 2, 0], [0, 2, 0, 0], [0, 0, 0, 0]]])

target = torch.tensor([[[0, 0, 0, 0], [0, 1, 255, 0], [0, 1, 255, 0], [0, 0, 0, 0]],
                       [[0, 0, 0, 0], [0, 255, 2, 0], [0, 255, 2, 0], [0, 0, 0, 0]]])

assert np.isclose(metric_class.RecallMetric(prediction, target, "micro", "mean").item(), 0.9286, atol=1e-3)
assert np.isclose(metric_class.RecallMetric(prediction, target, "macro", "mean").item(), 0.7500, atol=1e-3)
assert np.isclose(metric_class.IoUMetric(prediction, target, "micro", "mean").item(), 0.8667, atol=1e-3)
assert np.isclose(metric_class.IoUMetric(prediction, target, "macro", "mean").item(), 0.7115, atol=1e-3)

`这种情况说明了我们网络的预测能力：某些类别的高召回率和低 IoU？ 相反的情况可能吗？`

Recall 回答了以下问题：“预测分割包含目标分割的完整程度如何？” 或者：“我们找到原始分割的所有内容的能力如何？”

IoU 在这方面提供了更多信息。 高值表明我们的分割与原始分割非常相似。 相反，低的则表明我们根本没有猜测到某些东西。

因此，我们将图像中的所有像素指定为预测分割的退化情况会受到 Recall 指标的鼓励，并受到 IoU 指标的惩罚。

因此，上述情况表明我们的预测能力处于“是的，我们将在 3 天内接受”的水平。

不存在也不可能出现相反的情况，因为 这两个指标的区别仅在于 IoU 分母的正加法。 因此，相对于Recall的IoU值只能下降。

`What does the situation say about the predictive ability of our network: high Recall and low IoU for some class? Is the reverse situation possible?`

Recall answers the following question: “How completely does the predicted segmentation contain the target segmentation?” Or: “How well were we able to find all the contents of the original segmentation?”

IoU is much more informative in this regard. A high value indicates that our segmentation is very similar to the original one. Low ones, on the contrary, make it clear that we haven’t guessed something at all.

Thus, the degenerate situation where we specify all pixels in the image as the predicted segmentation is encouraged by the Recall metric and penalized by the IoU metric.

Therefore, the above situation suggests that our predictive ability is somewhere at the level of “yes, we’ll take it in 3 days.”

There is no reverse situation and cannot be, because these two metrics differ only in the positive addition in the denominator of IoU. Thus, the IoU value relative to Recall can only decrease.

`在我们的问题中使用哪种类型的平均更正确：宏观平均和微观平均？ 为什么？`

在我们的任务中，最好使用宏观平均，因为 类别不平衡：猫和狗的表示将被认为大致相等，但背景的表示要大得多。 使用宏观平均，我们将考虑这种不平衡，并查看每个类别的实际比率。 由于背景影响，微平均会产生高值。 主要贡献将由最具代表性的阶级做出，这根本不能解决原来的问题。

`Which type of averaging is more correct to use in our problem: macro and micro? Why?`

In our task, it is better to use macro-averaging, because there is an imbalance of classes: the representation of cats and dogs will be considered approximately equal, but the representation of the background is much greater. Using macro-averaging we will take this imbalance into account and look at the real ratios for each of the classes. Micro-averaging will produce high values due to background influence. The main contribution will be made by the most represented class, which does not solve the original problem at all.

`焦点损失相对于交叉熵损失有什么优势？ 超参数𝛾在焦点损失中控制什么？`

1）交叉熵损失是焦点损失的一种特例，它对类别不平衡不太敏感。 该灵敏度由超参数 γ 决定。

2）Focal Loss 更好地解决了问题，因为它更均匀地考虑了小类的影响。

3）焦点损失（Focal Loss）旨在解决类别不平衡（背景和其上的物体）的问题。

`What is the advantage of Focal Loss over Cross Entropy Loss? What does the hyperparameter 𝛾 control in Focal Loss?`

1)Cross Entropy Loss is a special case of Focal Loss, which is less sensitive to class imbalance. This sensitivity is determined by the hyperparameter γ.

2) Focal Loss solves the problem better because it more evenly takes into account the influence of small classes.

3) Focal Loss is designed to combat the problem of class imbalance (background and objects on it).

## 第 3 部分：PSPNet 训练、实验

## Part 3: PSPNet training, experiments

现在**剩下的就是将之前编写的所有内容放在一起并训练我们的网络**。 为了控制网络的学习过程，我们将计算验证集上的平均指标和损失函数。 为了方便显示信息**我们将使用`tensorboard`**工具。 为此，我们创建一个“SummaryWriter”类的对象，该对象将为 [tensorboard](https://pytorch.org/docs/stable/tensorboard.html) 创建并打开一个特殊的“event”文件。 要可视化内容，请在终端中输入命令“tensorboard --logdir=<PATH>”。 如果需要监控多个张量板，那么每个张量板都需要分配自己唯一的端口“--port <PORT>”。 [示例](https://colab.research.google.com/github/pytorch/tutorials/blob/gh-pages/_downloads/tensorboard_with_pytorch.ipynb#scrollTo=lFKETpE2F2oE) 在 Google Colab 上使用 Tensorboard。

**需要编写`train_model`和`test_model`方法。 所有训练配置都存储在“train_config”字典中。 如果需要，您可以用自己的东西来补充它。**

**需要将张量板日志附加到解决方案中。 为了方便验证过程，强烈建议使用`inline-tensorboard`：**

Now **all that remains is to put everything that was written earlier together and train our network**. To control the learning process of our network, we will calculate average metrics and loss functions on the validation set. For the convenience of displaying information **we will use the `tensorboard`** tool. To do this, let's create an object of the `SummaryWriter` class, which will create and open for writing a special `event` file for [tensorboard](https://pytorch.org/docs/stable/tensorboard.html). To visualize the content, enter the command `tensorboard --logdir=<PATH>` in the terminal. If there is a need to monitor several tensorboards, then each of them needs to be assigned its own unique port `--port <PORT>`. [Example](https://colab.research.google.com/github/pytorch/tutorials/blob/gh-pages/_downloads/tensorboard_with_pytorch.ipynb#scrollTo=lFKETpE2F2oE) of using tensorboard on Google Colab.

**need to write the `train_model` and `test_model` methods. All training configuration is stored in the `train_config` dictionary. If desired, you can supplement it with something of your own.**

**need to attach tensorboard logs to the solution. To facilitate the verification procedure, it is strongly recommended to use `inline-tensorboard`:**

```
%load_ext tensorboard
%tensorboard --logdir ./runs
```

In [None]:
%load_ext tensorboard
%tensorboard --logdir ./runs

### 3.1 网络训练/测试程序的实施

### 3.1 Implementation of network training/testing procedures

In [None]:
from IPython.utils.path import target_update
class PSPNet(nn.Module):
    def __init__(self, pretrained_model: Type[ResNet], HeadBlock: Type[nn.Module], num_classes: int, train_config: dict, bin_sizes: tuple[int, ...] = (1, 2, 3, 6)):
        """
        `pretrained_model` модель предобученного кодировщика
        `Head` класс блока, оценивающего score'ы для каждого класса сегментационной маски
        `num_class` число классов сегментации
        `train_config` словарь с конфигурацией процесса обучения сети
        `bin_sizes` пространственные размеры к которым сводит пулинг в блоке PPM
        """
        super().__init__()
        self.encoder = EncoderBlock(pretrained_model)
        self.encoder.freeze()
        self.decoder = DecoderBlock(512, 128, bin_sizes)
        self.head = HeadBlock(128, num_classes)

        self.train_config = train_config
        self.metric_class = train_config["metric_class"]
        self.optimizer = train_config["optimizer"](self.parameters(), **train_config["optimizer_params"])
        self.scheduler = train_config["scheduler"](self.optimizer, **train_config["scheduler_params"])

    def forward(self, x: torch.tensor) -> tuple[torch.tensor, torch.tensor]:
        # Для гарантии отсутствия градиентов по кодировщику
        with torch.no_grad():
            x_main, x_supp = self.encoder(x)
        out = self.decoder(x_main, x_supp)
        out = self.head(out, x_supp)
        return out, torch.argmax(out.detach(), dim=1)

    def write_val_metrics(self, val_metrics: dict, iter_num: int, norm: float = 1.0) -> None:
        """
        Записывает усредненные значения метрик/функций потерь в tensorboard

        `val_metrics` словарь с ключами "название_метрики/функции потерь" и их значениями
        `iter_num` номер глобальной итерации (по формуле #всего_итераций * номер_эпохи + номер_итерации)
        `norm` фактор нормализации; для усреднения равен числу объектов в валидационной выборке
        """
        for method, value in val_metrics.items():
            self.train_config["writer"].add_scalar(f"Mean {method}", np.round(val_metrics[method].item()/norm, 2), iter_num)

    def validate_model(self, val_dataloader: Type[DataLoader], iter_num: int) -> None:
        """
        Валидирует текущую модель и вычисляет соответствующие метрики/функции потерь

        `val_dataloader` валидационная выборка
        `iter_num` номер глобальной итерации (по формуле #всего_итераций * номер_эпохи + номер_итерации)
        """
        # Выставляет декодировщик в режим валидации (влияет на поведение BatchNorm2d и Dropout)

        self.decoder.eval()
        self.head.eval()

        # Инициализация словаря метрик/функций потерь

        val_metrics = dict([(method, 0.0) for method in (self.metric_class.ListMetrics() + self.metric_class.ListLosses())])

        # Обязательно считать с контекстным менеджером torch.no_grad()
        # Даже если мы не делаем шаг оптимизации, мы экономим память (не считаем градиенты)
        with torch.no_grad():
            for input, target in val_dataloader:
                scores, prediction = self.forward(input)

                for metric in self.metric_class.ListMetrics():
                    val_metrics[metric] += getattr(self.metric_class, metric)(prediction, target, reduce="sum")

                for loss in self.metric_class.ListLosses():
                    val_metrics[loss] += getattr(self.metric_class, loss)(scores, target, reduce="sum")

        # Tensorboard также позволяет сохранять визуализацию наших предсказаний в ходе обучения
        figure = draw((input[0], target[0]), t_dict, prediction[0], log=True)
        self.train_config["writer"].add_figure("image/GT/prediction", figure, iter_num)

        self.write_val_metrics(val_metrics, iter_num, norm=len(val_dataloader.dataset))
        # Возвращает режим обучения декодировщика
        self.decoder.train()

    def train_model(self, train_dataloader: Type[DataLoader], val_dataloader: Type[DataLoader]) -> None:
        """
        Обучает модель на обучающей выборке, периодически (периодичность выставляется в train_config) валидирует на валидационной выборке
        В конце каждой эпохи сохраняет модель на диск

        `train_dataloader` обучающая выборка
        `val_dataloader` валидационная выборка
        """
        # Выставляет режим обучения декодировщика
        self.decoder.train()
        self.head.train()

        for epoch in range(self.train_config["num_epochs"]):
            for iter_num, (input, target) in enumerate(train_dataloader):

                self.optimizer.zero_grad()
                input = input.to(DEVICE)
                target = target.to(DEVICE)
                scores, pred = self(input)
                loss = self.train_config['loss_fn'](scores, target, reduce='mean')
                loss.backward()
                self.optimizer.step()
                self.scheduler.step()

                if (iter_num % self.train_config["validate_each_iter"] == 0):
                    print(f"Epoch: {epoch+1}/{self.train_config['num_epochs']} || Iter: {iter_num}/{len(train_dataloader)} || Loss: {loss.item()}")

                    self.validate_model(val_dataloader, epoch * len(train_dataloader) + iter_num)

            torch.save(self.state_dict(), self.train_config["save_model_path"] + f"_{epoch+1}.pth")

    def test_model(self, test_dataloader: Type[DataLoader]) -> tuple[torch.tensor, torch.tensor]:
        """
        Inference модели на тестовой выборке. Возвращает тензор предсказаний сег.масок и тензор истинных сег.масок

        `test_dataloader` тестовая выборка
        """
        # Выставляет декодировщик в режим валидации (влияет на поведение BatchNorm2d и Dropout)

        self.decoder.eval()
        dl_prediction_list = []
        dl_target_list = []

        for input, target in test_dataloader:
          _, prediction = self(input)
        dl_prediction_list.append(prediction)
        dl_target_list.append(target)
        dl_prediction = torch.cat(dl_prediction_list, dim=0)

        dl_target = torch.cat(dl_target_list, dim=0)

        return dl_prediction, dl_target

### 3.2 PSPNet训练、实验

### 3.2 PSPNet training, experiments

获得网络超参数的初始值。 选择超参数（如有必要）并针对损失函数“CrossEntropyLoss”和“FocalLoss”训练网络。 至少其中一项在测试样本上获得以下结果：
- **`平均 IoU 指标` > 0.82**
- **`平均召回率` > 0.92**

需要将张量板日志附加到您的解决方案中。

You are given the initial values of the network hyperparameters. Select hyperparameters (if necessary) and train the network for both loss functions `CrossEntropyLoss` and `FocalLoss`. Achieve the following results on the test sample for at least one of them:
- **`Mean IoU metric` > 0.82**
- **`Mean Recall metric` > 0.92**

need to attach tensorboard logs to your solution.

**CrossEntropyLoss**

In [None]:
%reload_ext tensorboard

In [None]:
from torch.optim.lr_scheduler import StepLR

train_config = {
    "num_epochs": 2, # примерное время обучения ~ 20 минут на GPU
    "optimizer": torch.optim.Adam,
    "optimizer_params": {
        "lr": 1e-3,
        "weight_decay": 1e-5
    },
    "loss_fn": metric_class.CrossEntropyLoss,
    "scheduler": StepLR,
    "scheduler_params": {
        "step_size": 50,
        "gamma": 0.85
    },
    "validate_each_iter": 10,
    "writer": SummaryWriter(comment="CEloss"),
    "save_model_path": "model_celoss",
    "metric_class": metric_class
}

net = PSPNet(pretrained_model, SegmentationHead, num_classes=3, train_config=train_config).to(DEVICE)
print("#параметров в сети:", count_parameters(net))

In [None]:
net.train_model(train_dataloader, val_dataloader)

In [None]:
# Протестируйте модель:
net.load_state_dict(torch.load("model_celoss_1.pth")) # Не забудьте поменять версию `_1`-> `_n` если запускаете несколько раз!
net.eval();
dl_prediction, dl_target = net.test_model(test_dataloader)
print("Mean IoU metric: ", metric_class.IoUMetric(dl_prediction, dl_target))
print("Mean Recall metric: ", metric_class.RecallMetric(dl_prediction, dl_target))

In [None]:
# Примеры работы вами сети:
img_idx = np.random.randint(0, 100)
for idx, (input, target) in enumerate(test_dataset):
# for idx, (input, target) in enumerate(test_dataloader):
    if (idx < img_idx):
        continue
    draw((input.squeeze(), target.squeeze()), t_dict, dl_prediction[idx])
    plt.pause(0.1)
    if (idx == img_idx+2):
        break

**FocalLoss**

In [None]:
from torch.optim.lr_scheduler import StepLR

train_config = {
    "num_epochs": 2, # примерное время обучения ~ 20 минут на GPU
    "optimizer": torch.optim.Adam,
    "optimizer_params": {
        "lr": 1e-3,
        "weight_decay": 1e-5
    },
    "loss_fn": metric_class.FocalLoss,
    "scheduler": StepLR,
    "scheduler_params": {
        "step_size": 50,
        "gamma": 0.85
    },
    "validate_each_iter": 10,
    "writer": SummaryWriter(comment="Floss"),
    "save_model_path": "model_floss",
    "metric_class": metric_class
}

net = PSPNet(pretrained_model, SegmentationHead, num_classes=3, train_config=train_config).to(DEVICE)
print("#параметров в сети:", count_parameters(net))

In [None]:
net.train_model(train_dataloader, val_dataloader)

In [None]:
# Протестируйте вторую модель и сравните метрики:
net.load_state_dict(torch.load("model_floss_1.pth"))  # Не забудьте поменять версию `_1`-> `_n` если запускаете несколько раз!
net.eval();
dl_prediction, dl_target = net.test_model(test_dataloader)
print("Mean IoU metric: ", metric_class.IoUMetric(dl_prediction, dl_target))
print("Mean Recall metric: ", metric_class.RecallMetric(dl_prediction, dl_target))

In [None]:
# Примеры работы вами сети:

print ( "type of input : ", type(input) )
print ( "input.dim() : ", input.dim() )
print ( "len(dl_prediction) :", len(dl_prediction) )

img_idx = np.random.randint(0, 100)
for idx, (input, target) in enumerate(test_dataset):
# for idx, (input, target) in enumerate(test_dataloader):
    if (idx < img_idx):
        continue
    draw((input.squeeze(), target.squeeze()), t_dict, dl_prediction[idx])
    plt.pause(0.1)
    if (idx == img_idx+2):
        break