# Libs

In [1]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

from sklearn.metrics import f1_score

import torch
import torch.nn.functional as F
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms

import torchvision
from torchvision.models import ResNet18_Weights

from torchsummary import summary

In [2]:
# Остальное
from IPython.display import clear_output
from collections import defaultdict
from PIL import Image
from tqdm import tqdm
import warnings, os, random
from copy import deepcopy
from pathlib import Path


warnings.filterwarnings('ignore')

# Seeds

In [3]:
def set_all_seeds(seed=1990):
    # python's seeds
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    
    # torch's seeds
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
set_all_seeds()

# Cuda

In [4]:
if torch.cuda.is_available():
    DEVICE = torch.device('cuda')
else:
    DEVICE = torch.device('cpu')
print(torch.cuda.get_device_name())

NVIDIA GeForce RTX 3060 Laptop GPU


# Usefull functons

In [5]:
def train(model, data_loader, optimizer, loss_fn):
    model = model.to(DEVICE)
    model.train()

    total_loss = 0
    
    y_true = list()
    y_pred = list()

    for x, y in tqdm(data_loader):
        x, y = x.to(DEVICE), y.to(DEVICE)
        optimizer.zero_grad()

        output = model(x)

        loss = loss_fn(output, y)

        loss.backward()

        total_loss += loss.item()
 
        y_true.extend(y.tolist())
        y_pred.extend(output.argmax(dim=1).tolist())

        optimizer.step()

    return total_loss / len(data_loader), f1_score(y_true, y_pred, average='macro')

In [6]:
def evaluate(model, data_loader, loss_fn):
    model = model.cuda()
    model.eval()

    total_loss = 0
    y_true = list()
    y_pred = list()

    with torch.no_grad():
        for x, y in tqdm(data_loader):
            x, y = x.cuda(), y.cuda()
            output = model(x)

            loss = loss_fn(output, y)
            total_loss += loss.item()

            y_true.extend(y.tolist())
            y_pred.extend(output.argmax(dim=1).tolist())

    return total_loss / len(data_loader), f1_score(y_true, y_pred, average='macro')


In [7]:
def plot_stats(
    train_loss: list[float],
    valid_loss: list[float],
    train_f1: list[float],
    valid_f1: list[float],
    title: str
):
    plt.figure(figsize=(16, 8))

    plt.title(title + ' loss')

    plt.plot(train_loss, label='Train loss')
    plt.plot(valid_loss, label='Valid loss')
    plt.legend()

    plt.show()

    plt.figure(figsize=(16, 8))

    plt.title(title + ' f1')

    plt.plot(train_f1, label='Train f1')
    plt.plot(valid_f1, label='Valid f1')
    plt.legend()

    plt.show()

In [8]:
def fit(model, train_loader, valid_loader, optimizer, loss_fn, num_epochs, title='Model'):
    train_loss_history, valid_loss_history = [], []
    train_f1_history, valid_f1_history = [], []

    best_valid_f1 = 0.0
    best_model = None

    def epoch(count):
        nonlocal best_valid_f1, best_model

        train_loss, train_f1 = train(model, train_loader, optimizer, loss_fn)
        valid_loss, valid_f1 = evaluate(model, valid_loader, loss_fn)

        train_loss_history.append(train_loss)
        valid_loss_history.append(valid_loss)

        train_f1_history.append(train_f1)
        valid_f1_history.append(valid_f1)

        clear_output()

        print(f"Epoch: {count}")
        print(f"F1: {valid_f1:.4f}")

        if valid_f1 > best_valid_f1:
            best_valid_f1 = valid_f1
            best_model = deepcopy(model)

    epoch(1)

    for i in range(2, num_epochs + 1):
        epoch(i)

        plot_stats(
            train_loss_history, valid_loss_history,
            train_f1_history, valid_f1_history,
            title
        )

    return best_model, best_valid_f1

# Data preprocessing

## data reading

In [9]:
relative_path = os.path.join('..', 'data')

base_path = Path(relative_path)
data = pd.read_csv(base_path / "train.csv")
images_path = base_path / "train"

# Разобъем данные на тренировочную и отложенную (на которой мы будем проверять качество работы) части
train_df, val = train_test_split(data, test_size=0.3, random_state=1)

train_df = train_df.reset_index(drop=True)
val = val.reset_index(drop=True)

# Получилось примерно 19.6к картинок для тренировки и 8.4к картинок для подсчета метрик
print(train_df.shape, val.shape)

FileNotFoundError: [Errno 2] No such file or directory: '..\\data\\train.csv'

## image showing

In [None]:
from torchvision.utils import make_grid
from torchvision.io import decode_image
from pathlib import Path
import torchvision.transforms.functional as F


def show(imgs):
    if not isinstance(imgs, list):
        imgs = [imgs]
    fig, axs = plt.subplots(ncols=len(imgs), squeeze=False)
    for i, img in enumerate(imgs):
        img = img.detach()
        img = F.to_pil_image(img)
        axs[0, i].imshow(np.asarray(img))
        axs[0, i].set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])


In [None]:
from torchvision.io import read_image


idx = 1
img = read_image(images_path / data.iloc[idx]["image_name"])

img_list = [read_image(images_path / data.iloc[i]["image_name"]) for i in range(20, 200, 10)]

grid = make_grid(img_list)
show(grid)

## dataset impementing

In [None]:
SIZE = (224, 224)

train_transform = transforms.Compose([
    transforms.RandomResizedCrop((224, 224), scale=(0.8, 1.0)),  # Случайное кадрирование
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
class Dataset(Dataset):
    def __init__(self, dataframe: pd.DataFrame, path_to_images: Path, transforms: transforms.Compose) -> None:
        self.df = dataframe
        self.path_to_images = path_to_images
        self.transforms = transforms
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = Image.open(self.path_to_images / row["image_name"]).convert('RGB')
        if self.transforms is not None:
            image = self.transforms(image)
        return image, row["class_id"]

In [None]:
train_dataset = Dataset(train_df, images_path, transforms=train_transform)
val_dataset = Dataset(val, images_path, transforms=val_transform)

In [None]:
batch_size = 32
trainloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
testloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

# Fitting

In [None]:
class ResNet18(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        # Инициализируем модель
        self.model = torchvision.models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
        # Подменим классмификационный слой на наш собственный
        self.model.fc = torch.nn.Linear(self.model.fc.in_features, num_classes)

    def forward(self, batch):
        inputs = batch
        return self.model(inputs)

In [None]:
model = ResNet18(num_classes=data["unified_class"].nunique()).to(DEVICE)

# Инициализируем функцию потерь (loss/criterion), а так же оптимизатор, который будет регулировать обновление весов нашей модели
optimizer = optim.AdamW(model.parameters(), lr=0.3e-4, weight_decay=0.01)
criterion = nn.CrossEntropyLoss()
best_model, best_f1 = fit(model, trainloader, testloader, optimizer, criterion, num_epochs=10)