# Так, я хз работает или нет. Colab у меня закончился, а у компа 32гб ОЗУ не хватает, прошу больше такое не давать никому.

In [3]:
import os
import time
import tqdm
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import Dataset

import help_l.tiny_img_dataset
from help_l.tiny_img import download_tinyImg200

In [5]:
data_path = '.'
download_tinyImg200(data_path)

Dataset was downloaded to '.\tiny-imagenet-200.zip'
Extract downloaded dataset to '.'


In [6]:
def get_computing_device():
    """Return CUDA device if available else CPU"""
    return torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


device = get_computing_device()
print(f"Our main computing device is '{device}'")

Our main computing device is 'cuda:0'


In [7]:
train_transforms = transforms.Compose(
    [
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(5),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
    ]
)

In [9]:
train_dataset = help_l.tiny_img_dataset.TinyImagenetRAM(
    'tiny-imagenet-200/train', transform=train_transforms
)

tiny-imagenet-200/train: 100%|██████████| 200/200 [07:02<00:00,  2.11s/it]


In [10]:
class TinyImagenetValDataset(Dataset):
    """Validation dataset that mimics TinyImagenetRAM API"""

    def __init__(self, root: str, transform=transforms.ToTensor()):
        super().__init__()
        self.root = root
        self.transform = transform

        # Parse annotations
        with open(os.path.join(root, 'val_annotations.txt')) as f:
            annotations = [tuple(line.split('\t')[:2]) for line in f]

        # 1. Classes and mapping
        self.classes = sorted({label for _, label in annotations})
        assert len(self.classes) == 200, 'Unexpected number of classes'
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}

        # 2. Load images to RAM (same as TinyImagenetRAM)
        self.images, self.targets = [], []
        for img_name, class_name in tqdm.tqdm(annotations, desc='Loading val images'):
            img_path = os.path.join(root, 'images', img_name)
            image = help_l.tiny_img_dataset.read_rgb_image(img_path)
            assert image.shape == (64, 64, 3), 'Image shape mismatch'
            self.images.append(Image.fromarray(image))  # store as PIL for transforms
            self.targets.append(self.class_to_idx[class_name])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.transform(self.images[idx])
        target = self.targets[idx]
        return img, target

In [11]:
val_dataset = TinyImagenetValDataset('tiny-imagenet-200/val', transform=transforms.ToTensor())

Loading val images: 100%|██████████| 10000/10000 [02:29<00:00, 66.94it/s] 


In [12]:
# Consistency checks
assert train_dataset.classes == val_dataset.classes, 'Class order mismatch'
assert train_dataset.class_to_idx == val_dataset.class_to_idx, 'Class‑index mapping differs'

In [13]:
batch_size = 64
train_batch_gen = torch.utils.data.DataLoader(train_dataset,
                                              batch_size=batch_size,
                                              shuffle=True,
                                              num_workers=12)

val_batch_gen = torch.utils.data.DataLoader(val_dataset,
                                            batch_size=batch_size,
                                            shuffle=False,
                                            num_workers=12)

In [14]:
class GlobalAveragePool(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, x):
        return torch.mean(x, dim=self.dim)


class ConvBNRelu(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding='same'):
        super().__init__()
        pad = kernel_size // 2 if padding == 'same' else padding
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad, bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        return self.relu(self.bn(self.conv(x)))

In [15]:
def create_vgg_like_network(config=None):
    default_config = [[16, 16], [32, 32], [64, 64], [128, 128]]
    config = config or default_config

    model = nn.Sequential()
    in_ch = 3
    for block_i, block in enumerate(config):
        for layer_i, out_ch in enumerate(block):
            model.add_module(
                f'conv_{block_i}_{layer_i}',
                ConvBNRelu(in_ch, out_ch, kernel_size=3, stride=1, padding='same'),
            )
            in_ch = out_ch
        if block_i != len(config) - 1:
            model.add_module(f'mp_{block_i}', nn.MaxPool2d(kernel_size=3, stride=2))

    model.add_module('gap', GlobalAveragePool((2, 3)))
    model.add_module('logits', nn.Linear(in_ch, 200))
    return model

In [16]:
class ResNetBlock2(nn.Module):
    """Simple 2‑layer residual block"""

    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding='same'):
        super().__init__()
        pad = kernel_size // 2 if padding == 'same' else padding
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size, 1, pad, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu2 = nn.ReLU(inplace=True)

        self.downsample = None
        if in_channels != out_channels or stride != 1:
            self.downsample = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False)

    def forward(self, x):
        identity = x
        out = self.relu1(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        if self.downsample is not None:
            identity = self.downsample(identity)
        out = self.relu2(out + identity)
        return out

In [17]:
def create_resnet_like_network():
    model = nn.Sequential()
    config = [[32, 32], [64, 64], [128, 128]]
    model.add_module('init', ConvBNRelu(3, 32, kernel_size=7, stride=2, padding=3))

    in_ch = 32
    for block_i, block in enumerate(config):
        for layer_i, out_ch in enumerate(block):
            stride = 2 if block_i != 0 and layer_i == 0 else 1
            model.add_module(
                f'res_{block_i}_{layer_i}',
                ResNetBlock2(in_ch, out_ch, kernel_size=3, stride=stride, padding='same'),
            )
            in_ch = out_ch
    model.add_module('gap', GlobalAveragePool((2, 3)))
    model.add_module('logits', nn.Linear(in_ch, 200))
    return model


def compute_loss(predictions, targets):
    return F.cross_entropy(predictions, targets).mean()


def eval_model(model, loader):
    model.eval()
    acc = []
    with torch.no_grad():
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            logits = model(X)
            preds = logits.argmax(1)
            acc.append((preds == y).float().mean().item())
    return np.mean(acc)

def train_model(model, optimizer, loader):
    model.train()
    losses = []
    for X, y in tqdm.tqdm(loader, leave=False):
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        logits = model(X)
        loss = compute_loss(logits, y)
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
    return np.mean(losses)


def train_loop(model, optimizer, train_loader, val_loader, num_epochs):
    for epoch in range(num_epochs):
        start = time.time()
        train_loss = train_model(model, optimizer, train_loader)
        val_acc = eval_model(model, val_loader)
        print(
            f"Epoch {epoch + 1}/{num_epochs} | time: {time.time() - start:.1f}s | "
            f"train loss: {train_loss:.4f} | val acc: {val_acc * 100:.2f}%"
        )

In [None]:
vgg_model = create_vgg_like_network().to(device)
vgg_opt = torch.optim.Adam(vgg_model.parameters())
train_loop(vgg_model, vgg_opt, train_batch_gen, val_batch_gen, num_epochs=30)

  0%|          | 0/1563 [00:00<?, ?it/s]

In [None]:
resnet_model = create_resnet_like_network().to(device)
resnet_opt = torch.optim.Adam(resnet_model.parameters())
train_loop(resnet_model, resnet_opt, train_batch_gen, val_batch_gen, num_epochs=30)

In [None]:
larger_cfg = [[16, 16], [32, 32, 32, 32], [64, 64, 64, 64], [128, 128, 128, 128]]
large_vgg_model = create_vgg_like_network(config=larger_cfg).to(device)
large_vgg_opt = torch.optim.Adam(large_vgg_model.parameters())
train_loop(large_vgg_model, large_vgg_opt, train_batch_gen, val_batch_gen, num_epochs=30)