Установка зависимостей

In [None]:
# PyTorch, Lightning, timm, torchvision, shap, grad-cam, scikit-learn, pandas, matplotlib
!pip install torch torchvision timm pytorch-lightning pytorch-gradcam shap scikit-learn pandas matplotlib grad-cam

Импорты и директории

In [None]:
import os, time
import pandas as pd
import numpy as np
from PIL import Image
import torch, torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as T
import pytorch_lightning as pl
import timm
from sklearn.svm import SVR
import matplotlib.pyplot as plt
from pytorch_lightning.callbacks import EarlyStopping
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import mean_absolute_error, mean_squared_error, accuracy_score

# папки для моделей и результатов
os.makedirs('models', exist_ok=True)
os.makedirs('results', exist_ok=True)

Конфигурация

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/My Drive/Diplom

In [None]:
# === Настраиваемый путь к CSV и изображениям===
# Файл должен содержать колонку 'new_filename' и 13 признаков (от 0.0 до 10.0)
CSV_PATH = 'CoRE_Dataset/unified_dataset.csv'
IMG_DIR = 'CoRE_Dataset/images/'

# === Гиперпараметры ===
BATCH_SIZE = 32
IMG_SIZE = 224
LR = 1e-4
MAX_EPOCHS = 30

# === Опции балансировки ===
loss_type = "class_balanced"            # или "mse"
loss_weighting_strategy = "gradnorm"    # или None
use_class_aware_augmentation = True
use_class_aware_sampler = True
use_oversampling = True
oversample_factor = 2.0
rare_class_threshold = 0.05

print(f'🔄 CSV путь: {CSV_PATH}')

In [None]:
# === Расширенная конфигурация ===
TRAIN_MODELS = True
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Используем устройство: {device}")

DataModule

In [None]:
class CompositionDataset(Dataset):
    """
    PyTorch Dataset для изображений и их 13 композиционных оценок.
    Ожидается DataFrame с колонками: 'new_filename' + 13 float-оценок.
    """
    def __init__(self, df: pd.DataFrame, transform=None):
        self.df = df.reset_index(drop=True)
        self.image_paths = df['new_filename'].values
        self.targets = df.iloc[:, 1:].astype(np.float32).values
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        try:
            image = Image.open(img_path).convert("RGB")
        except (FileNotFoundError, OSError) as e:
            print(f"⚠️ Пропущен файл: {img_path} — {e}")
            # возвращаем случайное корректное изображение вместо сбоя
            return self.__getitem__((idx + 1) % len(self))

        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(self.targets[idx])


class CompositionDataModule(pl.LightningDataModule):
    """
    PyTorch Lightning DataModule для подготовки train/val/test с поддержкой:
    - class-aware аугментаций
    - class-aware sampler
    - oversampling по редким правилам
    """
    def __init__(self, csv_path: str, img_size: int, batch_size: int):
        super().__init__()
        self.csv_path = csv_path
        self.img_size = img_size
        self.batch_size = batch_size

    def setup(self, stage=None):
        df = pd.read_csv(self.csv_path)

        df['new_filename'] = df['new_filename'].apply(lambda x: os.path.join(IMG_DIR, x))

        # Оставляем только 'new_filename' + 13 оценок
        rule_columns = [
            "center", "curved", "diagonal", "fill_the_frame", "pattern",
            "rule_of_thirds", "symmetric", "triangle", "vanishing_point",
            "golden_ratio", "horizontal", "radial", "vertical"
        ]
        df = df[["new_filename"] + rule_columns]
        print(f"✅ Загружено {len(df)} изображений с 13 правилами композиции.")

        # Три раздела: train/val/test (60/20/20)
        df = df.sample(frac=1.0, random_state=42).reset_index(drop=True)
        N = len(df)
        n_train = int(0.6 * N)
        n_val = int(0.2 * N)
        self.df_train = df[:n_train]
        self.df_val = df[n_train:n_train + n_val]
        self.df_test = df[n_train + n_val:]
        print(f"📊 Split: train={len(self.df_train)}, val={len(self.df_val)}, test={len(self.df_test)}")

        # Transforms
        train_tf = [T.RandomResizedCrop(self.img_size)]
        if use_class_aware_augmentation:
            train_tf += [T.ColorJitter(0.4, 0.4, 0.4, 0.1)]
        train_tf += [T.RandomHorizontalFlip(), T.ToTensor()]
        self.tf_train = T.Compose(train_tf)
        self.tf_val = T.Compose([
            T.Resize(self.img_size),
            T.CenterCrop(self.img_size),
            T.ToTensor()
        ])

        self.ds_train = CompositionDataset(self.df_train, transform=self.tf_train)
        self.ds_val   = CompositionDataset(self.df_val, transform=self.tf_val)
        self.ds_test  = CompositionDataset(self.df_test, transform=self.tf_val)

        # Class-aware oversampling по редким правилам
        if use_oversampling:
            binary_labels = (self.df_train.iloc[:, 1:] > 0).astype(int)
            freq = binary_labels.mean()
            rare = freq < rare_class_threshold
            weights = []
            for row in binary_labels.values:
                w = 1.0
                if any(row[rare.values]):
                    w *= oversample_factor
                weights.append(w)
            self.train_sampler = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)
            print(f"🔁 Используется oversampling с rare_class_threshold={rare_class_threshold}")
        else:
            self.train_sampler = None

    def train_dataloader(self):
        return DataLoader(self.ds_train, batch_size=self.batch_size,
                          shuffle=self.train_sampler is None,
                          sampler=self.train_sampler, num_workers=2)

    def val_dataloader(self):
        return DataLoader(self.ds_val, batch_size=self.batch_size, shuffle=False, num_workers=2)

    def test_dataloader(self):
        return DataLoader(self.ds_test, batch_size=self.batch_size, shuffle=False, num_workers=2)

