# Импортируем библиотеки

In [None]:
import os
import random
import time
from collections import Counter

import numpy as np
import pandas as pd

import cv2
from PIL import Image

import matplotlib.pyplot as plt
import matplotlib
import IPython
from IPython import display

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset

from torchvision import transforms, datasets, models
import torchvision
from torchvision.models import ResNet18_Weights
from torchvision.utils import make_grid, save_image

from torch.optim.lr_scheduler import ReduceLROnPlateau

import albumentations as A
from albumentations.pytorch import ToTensorV2

from sklearn.model_selection import train_test_split
import sklearn
from tqdm import tqdm
import tqdm

In [None]:
# versions = {
#     "os": "N/A",  
#     "random": "N/A",
#     "time": "N/A",
#     "collections": "N/A",
#     "numpy": np.__version__,
#     "pandas": pd.__version__,
#     "cv2": cv2.__version__,
#     "Pillow": Image.__version__,
#     "matplotlib": matplotlib.__version__,
#     "IPython": IPython.__version__,
#     "torch": torch.__version__,
#     "torchvision": torchvision.__version__,  
#     "albumentations": A.__version__,
#     "sklearn": sklearn.__version__,
#     "tqdm": tqdm.__version__,
# }

# with open("requirements.txt", "w") as f:
#     for library, version in versions.items():
#         f.write(f"{library}=={version}\n")

# from IPython.display import FileLink
# FileLink(r'requirements.txt')

*Запишем дирекции*

In [None]:
data_dir = "/Users/vladislav/Documents/DL/dl-2025-competition-1/data/train"
test_dir = "/Users/vladislav/Documents/DL/dl-2025-competition-1/data/test"

# Проанализируем наши данные

In [None]:
class_names = sorted([
    name for name in os.listdir(data_dir) 
    if os.path.isdir(os.path.join(data_dir, name))
])
print("Классы:", len(class_names))

class_counts = {}
for class_name in class_names:
    class_path = os.path.join(data_dir, class_name)
    images = [
        f for f in os.listdir(class_path) 
        if os.path.isfile(os.path.join(class_path, f))
    ]
    class_counts[class_name] = len(images)
    
#Посмотрим на распределение
plt.figure(figsize=(20, 8))
plt.bar(class_counts.keys(), class_counts.values())
plt.title("Баланс классов")
plt.xticks(rotation=90)
plt.show()

*Выведем некоторые случайные картинки*

In [None]:
def visualize_random_samples(data_dir, num_samples=10):
    class_names = sorted([d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))])
    
    plt.figure(figsize=(15, 8))
    for i in range(num_samples):
        class_name = random.choice(class_names)
        class_path = os.path.join(data_dir, class_name)
        images = [f for f in os.listdir(class_path) if os.path.isfile(os.path.join(class_path, f))]
        if not images:
            continue
            
        random_image = random.choice(images)
        img = Image.open(os.path.join(class_path, random_image))
        
        #Отрисовываем
        plt.subplot(2, 5, i+1)
        plt.imshow(img)
        plt.title(f"Class: {class_name}\nSize: {img.size}")
        plt.axis("off")

    plt.tight_layout()
    plt.show()

visualize_random_samples(data_dir, num_samples=10)

*Основные выводы:*

Мы видим 100 классов картинок с большим дисбалансом (в некоторых классах всего 30 изображений, что может сказаться на обучении). Значит нам нужно очень аккуратно делить выборку на тренировочную и валидационную. Также достаточно малое качество изображений, значит аугментацию будем пробовать очень осторожно и оставим только наиболее подходящую

# Перейдем к обучению наших моделей

