# Домашнее задание 2: Создание и оптимизация ResNet18


In [23]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from pathlib import Path
from collections import defaultdict
import random
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support


def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
set_seed(42)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print({device})

{device(type='cuda')}


## 1. Подготовка данных

In [None]:
class TinyImageNetDataset(Dataset):
    def __init__(self, root_dir, split='train', selected_classes=None, transform=None):
        self.root_dir = Path(root_dir)
        self.split = split
        self.transform = transform
        self.selected_classes = selected_classes
        self.images = []
        self.labels = []
        self.class_to_idx = {}
        if selected_classes:
            self.class_to_idx = {cls: idx for idx, cls in enumerate(sorted(selected_classes))}
        self._load_data()
    
    def _load_data(self):
        if self.split == 'train':
            train_dir = self.root_dir / 'train'
            for class_dir in sorted(train_dir.iterdir()):
                if class_dir.is_dir():
                    class_name = class_dir.name
                    if self.selected_classes and class_name not in self.selected_classes:
                        continue
                    if class_name not in self.class_to_idx:
                        self.class_to_idx[class_name] = len(self.class_to_idx)
                    images_dir = class_dir / 'images'
                    if images_dir.exists():
                        for img_path in images_dir.glob('*.JPEG'):
                            self.images.append(str(img_path))
                            self.labels.append(self.class_to_idx[class_name])
        elif self.split == 'val':
            val_dir = self.root_dir / 'val'
            val_annotations = val_dir / 'val_annotations.txt'
            with open(val_annotations, 'r') as f:
                for line in f:
                    parts = line.strip().split('\t')
                    img_name = parts[0]
                    class_name = parts[1]
                    if self.selected_classes and class_name not in self.selected_classes:
                        continue
                    if class_name not in self.class_to_idx:
                        self.class_to_idx[class_name] = len(self.class_to_idx)
                    img_path = val_dir / 'images' / img_name
                    if img_path.exists():
                        self.images.append(str(img_path))
                        self.labels.append(self.class_to_idx[class_name])
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

In [None]:
DATA_DIR = 'tiny-imagenet-200'
# BATCH_SIZE = 8
# NUM_EPOCHS = 30
# LEARNING_RATE = 0.001
# NUM_WORKERS = 0 

BATCH_SIZE = 8
NUM_EPOCHS = 25
LEARNING_RATE = 0.001
NUM_WORKERS = 0


SELECTED_CLASSES = [
    'n01443537', 'n01629819', 'n01641577', 'n01644900', 'n01698640',
    'n01742172', 'n01768244', 'n01770393', 'n01774384', 'n01774750'
]

CLASS_NAMES = [
    'goldfish', 'salamander', 'bullfrog', 'tailed_frog', 'alligator',
    'boa', 'trilobite', 'scorpion', 'black_widow', 'tarantula'
]

print(f" Выбранные классы: {CLASS_NAMES}")

train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(64, padding=4),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

val_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


train_dataset = TinyImageNetDataset(DATA_DIR, split='train', selected_classes=SELECTED_CLASSES, transform=train_transform)
val_dataset = TinyImageNetDataset(DATA_DIR, split='val', selected_classes=SELECTED_CLASSES, transform=val_transform)
test_dataset = TinyImageNetDataset(DATA_DIR, split='val', selected_classes=SELECTED_CLASSES, transform=val_transform) 

print(f"Размер train dataset: {len(train_dataset)}")
print(f"Размер validation dataset: {len(val_dataset)}")

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

## ResNet18

