# Эксперименты с CNN

## U-Net

![Детали тренировок CNN](Example_architecture_of_U-Net_for_producing_k_256-by-256_image_masks_for_a_256-by-256_RGB_image.png)

### Подготовка данных

Для разных CNN отличаются рекомендуемая предобработка. Так, скажем, для U-Net в статье **Exploring microstructureand petrophysical properties of microporous volcanic rocks through 3D multiscale and super‑resolution imaging**, Buono et al. (2023) говорится про следующую цепочку обработок: HR  -> Bicubic downsampling -> LR -> Bicubic upsampling -> LR того же разрешения, что и HR. Данный метод дает наивысшее качество среди всех CNN в испытаниях ($SSIM$ = 0.8, $PSNR$ = 28.58). В обучении других НС использовался внутренний архитектурный upsampling. Однако, при детальном рассмотрении выяснилось, что U-Net обучалась на 24300 патчах, в то время как остальные CNN - лишь на 3888:

![Детали тренировок CNN](buono_cnn_training.png)

Это означает, что у U-Net было изначально больше данных, чем у других моделей. В нашем исследовании будем использовать одинаковое количество training epochs и patches для всех архитектур.

Обучение baseline будет проходить на данных из shuffled2D.

**План обработки для U-Net**:
1. Перевод из 'RGB' в 'L' (серый);
2. Upscaling LR до размера HR;
3. Аугментации:
   * Нормализация: либо в диапазон [0, 1], либо по (mean, std). Возьмем по (mean, std). Обязательный этап;
   * `RandomHorizontalFlip`, `RandomVerticalFlip`: не нарушает статистику пор, но добавляет информации;
   * `RandomCrop`: полезно для экономии памяти;
   * `RandomGaussianBlur`: размывает изображение, имитирует настоящие ошибки при КТ;
4. Этап обучения, предсказание;
5. Обратная нормализация для интерпретации результатов.

Импорты библиотек:

In [1]:
import os
import re
from pathlib import Path
from typing import Tuple, List, Optional, Callable
import random
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import v2 as T
from torchvision.transforms.v2 import functional as F
from torchvision.transforms import InterpolationMode
from torchmetrics.image.ssim import StructuralSimilarityIndexMeasure
import torch
from pytorch_msssim import ssim
import numpy as np
import matplotlib.pyplot as plt
#import wandb
import math
from math import log10

In [2]:
import warnings
try:
    from pydantic._internal._generate_schema import UnsupportedFieldAttributeWarning
    warnings.filterwarnings("ignore", category=UnsupportedFieldAttributeWarning)
except Exception:
    from pydantic.warnings import PydanticUserWarning
    warnings.filterwarnings("ignore", category=PydanticUserWarning, module="pydantic._internal._generate_schema")

Зафиксируем один seed для воспроизводимости:

In [2]:
def seed_everything(seed: int = 42):
    torch.manual_seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    
seed_everything(42)

Мы хотим создать датасет из пар HR и LR. Для этого:
1. Обратимся к директориям с изображениями (_get_dirs);
2. Обрежем имена изображений из LR-директорий (_strip_lr_suffix. Сейчас они формата 0001x2 или 0001x4, приведем их к 0001.) Это нужно для сопоставления пар HR и LR по имени;
3. Создаем датасет (Shuffled2DPaired). На выходе получим словарь вида {'hr': ..., 'lr': ..., 'stem': x2 или x4}.

In [3]:
def _get_dirs(root: str, split: str, scale: str) -> Tuple[Path, Path]:
    root = Path(root)
    hr_dir = root / "shuffled2D" / f"shuffled2D_{split}_HR"
    lr_dir = root / "shuffled2D" / f"shuffled2D_{split}_LR_default_{scale}"
    if not lr_dir.exists():
        fb = root / "shuffled2D" / f"shuffled2D_{split}_LR_default_X2"
        if fb.exists():
            lr_dir = fb
    if not (hr_dir.exists() and lr_dir.exists()):
        raise FileNotFoundError(f"Не найдены HR/LR директории для split={split}, scale={scale}")
    return hr_dir, lr_dir
    
def _strip_lr_suffix(stem: str, scale: str) -> str:
    # scale: "X2"/"X4" → убрать ровно такой хвост; поддержим опционально '_' или '-'
    suf = scale.lower()
    if not suf.startswith('x'):
        suf = 'x' + suf
    return re.sub(fr'([_-]?){re.escape(suf)}$', '', stem, flags=re.IGNORECASE)