Создадим кастомный датасет. Будем аккуратно **сортировать** индексы (долго не мог найти ошибку именно в этом месте, оказалось что мы неверно сортировали наши картинки

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform

        self.classes = sorted(
            [d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))],
            key=lambda x: int(x) 
        )

        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
        self.idx_to_class = {i: cls for i, cls in enumerate(self.classes)}
        
        self.samples = []
        for class_name in self.classes:
            class_path = os.path.join(data_dir, class_name)
            for img_name in os.listdir(class_path):
                if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    self.samples.append((
                        os.path.join(class_path, img_name),
                        self.class_to_idx[class_name]
                    ))

    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transform:
            image = self.transform(image=image)["image"]
            
        return image, label

*Попробуем разные аугментации*

In [None]:
# train_transform = A.Compose([
#     A.Resize(32, 32),
#     A.HorizontalFlip(p=0.5),
#     A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),
#     A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
#     ToTensorV2(),
# ])

train_transform = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.05, rotate_limit=15), 
    A.ColorJitter(brightness=0.2, contrast=0.2),            
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  
    ToTensorV2(),  
])    
val_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

In [None]:
train_dataset = CustomDataset(data_dir, transform=train_transform)
val_dataset = CustomDataset(data_dir, transform=val_transform)

In [None]:
def get_loaders(data_dir, batch_size=32):
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,  
        pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True
    )
    
    return train_loader, val_loader


In [None]:
train_loader, val_loader = get_loaders(data_dir)

*Посмотрим на аугментированные картинки*

In [None]:
def show_augmented_samples(dataloader, num_samples=8):
    images, labels = next(iter(dataloader))

    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    images = images.permute(0, 2, 3, 1).numpy()  
    images = std * images + mean  
    images = np.clip(images, 0, 1)
    
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 3))
    for i in range(num_samples):
        axes[i].imshow(images[i])
        axes[i].set_title(f"Class: {labels[i].item()}")
        axes[i].axis("off")
    plt.tight_layout()
    plt.show()
    
show_augmented_samples(train_loader)

Получилось достаточно неплохо, наши аугментации не сильно испортили картинки, значит можем идти дальше

*Разделим нашу выборку на трейн-валидацию, учитывая дисбаланс*

In [None]:
class StratifiedSubset(Subset):
    def __init__(self, dataset, indices, transform=None):
        super().__init__(dataset, indices)
        self.transform = transform
        self.idx_to_class = dataset.idx_to_class 
        self.class_to_idx = dataset.class_to_idx
        self.classes = dataset.classes
        self.samples = [dataset.samples[i] for i in indices]  

    def __getitem__(self, idx):
        image, label = self.dataset[self.indices[idx]]
        
        if self.transform:
            transformed = self.transform(image=image)
            image = transformed["image"]
            
        return image, label

def get_stratified_split(dataset, val_size=0.2):
    labels = [label for _, label in dataset.samples]
    train_idx, val_idx = train_test_split(
        range(len(dataset)),
        test_size=val_size,
        stratify=labels
    )
    return train_idx, val_idx


In [None]:
full_dataset = CustomDataset(data_dir, transform=None)
train_idx, val_idx = get_stratified_split(full_dataset)

train_dataset = StratifiedSubset(full_dataset, train_idx, train_transform)
val_dataset = StratifiedSubset(full_dataset, val_idx, val_transform)

In [None]:
# from collections import Counter          # здесь можно посмотреть на распределение выборки

# def check_split(dataset, name):
#     labels = [dataset[i][1] for i in range(len(dataset))]
#     print(f"{name} распределение:", Counter(labels))
# check_split(train_dataset, "Train")
# check_split(val_dataset, "Val")

# Обучение моделей

Сперва создадим различные функции, которые пригодятся нам в обучении. Затем будем пробовать разные модельки

In [None]:
def get_scheduler(optimizer, mode='min', patience=5, factor=0.1):

    return ReduceLROnPlateau(
        optimizer, 
        mode=mode, 
        patience=patience, 
        factor=factor,
        verbose=True
    )

In [None]:
def apply_warmup(optimizer, current_epoch, warmup_epochs=5, base_lr=0.001):
   
    if current_epoch < warmup_epochs:
        lr = base_lr * (current_epoch + 1) / warmup_epochs
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

