In [1]:
import torch
import torch.nn as nn
import torchvision.models
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn.functional as F

import albumentations as A
from albumentations.pytorch import ToTensorV2

from tqdm import tqdm
from PIL import Image
import cv2
import matplotlib.pyplot as plt
import numpy as np

import os
from time import time

  data = fetch_version_info()


### Get the data

In [37]:
import gdown
url = 'https://drive.google.com/uc?id=10f1H2T-5W-BiqabHHtlZ4ASs19TZmg8R'
output = 'data.zip'
gdown.download(url, output, quiet=False)

Downloading...
From (original): https://drive.google.com/uc?id=10f1H2T-5W-BiqabHHtlZ4ASs19TZmg8R
From (redirected): https://drive.google.com/uc?id=10f1H2T-5W-BiqabHHtlZ4ASs19TZmg8R&confirm=t&uuid=7e68dbe3-d05c-4877-8bfe-dbd97ab2e023
To: /mnt/wd15/projects/python/hse/deep-learning-course/hw02/data.zip
100%|██████████| 979M/979M [00:27<00:00, 35.2MB/s] 


'data.zip'

In [38]:
!unzip data.zip

Archive:  data.zip
   creating: data/
   creating: data/val/
   creating: data/val/gt/
   creating: data/val/gt/193.Bewick_Wren/
  inflating: data/val/gt/193.Bewick_Wren/Bewick_Wren_0124_184771.png  
  inflating: data/val/gt/193.Bewick_Wren/Bewick_Wren_0109_185170.png  
  inflating: data/val/gt/193.Bewick_Wren/Bewick_Wren_0139_184839.png  
  inflating: data/val/gt/193.Bewick_Wren/Bewick_Wren_0110_185216.png  
  inflating: data/val/gt/193.Bewick_Wren/Bewick_Wren_0005_184699.png  
 extracting: data/val/gt/193.Bewick_Wren/Bewick_Wren_0050_184919.png  
   creating: data/val/gt/175.Pine_Warbler/
  inflating: data/val/gt/175.Pine_Warbler/Pine_Warbler_0105_170983.png  
  inflating: data/val/gt/175.Pine_Warbler/Pine_Warbler_0120_172340.png  
  inflating: data/val/gt/175.Pine_Warbler/Pine_Warbler_0003_171639.png  
  inflating: data/val/gt/175.Pine_Warbler/Pine_Warbler_0035_98396.png  
  inflating: data/val/gt/175.Pine_Warbler/Pine_Warbler_0132_171936.png  
  inflating: data/val/gt/175.Pine_Warb

### Utilities (0.5 point)

Complete dataset to load prepared images and masks. Don't forget to use augmentations.

Some of the images are 1 channels, so use `gray2rgb`.

In [15]:
IMG_HEIGHT = 224
IMG_WIDTH = 224
CHANNELS_COUNT = 3
PIXEL_MAX_VALUE = 255

def gray2rgb(img):
    match img.shape:
        case (_, _):
            return np.dstack([img, img, img])
        case (_, _, 3):
            return img
        case _:
            raise ValueError(f"Invalid img.shape: {img.shape}")

def get_iou(gt, pred):
    pred = pred > 0.5
    return (gt & pred).sum() / (gt | pred).sum()