class Shuffled2DPaired(Dataset):
    def __init__(
        self,
        root: str,
        split: str = "train",
        scale: str = "X2",
        exts: Tuple[str, ...] = (".png", ".jpg", ".jpeg", ".tif", ".tiff"),
        transform_pair: Optional[Callable] = None,  # <- весь пайплайн тут
    ):
        self.hr_dir, self.lr_dir = _get_dirs(root, split, scale)
        self.exts = exts
        self.transform_pair = transform_pair

        hr_files = sorted([p for p in self.hr_dir.iterdir() if p.suffix.lower() in exts])
        if not hr_files:
            raise RuntimeError(f"Нет HR-файлов в {self.hr_dir}")
        hr_map = {p.stem: p for p in hr_files}

        lr_files = sorted([p for p in self.lr_dir.iterdir() if p.suffix.lower() in exts])
        pairs = []
        for p in lr_files:
            hr_stem = _strip_lr_suffix(p.stem, scale)
            hr = hr_map.get(hr_stem)
            if hr is not None:
                pairs.append((p, hr))
        if not pairs:
            raise RuntimeError("Не найдено пар LR↔HR по совпадающим именам файлов.")
        self.pairs = pairs

    def __len__(self):
        return len(self.pairs)

    def _open_raw(self, p: Path) -> Image.Image:
        with Image.open(p) as img:
            return img.copy()

    def __getitem__(self, idx: int):
        lr_path, hr_path = self.pairs[idx]
        lr = self._open_raw(lr_path)
        hr = self._open_raw(hr_path)

        if self.transform_pair is not None:
            lr, hr = self.transform_pair(lr, hr)

        return {"lr": lr, "hr": hr, "stem": lr_path.stem}

Единоразово считаем среднее и стандартное отклонение по всем shuffled HR, предварительно переведя в grayscale:

In [27]:
def mean_std_via_hist_from_ds(ds) -> Tuple[np.ndarray, np.ndarray]:
    """
    Считает mean/std для HR через 256-биновую гистограмму, предварительно конвертируя в 'L'.
    Результат в [0, 1].
    Требования:
      - у датасета есть поля: hr_dir (Path) и exts (кортеж расширений)
    """
    hr_dir: Path = ds.hr_dir
    exts = set(e.lower() for e in getattr(ds, "exts", (".png", ".jpg", ".jpeg", ".tif", ".tiff")))

    hist = np.zeros(256, dtype=np.int64)
    total_pixels = 0

    for p in hr_dir.iterdir():
        if p.suffix.lower() not in exts:
            continue
        # Конвертация в 'L' здесь гарантирует нужный формат (8-бит, один канал)
        with Image.open(p) as img:
            img = img.convert("L")
            arr = np.asarray(img, dtype=np.uint8)  # 0..255

        h = np.bincount(arr.ravel(), minlength=256)  # int64
        hist += h
        total_pixels += arr.size

    if total_pixels == 0:
        raise RuntimeError(f"В {hr_dir} нет подходящих HR-изображений (расширения: {sorted(exts)})")

    bins = np.arange(256, dtype=np.float64)
    mean  = (bins * hist).sum() / total_pixels
    mean2 = (bins**2 * hist).sum() / total_pixels
    std   = np.sqrt(max(0.0, mean2 - mean * mean))

    # в [0,1]
    return np.array([mean / 255.0]), np.array([std / 255.0])

In [9]:
root = Path("DeepRockSR-2D/shuffled2D")  # корень, где лежат папки shuffled2D_*
hr_dir = root / "shuffled2D_train_HR"

In [28]:
'''
ds_stats = Shuffled2DPaired(root, split="train", scale="X2",
                            patch_size=None, augment=False, grayscale=True)

mean_hr, std_hr = mean_std_via_hist_from_ds(ds_stats)
print("HR mean:", mean_hr)
print("HR std :", std_hr)
'''

HR mean: [0.45161797]
HR std : [0.20893379]


In [4]:
hr_mean = (0.45161797,)
hr_std = (0.20893379,)

Аугментации и трансформации в виде классов. Обычный v2.Compose не позволяет по умолчанию применять рандомизированные трасформации с одним и тем же исходом, что важно для парных датасетов, поэтому были построены кастомные классы для каждого преобразования:

In [5]:
class PairCompose:
    def __init__(self, transforms):
        self.transforms = transforms
    def __call__(self, lr, hr):
        for t in self.transforms:
            lr, hr = t(lr, hr)
        return lr, hr

class PairGrayscale:
    def __init__(self, num_output_channels: int = 1):
        self.t = T.Grayscale(num_output_channels)
    def __call__(self, lr, hr):
        return self.t(lr), self.t(hr)

class PairUpscaleLRtoHR:
    """Апскейлит LR до точного размера HR (bicubic)"""
    def __call__(self, lr, hr):
        if lr.size != hr.size:
            lr = F.resize(lr, size=hr.size[::-1], interpolation=InterpolationMode.BICUBIC, antialias=True)
        return lr, hr

class PairRandomCrop:
    """Согласованный кроп после апскейла: одна и та же box к LR_up и HR."""
    def __init__(self, patch_size: Optional[int]):
        self.ps = patch_size
    def __call__(self, lr, hr):
        if self.ps is None:
            return lr, hr
        h, w = hr.size[1], hr.size[0]
        th = tw = self.ps
        i, j, h, w = T.RandomCrop.get_params(hr, output_size=(th, tw))
        return F.crop(lr, i, j, h, w), F.crop(hr, i, j, h, w)

class PairFlips:
    """Согласованные флипы."""
    def __init__(self, p_flip=0.5, p_vflip=0.5):
        self.pf, self.pv = p_flip, p_vflip
    def __call__(self, lr, hr):
        if torch.rand(()) < self.pf:
            lr, hr = F.hflip(lr), F.hflip(hr)
        if torch.rand(()) < self.pv:
            lr, hr = F.vflip(lr), F.vflip(hr)
        return lr, hr

class PairToTensor01:
    """ToImage -> float32 [0,1] для обеих."""
    def __init__(self):
        self.to_img = T.ToImage()
        self.to_f32 = T.ToDtype(torch.float32, scale=True)
    def __call__(self, lr, hr):
        lr = self.to_f32(self.to_img(lr))
        hr = self.to_f32(self.to_img(hr))
        return lr, hr
        
class PairGaussianBlur:
    """
    Согласованный GaussianBlur: один и тот же sigma/случайность для LR и HR.
    Работает на тензорах после ToTensor/ToDtype(scale=True).
    """
    def __init__(self, kernel_size, sigma=(0.1, 2.0), p=0.5):
        if isinstance(kernel_size, int):
            # kernel должен быть нечётным
            if kernel_size % 2 == 0:
                raise ValueError("kernel_size должен быть нечётным")
        self.kernel_size = kernel_size
        self.sigma = sigma
        self.p = float(p)

    def __call__(self, lr, hr):
        # одна монетка на пару
        if torch.rand(()) >= self.p:
            return lr, hr
        # одна sigma на пару
        if isinstance(self.sigma, (tuple, list)):
            s_low, s_high = self.sigma
            sigma = float(torch.empty(1).uniform_(s_low, s_high))
        else:
            sigma = float(self.sigma)
        lr = F.gaussian_blur(lr, kernel_size=self.kernel_size, sigma=sigma)
        hr = F.gaussian_blur(hr, kernel_size=self.kernel_size, sigma=sigma)
        return lr, hr
        
class PairNormalize:
    """Применяет одинаковый Normalize к обоим тензорам (каналы те же, параметры детерминированы)."""
    def __init__(self, mean, std):
        self.norm = T.Normalize(mean=mean, std=std)
    def __call__(self, lr, hr):
        return self.norm(lr), self.norm(hr)
        
def build_pair_transform(
    patch_size: Optional[int],
    do_flips: bool = True,
    do_blur: bool = True,
    blur_kernel: int = 3,
    blur_sigma: tuple[float, float] = (0.1, 1.5),
    mean: tuple[float, ...] = (0.45161797,),
    std: tuple[float, ...]  = (0.20893379,)
) -> PairCompose:
    stages = [
        PairGrayscale(num_output_channels=1),   # 1) grayscale
        PairUpscaleLRtoHR(),                    # 2) upscale LR -> HR size
        PairRandomCrop(patch_size),             # 3) согласованный кроп
    ]
    if do_flips:
        stages.append(PairFlips())              # 4) согласованные флипы
    stages.append(PairToTensor01())             # 5) к тензорам [0,1]
    if do_blur:
        stages.append(PairGaussianBlur(kernel_size=blur_kernel, sigma=blur_sigma, p=0.5))  # блюр
    stages.append(PairNormalize(mean=mean, std=std)) # нормализация
    return PairCompose(stages)