In [None]:
class EarlyStopping:
    def __init__(self, patience=7, delta=0, mode='min'):
       
        self.patience = patience
        self.delta = delta
        self.mode = mode
        self.counter = 0
        self.best_metric = float('inf') if mode == 'min' else -float('inf')
        self.early_stop = False

    def __call__(self, current_metric):
        if self.mode == 'min':
            improved = current_metric < (self.best_metric - self.delta)
        else:
            improved = current_metric > (self.best_metric + self.delta)

        if improved:
            self.best_metric = current_metric
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

In [None]:
def get_device():
    if torch.backends.mps.is_available():
        device = torch.device("mps") 
    elif torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    print(f"Using device: {device}")
    return device

device = get_device()

In [None]:
def plot_training_history(history, live_update=True):

    if not hasattr(plot_training_history, 'fig') or not live_update:
        plot_training_history.fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
        plt.ion()  
    else:
        ax1, ax2 = plot_training_history.fig.axes
    
    ax1.clear()
    ax2.clear()

    ax1.plot(history['train_loss'], label='Train', color='blue')
    ax1.plot(history['val_loss'], label='Validation', color='orange')
    ax1.set_title('Training and Validation Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)

    ax2.plot(history['train_acc'], label='Train', color='blue')
    ax2.plot(history['val_acc'], label='Validation', color='orange')
    ax2.set_title('Training and Validation Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy (%)')
    ax2.legend()
    ax2.grid(True)

    last_epoch = len(history['train_loss']) - 1
    print(f"Epoch {last_epoch + 1}: "
          f"Train Loss = {history['train_loss'][-1]:.4f}, "
          f"Val Loss = {history['val_loss'][-1]:.4f} | "
          f"Train Acc = {history['train_acc'][-1]:.2f}%, "
          f"Val Acc = {history['val_acc'][-1]:.2f}%")
    
    plt.tight_layout()
    
    if live_update:
        display.clear_output(wait=True)  
        display.display(plot_training_history.fig) 
    else:
        plt.show()

In [None]:
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss = running_loss / len(dataloader)
    val_acc = 100 * correct / total
    return val_loss, val_acc

In [None]:
def train_model_with_logging(
    model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    criterion: nn.Module,
    optimizer: optim.Optimizer,
    device: str = "cuda",
    num_epochs: int = 20,
    warmup_epochs: int = 3,
    patience: int = 5,
    model_save_path: str = 'best_model.pth' 
) -> dict:

    model.to(device)
    history = {
        'train_loss': [],
        'val_loss': [],
        'val_acc': [],
        'train_acc': []
    }
    
    best_val_acc = 0.0 
    best_epoch = 0
    
    scheduler = get_scheduler(optimizer, mode='min', patience=3)
    early_stopping = EarlyStopping(patience=patience, mode='min')

    plt.ion()
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    
    metrics_text = ""
    
    for epoch in range(num_epochs):
        start_time = time.time()
 
        apply_warmup(optimizer, epoch, warmup_epochs)
     
        model.train()
        epoch_train_loss = 0.0
        correct_train = 0
        total_train = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device).requires_grad_(), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            epoch_train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
     
        val_loss, val_acc = evaluate_model(model, val_loader, criterion, device)
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_epoch = epoch
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_acc': val_acc,
                'train_acc': 100 * correct_train / total_train,
            }, model_save_path)
            print(f"Новая лучшая модель сохранена с val_acc: {val_acc:.2f}%")
        
        train_loss = epoch_train_loss / len(train_loader)
        train_acc = 100 * correct_train / total_train
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['train_acc'].append(train_acc)
        
        epoch_time = time.time() - start_time
        epoch_info = (
            f"Epoch {epoch+1}/{num_epochs} | "
            f"Train Loss: {train_loss:.4f} | "
            f"Val Loss: {val_loss:.4f} | "
            f"Train Acc: {train_acc:.2f}% | "
            f"Val Acc: {val_acc:.2f}% | "
            f"Time: {epoch_time:.2f}s\n"
        )
        
        metrics_text += epoch_info
       
        ax1.clear()
        ax1.plot(history['train_loss'], label='Train', color='blue')
        ax1.plot(history['val_loss'], label='Validation', color='orange')
        ax1.set_title('Training and Validation Loss')
        ax1.legend()
        
        ax2.clear()
        ax2.plot(history['train_acc'], label='Train', color='blue')
        ax2.plot(history['val_acc'], label='Validation', color='orange')
        ax2.set_title('Training and Validation Accuracy')
        ax2.legend()
        
        display.clear_output(wait=True)  
        print(metrics_text)  
        display.display(fig)  
        
        plt.pause(0.1)
        
        if early_stopping.early_stop:
            print(f"Early stopping triggered at epoch {epoch+1}")
            break
   
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        
    plt.ioff()
    plt.show()
    
    print(f"\nЛучшая модель достигнута на эпохе {best_epoch+1} с val_acc: {best_val_acc:.2f}%")
    return history