class BirdsDataset(Dataset):
    def __init__(self, folder) -> None:
        images_folder: str = os.path.join(folder, 'images')
        gt_folder: str = os.path.join(folder, 'gt')

        # These values are not the real mean and std, but calculating
        #  them would require iterating over images twice
        approx_mean = np.zeros(shape=(CHANNELS_COUNT,))
        approx_std = np.zeros(shape=(CHANNELS_COUNT,))
        img_index_to_class_name: list[str] = []
        img_index_to_file_name: list[str] = []
        for class_name in os.listdir(images_folder):
            class_images_dir: str = os.path.join(images_folder, class_name)
            class_gt_masks_dir: str = os.path.join(gt_folder, class_name)
            assert os.path.exists(class_gt_masks_dir), f"no dir with ground truth masks {class_gt_masks_dir} for the class {class_name}"
            for fname in os.listdir(class_images_dir):
                gt_mask_file_path = os.path.join(class_gt_masks_dir, BirdsDataset._gt_mask_filename_from_img_filename(fname))
                assert os.path.exists(gt_mask_file_path), \
                    f"no ground truth mask {gt_mask_file_path} for the file {fname} with class {class_name}"

                img_pixels: np.ndarray = gray2rgb(plt.imread(fname=os.path.join(class_images_dir, fname))).astype(np.float32)
                assert len(img_pixels.shape) == 3 and img_pixels.shape[2] == CHANNELS_COUNT
                img_pixels /= PIXEL_MAX_VALUE
                approx_mean += img_pixels.mean(axis=(0, 1))
                approx_std += img_pixels.std(axis=(0, 1))

                img_index_to_file_name.append(fname)

                # Note: only 1 instance of bytes sequence corresponding to the `class_name`
                #  exists in memory due to the reference semantic of the Python language
                img_index_to_class_name.append(class_name)

        total_images = len(img_index_to_class_name)
        assert total_images == len(img_index_to_file_name) > 0
        approx_mean /= total_images
        approx_std /= total_images

        assert approx_mean.ndim == 1 and \
            len(approx_mean) == CHANNELS_COUNT and \
                np.all((0 < approx_mean) & (approx_mean < 1))
        assert approx_std.ndim == 1 and \
            len(approx_std) == CHANNELS_COUNT \
                and np.all((0 < approx_std) & (approx_std < 1))

        self.transform = BirdsDataset._make_data_transformer(
            is_train="train" in folder,
            mean=approx_mean.tolist(),
            std=approx_std.tolist()
        )
        self._img_index_to_class_name = img_index_to_class_name
        self._img_index_to_file_name = img_index_to_file_name
        self._images_folder = images_folder
        self._gt_folder = gt_folder

    def __getitem__(self, index: int):
        img, gt_mask = self._read_img_and_gt_mask_by_index(index)
        match self.transform(image=gray2rgb(img), mask=gt_mask):
            case {
                "image": t_img,
                "mask": t_msk,
            }:
                return t_img, t_msk
        assert False

    def __len__(self) -> int:
        return len(self._img_index_to_file_name)

    @staticmethod
    def _make_data_transformer(is_train: bool, mean: list, std: list):
        assert len(mean) == len(std) == 3
        actions: list[A.BasicTransform] = [
            A.Normalize(mean=mean, std=std, max_pixel_value=PIXEL_MAX_VALUE),
        ]
        if is_train:
            actions.extend([
                A.RandomResizedCrop(size=(IMG_HEIGHT, IMG_WIDTH)),
                A.HorizontalFlip(p=0.3),
                A.RandomFog(alpha_coef=0.07, fog_coef_range=(0.1, 0.5), p=0.3),
                A.RandomBrightnessContrast(brightness_limit=(-0.1, 0.1), contrast_limit=(-0.1, 0.1), p=0.3),
            ])
        else:
            actions.extend([
                A.Resize(height=IMG_HEIGHT, width=IMG_WIDTH),
            ])
        actions.extend([
            ToTensorV2(),
        ])
        return A.Compose(actions)

    def _read_img_and_gt_mask_by_index(self, i: int):
        cn = self._img_index_to_class_name[i]
        fn = self._img_index_to_file_name[i]
        return (
            plt.imread(os.path.join(self._images_folder, cn, fn)),
            plt.imread(os.path.join(self._gt_folder, cn, self._gt_mask_filename_from_img_filename(fn))),
        )

    @staticmethod
    def _gt_mask_filename_from_img_filename(img_fname: str) -> str:
        return f"{img_fname.removesuffix("jpg")}png"

