In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, log_loss
from sklearn.preprocessing import label_binarize
from IPython.display import clear_output
import numpy as np
import os

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import EMNIST
from torch.utils.data import DataLoader




In [2]:
# Установка параметров
input_size = 784  # 28x28
hidden_size1 = 500
hidden_size2 = 250
num_classes = 10
num_epochs =15
batch_size = 500
learning_rate = 0.01
save_model = True
log_file_path = 'results/ResultsLog.log'

In [3]:
# Загрузка и нормализация данных EMNIST
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = EMNIST(root='../data', split='byclass', train=True, transform=transform, download=True)
test_dataset = EMNIST(root='../data', split='byclass', train=False, transform=transform, download=True)


train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


In [4]:
class ImprovedConvNet(nn.Module):
    def __init__(self, num_classes=62):
        super(ImprovedConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.fc1 = nn.Linear(128*3*3, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.dropout1 = nn.Dropout(p=0.5)
        self.relu1 = nn.ReLU()
        
        self.fc2 = nn.Linear(512, num_classes)
    
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc1(out)
        out = self.bn1(out)
        out = self.dropout1(out)
        out = self.relu1(out)
        out = self.fc2(out)
        return out

In [5]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
model = ImprovedConvNet(num_classes=62).to(device)

criterion_ce = nn.CrossEntropyLoss()

optimizer = optim.SGD(model.parameters(), lr=learning_rate)

Using device: cpu


In [6]:
# Инициализация для графиков
loss_ce_list = {'train': [], 'val': []}
accuracy_list = {'train': [], 'val': []}
precision_list = {'train': [], 'val': []}
recall_list = {'train': [], 'val': []}
f1_list = {'train': [], 'val': []}

In [7]:
# Проверка существования файла логирования и создание нового
if not os.path.exists(log_file_path):
    with open(log_file_path, 'w') as log_file:
        log_file.write("Test No\tAccuracy\tPrecision\tRecall\tF1-score\tROC AUC\n")

In [8]:

# Функция для вычисления метрик
def compute_metrics(loader, model):
    model.eval()
    all_labels = []
    all_predictions = []
    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions, average='weighted')
    recall = recall_score(all_labels, all_predictions, average='weighted')
    f1 = f1_score(all_labels, all_predictions, average='weighted')

    
    return accuracy, precision, recall, f1


In [9]:
# Функция для обновления и сохранения графиков
def update_plots(loss_ce_list, accuracy_list, precision_list, recall_list, f1_list, epoch, save_path='../temp/training_progress.png'):
    clear_output(wait=True)
    plt.figure(figsize=(18, 18))
    
    # График функции потерь CrossEntropy
    plt.subplot(3, 2, 1)
    plt.plot(loss_ce_list['train'], label='Train CrossEntropy Loss')
    plt.plot(loss_ce_list['val'], label='Validation CrossEntropy Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('CrossEntropy Loss')
    plt.legend()
    
    # График точности
    plt.subplot(3, 2, 2)
    plt.plot(accuracy_list['train'], label='Train Accuracy')
    plt.plot(accuracy_list['val'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Accuracy')
    plt.legend()

    # График Precision
    plt.subplot(3, 2, 3)
    plt.plot(precision_list['train'], label='Train Precision')
    plt.plot(precision_list['val'], label='Validation Precision')
    plt.xlabel('Epoch')
    plt.ylabel('Precision')
    plt.title('Precision')
    plt.legend()

    # График Recall
    plt.subplot(3, 2, 4)
    plt.plot(recall_list['train'], label='Train Recall')
    plt.plot(recall_list['val'], label='Validation Recall')
    plt.xlabel('Epoch')
    plt.ylabel('Recall')
    plt.title('Recall')
    plt.legend()

    # График F1-score
    plt.subplot(3, 2, 5)
    plt.plot(f1_list['train'], label='Train F1-score')
    plt.plot(f1_list['val'], label='Validation F1-score')
    plt.xlabel('Epoch')
    plt.ylabel('F1-score')
    plt.title('F1-score')
    plt.legend()

    
    # Номер теста
    plt.figtext(0.5, 0.05, f'Test No: {get_test_number(log_file_path)}', wrap=True, horizontalalignment='center', fontsize=12)

    # Добавление информации о модели
    plt.suptitle(f'Loss Function: CE Optimizer: SGD, LR: {learning_rate}, Epochs: {num_epochs}, Batch Size: {batch_size}')
    
    # Информация о слоях модели
    model_info = (
        f"Model Architecture:\n"
        f"Input Layer: {input_size} neurons\n"
        f"Hidden Layer 1: {hidden_size1} neurons with Batch Normalization and ReLU activation\n"
        f"Hidden Layer 2: {hidden_size2} neurons with Batch Normalization and ReLU activation\n"
        f"Output Layer: {num_classes} neurons"
    )
    
    plt.figtext(0.5, -0.1, model_info, wrap=True, horizontalalignment='center', fontsize=12)
    
    # Сохранение графиков в файл
    plt.savefig(save_path, bbox_inches='tight')
    plt.show()

In [10]:
# Функция для получения номера теста
def get_test_number(log_file_path):
    with open(log_file_path, 'r') as log_file:
        lines = log_file.readlines()
        return len(lines) - 1  # Исключаем заголовок

# Функция для сохранения модели
def save_model(model, test_number):
    model_path = f'model_{test_number}.ckpt'
    torch.save(model.state_dict(), model_path)
    return model_path

In [11]:
total_step = len(train_loader)
for epoch in range(num_epochs):
    model.train()
    running_loss_ce = 0.0

    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        
        outputs = model(images)
        
        loss_ce = criterion_ce(outputs, labels)

        optimizer.zero_grad()
        loss_ce.backward()
        optimizer.step()
        
        running_loss_ce += loss_ce.item()
        
        if (i+1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{total_step}], Loss CE: {loss_ce.item():.4f}')
    
    # Тестирование модели
    model.eval()
    
    # Метрики для обучающего набора данных
    train_accuracy, train_precision, train_recall, train_f1 = compute_metrics(train_loader, model)
    # Метрики для валидационного набора данных
    val_accuracy, val_precision, val_recall, val_f1 = compute_metrics(test_loader, model)
    
    avg_loss_ce = running_loss_ce / total_step
    
    loss_ce_list['train'].append(avg_loss_ce)
    accuracy_list['train'].append(train_accuracy)
    precision_list['train'].append(train_precision)
    recall_list['train'].append(train_recall)
    f1_list['train'].append(train_f1)

    loss_ce_list['val'].append(avg_loss_ce)
    accuracy_list['val'].append(val_accuracy)
    precision_list['val'].append(val_precision)
    recall_list['val'].append(val_recall)
    f1_list['val'].append(val_f1)
    
    # Обновление и сохранение графиков (функция update_plots должна быть определена ранее)append
    update_plots(loss_ce_list, accuracy_list, precision_list, recall_list, f1_list, epoch)

    model.train()

KeyboardInterrupt: 

In [None]:
# Запись результатов тестов в файл логирования
with open(log_file_path, 'a') as log_file:
    log_file.write(f"{get_test_number(log_file_path)}\t{val_accuracy:.4f}\t{val_precision:.4f}\t{val_recall:.4f}\t{val_f1:.4f}\t\n")

In [None]:
# Сохранение финальных графиков
update_plots(loss_ce_list, accuracy_list, precision_list, recall_list, f1_list, num_epochs-1, f'results/test_{get_test_number(log_file_path)}.png')

In [None]:
# Функция для сохранения модели
def save_model(model, test_number):
    model_path = f'models/model_{test_number}.ckpt'
    torch.save(model.state_dict(), model_path)
    return model_path

if save_model:
    test_number = get_test_number(log_file_path)
    model_path = save_model(model, test_number)
    print(f"Model saved as {model_path}")