In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from utils import make_regression_data, mse, log_epoch, RegressionDataset

In [3]:
# Модифицируйте существующую линейную регрессию:
# - Добавьте L1 и L2 регуляризацию
# - Добавьте early stopping

class LinearRegression(nn.Module):
    def __init__(self, in_features):
        super().__init__()
        self.linear = nn.Linear(in_features, 1)

    def forward(self, x):
        return self.linear(x)

def l1_l2_regularization(model, l1_lambda, l2_lambda):
    l1_norm = sum(p.abs().sum() for p in model.parameters())
    l2_norm = sum(p.pow(2.0).sum() for p in model.parameters())
    return l1_lambda * l1_norm + l2_lambda * l2_norm

if __name__ == '__main__':
    # Генерируем данные
    X, y = make_regression_data(n=200)
    
    # Создаём датасет и даталоадеры для тренировочных и валидационных данных
    dataset = RegressionDataset(X, y)
    
    # Разделяем данные на тренировочные и валидационные (80/20)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=32)
    
    print(f'Размер тренировочного датасета: {len(train_dataset)}')
    print(f'Размер валидационного датасета: {len(val_dataset)}')
    print(f'Количество тренировочных батчей: {len(train_dataloader)}')
    print(f'Количество валидационных батчей: {len(val_dataloader)}')
    
    # Создаём модель, функцию потерь и оптимизатор
    model = LinearRegression(in_features=1)
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)
    
    # Параметры регуляризации и early stopping
    l1_lambda = 0.01
    l2_lambda = 0.01
    patience = 10
    min_delta = 0.001
    best_val_loss = float('inf')
    patience_counter = 0
    best_model_state = None
    
    # Обучаем модель
    epochs = 100
    for epoch in range(1, epochs + 1):
        # Тренировочный цикл
        model.train()
        total_train_loss = 0
        for i, (batch_X, batch_y) in enumerate(train_dataloader):
            optimizer.zero_grad()
            y_pred = model(batch_X)
            loss = criterion(y_pred, batch_y)
            
            # Добавляем L1 и L2 регуляризацию
            reg_loss = l1_l2_regularization(model, l1_lambda, l2_lambda)
            total_loss = loss + reg_loss
            
            total_loss.backward()
            optimizer.step()
            
            total_train_loss += loss.item()
        
        avg_train_loss = total_train_loss / (i + 1)
        
        # Валидационный цикл
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for i, (batch_X, batch_y) in enumerate(val_dataloader):
                y_pred = model(batch_X)
                loss = criterion(y_pred, batch_y)
                total_val_loss += loss.item()
        
        avg_val_loss = total_val_loss / (i + 1)
        
        # Логирование
        if epoch % 10 == 0:
            log_epoch(epoch, avg_train_loss, validation_loss=avg_val_loss)
        
        # Early stopping
        if avg_val_loss < best_val_loss - min_delta:
            best_val_loss = avg_val_loss
            best_model_state = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1
            
        if patience_counter >= patience:
            print(f'Early stopping на эпохе {epoch}')
            break
    
    # Сохраняем лучшую модель
    torch.save(best_model_state, 'linreg_torch.pth')
    
    # Загружаем лучшую модель
    new_model = LinearRegression(in_features=1)
    new_model.load_state_dict(torch.load('linreg_torch.pth'))
    new_model.eval()

Размер тренировочного датасета: 160
Размер валидационного датасета: 40
Количество тренировочных батчей: 5
Количество валидационных батчей: 2
Epoch 10: loss=0.0665, validation_loss=0.0703
Epoch 20: loss=0.0369, validation_loss=0.0391
Epoch 30: loss=0.0266, validation_loss=0.0278
Epoch 40: loss=0.0225, validation_loss=0.0238
Epoch 50: loss=0.0205, validation_loss=0.0219
Epoch 60: loss=0.0198, validation_loss=0.0208
Epoch 70: loss=0.0194, validation_loss=0.0207
Early stopping на эпохе 73


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

class RegressionDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class ClassificationDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

def make_regression_data(n=100, noise=0.1, source='random'):
    if source == 'random':
        X = torch.rand(n, 1)
        w, b = 2.0, -1.0
        y = w * X + b + noise * torch.randn(n, 1)
        return X, y
    elif source == 'diabetes':
        from sklearn.datasets import load_diabetes
        data = load_diabetes()
        X = torch.tensor(data['data'], dtype=torch.float32)
        y = torch.tensor(data['target'], dtype=torch.float32).unsqueeze(1)
        return X, y
    else:
        raise ValueError('Unknown source')