# Простая моделька (без Transfer learning)

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=100):
        super(SimpleCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )   
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(64 * 28 * 28, 256), 
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )
        
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)  
        x = self.classifier(x)
        return x

model_simple = SimpleCNN(num_classes=100).to(device)

def init_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

# model_simple.apply(init_weights)

*Обучение!*

In [None]:
model_simple = model_simple.to(device)
optimizer = optim.Adam(model_simple.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

train_loader = train_loader
val_loader = val_loader

history_simple = train_model_with_logging(
    model=model_simple,
    train_loader=train_loader,
    val_loader=val_loader,
    criterion=criterion,
    optimizer=optimizer,
    device=device,
    num_epochs=20,
    warmup_epochs=3,
    patience=5
)

# EfficientNet_B3

Будем использовать эту модель. Она справилась лучше **ResNet-18**, но требует больше времени для обучения

*Подгрузим модель*

In [None]:
model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT)

In [None]:
# for name, param in model.named_parameters():       #Можно посмотреть параметры модели, какие слои где находятся
#      print(name, param.requires_grad) 

*Обучение!*

In [None]:
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

train_loader = train_loader
val_loader = val_loader

history = train_model_with_logging(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    criterion=criterion,
    optimizer=optimizer,
    device=device,
    num_epochs=20,
    warmup_epochs=3,
    patience=5
)

Кажется модель обучилась достаточно неплохо. Можно попробовать увеличить количество эпох. Примечательно, что трейн и валидация сходятся, а значит мы все делаем правильно

# Предсказания

*Запишем предсказания в файл. Не забудем создать трансформ: нормализуем и делаем ресайз как в обучении!*

In [None]:
from tqdm.auto import tqdm 
file_list = sorted(
    [f for f in os.listdir(test_dir) if f.endswith(('.png', '.jpg', '.jpeg'))],
    key=lambda x: int(x.split('.')[0]) 
)

test_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2()
])

submission = pd.DataFrame({
    'id': [int(f.split('.')[0]) for f in file_list],
    'file_path': [os.path.join(test_dir, f) for f in file_list],
    'target': 0  
})

model.eval()
predictions = []

for idx, row in tqdm(submission.iterrows(), total=len(submission)):
    image = cv2.imread(row['file_path'])
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    augmented = test_transform(image=image)
    image_tensor = augmented["image"].unsqueeze(0).to(device) 

    with torch.no_grad():
        output = model(image_tensor)
        pred = output.argmax().item()
    
    predictions.append(pred)

submission['target'] = predictions
submission = submission.sort_values('id').drop('file_path', axis=1)
submission.to_csv('submission_newly.csv', index=False)

print("Первые 5 предсказаний:")
print(submission.head())
print("\nСтатистика предсказаний:")
print(submission['target'].value_counts())

