# Импорт модулей

In [1]:
!pip install datasets transformers 1>/dev/null

In [2]:
# Подавление предупреждений
import warnings
for warn in [UserWarning, FutureWarning]: warnings.filterwarnings("ignore", category = warn)

# Импорт необходимых библиотек
import os
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import polars as pl
import pandas as pd
import sklearn
import networkx as nx
import ipywidgets
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_iris
from sklearn.preprocessing import MinMaxScaler
from mpl_toolkits.mplot3d import Axes3D

from datasets import load_dataset
import torch
from torch.utils.data import DataLoader
from transformers import AutoTokenizer

In [3]:
SEED = 42
BATCH_SIZE = 32
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE

device(type='cuda')

# Подготовка данных

In [4]:
MAX_LENGTH=32

In [5]:
dataset = load_dataset('imdb')
print(dataset)

# Используем предобученный токенизатор
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

def tokenize_function(examples):
    return tokenizer(
        examples['text'],
        padding='max_length',
        truncation=True,
        max_length=MAX_LENGTH
    )

tokenized_datasets = dataset.map(tokenize_function, batched=True)

tokenized_datasets = tokenized_datasets.remove_columns(['text'])
tokenized_datasets = tokenized_datasets.rename_column('label', 'labels')
tokenized_datasets.set_format('torch')

README.md: 0.00B [00:00, ?B/s]

plain_text/train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

plain_text/test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

plain_text/unsupervised-00000-of-00001.p(…):   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    unsupervised: Dataset({
        features: ['text', 'label'],
        num_rows: 50000
    })
})


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

In [6]:
train_dataset = tokenized_datasets['train']
test_dataset = tokenized_datasets['test']

train_texts = train_dataset['input_ids']
test_texts = test_dataset['input_ids']
train_labels = train_dataset['labels']
test_labels = test_dataset['labels']

X_train_tensor = torch.stack(list(train_texts))
X_test_tensor = torch.stack(list(test_texts))
y_train_tensor = torch.stack(list(train_labels))
y_test_tensor = torch.stack(list(test_labels))

In [7]:
X_train_tensor = X_train_tensor.unsqueeze(1)
X_test_tensor = X_test_tensor.unsqueeze(1)

In [8]:
X_train_tensor = X_train_tensor.float().to(DEVICE)
X_test_tensor  = X_test_tensor.float().to(DEVICE)
y_train_tensor = y_train_tensor.long().to(DEVICE)
y_test_tensor  = y_test_tensor.long().to(DEVICE)

# Класс модели Transformer

In [9]:
class TransformerModelWithAttention(nn.Module):
    def __init__(self, input_dim, num_classes, num_heads = 8, num_layers = 6, hidden_dim = 256, dropout = 0.1):
        super(TransformerModelWithAttention, self).__init__()

        self.embedding = nn.Linear(input_dim, hidden_dim)

        # Позиционные кодировки, размерность фиксирована
        self.positional_encoding = nn.Parameter(torch.zeros(1, 100, hidden_dim)) # Длина последовательности 100 (или любая другая)

        encoder_layer = nn.TransformerEncoderLayer(d_model = hidden_dim, nhead = num_heads, dim_feedforward = hidden_dim, dropout = dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers = num_layers)

        self.fc_out = nn.Linear(hidden_dim, num_classes)
        self.attention_weights = []

    def forward(self, x):
        # Добавляем позиционные кодировки
        batch_size, seq_len, _ = x.size()
        x = self.embedding(x)
        x = x + self.positional_encoding[:, :seq_len, :]

        # Пропускаем через трансформер
        encoder_output = self.transformer_encoder(x)

        # Извлечение внимания из каждого слоя трансформера
        for layer in self.transformer_encoder.layers:
            attention_output, attention_weights = layer.self_attn(x, x, x)
            self.attention_weights.append(attention_weights)

        x = encoder_output.mean(dim = 1) # Глобальное усреднение для классификации
        return self.fc_out(x)

# Обучение модели

In [53]:
losses = []
accuracies = []

def train_with_logging(model, X_train, y_train, X_test, y_test, optimizer, criterion, epochs = 100):
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output, y_train)
        loss.backward()
        optimizer.step()

        losses.append(loss.item())
        accuracy = evaluate(model, X_test, y_test)
        accuracies.append(accuracy)

        # if (epoch + 1) % 2 == 0:
        print(f'Эпоха {epoch+1}/{epochs}, Потери: {loss.item()}')

def evaluate(model, X_test, y_test):
    model.eval()
    with torch.no_grad():
        output = model(X_test)
        _, predictions = torch.max(output, dim=1)
        accuracy = (predictions == y_test).float().mean()
    return accuracy.item()

In [57]:
import torch
import gc

# Очистка кеша CUDA
torch.cuda.empty_cache()

# Сборка мусора Python
gc.collect()

0

In [58]:
model = TransformerModelWithAttention(input_dim = MAX_LENGTH, num_classes = 2, num_layers=1, hidden_dim=16, num_heads=1).to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr = 0.001)
criterion = nn.CrossEntropyLoss()

train_with_logging(model, X_train_tensor, y_train_tensor, X_test_tensor, y_test_tensor, optimizer, criterion, epochs = 1)
# при большем числе эпох, элементов в модели вылетает CUDA out of memory

Эпоха 1/1, Потери: 0.7906590700149536


In [60]:
model.cpu()

TransformerModelWithAttention(
  (embedding): Linear(in_features=32, out_features=16, bias=True)
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=16, out_features=16, bias=True)
        )
        (linear1): Linear(in_features=16, out_features=16, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=16, out_features=16, bias=True)
        (norm1): LayerNorm((16,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((16,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (fc_out): Linear(in_features=16, out_features=2, bias=True)
)

In [None]:
def plot_attention_weights(weights, layer):
    attention_weights = weights[layer]

    # Если веса внимания имеют больше чем 2 размерности, усредняем по головам
    if len(attention_weights.shape) > 2:
        attention_weights = np.array([weight.to('cpu') for weight in model.attention_weights]).mean(dim=1).squeeze().detach().numpy() # Усреднение по головам

    # Проверяем размерность после усреднения
    if attention_weights.ndim == 1:
        attention_weights = attention_weights[None, :] # Преобразование в двумерный массив (1, seq_len)

    plt.figure(figsize = (10, 8))
    plt.imshow(attention_weights, cmap = 'viridis', aspect = 'auto')
    plt.colorbar()
    plt.title(f"Веса внимания на слое {layer + 1}")
    plt.xlabel("Длина последовательности")
    plt.ylabel("Длина последовательности")
    plt.show()

# Количество графиков (5)
num_plots = 5
layers_to_plot = np.linspace(0, len(model.attention_weights) - 1, num_plots, dtype = int)

# Визуализация выбранных слоев
for i in layers_to_plot:
    plot_attention_weights(model.attention_weights, i)

# Визуализация графиков потерь и точности
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(losses, label = 'Потери')
plt.title('Потери во время обучения')
plt.xlabel('Эпоха')
plt.ylabel('Потери')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(accuracies, label = 'Точность')
plt.title('Точность во время обучения')
plt.xlabel('Эпоха')
plt.ylabel('Точность')
plt.legend()

plt.tight_layout()
plt.show()

# Оценка модели на тестовых данных
accuracy = evaluate(model, X_test_tensor, y_test_tensor)
print(f'Финальная точность: {accuracy:.4f}')

# Выводы

В данной работе представлен пример обучения модели Transformer. Модель получилось обучить лишь на одной эпохе ввиду проблем с оперативной памятью, что не помешало демонстрации практического использования этой гениальной архитектуры