BirdsDataset(folder=os.path.join("data", "train"))

<__main__.BirdsDataset at 0x72d4261b7080>

### Architecture (1 point)
Your task for today is to build your own Unet to solve the segmentation problem.

As an encoder, you can use pre-trained on IMAGENET models(or parts) from torchvision. The decoder must be trained from scratch.
It is forbidden to use data not from the `data` folder.

I advise you to experiment with the number of blocks so as not to overfit on the training sample and get good quality on validation.

In [108]:
# B = 2
# CR = 256
# CX = 512
# N = 102
# r = torch.randn(size=(B, CR, N, N))
# x = torch.randn(size=(B, CX, N, N))
# assert torch.cat((r, x), dim=1).shape == torch.Size((B, CR + CX, N, N))
# nn.Conv2d(in_channels=CHANNELS_COUNT, out_channels=256, kernel_size=3, stride=1, padding=1)(torch.randn((1, CHANNELS_COUNT, IMG_HEIGHT, IMG_WIDTH))).shape
# nn.MaxPool2d(kernel_size=2)(torch.randn((1, 256, IMG_HEIGHT, IMG_WIDTH))).shape
# nn.MaxPool2d(kernel_size=3, stride=2, padding=1)(torch.randn((1, 256, IMG_HEIGHT, IMG_WIDTH))).shape
# from torchvision import models
# from torchvision.models.segmentation.deeplabv3 import DeepLabV3
# from torchvision.models.segmentation.fcn import FCN
# m: DeepLabV3 = models.segmentation.deeplabv3_mobilenet_v3_large()
# m: torchvision.models.resnet.ResNet = models.resnet152(weights=models.ResNet152_Weights.DEFAULT)

# from torchvision import models
# # m: torchvision.models.resnet.ResNet = models.resnet152(weights=models.ResNet152_Weights.DEFAULT)
# m = models.segmentation.fcn_resnet50(weights_backbone=models.ResNet50_Weights.IMAGENET1K_V1).eval().backbone
# [s for s in dir(m) if not s.startswith('_') and ("layer" in s or "conv" in s or "relu" in s or "sampl" in s or "pool" in s)]
# B = 2
# N = 32
# M = 16
# assert m.conv1(torch.zeros((B, CHANNELS_COUNT, IMG_HEIGHT, IMG_WIDTH))).shape == torch.Size((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))
# assert m.bn1(torch.zeros((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))).shape == torch.Size((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))
# assert m.relu(torch.zeros((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))).shape == torch.Size((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))
# assert m.maxpool(torch.zeros((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))).shape == torch.Size((B, 64, IMG_HEIGHT // 4, IMG_WIDTH // 4))
# assert m.layer1(torch.zeros((B, 64, IMG_HEIGHT // 4, IMG_WIDTH // 4))).shape == torch.Size((B, 256, IMG_HEIGHT // 4, IMG_WIDTH // 4))
# assert m.layer2(torch.zeros((B, 256, IMG_HEIGHT // 4, IMG_WIDTH // 4))).shape == torch.Size((B, 512, IMG_HEIGHT // 8, IMG_WIDTH // 8))
# # assert m.layer3(torch.zeros((B, 512, IMG_HEIGHT // 8, IMG_WIDTH // 8))).shape == torch.Size((B, 1024, IMG_HEIGHT // 16, IMG_WIDTH // 16))
# assert m.layer3(torch.zeros((B, 512, IMG_HEIGHT // 8, IMG_WIDTH // 8))).shape == torch.Size((B, 1024, IMG_HEIGHT // 8, IMG_WIDTH // 8))

