In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Создадим простую модель

In [59]:
class SimpleFCN(nn.Module):
    def __init__(self, input_size=8):
        super(SimpleFCN, self).__init__()
        self.fc1 = nn.Linear(input_size, 4)
        self.fc2 = nn.Linear(4, 10)
        self.fc3 = nn.Linear(10, 1)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [60]:
input_size = 8
simple_model = SimpleFCN(input_size)


In [61]:
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_regression

X, y = make_regression(n_samples=1000, n_features=input_size, n_informative=input_size, random_state=42)
X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()

dataset = list(zip(X, y))
train_dataset, test_dataset = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# metrics
Метрики для оценки нелинейности

In [47]:
criterion = nn.MSELoss()

In [92]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class NonlinearityMetric:
    def __init__(self, loss_fn):
        self.loss_fn = loss_fn

    @staticmethod
    def _get_last_linear_layer(model):
        layers = [module for module in model.modules() if isinstance(module, nn.Linear)]
        if not layers:
            raise ValueError("В модели отсутствуют слои nn.Linear.")
        return layers[-1]

    def calculate(self, model, X_arr, y_arr):
        raise NotImplementedError()


# Метрика 1: Средний градиент последнего слоя для каждого ребра
class GradientMeanEdgeMetric(NonlinearityMetric):
    def calculate(self, model, X_arr, y_arr):
        model.eval()
        model.zero_grad()
        y_pred = model(X_arr).squeeze()
        loss = self.loss_fn(y_pred, y_arr)
        loss.backward()

        last_layer = self._get_last_linear_layer(model)
        edge_gradients = last_layer.weight.grad.abs()
        model.zero_grad()
        return edge_gradients

    # Метрика 2: Стандартное отклонение активаций последнего слоя для каждого ребра


class ActivationStdEdgeMetric(NonlinearityMetric):
    def calculate(self, model, X_arr, y_arr):
        model.eval()

        # Пропуск через модель и получение активаций перед последним слоем
        x = X_arr
        for layer in list(model.children())[:-1]:  # Все слои, кроме последнего
            x = layer(x)
        activations = x  # Активации перед последним линейным слоем

        last_layer = self._get_last_linear_layer(model)
        edge_activations = activations.unsqueeze(-1) * last_layer.weight  # Активности, пропорциональные весам
        activation_std_edges = edge_activations.std(dim=0)
        return activation_std_edges


# Метрика 3: Чувствительность к возмущению для каждого ребра
class PerturbationSensitivityEdgeMetric(NonlinearityMetric):
    def __init__(self, loss_fn, epsilon=1e-2):
        super().__init__(loss_fn)
        self.epsilon = epsilon

    def calculate(self, model, X_arr, y_arr):
        model.eval()

        # Пропуск входа через модель и получение оригинального вывода
        original_output = model(X_arr).detach()

        last_layer = self._get_last_linear_layer(model)
        sensitivities = torch.zeros_like(last_layer.weight)

        # Возмущаем каждый вес по отдельности и измеряем чувствительность
        for i in range(last_layer.weight.size(0)):
            for j in range(last_layer.weight.size(1)):
                with torch.no_grad():
                    # Возмущение только одного веса
                    original_weight = last_layer.weight[i, j].clone()
                    last_layer.weight[i, j] += self.epsilon

                    # Пропуск с возмущением и вычисление чувствительности
                    perturbed_output = model(X_arr)
                    sensitivity = (perturbed_output - original_output).abs().mean().item()
                    sensitivities[i, j] = sensitivity

                    # Восстановление оригинального веса
                    last_layer.weight[i, j] = original_weight

        return sensitivities


# Метрика 4: Косинусное расстояние между градиентами для каждого ребра
class CosineGradientSimilarityEdgeMetric(NonlinearityMetric):
    def calculate(self, model, X_arr, y_arr):
        model.eval()
        outputs = model(X_arr)
        loss = outputs.mean()
        loss.backward()

        last_layer = self._get_last_linear_layer(model)
        grad_last_layer = last_layer.weight.grad

        # Косинусное сходство между соседними градиентами для каждого ребра
        similarities = torch.zeros_like(grad_last_layer)
        for i in range(grad_last_layer.size(0)):
            for j in range(grad_last_layer.size(1) - 1):  # Для соседних элементов
                # Убираем dim=1, поскольку каждый градиент - скаляр или одномерный тензор
                cos_sim = F.cosine_similarity(grad_last_layer[i, j].unsqueeze(0),
                                              grad_last_layer[i, j + 1].unsqueeze(0), dim=0)
                similarities[i, j] = cos_sim

        model.zero_grad()
        return similarities



metrics = [
    GradientMeanEdgeMetric(criterion),
    ActivationStdEdgeMetric(criterion),
    PerturbationSensitivityEdgeMetric(criterion),
    CosineGradientSimilarityEdgeMetric(criterion)
]

In [93]:
def calculate_edge_metric_for_dataloader(model, dataloader, edgeMetric: NonlinearityMetric):
    accumulated_grads = None
    for data, target in dataloader:
        data, target = data.to(device), target.to(device)

        metric = edgeMetric.calculate(model, data, target)
        
        if accumulated_grads is None:
            accumulated_grads = torch.zeros_like(metric).to(device)

        accumulated_grads += metric

    return accumulated_grads / len(dataloader)

In [94]:
for i in metrics:
    print(calculate_edge_metric_for_dataloader(simple_model, test_loader, i))

tensor([[ 2.4379,  0.5358, 85.4350, 15.9770,  2.7179, 39.3903, 23.5133,  6.8977,
         34.0691,  6.2802]])
tensor([[0.1160, 0.0511, 0.1099, 0.0179, 0.0011, 0.0096, 0.0931, 0.1156, 0.1033,
         0.0123]], grad_fn=<DivBackward0>)
tensor([[7.7956e-05, 8.5412e-06, 4.8188e-03, 1.7046e-03, 3.7740e-04, 4.0245e-03,
         3.8460e-03, 4.6285e-04, 1.3553e-03, 1.1440e-04]])
tensor([[0.5714, 0.7143, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         0.0000]])