Собираем трансформы:

In [6]:
# Комментариями описан порядок строго встроенных преобразований
train_tf = build_pair_transform(
    patch_size=256,
    # stages из функции,
    do_flips=True,
    # автоматически T.ToImage(),
    # автоматически T.ToDtype(torch.float32, scale=True),
    do_blur=True,
    # автоматически T.Normalize(mean=mean, std=std))
)

eval_test_tf = build_pair_transform(
    # те же обязательные stages, одинаковы для train, val, test
    patch_size=None, 
    do_flips=False,
    # автоматически T.ToImage(),
    # автоматически T.ToDtype(torch.float32, scale=True),
    do_blur=False,
    # автоматически T.Normalize(mean=mean, std=std))
)

Применяем:

In [7]:
train_ds_x2 = Shuffled2DPaired(root="DeepRockSR-2D", split="train", scale="X2", transform_pair=train_tf)
valid_ds_x2 = Shuffled2DPaired(root="DeepRockSR-2D", split="valid", scale="X2", transform_pair=eval_test_tf)
test_ds_x2  = Shuffled2DPaired(root="DeepRockSR-2D", split="test",  scale="X2", transform_pair=eval_test_tf)

train_ds_x4 = Shuffled2DPaired(root="DeepRockSR-2D", split="train", scale="X4", transform_pair=train_tf)
valid_ds_x4 = Shuffled2DPaired(root="DeepRockSR-2D", split="valid", scale="X4", transform_pair=eval_test_tf)
test_ds_x4  = Shuffled2DPaired(root="DeepRockSR-2D", split="test",  scale="X4", transform_pair=eval_test_tf)

sample = train_ds_x2[0]
# sample["lr"], sample["hr"] — тензоры [1,H,W] в [0,1]

In [8]:
sample

{'lr': Image([[[-1.4671, -1.4644, -1.4597,  ..., -1.4058, -1.4172, -1.4242],
         [-1.4671, -1.4634, -1.4570,  ..., -1.4020, -1.4144, -1.4222],
         [-1.4671, -1.4621, -1.4533,  ..., -1.4020, -1.4158, -1.4245],
         ...,
         [-1.8900, -1.8923, -1.8964,  ..., -1.4185, -1.4080, -1.4020],
         [-1.8850, -1.8887, -1.8951,  ..., -1.4159, -1.3970, -1.3870],
         [-1.8888, -1.8914, -1.8961,  ..., -1.4131, -1.3932, -1.3832]]], ),
 'hr': Image([[[-1.4576, -1.4551, -1.4556,  ..., -1.4116, -1.4292, -1.4390],
         [-1.4674, -1.4630, -1.4583,  ..., -1.4134, -1.4333, -1.4419],
         [-1.4823, -1.4745, -1.4597,  ..., -1.4207, -1.4411, -1.4468],
         ...,
         [-1.8973, -1.8964, -1.8923,  ..., -1.4130, -1.4043, -1.4037],
         [-1.8827, -1.8860, -1.8928,  ..., -1.4185, -1.4021, -1.3952],
         [-1.8800, -1.8850, -1.8961,  ..., -1.4231, -1.4034, -1.3932]]], ),
 'stem': '0001x2'}

In [8]:
train_loader_x2 = DataLoader(train_ds_x2, batch_size=16, shuffle=True,
                          num_workers=0, pin_memory=True, drop_last=True, persistent_workers=False)
valid_loader_x2 = DataLoader(valid_ds_x2, batch_size=4, shuffle=False,
                          num_workers=4, pin_memory=True)
test_loader_x2  = DataLoader(test_ds_x2,  batch_size=4, shuffle=False,
                          num_workers=4, pin_memory=True)

train_loader_x4 = DataLoader(train_ds_x4, batch_size=16, shuffle=True,
                          num_workers=4, pin_memory=True, drop_last=True)
valid_loader_x4 = DataLoader(valid_ds_x4, batch_size=4, shuffle=False,
                          num_workers=4, pin_memory=True)
test_loader_x4  = DataLoader(test_ds_x4,  batch_size=4, shuffle=False,
                          num_workers=4, pin_memory=True)

### Архитектура

Архитектура была вынесена в отдельный файл (см. `unet2d.py`)

