In [11]:
import math
from typing import Tuple, List, Dict, Sized, Optional, Any

import torch
import torch.nn as nn
import torch.nn.functional as F
import lightning as L

import numpy as np

import seaborn as sns
import matplotlib.pyplot as plt

import torchmetrics

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Используемое устройство: {device}")

Используемое устройство: cuda


## 1. Выбор метрик

В задачах бинарной классификации существует 4 типа меток:
- *Истинно положительные (True Positives, TP)* - правильно предсказанные положительные метки;
- *Истинно отрицательные (True Negatives, TN)* - правильно предсказанные отрицательные метки;
- *Ложно положительные (False Positives, FP)* - ошибочно предсказанные положительные метки;
- *Ложно отрицательные (False Negatives, FN)* - ошибочно предсказанные отрицательные метки;

Для ***идеальной*** модели машинного обучения
$$ \text{FN}=\text{FP}=0 $$

Однако для реальных моделей множества FN и FP всегда не пустые *(а если пустые - нужно постараться отыскать такие новые данные, чтобы они стали непустыми)* и условная стоимость ошибки может варьироваться.

Примеры:
- Модель выявляет рак - FN или FP дороже?
- Модель определяет, является ли письмо спамом - FN или FP дороже?
- В суде FN или FP дороже?

Метрика Accuracy, по определению, никак не учитывает различия между этими классами:
$$
\text{Accuracy} = \frac{\text{TP}+\text{TN}}{\text{Total}} = \frac{\text{TP}+\text{TN}}{\text{TP}+\text{TN}+\text{FP}+\text{FN}} 
$$

Поэтому при классификации, в зависимости от задачи, важно отслеживать и другие метрики:

1. **Precision (точность)**: 

    Доля правильно предсказанных положительных меток среди всех предсказанных положительных меток:
    $$
    \text{Precision} = \frac{\text{TP}}{\text{TP}+\text{FP}}
    $$

2. **Recall (отзыв)**: 

    Доля правильно предсказанных положительных меток среди всех фактических положительных меток:
    $$ 
    \text{Recall} = \frac{\text{TP}}{\text{TP}+\text{FN}} 
    $$

3. **F1-мера**:

    Гармоническое среднее между Precision и Recall
    $$ 
    \text{F1} = 2 \cdot \frac{\text{Precision} \cdot \text{Recall}}{\text{Precision} + \text{Recall}} = \frac{2 \cdot \text{TP}}{2 \cdot \text{TP} + \text{FP} + \text{FN}}
    $$
    





![](../images/MetricsSummary.png)

In [None]:
from sklearn.metrics import confusion_matrix, roc_curve, auc, precision_recall_fscore_support


def generate_synthetic_data(n_samples=1000, class_balance=0.5):
    """
    Generates synthetic data for a binary classification problem.

    Args:
        n_samples (int): Total number of samples to generate.
        class_balance (float): The proportion of the positive class (class 1).

    Returns:
        torch.Tensor: Features (X).
        torch.Tensor: Labels (y).
    """
    n_class_1 = int(n_samples * class_balance)
    n_class_0 = n_samples - n_class_1

    # Class 0: centered around (-2, -2)
    class_0 = torch.randn(n_class_0, 2) - 2

    # Class 1: centered around (2, 2)
    class_1 = torch.randn(n_class_1, 2) + 2

    X = torch.cat([class_0, class_1], dim=0)
    y = torch.cat([torch.zeros(n_class_0), torch.ones(n_class_1)], dim=0).long()

    # Shuffle the data
    shuffle_indices = torch.randperm(n_samples)
    X = X[shuffle_indices]
    y = y[shuffle_indices]

    return X, y

def simulate_model_predictions(X, y, noise_level=0.5, bias_to_majority_class=0.0):
    """
    Simulates the output of a binary classification model.

    This function simulates logits (raw scores) from a model. A higher logit for a
    sample means the model is more confident that it belongs to the positive class.

    Args:
        X (torch.Tensor): Input features.
        y (torch.Tensor): True labels.
        noise_level (float): How "good" the model is. Lower noise means a better model.
        bias_to_majority_class (float): A factor to simulate a model biased towards
                                       predicting the majority class.

    Returns:
        torch.Tensor: Simulated logits (raw model scores).
    """
    # A perfect model would output scores based on the true centers.
    # We simulate a real model by adding noise.
    perfect_scores = (X[:, 0] + X[:, 1]) / 2  # Simple linear separation
    noise = torch.randn(X.shape[0]) * noise_level

    # This bias simulates a model that has learned to favor the majority class
    # A negative bias pushes predictions towards class 0.
    simulated_logits = perfect_scores + noise - bias_to_majority_class

    return simulated_logits