In [110]:
from torchvision import models

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, mid_channels, out_channels):
        super().__init__()
        # self.upconv = nn.ConvTranspose2d(in_channels=in_channels, out_channels=out_channels, kernel_size=4, stride=2, padding=1)
        self.upsample = nn.UpsamplingNearest2d(scale_factor=2)
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=mid_channels, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=mid_channels, out_channels=out_channels, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU()

    def forward_with_resid(self, resid: torch.Tensor, x: torch.Tensor) -> torch.Tensor:
        return self.forward(torch.cat((resid, self.upsample(x)), dim=1))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.relu2(self.conv2(self.relu1(self.conv1(x))))

class Unet(nn.Module):
    def __init__(self):
        super().__init__()

        # pretrained_model_to_steal_layers_from
        m: torchvision.models.resnet.ResNet = models.resnet152(weights=models.ResNet152_Weights.DEFAULT)

        B = 2
        assert m.conv1(torch.zeros((B, CHANNELS_COUNT, IMG_HEIGHT, IMG_WIDTH))).shape == torch.Size((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))
        assert m.bn1(torch.zeros((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))).shape == torch.Size((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))
        assert m.relu(torch.zeros((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))).shape == torch.Size((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))

        assert m.maxpool(torch.zeros((B, 64, IMG_HEIGHT // 2, IMG_WIDTH // 2))).shape == torch.Size((B, 64, IMG_HEIGHT // 4, IMG_WIDTH // 4))
        assert m.layer1(torch.zeros((B, 64, IMG_HEIGHT // 4, IMG_WIDTH // 4))).shape == torch.Size((B, 256, IMG_HEIGHT // 4, IMG_WIDTH // 4))

        assert m.layer2(torch.zeros((B, 256, IMG_HEIGHT // 4, IMG_WIDTH // 4))).shape == torch.Size((B, 512, IMG_HEIGHT // 8, IMG_WIDTH // 8))

        assert m.layer3(torch.zeros((B, 512, IMG_HEIGHT // 8, IMG_WIDTH // 8))).shape == torch.Size((B, 1024, IMG_HEIGHT // 16, IMG_WIDTH // 16))

        # encoder blocks
        self.inp = nn.Sequential(m.conv1, m.bn1, m.relu)
        self.encoder1 = nn.Sequential(m.maxpool, m.layer1)
        self.encoder2= m.layer2
        self.encoder3= m.layer3

        # decoder blocks
        self.decoder1 = DecoderBlock(in_channels=512 + 1024, mid_channels=1024, out_channels=512)
        self.decoder2 = DecoderBlock(in_channels=256 + 512, mid_channels=512, out_channels=256)
        self.decoder3 = DecoderBlock(in_channels=64 + 256, mid_channels=128, out_channels=64)

        self.out = nn.Sequential(
            nn.UpsamplingNearest2d(scale_factor=2),
            nn.Conv2d(in_channels=64, out_channels=2, kernel_size=1),
        )

    def forward(self, x: torch.Tensor):
        r1 = self.inp(x)
        r2 = self.encoder1(r1)
        r3 = self.encoder2(r2)
        x = self.encoder3(r3)
        x = self.decoder1.forward_with_resid(r3, x)
        x = self.decoder2.forward_with_resid(r2, x)
        x = self.decoder3.forward_with_resid(r1, x)
        return self.out(x)

    def mutable_parameters(self, recurse=True):
        return self.parameters(recurse)

# Unet()(torch.zeros(1, CHANNELS_COUNT, IMG_HEIGHT, IMG_WIDTH)).shape == torch.Size((1, 2, IMG_HEIGHT, IMG_WIDTH))

### Train script (0.5 point)

Complete the train and predict scripts.

In [None]:
def train_segmentation_model(data_path):
    BATCH_SIZE = 8
    N_EPOCH = 15
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    train_dataset = BirdsDataset(data_path + 'train')
    val_dataset = BirdsDataset(data_path + 'val')
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    model = Unet().to(DEVICE)
    optimizer = torch.optim.Adam(params=model.mutable_parameters(), lr=0.0007)
    criterion = # YOUR CODE HERE
    losses_train, losses_val, ious_train, ious_val = [], [], [], []

    for epoch in range(N_EPOCH):
        model.train()

        for tqdm(inputs, masks) in train_dataloader:
            inputs = inputs.to(DEVICE)
            masks = masks.to(DEVICE)
            # YOUR CODE HERE
        losses_train.append(...)
        ious_train.append(...)

        model.eval()
        with torch.no_grad():
            for inputs, masks in tqdm(val_dataloader):
                inputs = inputs.to(DEVICE)
                masks = masks.to(DEVICE)
                # YOUR CODE HERE
        losses_val.append(...)
        ious_val.append(...)

        torch.save(model.state_dict(), f'model_{epoch}.pth')

        print(f"Epoch: {epoch}, train loss: {losses_train[-1]}, val loss: {losses_val[-1]}, train iou: {ious_train[-1]}, val iou: {ious_val[-1]}")

In [None]:
def predict(model, img_path):
    with torch.no_grad():
        # YOUR CODE HERE TO PREPARE IMAGE
        # GET PREDICTIONS
        # POST PROCESS
        return segm

def get_model(path):
    model = Unet()
    model.load_state_dict(torch.load(path))
    model.eval()
    return model

In [None]:
train_segmentation_model('data/')

You can also experiment with models and write a small report about results. If the report will be meaningful, you will receive an extra point.

### Testing (8 points)
Your model will be tested on the new data, similar to validation, so use techniques to prevent overfitting the model.

* IoU > 0.85 — 8 points
* IoU > 0.80 — 7 points
* IoU > 0.75 — 6 points
* IoU > 0.70 — 5 points
* IoU > 0.60 — 4 points
* IoU > 0.50 — 3 points
* IoU > 0.40 — 2 points
* IoU > 0.30 — 1 points

In [None]:
model = get_model('model_14.pth').to('cuda')

In [None]:
ious, times = [], []
test_dir = 'data/val/'

for class_name in tqdm(sorted(os.listdir(os.path.join(test_dir, 'images')))):
    for img_name in sorted(os.listdir(os.path.join(test_dir, 'images', class_name))):

        t_start = time()
        pred = predict(model, os.path.join(test_dir, 'images', class_name, img_name))
        times.append(time() - t_start)

        gt_name = img_name.replace('jpg', 'png')
        gt = np.asarray(Image.open(os.path.join(test_dir, 'gt', class_name, gt_name)), dtype = np.uint8)
        if len(gt.shape) > 2:
            gt = gt[:, :, 0]

        iou = get_iou(gt==255, pred>0.5)
        ious.append(iou)

np.mean(ious), np.mean(times)

### Compression (1 point)

Try to speed up the model in any way without losing more than 1% in iou score.
For example [torch2trt](https://github.com/NVIDIA-AI-IOT/torch2trt)

In [None]:
def get_fast_model():
    # YOUR CODE HERE
    return model

In [None]:
fast_model = get_fast_model().to('cuda')

In [None]:
ious, times = [], []
test_dir = 'data/val/'

for class_name in tqdm(sorted(os.listdir(os.path.join(test_dir, 'images')))):
    for img_name in sorted(os.listdir(os.path.join(test_dir, 'images', class_name))):

        t_start = time()
        pred = predict(fast_model, os.path.join(test_dir, 'images', class_name, img_name))
        times.append(time() - t_start)

        gt_name = img_name.replace('jpg', 'png')
        gt = np.asarray(Image.open(os.path.join(test_dir, 'gt', class_name, gt_name)), dtype = np.uint8)
        if len(gt.shape) > 2:
            gt = gt[:, :, 0]

        iou = get_iou(gt==255, pred>0.5)
        ious.append(iou)

np.mean(ious), np.mean(times)

**Bonus:** For the best iou score on test(without compression) in group you will get 1.5, 1, 0.5 extra points(for 1st, 2nd, 3rd places).