def make_classification_data(n=100, source='random', n_classes=3):
    if source == 'random':
        # Генерируем сбалансированные данные
        X = torch.rand(n, 2)
        # Создаём сбалансированные классы
        samples_per_class = n // n_classes
        y = torch.zeros(n, dtype=torch.long)
        X_list = []
        
        for i in range(n_classes):
            # Генерируем данные для каждого класса с разными центрами
            X_class = torch.rand(samples_per_class, 2) + torch.tensor([i * 2.0, i * 2.0])
            X_list.append(X_class)
            y[i * samples_per_class:(i + 1) * samples_per_class] = i
        
        # Добавляем оставшиеся примеры для последнего класса, если n не делится на n_classes
        remaining = n - samples_per_class * n_classes
        if remaining > 0:
            X_class = torch.rand(remaining, 2) + torch.tensor([(n_classes - 1) * 2.0, (n_classes - 1) * 2.0])
            X_list.append(X_class)
            y[-remaining:] = n_classes - 1
        
        X = torch.cat(X_list, dim=0)
        return X, y
    elif source == 'breast_cancer':
        from sklearn.datasets import load_breast_cancer
        data = load_breast_cancer()
        X = torch.tensor(data['data'], dtype=torch.float32)
        y = torch.tensor(data['target'], dtype=torch.long)
        return X, y
    else:
        raise ValueError('Unknown source')

def mse(y_pred, y_true):
    return ((y_pred - y_true) ** 2).mean().item()

def accuracy(y_pred, y_true):
    y_pred_classes = torch.argmax(y_pred, dim=1) if y_pred.dim() > 1 else (y_pred > 0.5).float()
    return (y_pred_classes == y_true).float().mean().item()

def log_epoch(epoch, loss, **metrics):
    msg = f"Epoch {epoch}: loss={loss:.4f}"
    for k, v in metrics.items():
        msg += f", {k}={v:.4f}"
    print(msg)

class LogisticRegression(nn.Module):
    def __init__(self, in_features, num_classes):
        super().__init__()
        self.linear = nn.Linear(in_features, num_classes)

    def forward(self, x):
        return self.linear(x)

def compute_metrics(y_true, y_pred, y_pred_proba, num_classes):
    y_true = y_true.cpu().numpy()
    y_pred = y_pred.cpu().numpy()
    
    # Вычисляем precision, recall, F1-score с zero_division=0
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average='weighted', zero_division=0
    )
    
    # Вычисляем ROC-AUC
    try:
        if num_classes == 2:
            roc_auc = roc_auc_score(y_true, y_pred_proba[:, 1])
        else:
            roc_auc = roc_auc_score(y_true, y_pred_proba, multi_class='ovr')
    except ValueError:
        roc_auc = float('nan')
    
    return precision, recall, f1, roc_auc