# Генератор

Создадим простой генератор (типа GAN): начнем с реализации генератора и дискриминатора

In [None]:
img_size = 224  
batch_size = 128
num_classes = 100
latent_dim = 100

transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

dataset = CustomDataset(data_dir, transform=transform)
dataloader, loader= get_loaders(data_dir)

In [None]:
class Generator(nn.Module):
    def __init__(self, latent_dim=100, num_classes=100, img_size=224):
        super(Generator, self).__init__()
        self.img_size = img_size
        self.label_emb = nn.Embedding(num_classes, num_classes)
        input_dim = latent_dim + num_classes
        self.init_size = img_size // 16  # 224 -> 14

        self.l1 = nn.Sequential(nn.Linear(input_dim, 256 * self.init_size * self.init_size))

        self.conv_blocks = nn.Sequential(
            nn.BatchNorm2d(256),
            nn.Upsample(scale_factor=2),  # 14 → 28
            nn.Conv2d(256, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Upsample(scale_factor=2),  # 28 → 56
            nn.Conv2d(128, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Upsample(scale_factor=2),  # 56 → 112
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Upsample(scale_factor=2),  # 112 → 224
            nn.Conv2d(64, 3, 3, stride=1, padding=1),
            nn.Tanh()
        )

    def forward(self, noise, labels):
        x = torch.cat((noise, self.label_emb(labels)), dim=1)
        out = self.l1(x)
        out = out.view(out.size(0), 256, self.init_size, self.init_size)
        img = self.conv_blocks(out)
        return img


In [None]:
class Discriminator(nn.Module):
    def __init__(self, num_classes=100, img_size=224):
        super(Discriminator, self).__init__()
        self.img_size = img_size
        self.label_embedding = nn.Embedding(num_classes, 1 * img_size * img_size)

        def discriminator_block(in_filters, out_filters, bn=True):
            block = [nn.Conv2d(in_filters, out_filters, 3, 2, 1),
                     nn.LeakyReLU(0.2, inplace=True),
                     nn.Dropout2d(0.25)]
            if bn:
                block.append(nn.BatchNorm2d(out_filters, 0.8))
            return block

        self.model = nn.Sequential(
            *discriminator_block(4, 64, bn=False),   
            *discriminator_block(64, 128),
            *discriminator_block(128, 256),
            *discriminator_block(256, 512),
        )

        ds_size = img_size // 2 ** 4  # 224 / 16 = 14
        self.adv_layer = nn.Sequential(nn.Flatten(), nn.Linear(512 * ds_size ** 2, 1), nn.Sigmoid())

    def forward(self, img, labels):
        class_map = self.label_embedding(labels).view(labels.size(0), 1, self.img_size, self.img_size)
        d_in = torch.cat((img, class_map), 1)
        out = self.model(d_in)
        validity = self.adv_layer(out)
        return validity


In [None]:
generator = Generator(latent_dim=latent_dim, num_classes=num_classes, img_size=img_size).to(device)
discriminator = Discriminator(num_classes=num_classes, img_size=img_size).to(device)

adversarial_loss = nn.BCELoss()

optimizer_G = optim.Adam(generator.parameters(), lr=0.0001, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

*Запустим обучение и будем следить за изменением лосса на генераторе и дискриминаторе*

In [None]:
n_epochs = 20
sample_interval = 5 
os.makedirs("generated_images", exist_ok=True)

for epoch in range(n_epochs):
    for i, (real_imgs, labels) in enumerate(dataloader):  
        batch_size = real_imgs.size(0)
        real_imgs, labels = real_imgs.to(device), labels.to(device)

        valid = torch.ones((batch_size, 1), device=device)
        fake = torch.zeros((batch_size, 1), device=device)

        optimizer_G.zero_grad()

        z = torch.randn(batch_size, latent_dim, device=device)
        gen_labels = torch.randint(0, num_classes, (batch_size,), device=device)
        gen_imgs = generator(z, gen_labels)

        g_loss = adversarial_loss(discriminator(gen_imgs, gen_labels), valid)
        g_loss.backward()
        optimizer_G.step()
       
        for _ in range(5):  
            optimizer_D.zero_grad()
            real_loss = adversarial_loss(discriminator(real_imgs, labels), valid)
            fake_loss = adversarial_loss(discriminator(gen_imgs.detach(), gen_labels), fake)
            d_loss = (real_loss + fake_loss) / 2
            d_loss.backward()
            optimizer_D.step()

        if i % 100 == 0:
            print(f"[Epoch {epoch}/{n_epochs}] [Batch {i}/{len(dataloader)}] "
                  f"[D loss: {d_loss.item():.4f}] [G loss: {g_loss.item():.4f}]")

    if epoch % sample_interval == 0:
        z = torch.randn(10, latent_dim, device=device)
        labels_sample = torch.arange(10, device=device)  
        gen_imgs = generator(z, labels_sample)
        gen_imgs = (gen_imgs + 1) / 2  
        save_image(gen_imgs, f"generated_images/epoch_{epoch}.png", nrow=5, normalize=True)

*Посмотрим на сгенерированные картинки*

In [None]:
def generate_images(generator, z_dim, device, n_images=15, images_per_row=5, n_classes=100):
    generator.eval()
    z = torch.randn(n_images, z_dim).to(device)
    random_labels = torch.randint(0, n_classes, (n_images,), device=device)
    
    with torch.no_grad():
        gen_imgs = generator(z, random_labels).cpu()

    fig, axs = plt.subplots(n_images // images_per_row, images_per_row, figsize=(images_per_row * 4, (n_images // images_per_row) * 4))  
    axs = axs.flatten()
    
    for i, img in enumerate(gen_imgs):
        img_np = img.permute(1, 2, 0).numpy()
        axs[i].imshow((img_np - img_np.min()) / (img_np.max() - img_np.min())) 
        axs[i].set_title(f"Class {random_labels[i].item()}")
        axs[i].axis("off")

    plt.tight_layout()
    plt.show()

generate_images(generator, z_dim=100, device=device, n_images=15, images_per_row=5)

# Пайплайн с лучшим скором для Kaggle

## Напомним необходимые функции и классы

In [None]:
import os
import random
import time
from collections import Counter

import numpy as np
import pandas as pd

import cv2
from PIL import Image

import matplotlib.pyplot as plt
from IPython import display

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset

from torchvision import transforms, datasets, models
from torchvision.models import ResNet18_Weights
from torchvision.utils import make_grid, save_image

from torch.optim.lr_scheduler import ReduceLROnPlateau

import albumentations as A
from albumentations.pytorch import ToTensorV2

from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [None]:
data_dir = "/home/jupyter/work/resources/data/train"
test_dir = "/home/jupyter/work/resources/data/test"

class CustomDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.classes = sorted(
            [d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))],
            key=lambda x: int(x) 
        )
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
        self.idx_to_class = {i: cls for i, cls in enumerate(self.classes)}
        
        self.samples = []
        for class_name in self.classes:
            class_path = os.path.join(data_dir, class_name)
            for img_name in os.listdir(class_path):
                if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    self.samples.append((
                        os.path.join(class_path, img_name),
                        self.class_to_idx[class_name]
                    ))

    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            image = self.transform(image=image)["image"]          
        return image, label

train_transform = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.05, rotate_limit=15), 
    A.ColorJitter(brightness=0.2, contrast=0.2),            
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  
    ToTensorV2(),  
])    
val_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