#### Основные компоненты
1. Блок `ConvBNAct` - содержит последовательность: `Conv2D` -> `BatchNorm` (опц.) -> `ReLU` -> `Dropout` (опц.). Базовый конвейер для извлечения признаков при стабильном градиенте (BN) и нелинейности (ReLU).
2. Блок `DoubleConv` - повторение `ConvBNAct` два раза. Классическая связка U-Net: после конкатенации со скипом перерабатывает признаки и склеивает информацию из энкодера и декодера.
3. `Down`: `MaxPool2D`(2) -> `DoubleConv`. Уменьшает H×W в 2 раза и удваивает число каналов. Это энкодерный шаг: больше контекста, грубее пространственное разрешение.
4. `Up`: `Upsample` (два способа, см. `unet2d.py`) -> concat со skip из соответсутвующего энкодера -> `DoubleConv`. Обратное повышение разрешения и убавление количества каналов.
5. `OutConv`: `Conv2D`(kernel_size=1) до нужного out_ch (в нашем случае 1).

#### Конфиг для UNetConfig
1. `in_channels` - входные каналы (1 для greyscale);
2. `out_channels` - число классов или каналов цели;
3. `base_channels` - ширина сети на первом уровне (обычно 32/64);
4. `depth` - число понижающих шагов (глубина энкодера);
5. `bilinear` — тип апсемплинга (True: `Upsample` + `Conv2D`, False: `TransposedConv2D`);
6. `norm` / `dropout` — включение BN и Dropout.

#### Пример пайплайна преобразований данных
Пусть `base_channels=64` и `depth=4`:

**Энкодер** (Down path):
1. `inc`: in -> 64 (DoubleConv);
2. `down1`: 64 -> 128 (Pool×2, DoubleConv);
3. `down2`: 128 -> 256;
4. `down3`: 256 -> 512;
5. `down4`: 512 -> 1024 <- bottleneck;

**Декодер** (Up path):

Каждый up-шаг: upsample текущего тензора, concat со skip connection из энкодера того же уровня, прогоняем через DoubleConv, при этом число каналов уменьшается в 2 раза:
1. `up1`: (1024 ↑) ⊕ (512) → 512;
2. `up2`: (512 ↑) ⊕ (256) → 256;
3. `up3`: (256 ↑) ⊕ (128) → 128;
4. `up4`: (128 ↑) ⊕ (64) → 64;
5. `outc`: 64 → out_channels.

#### Форматы данных после преобразований

Для примера: вход `x` формы `[B, 1, 256, 256]`, `out_channels=1`.

**Энкодер**:

1. `x1 = inc(x)` -> `[B, 64, 256, 256]` (сохраняем в skips);
2. `x2 = down1(x1)` -> `[B, 128, 128, 128]` (skips);
3. `x3 = down2(x2)` -> `[B, 256, 64, 64]` (skips);
4. `x4 = down3(x3)` -> `[B, 512, 32, 32]` (skips);
5. `x5 = down4(x4)` -> `[B, 1024, 16, 16]` (дно).

**Декодер**:
1. `u1 = up1(x5, x4)` -> `[B, 512, 32, 32]`;
2. `u2 = up2(u1, x3)` -> `[B, 256, 64, 64]`;
3. `u3 = up3(u2, x2)` -> `[B, 128, 128, 128]`;
4. `u4 = up4(u3, x1)` -> `[B, 64, 256, 256]`;
5. `logits = outc(u4)` -> `[B, 1, 256, 256]`.

#### Инициализация

In [14]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [15]:
from unet2d import UNet2D, UNetConfig

cfg = UNetConfig(
    in_channels=1,
    out_channels=1,
    base_channels=64,
    depth=4,
    bilinear=True,
    norm=True,
    dropout=0.0,
)
model = UNet2D(cfg)
sum(p.numel() for p in model.parameters())

28599361

In [37]:
wandb.login(key="e178f2b1ee4008c4903f56c9a600498f420802cc")

wandb: Appending key for api.wandb.ai to your netrc file: C:\Users\Вячеслав\_netrc
wandb: Currently logged in as: vyacheslav-timofeev (vyacheslav-timofeev-tomsk-polytechnic-university) to https://api.wandb.ai. Use `wandb login --relogin` to force relogin


True

In [None]:
run = wandb.init(
    project="unet2d",
    name="baseline",
    config={
        "epochs": 150,
        "batch_size": 16,
        "lr": 2e-4,
        "weight_decay": 1e-4,
        "optimizer": "AdamW",
        "stages": 4,
        "activation": "ReLU",
        "scheduler": "OneCycleLR",
    },
)