Общий LightningModule

In [None]:
class CompositionLitModel(pl.LightningModule):
    def __init__(self, backbone_name, lr, num_tasks=13, task_type='regression',
                 loss_type='default', loss_weighting_strategy=None):
        """
        task_type: 'regression', 'classification', 'both'
        loss_type: 'default' or 'class_balanced'
        loss_weighting_strategy: None or 'gradnorm'
        """
        super().__init__()
        self.save_hyperparameters()

        self.task_type = task_type
        self.lr = lr
        self.loss_type = loss_type
        self.loss_weighting_strategy = loss_weighting_strategy
        self.num_tasks = num_tasks

        # Backbone
        self.backbone = timm.create_model(backbone_name, pretrained=True, num_classes=0, global_pool='avg')
        feat_dim = self.backbone.num_features

        # Heads
        if task_type in ['regression', 'both']:
            # Выход с сигмоидой: всегда в [0, 1] -> *10 => [0, 10]
            self.reg_heads = nn.ModuleList([
                nn.Sequential(
                    nn.Linear(feat_dim, 1),
                    nn.Sigmoid()  # Строго [0, 1]
                ) for _ in range(num_tasks)
            ])
            self.mse = nn.MSELoss()
        else:
            self.reg_heads = None

        if task_type in ['classification', 'both']:
            self.cls_heads = nn.ModuleList([
                nn.Linear(feat_dim, 1) for _ in range(num_tasks)
            ])
            self.bce = nn.BCEWithLogitsLoss()
            if loss_type == 'class_balanced':
                self.register_buffer("class_weights", torch.ones(num_tasks))
        else:
            self.cls_heads = None

    def forward(self, x):
        f = self.backbone(x)
        out = {}

        if self.task_type in ['regression', 'both']:
            reg_outs = [head(f).squeeze(1) * 10 for head in self.reg_heads]  # [0, 10]
            out['regression'] = torch.stack(reg_outs, dim=1)

        if self.task_type in ['classification', 'both']:
            cls_outs = [head(f).squeeze(1) for head in self.cls_heads]
            out['classification'] = torch.stack(cls_outs, dim=1)  # logits

        return out

    def _compute_loss(self, preds, targets):
        reg_loss = torch.tensor(0.0, device=self.device)
        cls_loss = torch.tensor(0.0, device=self.device)

        if self.task_type in ['regression', 'both']:
            reg_loss = self.mse(preds['regression'], targets)

        if self.task_type in ['classification', 'both']:
            cls_targets = (targets > 0).float()
            if self.loss_type == 'class_balanced':
                weights = self.class_weights.unsqueeze(0)  # [1, num_tasks]
                cls_loss = F.binary_cross_entropy_with_logits(preds['classification'], cls_targets, weight=weights)
            else:
                cls_loss = self.bce(preds['classification'], cls_targets)

        if self.loss_weighting_strategy == 'gradnorm':
            # Пример gradnorm: (для сложных задач можно заменить на свой)
            try:
                reg_grad = torch.autograd.grad(reg_loss, self.reg_heads.parameters(), retain_graph=True, allow_unused=True)
                reg_norm = sum([g.norm() for g in reg_grad if g is not None]) + 1e-8
            except:
                reg_norm = torch.tensor(1.0, device=self.device)
            try:
                cls_grad = torch.autograd.grad(cls_loss, self.cls_heads.parameters(), retain_graph=True, allow_unused=True)
                cls_norm = sum([g.norm() for g in cls_grad if g is not None]) + 1e-8
            except:
                cls_norm = torch.tensor(1.0, device=self.device)
            total = reg_norm + cls_norm
            loss = (reg_loss * cls_norm + cls_loss * reg_norm) / total
        else:
            loss = reg_loss + cls_loss

        return loss

    def training_step(self, batch, batch_idx):
        x, y = batch
        preds = self(x)
        loss = self._compute_loss(preds, y)
        self.log('train_loss', loss, on_step=False, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        preds = self(x)
        loss = self._compute_loss(preds, y)
        self.log('val_loss', loss, prog_bar=True)

        # Cохраняем для метрик после эпохи
        if not hasattr(self, 'val_outputs'):
            self.val_outputs = []
        # detach, чтобы не тащить граф
        preds_detached = {k: v.detach().cpu() for k, v in preds.items()}
        self.val_outputs.append({'preds': preds_detached, 'target': y.detach().cpu()})

    def on_validation_epoch_end(self):
        if not hasattr(self, 'val_outputs') or len(self.val_outputs) == 0:
            return
        preds_all = {k: self.val_outputs[0]['preds'][k] for k in self.val_outputs[0]['preds']}
        for o in self.val_outputs[1:]:
            for k in preds_all:
                preds_all[k] = torch.cat([preds_all[k], o['preds'][k]], dim=0)
        targets = torch.cat([o['target'] for o in self.val_outputs], dim=0)

        if 'regression' in preds_all:
            mae = torch.mean(torch.abs(preds_all['regression'] - targets))
            self.log('val_mae', mae, prog_bar=True)

        if 'classification' in preds_all:
            preds_bin = (torch.sigmoid(preds_all['classification']) > 0.5).float()
            true_bin = (targets > 0).float()
            acc = (preds_bin == true_bin).float().mean()
            self.log('val_cls_acc', acc, prog_bar=True)

        self.val_outputs.clear()

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

    def set_class_weights(self, y_bin):
        """Установить веса классов по бинарной матрице (0 или 1)"""
        weights = []
        for i in range(y_bin.shape[1]):
            try:
                w = compute_class_weight(class_weight='balanced', classes=[0,1], y=y_bin[:, i])
                weights.append(torch.tensor(w[1], dtype=torch.float32))
            except:
                weights.append(torch.tensor(1.0))
        self.class_weights = torch.stack(weights).to(self.device)

Pipeline

In [None]:
early_stop = EarlyStopping(
    monitor='val_mae',    # метрика, которую отслеживаем
    mode='min',            # минимизация ошибки
    patience=3,            # сколько эпох ждать без улучшений
    verbose=True
)

if TRAIN_MODELS and not os.path.exists('models/resnet50.ckpt'):
    dm = CompositionDataModule(CSV_PATH, IMG_SIZE, BATCH_SIZE)
    model = CompositionLitModel('resnet50', lr=LR, task_type='both')
    if loss_type == 'class_balanced':
        dm.setup()
        y_bin = (dm.df_train.iloc[:, 1:] > 0).astype(int).values
        model.set_class_weights(y_bin)
    trainer = pl.Trainer(
        max_epochs=MAX_EPOCHS,
        default_root_dir='.',
        devices=1,
        accelerator=device,
        callbacks=[early_stop])
    trainer.fit(model, dm)
    trainer.save_checkpoint('models/resnet50.ckpt')
else:
    print("✅ Модель уже обучена: models/resnet50.ckpt")

In [None]:
early_stop = EarlyStopping(
    monitor='val_mae',    # метрика, которую отслеживаем
    mode='min',            # минимизация ошибки
    patience=3,            # сколько эпох ждать без улучшений
    verbose=True
)

if TRAIN_MODELS and not os.path.exists('models/efficientnet_b3.ckpt'):
    dm = CompositionDataModule(CSV_PATH, IMG_SIZE, BATCH_SIZE)
    model = CompositionLitModel('tf_efficientnet_b3_ns', lr=LR, task_type='both')
    if loss_type == 'class_balanced':
        dm.setup()
        y_bin = (dm.df_train.iloc[:, 1:] > 0).astype(int).values
        model.set_class_weights(y_bin)
    trainer = pl.Trainer(
        max_epochs=MAX_EPOCHS,
        default_root_dir='.',
        devices=1,
        accelerator=device,
        callbacks=[early_stop])
    trainer.fit(model, dm)
    trainer.save_checkpoint('models/efficientnet_b3.ckpt')
else:
    print("✅ Модель уже обучена: models/efficientnet_b3.ckpt")

In [None]:
early_stop = EarlyStopping(
    monitor='val_mae',    # метрика, которую отслеживаем
    mode='min',            # минимизация ошибки
    patience=3,            # сколько эпох ждать без улучшений
    verbose=True
)

if TRAIN_MODELS and not os.path.exists('models/vit_b16.ckpt'):
    dm = CompositionDataModule(CSV_PATH, IMG_SIZE, BATCH_SIZE)
    model = CompositionLitModel('vit_base_patch16_224', lr=LR, task_type='both')
    if loss_type == 'class_balanced':
        dm.setup()
        y_bin = (dm.df_train.iloc[:, 1:] > 0).astype(int).values
        model.set_class_weights(y_bin)
    trainer = pl.Trainer(
        max_epochs=MAX_EPOCHS,
        default_root_dir='.',
        devices=1,
        accelerator=device,
        callbacks=[early_stop])
    trainer.fit(model, dm)
    trainer.save_checkpoint('models/vit_b16.ckpt')
else:
    print("✅ Модель уже обучена: models/vit_b16.ckpt")

In [None]:
early_stop = EarlyStopping(
    monitor='val_mae',    # метрика, которую отслеживаем
    mode='min',            # минимизация ошибки
    patience=3,            # сколько эпох ждать без улучшений
    verbose=True
)

if TRAIN_MODELS and not os.path.exists('models/clip_vit_b32.ckpt'):
    dm = CompositionDataModule(CSV_PATH, IMG_SIZE, BATCH_SIZE)
    model = CompositionLitModel('vit_base_patch16_clip_224.openai', lr=LR, task_type='both')
    if loss_type == 'class_balanced':
        dm.setup()
        y_bin = (dm.df_train.iloc[:, 1:] > 0).astype(int).values
        model.set_class_weights(y_bin)
    trainer = pl.Trainer(
        max_epochs=MAX_EPOCHS,
        default_root_dir='.',
        devices=1,
        accelerator=device,
        callbacks=[early_stop])
    trainer.fit(model, dm)
    trainer.save_checkpoint('models/clip_vit_b32.ckpt')
else:
    print("✅ Модель уже обучена: models/clip_vit_b32.ckpt")

Ансамбль предсказаний

In [None]:
# === 1. Пути к чекпойнтам моделей ===
ckpts = {
    'vitb16':    'models/vit_b16.ckpt',
    'efnb3':     'models/efficientnet_b3.ckpt',
    'resnet50':  'models/resnet50.ckpt'
}

# === 2. Проверка наличия всех моделей ===
if all(os.path.exists(path) for path in ckpts.values()):
    print("✅ Все модели для ансамбля найдены. Загружаем...")

    # Загрузка моделей
    models = {
        name: CompositionLitModel.load_from_checkpoint(path).eval().to(device)
        for name, path in ckpts.items()
    }

    # === 3. Инференс ансамбля ===
    @torch.no_grad()
    def ensemble_predict(dl):
        all_preds, all_targets = [], []
        for x, y in dl:
            x = x.to(device)
            preds_list = [model(x)['regression'] for model in models.values()]
            ensemble = torch.stack(preds_list).mean(dim=0)
            all_preds.append(ensemble.cpu())
            all_targets.append(y)
        return torch.cat(all_preds), torch.cat(all_targets)

    # Получение данных
    dm = CompositionDataModule(CSV_PATH, IMG_SIZE, BATCH_SIZE)
    dm.setup()
    val_dl = dm.val_dataloader()
    ens_preds, targets = ensemble_predict(val_dl)

    # === 4. Метрики ===
    mae_mean = mean_absolute_error(targets.numpy(), ens_preds.numpy())
    mae_per_rule = torch.mean(torch.abs(ens_preds - targets), dim=0).numpy()

    print(f"📊 MAE ансамбля (среднее): {mae_mean:.4f}")
    for i, mae in enumerate(mae_per_rule):
        print(f"Rule {i+1:02d}: MAE = {mae:.4f}")

    # === 5. Добавление в таблицу метрик ===
    if "metrics_df" not in locals():
        metrics_df = pd.DataFrame(columns=["model", "val_mae", "val_cls_acc"])
    metrics_df.loc[len(metrics_df)] = ['ensemble_vit_effnet_resnet', mae_mean, np.nan]

else:
    print("⚠️ Не все модели найдены. Пропуск ансамбля:")
    for name, path in ckpts.items():
        if not os.path.exists(path):
            print(f"  ⛔ отсутствует: {path}")

SVM на фичах

In [None]:
from sklearn.svm import SVR
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
import numpy as np
import os
import joblib
import torch

SVR_MODEL_DIR = 'models/svr_vitb16'
os.makedirs(SVR_MODEL_DIR, exist_ok=True)

# === 1. Извлечение фичей из ViT-B/16 ===
feature_extractor = models['vitb16'].backbone
feature_extractor.eval().to(device)

X_train, Y_train = [], []
for x, y in dm.train_dataloader():
    with torch.no_grad():
        feats = feature_extractor(x.to(device))
    X_train.append(feats.cpu().numpy())
    Y_train.append(y.numpy())

X_train = np.vstack(X_train)
Y_train = np.vstack(Y_train)

# === 2. Обучение или загрузка моделей SVR ===
svm_models = []
retrained = False

if TRAIN_MODELS and not all(os.path.exists(f'{SVR_MODEL_DIR}/rule_{i+1:02d}.joblib') for i in range(13)):
    print("🚀 Обучение моделей SVR...")

    for i in range(13):
        # Разделим на подвалидацию для упрощённого варианта "early stopping"
        X_subtrain, X_subval, y_subtrain, y_subval = train_test_split(
            X_train, Y_train[:, i], test_size=0.2, random_state=42
        )

        best_mae = float('inf')
        best_model = None

        for c_val in [0.1, 1.0, 10.0]:  # аналог подбора по метрике
            model = SVR(kernel='rbf', C=c_val, epsilon=0.1)
            model.fit(X_subtrain, y_subtrain)
            pred_val = model.predict(X_subval)
            mae = mean_absolute_error(y_subval, pred_val)
            if mae < best_mae:
                best_mae = mae
                best_model = model

        joblib.dump(best_model, f'{SVR_MODEL_DIR}/rule_{i+1:02d}.joblib')
        svm_models.append(best_model)
        print(f"Rule {i+1:02d} — best C={best_model.C}, val MAE={best_mae:.4f}")

    retrained = True
else:
    print("✅ Модели SVR уже обучены. Загрузка из файлов...")
    for i in range(13):
        model = joblib.load(f'{SVR_MODEL_DIR}/rule_{i+1:02d}.joblib')
        svm_models.append(model)

# === 3. Предсказания на валидации ===
X_val, Y_val = [], []
for x, y in dm.val_dataloader():
    with torch.no_grad():
        feats = feature_extractor(x.to(device))
    X_val.append(feats.cpu().numpy())
    Y_val.append(y.numpy())

X_val = np.vstack(X_val)
Y_val = np.vstack(Y_val)

# === 4. Предсказания и метрики ===
preds_val = np.stack([svm.predict(X_val) for svm in svm_models], axis=1)
mae_svm = mean_absolute_error(Y_val, preds_val)
mae_per_rule = np.mean(np.abs(preds_val - Y_val), axis=0)

print(f"📊 SVR (ViT) MAE (среднее): {mae_svm:.4f}")
for i, mae in enumerate(mae_per_rule):
    print(f"Rule {i+1:02d}: MAE = {mae:.4f}")

# === 5. Добавление в общую таблицу метрик ===
if "metrics_df" not in locals():
    metrics_df = pd.DataFrame(columns=["model", "val_mae", "val_cls_acc"])

metrics_df.loc[len(metrics_df)] = ['svm_vitb16_features', mae_svm, None]

Унификация предсказаний

In [None]:
@torch.no_grad()
def predict_model(model, dataloader, device='cuda'):
    preds_reg, preds_cls, targets = [], [], []
    is_torch_model = isinstance(model, torch.nn.Module)
    if is_torch_model:
        model.eval().to(device)
    for x, y in dataloader:
        if is_torch_model:
            x = x.to(device)
            y = y.to(device)
        out = model(x)
        targets.append(y.cpu() if is_torch_model else y)
        if 'regression' in out:
            preds_reg.append(out['regression'].detach().cpu() if is_torch_model else out['regression'])
        if 'classification' in out:
            preds_cls.append(torch.sigmoid(out['classification'].detach().cpu()) if is_torch_model else out['classification'])
    result = {}
    if preds_reg:
        result['regression'] = torch.cat(preds_reg, dim=0)
    if preds_cls:
        result['classification'] = torch.cat(preds_cls, dim=0)
    result['target'] = torch.cat(targets, dim=0)
    return result

Сбор и логирование метрик

In [None]:
class SVMWrapper:
    def __init__(self, models, feature_extractor, device='cuda'):
        self.models = models
        self.feature_extractor = feature_extractor.eval().to(device)
        self.device = device

    @torch.no_grad()
    def __call__(self, x):
        feats = self.feature_extractor(x.to(self.device)).cpu().numpy()
        preds = np.stack([m.predict(feats) for m in self.models], axis=1)
        return {'regression': torch.tensor(preds, dtype=torch.float32)}

class EnsembleWrapper:
    def __init__(self, models_dict, device='cuda'):
        self.models = {k: m.eval().to(device) for k, m in models_dict.items()}
        self.device = device

    @torch.no_grad()
    def __call__(self, x):
        x = x.to(self.device)
        preds = [m(x)['regression'] for m in self.models.values()]
        mean_preds = torch.stack(preds).mean(dim=0)
        return {'regression': mean_preds}

In [None]:
@torch.no_grad()
def evaluate_model(name, model, dataloader):
    # Приведение модели в eval и перенос на устройство — только для torch-моделей
    if isinstance(model, torch.nn.Module):
        model.eval().to(device)

    all_targets, all_reg_preds, all_cls_preds = [], [], []

    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        output = model(x)
        all_targets.append(y.cpu())

        if 'regression' in output:
            all_reg_preds.append(output['regression'].cpu())
        if 'classification' in output:
            all_cls_preds.append(output['classification'].cpu())

    scores = {'model': name}
    y_true = torch.cat(all_targets).numpy()

    # MAE + RMSE
    if all_reg_preds:
        y_pred = torch.cat(all_reg_preds).numpy()
        scores['MAE'] = mean_absolute_error(y_true, y_pred)
        scores['RMSE'] = mean_squared_error(y_true, y_pred)

    # Accuracy (предпочтительно по classification, иначе по threshold от regression)
    if all_cls_preds:
        y_pred_cls = torch.cat(all_cls_preds).numpy() > 0.5
        y_true_cls = y_true > 0
        scores['Accuracy'] = accuracy_score(y_true_cls.flatten(), y_pred_cls.flatten())
    elif all_reg_preds:
        y_pred_cls = y_pred > 0
        y_true_cls = y_true > 0
        scores['Accuracy'] = accuracy_score(y_true_cls.flatten(), y_pred_cls.flatten())

    return scores


def benchmark_all(models_dict, dataloader):
    metrics = []

    for name, model in models_dict.items():
        print(f"🔍 Оценка модели: {name}")

        # Засекаем время только на инференс
        start = time.time()
        scores = evaluate_model(name, model, dataloader)
        duration = time.time() - start

        # Подсчёт latency (если доступен размер датасета)
        try:
            num_samples = len(dataloader.dataset)
            scores['Latency (ms/img)'] = 1000 * duration / num_samples
        except Exception as e:
            scores['Latency (ms/img)'] = None
            print(f"⚠️ Не удалось рассчитать latency: {e}")

        metrics.append(scores)

        # Отладочный вывод
        print(f"✅ {name}: MAE={scores.get('MAE', '—')}, Accuracy={scores.get('Accuracy', '—')}, Latency={scores['Latency (ms/img)']:.2f} ms")

    return pd.DataFrame(metrics)

AutoModelSelector

In [None]:
def normalize_column(df, column, inverse=False):
    col = df[column].copy()
    if col.isnull().all():
        return pd.Series([0.0] * len(df))
    col = (col - col.min()) / (col.max() - col.min() + 1e-8)
    return 1.0 - col if inverse else col

def compute_total_score(df, weights={'MAE': 0.5, 'Accuracy': 0.4, 'Latency': 0.1}):
    norm_mae = normalize_column(df, 'MAE', inverse=True)
    norm_acc = normalize_column(df, 'Accuracy')
    norm_latency = normalize_column(df, 'Latency (ms/img)', inverse=True)

    total = (
        weights['MAE'] * norm_mae +
        weights['Accuracy'] * norm_acc +
        weights['Latency'] * norm_latency
    )
    return total

def select_best_model(df: pd.DataFrame):
    df_copy = df.copy()
    df_copy['total_score'] = compute_total_score(df_copy)
    df_sorted = df_copy.sort_values(by='total_score', ascending=False)
    best_model_row = df_sorted.iloc[0]
    return df_sorted, best_model_row


# === Инициализация моделей ===
model_dict = {
    'ResNet50': CompositionLitModel.load_from_checkpoint('models/resnet50.ckpt'),
    'EffNet-B3': CompositionLitModel.load_from_checkpoint('models/efficientnet_b3.ckpt'),
    'ViT-B/16': CompositionLitModel.load_from_checkpoint('models/vit_b16.ckpt'),
    'CLIP ViT-B/32': CompositionLitModel.load_from_checkpoint('models/clip_vit_b32.ckpt'),
}
for m in model_dict.values():
    m.to(device)

# SVM
svm_model = SVMWrapper(svm_models, feature_extractor=models['vitb16'].backbone, device=device)
model_dict['SVM (ViT features)'] = svm_model

# Ensemble
ensemble_model = EnsembleWrapper({
    'resnet50': model_dict['ResNet50'],
    'efnb3': model_dict['EffNet-B3'],
    'vitb16': model_dict['ViT-B/16'],
})
model_dict['Ensemble (ResNet+EffNet+ViT)'] = ensemble_model

# === Оценка ===
df_metrics = benchmark_all(model_dict, dm.val_dataloader())
df_metrics.to_csv('results/model_comparison.csv', index=False)

# === Рейтинг и выбор лучшей ===
df_ranked, best_model_row = select_best_model(df_metrics)
df_ranked.to_csv('results/ranked_models.csv', index=False)

# === Вывод ===
print("🥇 Best model overall:")
display(best_model_row.to_frame().T)
BEST_MODEL_NAME = best_model_row['model']
BEST_MODEL = model_dict[BEST_MODEL_NAME]

In [None]:
import os
import torch
import re

def sanitize_filename(name: str) -> str:
    """Заменяет недопустимые символы для имени файла."""
    return re.sub(r'[^a-zA-Z0-9_.-]', '_', name)

def save_best_model(model, name: str, save_dir: str = "models"):
    os.makedirs(save_dir, exist_ok=True)
    safe_name = sanitize_filename(name)

    path_ckpt = os.path.join(save_dir, f"best_model_{safe_name}.ckpt")
    path_pt = os.path.join(save_dir, f"best_model_{safe_name}.pt")
    path_full = os.path.join(save_dir, f"best_model_{safe_name}_full.pt")

    if isinstance(model, pl.LightningModule):
        model = model.cpu().eval()
        try:
            model.save_checkpoint(path_ckpt)
            print(f"💾 Сохранено как Lightning checkpoint: {path_ckpt}")
        except Exception as e:
            print(f"⚠️ Ошибка при сохранении .ckpt: {e}")

        try:
            torch.save(model.state_dict(), path_pt)
            print(f"💾 Сохранено только state_dict: {path_pt}")
        except Exception as e:
            print(f"⚠️ Ошибка при сохранении state_dict: {e}")

        try:
            torch.save(model, path_full)
            print(f"💾 Сохранено как полная модель: {path_full}")
        except Exception as e:
            print(f"⚠️ Ошибка при сохранении полной модели: {e}")

    elif isinstance(model, SVMWrapper):
        path_svm = os.path.join(save_dir, f"{safe_name}_svm.joblib")
        try:
            joblib.dump(model, path_svm)
            print(f"💾 Сохранён SVMWrapper: {path_svm}")
        except Exception as e:
            print(f"⚠️ Ошибка при сохранении SVMWrapper: {e}")
    else:
        print(f"⚠️ Неизвестный тип модели: {type(model)}. Пропуск сохранения.")

# === Вызов ===
save_best_model(BEST_MODEL, BEST_MODEL_NAME)

In [None]:
from sklearn.calibration import calibration_curve
import matplotlib.pyplot as plt

def plot_reliability(y_true, y_prob, title='Reliability Diagram'):
    prob_true, prob_pred = calibration_curve(y_true, y_prob, n_bins=10)
    plt.figure(figsize=(6, 6))
    plt.plot(prob_pred, prob_true, marker='o', label='Model')
    plt.plot([0, 1], [0, 1], linestyle='--', label='Perfectly calibrated')
    plt.xlabel('Confidence')
    plt.ylabel('Accuracy')
    plt.title(title)
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

# Оценим калибровку по всем правилам
res = predict_model(BEST_MODEL, dm.test_dataloader(), device=device)
if 'classification' in res:
    y_true_bin = (res['target'] > 0).numpy()
    y_pred_prob = res['classification'].numpy()

    for i in range(min(5, y_true_bin.shape[1])):  # Покажем первые 5 правил
        plot_reliability(y_true_bin[:, i], y_pred_prob[:, i], title=f'Rule {i+1} calibration')

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Отчёт: графики по всем метрикам
def plot_metric(df, metric_name, title=None, save_path=None):
    plt.figure(figsize=(8, 5))
    sns.barplot(x='model', y=metric_name, data=df, palette='viridis')
    plt.title(title or metric_name)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.grid(True)
    if save_path:
        plt.savefig(save_path, dpi=300)
    plt.show()

plot_metric(df_ranked, 'MAE', 'Средняя абсолютная ошибка (MAE)', 'results/plot_mae.png')
plot_metric(df_ranked, 'RMSE', 'Среднеквадратичная ошибка (RMSE)', 'results/plot_rmse.png')
if 'Accuracy' in df_ranked.columns:
    plot_metric(df_ranked, 'Accuracy', 'Точность классификации (Accuracy)', 'results/plot_accuracy.png')
plot_metric(df_ranked, 'Latency (ms/img)', 'Скорость инференса', 'results/plot_latency.png')
plot_metric(df_ranked, 'total_score', 'Итоговый рейтинг моделей', 'results/plot_score.png')

In [None]:
from IPython.display import display, Markdown

def generate_comparative_report(df):
    summary = []

    best_mae = df.sort_values('MAE').iloc[0]
    best_rmse = df.sort_values('RMSE').iloc[0]
    best_acc = df.sort_values('Accuracy', ascending=False).iloc[0] if 'Accuracy' in df.columns else None
    best_latency = df.sort_values('Latency (ms/img)').iloc[0]
    best_total = df.sort_values('total_score', ascending=False).iloc[0]

    summary.append(f"**Лучшая модель по MAE:** {best_mae['model']} ({best_mae['MAE']:.4f})")
    summary.append(f"**Лучшая модель по RMSE:** {best_rmse['model']} ({best_rmse['RMSE']:.4f})")
    if best_acc is not None:
        summary.append(f"**Лучшая модель по Accuracy:** {best_acc['model']} ({best_acc['Accuracy']:.4f})")
    summary.append(f"**Самая быстрая модель:** {best_latency['model']} ({best_latency['Latency (ms/img)']:.2f} ms/img)")
    summary.append(f"**Лучшая модель по совокупной оценке:** {best_total['model']} (score: {best_total['total_score']:.4f})")

    display(Markdown("### 📊 Сводный сравнительный отчёт"))
    for line in summary:
        display(Markdown("- " + line))

generate_comparative_report(df_ranked)

Интерпретация

In [None]:
import numpy as np
import torch.nn as nn
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import show_cam_on_image
import matplotlib.pyplot as plt

class RegressionChannelWrapper(nn.Module):
    def __init__(self, model, channel_index=0):
        super().__init__()
        self.model = model
        self.channel_index = channel_index

    def forward(self, x):
        out = self.model(x)
        return out['regression'][:, self.channel_index].unsqueeze(1)

def get_last_conv_layer(model):
    if hasattr(model, 'layer4'):
        return model.layer4[-1], False
    elif hasattr(model, 'blocks'):
        return model.blocks[-1], True
    elif hasattr(model, 'conv_head'):
        return model.conv_head, False
    else:
        raise ValueError("Не удалось определить последний conv-слой.")

def reshape_transform_vit(tensor, height=14, width=14):
    result = tensor[:, 1:, :].reshape(tensor.size(0), height, width, tensor.size(2))
    return result.permute(0, 3, 1, 2)

# --- Конфигурация
rule_index = 0  # "center"
wrapped_model = RegressionChannelWrapper(BEST_MODEL, channel_index=rule_index).to(device)
target_layer, use_vit = get_last_conv_layer(BEST_MODEL.backbone)
reshape_transform = reshape_transform_vit if use_vit else None

cam = GradCAM(model=wrapped_model,
              target_layers=[target_layer],
              reshape_transform=reshape_transform)

best_score = -1
best_img_rgb = None
best_heatmap = None

# --- Проход по датасету
for imgs, ys in dm.val_dataloader():
    for i in range(imgs.size(0)):
        img = imgs[i].unsqueeze(0).to(device)
        rgb = imgs[i].permute(1, 2, 0).cpu().numpy()

        grayscale = cam(input_tensor=img)[0]

        # Оценка: сколько "внимания" в центре
        h, w = grayscale.shape
        ch, cw = h // 3, w // 3
        center_mask = np.zeros_like(grayscale)
        center_mask[h//3:2*h//3, w//3:2*w//3] = 1
        center_score = (grayscale * center_mask).sum() / grayscale.sum()

        if center_score > best_score:
            best_score = center_score
            best_img_rgb = rgb
            best_heatmap = show_cam_on_image(rgb, grayscale, use_rgb=True)

    # ограничим одним батчем, если долго:
    break

# --- Сохранение лучшего примера
plt.imshow(best_heatmap)
plt.axis('off')
plt.title(f"Grad-CAM: центр внимания в центре (score={best_score:.2f})")
plt.show()

In [None]:
import torch.nn as nn
import matplotlib.pyplot as plt
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import show_cam_on_image

# Обёртка для модели с выбором одного регрессионного канала
class RegressionChannelWrapper(nn.Module):
    def __init__(self, model, channel_index=0):
        super().__init__()
        self.model = model
        self.channel_index = channel_index

    def forward(self, x):
        out = self.model(x)
        return out['regression'][:, self.channel_index].unsqueeze(1)

# Определение последнего слоя + является ли ViT
def get_last_conv_layer(model):
    if hasattr(model, 'layer4'):
        return model.layer4[-1], False  # ResNet
    elif hasattr(model, 'blocks'):
        return model.blocks[-1], True   # ViT / CLIP
    elif hasattr(model, 'conv_head'):
        return model.conv_head, False   # EfficientNet
    else:
        raise ValueError("Не удалось определить последний conv-слой модели.")

# Преобразование для ViT: [B, N, C] → [B, C, H, W]
def reshape_transform_vit(tensor, height=14, width=14):
    result = tensor[:, 1:, :].reshape(tensor.size(0), height, width, tensor.size(2))
    result = result.permute(0, 3, 1, 2)  # [B, C, H, W]
    return result

# Признак 0 (например, "center")
rule_index = 0
wrapped_model = RegressionChannelWrapper(BEST_MODEL, channel_index=rule_index).to(device)
target_layer, use_vit = get_last_conv_layer(BEST_MODEL.backbone)
reshape_transform = reshape_transform_vit if use_vit else None

cam = GradCAM(
    model=wrapped_model,
    target_layers=[target_layer],
    reshape_transform=reshape_transform
)

imgs, ys = next(iter(dm.val_dataloader()))
img = imgs[0].unsqueeze(0).to(device)
rgb = imgs[0].permute(1, 2, 0).numpy()

grayscale = cam(input_tensor=img)[0]
heatmap = show_cam_on_image(rgb, grayscale, use_rgb=True)

plt.imshow(heatmap)
plt.axis('off')
plt.title(f"Grad-CAM | Признак {rule_index + 1}")
plt.savefig("results/gradcam_vit.png", dpi=300)
plt.show()

In [None]:
import shap
import torch.nn as nn

# Обёртка для вывода одного признака
class SHAPWrapper(nn.Module):
    def __init__(self, model, channel_index=0):
        super().__init__()
        self.model = model
        self.channel_index = channel_index

    def forward(self, x):
        out = self.model(x)
        return out['regression'][:, self.channel_index].unsqueeze(1)

# Настройка
rule_index = 0  # например, центр
shap_model = SHAPWrapper(BEST_MODEL, channel_index=rule_index).to(device)

# Данные
background = next(iter(dm.train_dataloader()))[0][:16].to(device)
test_imgs, _ = next(iter(dm.val_dataloader()))
test_imgs = test_imgs[:5].to(device)

# SHAP (градиентный)
e = shap.GradientExplainer(shap_model, background)
shap_vals = e.shap_values(test_imgs)

# Приведение к [B, H, W, C]
test_imgs_nhwc = test_imgs.permute(0, 2, 3, 1).cpu().numpy()

# Исправляем: (5, 3, 224, 224, 1) → (5, 224, 224, 3)
shap_vals_correct = shap_vals.transpose(0, 2, 3, 1, 4).squeeze(-1)

# Визуализация
shap.image_plot([shap_vals_correct], test_imgs_nhwc)

In [None]:
# SHAP summary plot (SVM на ViT)
import joblib
svms = [joblib.load(f'models/svr_vitb16/rule_{i+1:02d}.joblib') for i in range(13)]
feature_extractor = models['vitb16'].backbone.eval().to(device)

X_val = []
for x, y in dm.val_dataloader():
    with torch.no_grad():
        feats = feature_extractor(x.to(device)).cpu().numpy()
    X_val.append(feats)
X_val = np.vstack(X_val)

explainer = shap.Explainer(svms[0].predict, X_val)
shap_vals = explainer(X_val[:100])

shap.summary_plot(shap_vals.values, X_val[:100], feature_names=[f"f{i+1}" for i in range(X_val.shape[1])], show=False)
plt.savefig('results/shap_summary.png', dpi=300)
plt.show()

In [None]:
ranking.plot(x='model', y='score', kind='bar', legend=False)
plt.title('Сводный рейтинг моделей')
plt.ylabel('Score')
plt.grid()
plt.show()