In [None]:
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, activation='relu'):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = None
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        self.activation = self._get_activation(activation)
    
    def _get_activation(self, activation):
        if activation == 'leakyrelu': return nn.LeakyReLU(inplace=True)
        if activation == 'elu': return nn.ELU(inplace=True)
        if activation == 'gelu': return nn.GELU()
        return nn.ReLU(inplace=True)
    
    def forward(self, x):
        identity = x
        out = self.activation(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        return self.activation(out)

class ResNet18(nn.Module):
    def __init__(self, num_classes=10, channels=[64, 128, 256, 512], num_blocks=[2, 2, 2, 2], activation='relu', use_maxpool=False):
        super(ResNet18, self).__init__()
        self.in_channels = channels[0]
        self.activation_name = activation
        
        if use_maxpool:
            self.conv1 = nn.Conv2d(3, channels[0], kernel_size=7, stride=2, padding=3, bias=False)
            self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        else:
            self.conv1 = nn.Conv2d(3, channels[0], kernel_size=3, stride=1, padding=1, bias=False)
            self.maxpool = None
            
        self.bn1 = nn.BatchNorm2d(channels[0])
        self.activation = self._get_activation(activation)
        
        self.layer1 = self._make_layer(channels[0], num_blocks[0], stride=1)
        self.layer2 = self._make_layer(channels[1], num_blocks[1], stride=2)
        self.layer3 = self._make_layer(channels[2], num_blocks[2], stride=2)
        
        if len(channels) > 3 and len(num_blocks) > 3:
            self.layer4 = self._make_layer(channels[3], num_blocks[3], stride=2)
            final_channels = channels[3]
        else:
            self.layer4 = None
            final_channels = channels[2]
            
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(final_channels, num_classes)

    def _get_activation(self, activation):
        if activation == 'leakyrelu': return nn.LeakyReLU(inplace=True)
        if activation == 'elu': return nn.ELU(inplace=True)
        if activation == 'gelu': return nn.GELU()
        return nn.ReLU(inplace=True)

    def _make_layer(self, out_channels, num_blocks, stride):
        layers = [BasicBlock(self.in_channels, out_channels, stride, self.activation_name)]
        self.in_channels = out_channels
        for _ in range(1, num_blocks):
            layers.append(BasicBlock(out_channels, out_channels, stride=1, activation=self.activation_name))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.activation(self.bn1(self.conv1(x)))
        if self.maxpool is not None:
            x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        if self.layer4 is not None:
            x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        return self.fc(x)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

## Класс для обучения (Trainer)

In [None]:
class Trainer:
    def __init__(self, model, train_loader, val_loader, criterion, optimizer, device, model_name='model'):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device
        self.model_name = model_name
        self.train_losses, self.train_accs, self.val_losses, self.val_accs = [], [], [], []
        self.best_val_acc = 0.0
        self.best_model_path = f'{model_name}_best.pth'

    def train_epoch(self):
        self.model.train()
        running_loss, correct, total = 0.0, 0, 0
        loop = tqdm(self.train_loader, desc=f"Epoch {self.current_epoch}/{self.num_epochs} [Train]" , leave=False)
        for images, labels in loop:
            images, labels = images.to(self.device), labels.to(self.device)
            self.optimizer.zero_grad()
            outputs = self.model(images)
            loss = self.criterion(outputs, labels)
            loss.backward()
            self.optimizer.step()
            running_loss += loss.item() * images.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            loop.set_postfix(loss=running_loss / total, acc=100. * correct / total)
        return running_loss / total, 100. * correct / total

    def validate(self):
        self.model.eval()
        running_loss, correct, total = 0.0, 0, 0
        loop = tqdm(self.val_loader, desc=f"Epoch {self.current_epoch}/{self.num_epochs} [Val]" , leave=False)
        with torch.no_grad():
            for images, labels in loop:
                images, labels = images.to(self.device), labels.to(self.device)
                outputs = self.model(images)
                loss = self.criterion(outputs, labels)
                running_loss += loss.item() * images.size(0)
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
                loop.set_postfix(loss=running_loss / total, acc=100. * correct / total)
        return running_loss / total, 100. * correct / total

    def train(self, num_epochs):
        self.num_epochs = num_epochs
        print(f"\n Обучение модели: {self.model_name}")
        print(f"Количество параметров: {count_parameters(self.model):,}")
        print('-' * 100)
        for epoch in range(num_epochs):
            self.current_epoch = epoch + 1
            train_loss, train_acc = self.train_epoch()
            val_loss, val_acc = self.validate()
            self.train_losses.append(train_loss); self.train_accs.append(train_acc)
            self.val_losses.append(val_loss); self.val_accs.append(val_acc)
            print(f"Эпоха [{self.current_epoch}/{self.num_epochs}] Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}% | Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}%")
            if val_acc > self.best_val_acc:
                self.best_val_acc = val_acc
                torch.save(self.model.state_dict(), self.best_model_path)
                print(f"Обновлена лучшая модель с Val Acc: {val_acc:.2f}%" )
        print('-' * 100)
        print(f"Обучение завершено. Лучшая Val Accuracy: {self.best_val_acc:.2f}%")
        return {'train_losses': self.train_losses, 'train_accs': self.train_accs, 'val_losses': self.val_losses, 'val_accs': self.val_accs, 'best_val_acc': self.best_val_acc, 'params': count_parameters(self.model)}

In [None]:
def plot_training_curves(history, title, save_path=None):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    epochs = range(1, len(history['train_losses']) + 1)
    ax1.plot(epochs, history['train_losses'], 'b-', label='Train Loss'); ax1.plot(epochs, history['val_losses'], 'r-', label='Validation Loss')
    ax1.set(xlabel='Epoch', ylabel='Loss', title=f'{title} - Loss'); ax1.legend(); ax1.grid(True, alpha=0.3)
    ax2.plot(epochs, history['train_accs'], 'b-', label='Train Accuracy'); ax2.plot(epochs, history['val_accs'], 'r-', label='Validation Accuracy')
    ax2.set(xlabel='Epoch', ylabel='Accuracy (%)', title=f'{title} - Accuracy'); ax2.legend(); ax2.grid(True, alpha=0.3)
    plt.tight_layout()
    if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.show()

def plot_multiple_experiments(experiments, metric='accuracy', save_path=None):
    plt.figure(figsize=(12, 6))
    for exp_name, history in experiments.items():
        plt.plot(range(1, len(history['val_accs']) + 1), history['val_accs' if metric == 'accuracy' else 'val_losses'], label=exp_name)
    plt.xlabel('Epoch'); plt.ylabel(f'Validation {metric.capitalize()}'); plt.title(f'Сравнение экспериментов - {metric.capitalize()}')
    plt.legend(); plt.grid(True, alpha=0.3); plt.tight_layout()
    if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.show()

def evaluate_model(model, test_loader, device, class_names):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for images, labels in test_loader:
            outputs = model(images.to(device))
            all_preds.extend(outputs.max(1)[1].cpu().numpy()); all_labels.extend(labels.numpy())
    print(f"\nTest Accuracy: {100. * np.mean(np.array(all_preds) == np.array(all_labels)):.2f}%\n")
    print(classification_report(all_labels, all_preds, target_names=class_names, digits=4))
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8)); sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix'); plt.ylabel('True Label'); plt.xlabel('Predicted Label'); plt.tight_layout()
    plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight'); plt.show()

