In [None]:
from typing import Optional
from matplotlib.image import imread
from matplotlib import pyplot as plt
import torch
import torchvision.transforms.functional as fn
from torchvision import transforms
from sklearn.model_selection import train_test_split
import os
import sys
import sklearn
from tqdm import tqdm
from torch import nn, Tensor
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader
import random

torch.manual_seed(42)
random.seed(42)

In [None]:
dataset_dir = "/local/tmp/BostonGene_application_data/images/"
processed_image_size = 64
device = "cuda"

In [None]:
targets = os.listdir(dataset_dir)
_filenames = {target:[file for file in os.listdir(f"{dataset_dir}/{target}")] for target in targets}

In [None]:
def load_and_preprocess(filename: str) -> torch.Tensor:
    # Load the image
    raw_image = torch.tensor(imread(filename), dtype=torch.float32).permute(2, 1, 0)
    # Crop it to the square shape
    cropped_image = fn.center_crop(raw_image, output_size=min(raw_image.shape[1:]))
    # Resize image
    resized_image = fn.resize(cropped_image, size=processed_image_size)
    # Normalise image
    normalised_image = resized_image / 255
    return normalised_image

In [None]:
img_name = f"{dataset_dir}/{targets[0]}/{_filenames[targets[0]][3]}"
print("Before:")
plt.imshow(imread(img_name))
plt.show()
print("After:")
plt.imshow(load_and_preprocess(img_name).permute(2, 1, 0))
plt.show()

Building the dataset and splitting the data

Processed dataset is small, so we can easilly fit it inside ram

In [None]:
target_encoding = {targets[_id]: _id for _id in range(len(targets))}

_data = []
_targets = []
for target in targets:
    for file in os.listdir(f"{dataset_dir}/{target}"):
        _data.append(load_and_preprocess(f"{dataset_dir}/{target}/{file}"))
        _targets.append(target_encoding[target])

In [None]:
print(f"Estimated processed dataset_size: {len(_data) * (3 * processed_image_size**2) * 4 // 10**6 } MB")

In [None]:
x_train, x_test, y_train, y_test = train_test_split(_data, _targets, test_size=0.1, random_state=42, shuffle=True, stratify=_targets)

In [None]:
# Implementing augmentations
augmentation_pipeline = transforms.Compose([
    transforms.RandomRotation(10),
    transforms.RandomHorizontalFlip(p=0.1),
    transforms.RandomVerticalFlip(p=0.1)
])

In [None]:
# Let's create a simple baseline - a ResNet model
class ResBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels,
                               kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(num_features=out_channels)
        self.activation = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels,
                               kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(num_features=out_channels)

        self.downsample = False
        if in_channels != out_channels:
            self.downsample = True
            self.downsample_layer = nn.Conv2d(in_channels=in_channels, out_channels=out_channels,
                               kernel_size=1, stride=1, padding=0, bias=False)

    def forward(self, _input: Tensor) -> Tensor:
        identity = _input
        if self.downsample:
            identity = self.downsample_layer(identity)

        output = self.conv1(_input)
        output = self.bn1(output)
        output = self.activation(output)
        output = self.conv2(output)
        output = self.bn2(output)
        output += identity
        output = self.activation(output)

        return output


class MyResNet(nn.Module):
    @property
    def device(self):
        return next(self.parameters()).device


    def __init__(self, input_channels: int, layers: list[tuple[int, int]], num_of_classes: int, initial_layers:
    Optional[nn.Module] = None):
        super().__init__()

        if initial_layers is None:
            initial_layers = nn.Identity()
        self.initial_layers = initial_layers

        self.layers = nn.Sequential()
        for layer_id, layer in enumerate(layers):
            num_of_blocks, channels_per_block = layer
            for block_id in range(num_of_blocks):
                if block_id == 0:
                    if layer_id == 0:
                        in_channels = input_channels
                    else:
                        in_channels = layers[layer_id - 1][1]
                else:
                    in_channels = channels_per_block
                self.layers.add_module(f"ResBlock{layer_id}_{block_id}",
                                       ResBlock(in_channels=in_channels, out_channels=channels_per_block))

            if layer_id != len(layers) - 1:
                self.layers.add_module(f"pool{layer_id}", nn.MaxPool2d(2))

        self.avg_pool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.fc = nn.Linear(in_features=layers[-1][1], out_features=num_of_classes)


    def forward(self, _input: Tensor) -> Tensor:
        output = self.initial_layers(_input)
        output = self.layers(output)
        output = self.avg_pool(output)
        output = torch.flatten(output, 1)
        output = self.fc(output)

        return output

In [None]:
model = MyResNet(input_channels=3, layers=[(4, 64), (4, 128)], num_of_classes=8)

In [None]:
# Напишем стандартный код, который можно использовать для обучения моделек:
# TODO:
# add augmentation during training

def trainer(number_of_epochs,
            dataset,
            val_dataset,
            batch_size,
            model,
            loss_function,
            optimizer,
            writer,
            lr = 0.001):
    def make_val_report(iteration):
        report = calculate_val_performance(model, DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=8))
        def report_avg(which_avg):
            for metric in report[which_avg]:
                if metric != 'support':
                    writer.add_scalar(f'metrics/{which_avg}/{metric}', report[which_avg][metric], iteration)
        report_avg('weighted avg')
        report_avg('macro avg')
        writer.add_scalar(f'metrics/accuracy', report['accuracy'], iteration)


    optima = optimizer(model.parameters(), lr=lr)

    iterations = tqdm(range(number_of_epochs), desc='epoch')
    iterations.set_postfix({'train epoch loss': np.nan})
    for it in iterations:
        epoch_loss = train_epoch(train_generator=DataLoader(dataset, batch_size, shuffle=True, num_workers=8),
                    model=model,
                    loss_function=loss_function,
                    optimizer=optima)

        iterations.set_postfix({'train epoch loss': epoch_loss})
        writer.add_scalar('metrics/train_loss', epoch_loss, it)
        make_val_report(it)


def train_epoch(train_generator, model, loss_function, optimizer):
    model.train()

    epoch_loss = 0
    total = 0
    for x, y in train_generator:
        optimizer.zero_grad()

        output = model(x.to(model.device))

        loss = loss_function(output, y.to(model.device))
        loss.backward()

        optimizer.step()

        epoch_loss += loss.cpu().item()
        total += 1

    return epoch_loss/total


def calculate_val_performance(model, val_dataset):
    model.eval()

    y_pred = [int(torch.argmax(model(x.to(model.device)).cpu())) for x, y in val_dataset]
    y_target = [int(y) for x, y in val_dataset]

    print(y_pred, y_target)

    return sklearn.metrics.classification_report(y_target, y_pred, output_dict=True, zero_division=0)


In [None]:

model.to('cuda')
trainer(number_of_epochs=10,
        dataset=list(zip(x_train, y_train)),
        val_dataset=list(zip(x_test, y_test)),
        batch_size = 16,
        model=model,
        loss_function=nn.CrossEntropyLoss(),
        optimizer=torch.optim.Adam,
        writer=SummaryWriter(log_dir='/local/tmp/logs/BG_application/resnet'))