def plot_confusion_matrix(y_true, y_pred, num_classes, epoch):
    # Явно указываем все возможные метки классов
    cm = confusion_matrix(y_true.cpu().numpy(), y_pred.cpu().numpy(), labels=range(num_classes))
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=[f'Class {i}' for i in range(num_classes)],
                yticklabels=[f'Class {i}' for i in range(num_classes)])
    plt.title(f'Confusion Matrix at Epoch {epoch}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.savefig(f'confusion_matrix_epoch_{epoch}.png')
    plt.close()

if __name__ == '__main__':
    # Параметры
    num_classes = 3
    in_features = 2
    
    # Генерируем данные
    X, y = make_classification_data(n=200, source='random', n_classes=num_classes)
    
    # Проверяем, что все классы присутствуют
    unique_classes = torch.unique(y).numel()
    if unique_classes < num_classes:
        print(f"Предупреждение: в данных присутствует только {unique_classes} из {num_classes} классов")
    
    # Создаём датасет и разделяем на тренировочный и валидационный
    dataset = ClassificationDataset(X, y)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=32)
    
    print(f'Размер тренировочного датасета: {len(train_dataset)}')
    print(f'Размер валидационного датасета: {len(val_dataset)}')
    print(f'Количество тренировочных батчей: {len(train_dataloader)}')
    print(f'Количество валидационных батчей: {len(val_dataloader)}')
    
    # Создаём модель, функцию потерь и оптимизатор
    model = LogisticRegression(in_features=in_features, num_classes=num_classes)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)
    
    # Обучаем модель
    epochs = 100
    for epoch in range(1, epochs + 1):
        # Тренировочный цикл
        model.train()
        total_loss = 0
        total_acc = 0
        
        for i, (batch_X, batch_y) in enumerate(train_dataloader):
            optimizer.zero_grad()
            logits = model(batch_X)
            loss = criterion(logits, batch_y)
            loss.backward()
            optimizer.step()
            
            acc = accuracy(logits, batch_y)
            
            total_loss += loss.item()
            total_acc += acc
        
        avg_loss = total_loss / (i + 1)
        avg_acc = total_acc / (i + 1)
        
        # Валидационный цикл
        model.eval()
        val_y_true = []
        val_y_pred = []
        val_y_pred_proba = []
        total_val_loss = 0
        total_val_acc = 0
        
        with torch.no_grad():
            for i, (batch_X, batch_y) in enumerate(val_dataloader):
                logits = model(batch_X)
                loss = criterion(logits, batch_y)
                y_pred = torch.argmax(logits, dim=1)
                y_pred_proba = torch.softmax(logits, dim=1)
                
                val_y_true.append(batch_y)
                val_y_pred.append(y_pred)
                val_y_pred_proba.append(y_pred_proba)
                
                acc = accuracy(logits, batch_y)
                total_val_loss += loss.item()
                total_val_acc += acc
        
        avg_val_loss = total_val_loss / (i + 1)
        avg_val_acc = total_val_acc / (i + 1)
        
        # Объединяем валидационные предсказания
        val_y_true = torch.cat(val_y_true)
        val_y_pred = torch.cat(val_y_pred)
        val_y_pred_proba = torch.cat(val_y_pred_proba)
        
        # Проверяем, что все классы присутствуют в валидационной выборке
        unique_val_classes = torch.unique(val_y_true).numel()
        if unique_val_classes < num_classes:
            print(f"Предупреждение на эпохе {epoch}: в валидационной выборке только {unique_val_classes} из {num_classes} классов")
        
        # Вычисляем метрики
        precision, recall, f1, roc_auc = compute_metrics(val_y_true, val_y_pred, val_y_pred_proba, num_classes)
        
        # Логирование
        if epoch % 10 == 0:
            log_epoch(epoch, avg_loss, accuracy=avg_acc, validation_loss=avg_val_loss, 
                      validation_accuracy=avg_val_acc, precision=precision, recall=recall, 
                      f1_score=f1, roc_auc=roc_auc)
            
            # Визуализация confusion matrix
            plot_confusion_matrix(val_y_true, val_y_pred, num_classes, epoch)
    
    # Сохраняем модель
    torch.save(model.state_dict(), 'logreg_torch.pth')
    
    # Загружаем модель
    new_model = LogisticRegression(in_features=in_features, num_classes=num_classes)
    new_model.load_state_dict(torch.load('logreg_torch.pth'))
    new_model.eval()

Размер тренировочного датасета: 160
Размер валидационного датасета: 40
Количество тренировочных батчей: 5
Количество валидационных батчей: 2
Epoch 10: loss=0.7567, accuracy=0.4750, validation_loss=0.8647, validation_accuracy=0.3594, precision=0.5714, recall=0.3500, f1_score=0.3329, roc_auc=0.8581
Epoch 20: loss=0.5963, accuracy=0.6937, validation_loss=0.6378, validation_accuracy=0.9219, precision=0.9591, recall=0.9500, f1_score=0.9502, roc_auc=1.0000
Epoch 30: loss=0.5043, accuracy=0.8250, validation_loss=0.5460, validation_accuracy=0.7031, precision=0.9016, recall=0.8250, f1_score=0.8135, roc_auc=1.0000
Epoch 40: loss=0.4314, accuracy=0.9625, validation_loss=0.4625, validation_accuracy=0.9844, precision=0.9775, recall=0.9750, f1_score=0.9751, roc_auc=1.0000
Epoch 50: loss=0.3884, accuracy=0.9938, validation_loss=0.4146, validation_accuracy=0.9844, precision=0.9775, recall=0.9750, f1_score=0.9751, roc_auc=1.0000
Epoch 60: loss=0.3642, accuracy=0.9563, validation_loss=0.3764, validation