def plot_confusion_matrix(y_true, y_pred, title='Confusion Matrix'):
    """
    Plots a confusion matrix using seaborn.
    """
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Predicted 0', 'Predicted 1'],
                yticklabels=['Actual 0', 'Actual 1'])
    plt.title(title)
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

def plot_roc_curve(y_true, y_scores, title='ROC Curve'):
    """
    Plots the ROC curve and shows the AUC score.
    """
    fpr, tpr, _ = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(7, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate (1 - Specificity)')
    plt.ylabel('True Positive Rate (Recall)')
    plt.title(title)
    plt.legend(loc='lower right')
    plt.grid(True)
    plt.show()

def print_metrics(y_true, y_pred, y_scores):
    """
    Calculates and prints key classification metrics.
    """
    accuracy = (y_true == y_pred).float().mean()
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary')
    _, _, roc_auc = roc_curve(y_true, y_scores)

    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")

In [None]:
# --- Scenario 1: The misleading nature of accuracy on an imbalanced dataset ---
print("--- Scenario 1: Imbalanced Dataset ---")
print("We have a dataset with 95% of samples belonging to Class 0 (negative) "
        "and 5% to Class 1 (positive).")

X_imbalanced, y_imbalanced = generate_synthetic_data(n_samples=1000, class_balance=0.05)

# Let's simulate a "dumb" model that always predicts the majority class (Class 0)
y_pred_dumb = torch.zeros(y_imbalanced.shape[0])
# For scores, let's assume it gives a low score for all samples
y_scores_dumb = torch.full((y_imbalanced.shape[0],), -1.0)


print("\nMetrics for a 'Dumb' Model (always predicts majority class 0):")
print_metrics(y_imbalanced, y_pred_dumb, y_scores_dumb)
plot_confusion_matrix(y_imbalanced, y_pred_dumb, title="Confusion Matrix for 'Dumb' Model")

print("\nObservation:")
print("The accuracy is very high (95%)! However, the model is useless.")
print("It completely fails to identify any positive samples (Recall = 0.0).")
print("Precision is also 0.0 because it never makes a positive prediction.")
print("This shows why accuracy is not a good metric for imbalanced problems.")

In [None]:
# --- Scenario 2: A slightly better model on the same imbalanced dataset ---
print("\n--- Scenario 2: A 'Better' Model on the Imbalanced Dataset ---")
# This model has some predictive power but is biased towards the majority class.
y_scores_better = simulate_model_predictions(X_imbalanced, y_imbalanced, noise_level=1.5, bias_to_majority_class=1.0)
y_pred_better = (y_scores_better > 0).float() # Threshold at 0

print("\nMetrics for the 'Better' Model:")
print_metrics(y_imbalanced, y_pred_better, y_scores_better)
plot_confusion_matrix(y_imbalanced, y_pred_better, title="Confusion Matrix for 'Better' Model")

print("\nObservation:")
print("Accuracy is still high, but now we have non-zero precision and recall.")
print("The F1-score gives a single number to balance Precision and Recall.")

Перечисленные метрики в случае бинарной классификации зависят от порога классификации (classification treshold).

Лучше всего понять качество классификационной модели позволяет **ROC кривая**:

**ROC-кривая** (Receiver Operating Characteristic) отображает соотношение истинных положительных показателей к ложным положительным показателям при различных порогах.

**AUC** (Area Under Curve) представляет собой площадь под ROC-кривой и измеряет общую эффективность классификатора.

In [None]:
plot_roc_curve(y_imbalanced, y_scores_better, title="ROC Curve for 'Better' Model")
print("The ROC-AUC score (Area Under the Curve) is a great summary metric.")
print("It measures the model's ability to distinguish between the two classes across all possible thresholds.")
print("An AUC of 0.5 is random guessing, and 1.0 is a perfect model. Our 0.88 is quite good!")

In [None]:

# --- Scenario 3: Precision vs. Recall Trade-off ---
print("\n--- Scenario 3: The Precision-Recall Trade-off ---")
print("By changing the classification threshold, we can trade Precision for Recall.")
print("A high threshold makes the model more 'cautious' about predicting Class 1 (higher Precision, lower Recall).")
print("A low threshold makes the model predict Class 1 more often (lower Precision, higher Recall).")

# High threshold -> High Precision
y_pred_high_precision = (y_scores_better > 2.0).float()
print("\nMetrics with a HIGH threshold (2.0):")
print_metrics(y_imbalanced, y_pred_high_precision, y_scores_better)

# Low threshold -> High Recall
y_pred_high_recall = (y_scores_better > -1.0).float()
print("\nMetrics with a LOW threshold (-1.0):")
print_metrics(y_imbalanced, y_pred_high_recall, y_scores_better)

## 2. Стохастическое усреднение весов

![](../images/SWA.png)

## 3. SWIN Трансформер

Изображения в датасете CIFAR10 достаточно были достаточно маленькими - всего 32х32

При применении ViT с той же архитектурой к изображениям с высоким разрешением, мы столкнёмся со следующей проблемой:

- *Количество* патчей растёт пропорционально квадрату разрешения, а количество операций внимания - пропорционально квадрату числа патчей. Итого для картинок с разрешением в x раз больше понадобится в x^4 раз больше операций внимания:
$$
\Omega (\text{MSA}) = 4hwC^2 + 2(hw)^2 C
$$

- Если же увеличивать *размер* патчей, снижается способность модели обрабатывать мелкие детали - годится для классификации, но не для задач детекции и сегментации.

Одна из архитектур, успешно решающих данную проблему - Shifted Window Transformer: 
https://arxiv.org/pdf/2103.14030

![](../images/SWIN_Transformer.png)

Эта архитектура базируется на двух основных идеях:

**Patch Merging**: 

В каждом следующем блоке модели (обозначен пунктиром) соседние патчи объединяются в один блоками 2х2 - они конкатенируются по размерности вложения, после чего эта размерность понижается с помощью небольшого обучаемого слоя (линейного, например).

Такой механизм даёт модели иерархическое представление изображение, которое позволяет хорошо решать задачи сешментации, детекции и т.д.:

![](../images/SWIN_patch_merging.jpg)


**Windowed self-attention and shifted windowed self-attention:**

В модель уже зашит механизм для работы с признаками изображения на разных уровнях - это позволяет без потери качества перейти к обработке токенов в рамках небольших локальных окон (windowed attention).

![](../images/SWIN_window.jpg)

Такой подход уже применялся при обработке языка, например, в модели Longformer. Но есть особенность - чтобы обеспечить взаимодействие между окнами, на следующем шаге считается MHSA для сдвинутых по диагонали окон:

![](../images/SWIN_shifted_window_msa.jpg)


### Реализуем SWIN Transformer с нуля.

Кроме перечисленных модулей, отличие от уже реализованного ViT в том, что нам удобнее делать flattening патчей непосредственно перед MHSA - т.е. в основном модель будет работать с размерностью batch, h_patches, w_patches, embedding_dim

In [12]:
class RotaryPositionalEmbedding(nn.Module):
    """
    Rotary Positional Embeddings for Transformers.

    This module applies rotary positional embeddings to input tensors, allowing the model to utilize 
    continuous position information in a more flexible manner compared to traditional learned embeddings.

    :param d: Dimension of the embeddings. Should be even.
    :param base: Base used for calculating positional encodings (default: 10,000).
    """
    
    def __init__(self, d: int, base: int = 10_000):
        super().__init__()
        if d % 2 != 0:
            raise ValueError("Dimension `d` for Rotary Positional Embedding must be even.")
        self.base = base
        self.d = d
        self.cos_cached = None
        self.sin_cached = None

    def _build_cache(self, max_pos: int, device: torch.device, dtype: torch.dtype):
        """Builds the cache for cosine and sine values."""
        if self.cos_cached is not None and max_pos <= self.cos_cached.shape[0]:
            return

        theta = 1. / (self.base ** (torch.arange(0, self.d, 2).float() / self.d)).to(device)
        seq_idx = torch.arange(max_pos, device=device, dtype=dtype)
        idx_theta = torch.einsum('n,d->nd', seq_idx, theta)
        idx_theta2 = torch.cat([idx_theta, idx_theta], dim=1)

        self.cos_cached = idx_theta2.cos()
        self.sin_cached = idx_theta2.sin()

    @staticmethod
    def _rotate_half(x: torch.Tensor):
        """Rotates half of the embedding dimension."""
        x1, x2 = x.chunk(2, dim=-1)
        
        return torch.cat([-x2, x1], dim=-1)

    def forward(self, x: torch.Tensor, pos: torch.Tensor=None):
        """
        Forward pass for the Rotary Positional Embeddings.

        This method applies the rotary positional embeddings to the input tensor.

        :param x: Input tensor of shape [batch, seq_len, d] 
        :param pos: Optional tensor of position indices, shape [batch, seq_len]. If None, indices are inferred as range(seq_len).
        :return: Tensor with applied rotary embeddings of the same shape as x.
        """
        batch_size, seq_len = x.shape[0], x.shape[1]
        
        if pos is None:
            pos = torch.arange(seq_len, device=x.device).unsqueeze(0).expand(batch_size, -1)

        max_pos = pos.max().item() + 1
        
        self._build_cache(max_pos, device=x.device, dtype=x.dtype)
        
        x_rotated = self._rotate_half(x)
       
        x_rope = x * self.cos_cached[pos] + x_rotated * self.sin_cached[pos]

        return x_rope


In [None]:
class RopeEmbeddingXY(nn.Module):

    def __init__(self, emb_dim: int, max_patches_xy: int = 128, freezed=True):
        '''
        :param emb_dim: 
        :param max_patches_xy:
        :param freezed:
        :param separate: 
        '''
        super(RopeEmbeddingXY, self).__init__()
        assert emb_dim % 4 == 0, f'Embedding dimension must be divisible by 4'
        assert emb_dim > 4, f'Embedding dimension must be greater than 4'
   
        self.emb_dim = emb_dim
        # self.ax_dim = emb_dim // 2 if separate else emb_dim
        self.h_emb = RotaryPositionalEmbedding(emb_dim)
        self.w_emb = RotaryPositionalEmbedding(emb_dim)        

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        '''
        :param x: input patches [B, H, W, D]
        :returns: output patches [B, H, W, D]
        '''
        batch_size, patch_h, patch_w, emb_dim = x.shape
        device = x.device
        
        h_pos, w_pos = torch.meshgrid(
            torch.arange(0, patch_h), torch.arange(0, patch_w), indexing='ij',
        )

        x = self.h_emb(x, h_pos.unsqueeze(0))
        x = self.w_emb(x, w_pos.unsqueeze(0))
            
        return x 

In [None]:
class PatchEmbedding(nn.Module):

    def __init__(self, patch_size: int, channels: int, emb_dim: int):
        '''
        :param patch_size: int - size of the patch square (size of convolution kernel)
        :param channels: int - channels of input image
        :param emb_dim: int - embedding dimension
        '''
        super().__init__()
        self.patch_size = patch_size
        self.conv = nn.Conv2d(
            in_channels=channels,
            out_channels=emb_dim,
            kernel_size=patch_size,
            stride=patch_size,
            )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        '''
        :param x: torch.Tensor [B, C, H, W] - batched images
        :returns: torch.Tensor [B, D, pH, pW] - patches embeddings
        '''
        return self.conv(x)        


class ImageEmbedding(nn.Module):

    def __init__(self, patch_size: int, in_channels: int, emb_dim: int, dropout_rate: float = 0.1, freezed_pe = True):
        '''
        :param patch_size: int - size of the square patch
        :param in_channels: int - number of input channels
        :param emb_dim: int - embedding dimension
        :param dropout_rate: float - dropout rate
        :param freezed_pe: bool - freeze positional embeddings
        '''
        super().__init__()
        self.patch_size = patch_size
        self.in_channels = in_channels
        self.emb_dim = emb_dim
        self.dropout_rate = dropout_rate
        self.freezed_pe = freezed_pe

        self.dropout = nn.Dropout(dropout_rate)
        self.img_emb = PatchEmbedding(patch_size=patch_size, channels=in_channels, emb_dim=emb_dim)
        
        self.pos_emb = RopeEmbeddingXY(emb_dim=emb_dim)


    def forward(self, x: torch.Tensor) -> torch.Tensor:
        '''
        :param x: torch.Tensor [B, C, H, W] - input image
        :returns: torch.Tensor [B, pH, pW, D] - output tokens (L=H*W//patch^2)
        '''
        x_patches = self.img_emb(x) # [B, D, pH, pW]

        return self.pos_emb(x_patches) # SinusoidalEncodingXY expects [B, D, pH, pW]


In [None]:
class PatchMerging(nn.Module):

    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.proj = nn.Linear(4 * input_dim, output_dim)
        

    def forward(self, x):
        '''
        :param x: input patches [B, H, W, D_in]
        :returns: output patches [B, H//2, W//2, D_out]
        '''
        batch, patches_H, patches_W, emb_dim = x.shape
        
        x_tl = x[:, 0::2, 0::2, :]
        x_tr = x[:, 0::2, 1::2, :]
        x_bl = x[:, 1::2, 0::2, :]
        x_br = x[:, 1::2, 1::2, :]
        
        x = torch.cat([x_tl, x_tr, x_bl, x_br], dim=-1)
        
        return self.proj(x)        

In [None]:
class SwinBlock(nn.Module):
     
    def __init__(self, d_model, nhead, dim_feedforward, dropout):

        self.MHSA_W = nn.MultiheadAttention(d_model, nhead, batch_first=True)
        self.MHSA_SW = nn.MultiheadAttention(d_model, nhead, batch_first=True)
        
        self.MLP = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.SELU(),
            nn.Linear(dim_feedforward, d_model)
        )

        self.LN_W = nn.LayerNorm(d_model)
        self.LN_SW = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor):
        '''
        :param x: input patches [B, H, W, D]
        :returns: output patches [B, H, W, D]
        '''

        x = self.LN_W(x)

        x_w = self.fetch_window(x)
        x_w, _ = self.MHSA_W.forward(x_w, x_w, x_w)

        x = x + self.unfetch_window(x_sw)

        x = self.LN_SW(x)
        x_sw = self.fetch_window(self.diag_shift(x, 1))
        x_sw = self.MHSA_SW.forward(x_sw, x_sw, x_sw)
        
        x = x + self.diag_shift(self.unfetch_window(x_sw), -1)



        return x

    def fetch_window(self, x: torch.Tensor) -> torch.Tensor:
        '''
        :param x: input patches [B, H, W, D]
        :returns: attn windows [B*4, L, D]
        '''
         
        batch, patches_H, patches_W, emb_dim = x.shape

        h_center = patches_H//2
        w_center = patches_W//2
        # [B, H, W, D] -> [4*B, L, D]
        x_w = torch.cat([
            x[:, :h_center, :w_center, :],
            x[:, :h_center, w_center:, :],
            x[:, h_center:, :w_center, :],
            x[:, h_center:, :w_center, :],
        ], dim=0)

        return x_w 

    def unfetch_window(self, x: torch.Tensor):
        '''
        :param x: attn windows [B*4, L, D]
        :returns: input patches [B, H, W, D]
        '''
         
        batch4, l, emb_dim = x.shape

        h_center = patches_H//2
        w_center = patches_W//2
        # [B, H, W, D] -> [4*B, L, D]
        x_w = torch.cat([
            x[:, :h_center, :w_center, :],
            x[:, :h_center, w_center:, :],
            x[:, h_center:, :w_center, :],
            x[:, h_center:, :w_center, :],
        ], dim=0)

        return x_w 
    
    def diag_shift(self, x: torch.Tensor):
        '''
        :param x: input patches [B, H, W, D]
        :returns: output patches [B, H, W, D]
        '''


        batch, patches_H, patches_W, emb_dim = x.shape

        return x_s 


Для дальнейшего изучения:

https://github.com/ZTX-100/Efficient_ViT_with_DW