def visualize_predictions(model, test_dataset, device, class_names, num_samples=10):
    model.eval()
    indices = random.sample(range(len(test_dataset)), num_samples)
    fig, axes = plt.subplots(2, 5, figsize=(15, 6)); axes = axes.flatten()
    for i, idx in enumerate(indices):
        image, true_label = test_dataset[idx]
        with torch.no_grad():
            pred_label = model(image.unsqueeze(0).to(device)).max(1)[1].item()
        img_display = np.clip((image.permute(1, 2, 0).numpy() * 0.5) + 0.5, 0, 1)
        axes[i].imshow(img_display); axes[i].axis('off')
        true_class, pred_class = class_names[true_label], class_names[pred_label]
        axes[i].set_title(f'True: {true_class}\nPred: {pred_class}', color='green' if true_label == pred_label else 'red')
    plt.tight_layout(); plt.savefig('predictions_visualization.png', dpi=300, bbox_inches='tight'); plt.show()


criterion = nn.CrossEntropyLoss()
all_results = {}

## 2 Базовая архитектура ResNet18

In [None]:
baseline_model = ResNet18(num_classes=10, channels=[32, 64, 128, 256], num_blocks=[2, 2, 2, 2], activation='relu').to(device)
print(f"Архитектура базовой модели:\n{baseline_model}\n\nКоличество параметров: {count_parameters(baseline_model):,}")
optimizer = optim.Adam(baseline_model.parameters(), lr=LEARNING_RATE)
trainer = Trainer(baseline_model, train_loader, val_loader, criterion, optimizer, device, model_name='baseline')
baseline_history = trainer.train(NUM_EPOCHS)
all_results['Baseline'] = baseline_history
plot_training_curves(baseline_history, 'Baseline ResNet18', save_path='baseline_curves.png')

### 3.1: Оптимизация количества каналов

In [None]:
print('Вариант A: 32  64  128  256 каналов')
model_3_1_a = ResNet18(num_classes=10, channels=[32, 64, 128, 256], num_blocks=[2, 2, 2, 2], activation='relu').to(device)
optimizer_3_1_a = optim.Adam(model_3_1_a.parameters(), lr=LEARNING_RATE)
trainer_3_1_a = Trainer(model_3_1_a, train_loader, val_loader, criterion, optimizer_3_1_a, device, model_name='3_1_a')
history_3_1_a = trainer_3_1_a.train(NUM_EPOCHS)
all_results['3.1-A (32-64-128-256)'] = history_3_1_a


print('Вариант B: 64 128  256 каналов')
model_3_1_b = ResNet18(num_classes=10, channels=[64, 128, 256], num_blocks=[2, 2, 2,2], activation='relu').to(device)
optimizer_3_1_b = optim.Adam(model_3_1_b.parameters(), lr=LEARNING_RATE)
trainer_3_1_b = Trainer(model_3_1_b, train_loader, val_loader, criterion, optimizer_3_1_b, device, model_name='3_1_b')
history_3_1_b = trainer_3_1_b.train(NUM_EPOCHS)
all_results['3.1-B (64-128-256)'] = history_3_1_b


experiments_3_1 = {'Вариант A': history_3_1_a, 'Вариант B': history_3_1_b}
plot_multiple_experiments(experiments_3_1, metric='accuracy', save_path='3_1_comparison.png')

### 3.2: Эксперименты с количеством residual блоков