In [16]:
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4, weight_decay=1e-4)

In [17]:
EPOCHS = 150
steps_per_epoch = len(train_loader_x2)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=2e-4, epochs=EPOCHS, steps_per_epoch=steps_per_epoch)

In [18]:
steps_per_epoch

600

In [9]:
import time, torch
from torch.utils.data import DataLoader

def time_loader(dl, n_batches=5):
    t0 = time.time()
    for i, batch in enumerate(dl):
        if i==0:
            print(f"⏱ first batch: {time.time()-t0:.2f}s")
        if i+1 >= n_batches:
            break
    print(f"⏱ {n_batches} batches: {time.time()-t0:.2f}s")

In [10]:
time_loader(train_loader_x2, n_batches=5)

⏱ first batch: 0.20s
⏱ 5 batches: 0.61s


In [16]:
def ssim_val(pred, target):
    # ожидаем [0,1]; возвращает скаляр в [0,1]
    return ssim(pred, target, data_range=1.0, size_average=True)

def psnr_val(pred, target, eps=1e-8):
    # ожидаем [0,1]
    mse = F.mse_loss(pred, target, reduction="mean")
    return 10.0 * torch.log10(1.0 / (mse + eps))

# ---- Комбинированный лосс: L1 + λ(1-SSIM)
def sr_loss(pred, target, lambda_ssim=0.1):
    pred_01   = pred.clamp(0, 1)
    target_01 = target.clamp(0, 1)
    l1 = F.l1_loss(pred_01, target_01)
    
    with torch.cuda.amp.autocast(enabled=False):  # SSIM более стабилен в float32
        ssim_score = ssim_val(pred_01.float(), target_01.float())
        
    return l1 + lambda_ssim * (1.0 - ssim_score), {'l1': l1.detach(), 'ssim': ssim_score.detach()}

def train_one_epoch(model, loader, optimizer, scheduler, device, epoch, amp=True, lambda_ssim=0.1, step_scheduler_in_batch=True):
    model.train()
    scaler = torch.cuda.amp.GradScaler(enabled=(amp and (device == "cuda" or "cuda" in str(device))))
    total_loss = total_psnr = total_ssim = 0.0
    total_samples = 0

    for step, (lr_img, hr_img) in enumerate(loader, 1):
        lr_img = lr_img.to(device, non_blocking=True)
        hr_img = hr_img.to(device, non_blocking=True)
        optimizer.zero_grad(set_to_none=True)

        with torch.cuda.amp.autocast(device_type="cuda", enabled=(amp and (device == "cuda" or "cuda" in str(device)))):
            pred = model(lr_img)
            loss, parts = sr_loss(pred, hr_img, lambda_ssim=lambda_ssim)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        if step_scheduler_in_batch and scheduler is not None:
            scheduler.step()

        # метрики (в float32, [0,1])
        with torch.no_grad():
            pred_01   = pred.detach().clamp(0, 1)
            hr_01     = hr_img.detach().clamp(0, 1)
            psnr = psnr_val(pred_01, hr_01)
            ssim_score = parts['ssim']

        bsz = lr_img.size(0)
        total_samples += bsz
        total_loss += loss.item() * bsz
        total_psnr += psnr.item() * bsz
        total_ssim += ssim_score.item() * bsz

    epoch_loss = total_loss / total_samples
    epoch_psnr = total_psnr / total_samples
    epoch_ssim = total_ssim / total_samples
    return {'loss': epoch_loss, 'psnr': epoch_psnr, 'ssim': epoch_ssim}

@torch.no_grad()
def evaluate(model, loader, device, amp=False, lambda_ssim=0.1):
    model.eval()
    total_loss = total_psnr = total_ssim = 0.0
    total_samples = 0

    for lr_img, hr_img in loader:
        lr_img = lr_img.to(device, non_blocking=True)
        hr_img = hr_img.to(device, non_blocking=True)

        with torch.cuda.amp.autocast(device_type="cuda", enabled=(amp and (device == "cuda" or "cuda" in str(device)))):
            pred = model(lr_img)
            loss, parts = sr_loss(pred, hr_img, lambda_ssim=lambda_ssim)

        pred_01   = pred.clamp(0, 1)
        hr_01     = hr_img.clamp(0, 1)
        psnr = psnr_val(pred_01, hr_01)
        ssim_score = parts['ssim']

        bsz = lr_img.size(0)
        total_samples += bsz
        total_loss += loss.item() * bsz
        total_psnr += psnr.item() * bsz
        total_ssim += ssim_score.item() * bsz

    val_loss = total_loss / total_samples
    val_psnr = total_psnr / total_samples
    val_ssim = total_ssim / total_samples
    return {'loss': val_loss, 'psnr': val_psnr, 'ssim': val_ssim}