train_dataset = CustomDataset(data_dir, transform=train_transform)
val_dataset = CustomDataset(data_dir, transform=val_transform)

def get_loaders(data_dir, batch_size=32):
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,  
        pin_memory=True
    )   
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True
    ) 
    return train_loader, val_loader

train_loader, val_loader = get_loaders(data_dir)

class StratifiedSubset(Subset):
    def __init__(self, dataset, indices, transform=None):
        super().__init__(dataset, indices)
        self.transform = transform
        self.idx_to_class = dataset.idx_to_class 
        self.class_to_idx = dataset.class_to_idx
        self.classes = dataset.classes
        self.samples = [dataset.samples[i] for i in indices]  

    def __getitem__(self, idx):
        image, label = self.dataset[self.indices[idx]]        
        if self.transform:
            transformed = self.transform(image=image)
            image = transformed["image"]    
        return image, label

def get_stratified_split(dataset, val_size=0.2):
    labels = [label for _, label in dataset.samples]
    train_idx, val_idx = train_test_split(
        range(len(dataset)),
        test_size=val_size,
        stratify=labels
    )
    return train_idx, val_idx

full_dataset = CustomDataset(data_dir, transform=None)
train_idx, val_idx = get_stratified_split(full_dataset)

train_dataset = StratifiedSubset(full_dataset, train_idx, train_transform)
val_dataset = StratifiedSubset(full_dataset, val_idx, val_transform)