In [None]:


BEST_CHANNELS_3_1 = [ 64, 128, 256]

print('Вариант A: [1, 1, 1, 1] блоков')
model_3_2_a = ResNet18(num_classes=10, channels=BEST_CHANNELS_3_1, num_blocks=[1, 1, 1, 1], activation='relu').to(device)
optimizer_3_2_a = optim.Adam(model_3_2_a.parameters(), lr=LEARNING_RATE)
trainer_3_2_a = Trainer(model_3_2_a, train_loader, val_loader, criterion, optimizer_3_2_a, device, model_name='3_2_a')
history_3_2_a = trainer_3_2_a.train(NUM_EPOCHS)
all_results['3.2-A [1,1,1,1]'] = history_3_2_a


print('\n Вариант B: [2, 2, 2, 2] блоков')

model_3_2_b = ResNet18(num_classes=10, channels=BEST_CHANNELS_3_1, num_blocks=[2, 2, 2, 2], activation='relu').to(device)
optimizer_3_2_b = optim.Adam(model_3_2_b.parameters(), lr=LEARNING_RATE)
trainer_3_2_b = Trainer(model_3_2_b, train_loader, val_loader, criterion, optimizer_3_2_b, device, model_name='3_2_b')
history_3_2_b = trainer_3_2_b.train(NUM_EPOCHS)
all_results['3.2-B [2,2,2,2]'] = history_3_2_b

print('\n Вариант C: [3, 3, 3, 3] блоков')
model_3_2_c = ResNet18(num_classes=10, channels=BEST_CHANNELS_3_1, num_blocks=[3, 3, 3, 3], activation='relu').to(device)
optimizer_3_2_c = optim.Adam(model_3_2_c.parameters(), lr=LEARNING_RATE)
trainer_3_2_c = Trainer(model_3_2_c, train_loader, val_loader, criterion, optimizer_3_2_c, device, model_name='3_2_c')
history_3_2_c = trainer_3_2_c.train(NUM_EPOCHS)
all_results['3.2-C [3,3,3,3]'] = history_3_2_c


experiments_3_2 = {'Вариант A [1,1,1,1]': history_3_2_a, 'Вариант B [2,2,2,2]': history_3_2_b, 'Вариант C [3,3,3,3]': history_3_2_c}
plot_multiple_experiments(experiments_3_2, metric='accuracy', save_path='3_2_comparison.png')







### 3.3: Эксперименты с функциями активации

In [None]:
BEST_CHANNELS_3_2 = BEST_CHANNELS_3_1
BEST_BLOCKS_3_2 = [3,3,3,3]

experiments_activations = {}





for act in ['relu','leakyrelu', 'elu', 'gelu']:
    act_name = act.upper()
    print(f'\n Вариант: {act_name}')
    model = ResNet18(num_classes=10, channels=BEST_CHANNELS_3_2, num_blocks=BEST_BLOCKS_3_2, activation=act).to(device)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    trainer = Trainer(model, train_loader, val_loader, criterion, optimizer, device, model_name=f'act_{act}')
    history = trainer.train(NUM_EPOCHS)
    all_results[f'Activation-{act_name}'] = history
    experiments_activations[act_name] = history


plot_multiple_experiments(experiments_activations, metric='accuracy', save_path='activations_comparison.png')

## 6. Финальная модель и оценка

параметры

In [None]:
final_model = ResNet18(num_classes=10, channels=[64, 128, 256], num_blocks=[3,3,3,3], activation='leakyrelu').to(device)
optimizer = optim.Adam(final_model.parameters(), lr=LEARNING_RATE)
trainer = Trainer(final_model, train_loader, val_loader, criterion, optimizer, device, model_name='final_model')
final_history = trainer.train(40)
all_results['Final Model'] = final_history
plot_training_curves(final_history, 'Final Model', save_path='final_model_curves.png')

In [None]:
final_model.load_state_dict(torch.load('final_model_best.pth'))
evaluate_model(final_model, val_loader, device, CLASS_NAMES)
visualize_predictions(final_model, val_dataset, device, CLASS_NAMES, num_samples=10)

In [None]:
results_table = []
for exp_name, history in all_results.items():
    results_table.append({'Эксперимент': exp_name, 'Val Accuracy': f"{history['best_val_acc']:.2f}%", 'Train Accuracy': f"{history['train_accs'][-1]:.2f}%", 'Parameters': f"{history['params']:,}" })
print("\n{:<30} {:<20} {:<20} {:<20}".format('Эксперимент', 'Val Accuracy', 'Train Accuracy', 'Parameters'))
print('__' * 42)
for row in results_table:
    print("{:<30} {:<20} {:<20} {:<20}".format(row['Эксперимент'], row['Val Accuracy'], row['Train Accuracy'], row['Parameters']))