In [17]:
class EarlyStopping:
    """
    Ранняя остановка по метрике.
    mode='max' — метрика должна расти (PSNR/SSIM), 'min' — падать (loss).
    Если улучшение <= min_delta — не считаем за улучшение.
    """
    def __init__(self, patience=20, min_delta=0.0, mode='max', restore_best=True):
        assert mode in ('max', 'min')
        self.patience = patience
        self.min_delta = min_delta
        self.mode = mode
        self.restore_best = restore_best

        self.best_value = -math.inf if mode == 'max' else math.inf
        self.best_state = None
        self.best_epoch = 0
        self.num_bad_epochs = 0

    def _is_better(self, value):
        if self.mode == 'max':
            return value > (self.best_value + self.min_delta)
        else:
            return value < (self.best_value - self.min_delta)

    def step(self, value, model, epoch):
        """
        Возвращает True, если нужно ОСТАНОВИТЬСЯ.
        """
        if self._is_better(value):
            self.best_value = value
            self.best_epoch = epoch
            self.num_bad_epochs = 0
            if self.restore_best:
                # копируем лучшие веса в память (чтобы не читать файл)
                self.best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
            return False
        else:
            self.num_bad_epochs += 1
            return self.num_bad_epochs > self.patience

    def restore(self, model):
        if self.restore_best and self.best_state is not None:
            model.load_state_dict(self.best_state)

In [None]:
EPOCHS = 200
best_psnr = -math.inf
best_path = "best_sr_unet.pt"

STEP_SCHED_IN_BATCH = isinstance(scheduler, torch.optim.lr_scheduler.OneCycleLR)

early = EarlyStopping(
    patience=15,     # попробуйте 10–30
    min_delta=0.05,  # 0.05 dB — игнорируем микро-флуктуации PSNR
    mode='max',
    restore_best=True
)

for epoch in range(1, EPOCHS + 1):
    train_stats = train_one_epoch(
        model, train_loader_x2, optimizer, scheduler if STEP_SCHED_IN_BATCH else None,
        device, epoch=epoch, amp=True, lambda_ssim=0.1, step_scheduler_in_batch=STEP_SCHED_IN_BATCH
    )
    val_stats = evaluate(model, valid_loader_x2, device, amp=False, lambda_ssim=0.1)

    if not STEP_SCHED_IN_BATCH and scheduler is not None:
        scheduler.step()

    # сохранение лучшего чекпойнта по PSNR (или поменяйте на SSIM)
    if val_stats['psnr'] > best_psnr + 1e-9:
        best_psnr = val_stats['psnr']
        torch.save(
            {
                "state_dict": model.state_dict(),
                "best_psnr": best_psnr,
                "best_ssim": val_stats['ssim'],
                "epoch": epoch,
                "cfg": getattr(model, "cfg", None),
            },
            best_path,
        )

    print(
        f"Epoch {epoch:03d}/{EPOCHS} | "
        f"train: loss {train_stats['loss']:.4f}, PSNR {train_stats['psnr']:.2f}, SSIM {train_stats['ssim']:.4f} | "
        f"valid: loss {val_stats['loss']:.4f}, PSNR {val_stats['psnr']:.2f}, SSIM {val_stats['ssim']:.4f} | "
        f"best PSNR: {best_psnr:.2f}"
    )

    # шаг ранней остановки по PSNR
    should_stop = early.step(val_stats['psnr'], model, epoch)
    if should_stop:
        print(f"Early stopping: no improvement in {early.patience} epochs "
              f"(best PSNR {early.best_value:.2f} at epoch {early.best_epoch}).")
        break

# восстановим лучшие веса из памяти (если не грузите из файла)
early.restore(model)
print(f"Best PSNR: {early.best_value:.2f} dB @ epoch {early.best_epoch} (also saved to {best_path})")

  scaler = torch.cuda.amp.GradScaler(enabled=(amp and (device == "cuda" or "cuda" in str(device))))