In [None]:
def get_scheduler(optimizer, mode='min', patience=5, factor=0.1):
    return ReduceLROnPlateau(
        optimizer, 
        mode=mode, 
        patience=patience, 
        factor=factor,
        verbose=True
    )

def apply_warmup(optimizer, current_epoch, warmup_epochs=5, base_lr=0.001): 
    if current_epoch < warmup_epochs:
        lr = base_lr * (current_epoch + 1) / warmup_epochs
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

class EarlyStopping:
    def __init__(self, patience=7, delta=0, mode='min'):    
        self.patience = patience
        self.delta = delta
        self.mode = mode
        self.counter = 0
        self.best_metric = float('inf') if mode == 'min' else -float('inf')
        self.early_stop = False
    def __call__(self, current_metric):
        if self.mode == 'min':
            improved = current_metric < (self.best_metric - self.delta)
        else:
            improved = current_metric > (self.best_metric + self.delta)
        if improved:
            self.best_metric = current_metric
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

def get_device():
    if torch.backends.mps.is_available():
        device = torch.device("mps") 
    elif torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    print(f"Using device: {device}")
    return device
    
device = get_device()

def plot_training_history(history, live_update=True):
    if not hasattr(plot_training_history, 'fig') or not live_update:
        plot_training_history.fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
        plt.ion()  
    else:
        ax1, ax2 = plot_training_history.fig.axes   
    ax1.clear()
    ax2.clear()
    ax1.plot(history['train_loss'], label='Train', color='blue')
    ax1.plot(history['val_loss'], label='Validation', color='orange')
    ax1.set_title('Training and Validation Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)
    ax2.plot(history['train_acc'], label='Train', color='blue')
    ax2.plot(history['val_acc'], label='Validation', color='orange')
    ax2.set_title('Training and Validation Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy (%)')
    ax2.legend()
    ax2.grid(True)
    last_epoch = len(history['train_loss']) - 1
    print(f"Epoch {last_epoch + 1}: "
          f"Train Loss = {history['train_loss'][-1]:.4f}, "
          f"Val Loss = {history['val_loss'][-1]:.4f} | "
          f"Train Acc = {history['train_acc'][-1]:.2f}%, "
          f"Val Acc = {history['val_acc'][-1]:.2f}%")  
    plt.tight_layout()   
    if live_update:
        display.clear_output(wait=True)  
        display.display(plot_training_history.fig) 
    else:
        plt.show()

def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_loss = running_loss / len(dataloader)
    val_acc = 100 * correct / total
    return val_loss, val_acc

def train_model_with_logging(
    model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    criterion: nn.Module,
    optimizer: optim.Optimizer,
    device: str = "cuda",
    num_epochs: int = 20,
    warmup_epochs: int = 3,
    patience: int = 5,
    model_save_path: str = 'best_model.pth' 
) -> dict:
    model.to(device)
    history = {
        'train_loss': [],
        'val_loss': [],
        'val_acc': [],
        'train_acc': []
    }   
    best_val_acc = 0.0 
    best_epoch = 0
    
    scheduler = get_scheduler(optimizer, mode='min', patience=3)
    early_stopping = EarlyStopping(patience=patience, mode='min')

    plt.ion()
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    
    metrics_text = ""
    
    for epoch in range(num_epochs):
        start_time = time.time()
 
        apply_warmup(optimizer, epoch, warmup_epochs)
     
        model.train()
        epoch_train_loss = 0.0
        correct_train = 0
        total_train = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device).requires_grad_(), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            epoch_train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
     
        val_loss, val_acc = evaluate_model(model, val_loader, criterion, device)
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_epoch = epoch
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_acc': val_acc,
                'train_acc': 100 * correct_train / total_train,
            }, model_save_path)
            print(f"Новая лучшая модель сохранена с val_acc: {val_acc:.2f}%")
        
        train_loss = epoch_train_loss / len(train_loader)
        train_acc = 100 * correct_train / total_train
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['train_acc'].append(train_acc)
        
        epoch_time = time.time() - start_time
        epoch_info = (
            f"Epoch {epoch+1}/{num_epochs} | "
            f"Train Loss: {train_loss:.4f} | "
            f"Val Loss: {val_loss:.4f} | "
            f"Train Acc: {train_acc:.2f}% | "
            f"Val Acc: {val_acc:.2f}% | "
            f"Time: {epoch_time:.2f}s\n"
        )
        
        metrics_text += epoch_info
       
        ax1.clear()
        ax1.plot(history['train_loss'], label='Train', color='blue')
        ax1.plot(history['val_loss'], label='Validation', color='orange')
        ax1.set_title('Training and Validation Loss')
        ax1.legend()
        
        ax2.clear()
        ax2.plot(history['train_acc'], label='Train', color='blue')
        ax2.plot(history['val_acc'], label='Validation', color='orange')
        ax2.set_title('Training and Validation Accuracy')
        ax2.legend()
        
        display.clear_output(wait=True)  
        print(metrics_text)  
        display.display(fig)  
        
        plt.pause(0.1)
        
        if early_stopping.early_stop:
            print(f"Early stopping triggered at epoch {epoch+1}")
            break 
    if torch.cuda.is_available():
        torch.cuda.empty_cache()      
    plt.ioff()
    plt.show() 
    print(f"\nЛучшая модель достигнута на эпохе {best_epoch+1} с val_acc: {best_val_acc:.2f}%")
    return history

## Само обучение

In [None]:
model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT)
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

train_loader = train_loader
val_loader = val_loader

history = train_model_with_logging(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    criterion=criterion,
    optimizer=optimizer,
    device=device,
    num_epochs=20,
    warmup_epochs=3,
    patience=5
)

## Предсказания (если необходимо)

In [None]:
from tqdm.auto import tqdm 
file_list = sorted(
    [f for f in os.listdir(test_dir) if f.endswith(('.png', '.jpg', '.jpeg'))],
    key=lambda x: int(x.split('.')[0]) 
)

test_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2()
])

submission = pd.DataFrame({
    'id': [int(f.split('.')[0]) for f in file_list],
    'file_path': [os.path.join(test_dir, f) for f in file_list],
    'target': 0  
})

model.eval()
predictions = []

for idx, row in tqdm(submission.iterrows(), total=len(submission)):
    image = cv2.imread(row['file_path'])
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    augmented = test_transform(image=image)
    image_tensor = augmented["image"].unsqueeze(0).to(device) 

    with torch.no_grad():
        output = model(image_tensor)
        pred = output.argmax().item()
    
    predictions.append(pred)

submission['target'] = predictions
submission = submission.sort_values('id').drop('file_path', axis=1)
submission.to_csv('submission_newly.csv', index=False)