# Классификация текстов с использованием LSTM в PyTorch

В этом ноутбуке мы построим модель классификации текстов на основе LSTM-сетей с использованием PyTorch. 
Датасет содержит статьи на английском языке из 5 тематических классов. Цель — достичь максимально возможного качества классификации.

In [None]:
# Импорт необходимых библиотек
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import warnings
warnings.filterwarnings('ignore')

### Загрузим словари 

In [None]:
# Загрузка необходимых данных NLTK
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')
    
try:
    nltk.data.find('corpora/stopwords')
except LookupError:
    nltk.download('stopwords')

In [None]:
# Загрузка датасета
df = pd.read_csv('labs_data/lab3_5/Documents topics (Politics 0, Sport 1, Technology 2, Entertainment 3, Business 4).csv')
print(f'Размер датасета: {df.shape}')
print(df.head())

In [None]:
# Проверка пропущенных значений
print(f'Пропущенных значений в столбце Text: {df["Text"].isnull().sum()}')
print(f'Пропущенных значений в столбце Label: {df["Label"].isnull().sum()}')

# Удаление строк с пропущенными значениями (если есть)
df = df.dropna()
print(f'Размер датасета после удаления NaN: {df.shape}')

In [None]:
# Анализ распределения классов
class_counts = df['Label'].value_counts().sort_index()
print('Распределение классов:')
for i, count in enumerate(class_counts):
    print(f'Класс {i} ({get_class_name(i)}): {count} примеров')

In [None]:
# Функция для получения названия класса по метке
def get_class_name(label):
    class_names = {
        0: 'Политика',
        1: 'Спорт',
        2: 'Технологии',
        3: 'Развлечения',
        4: 'Бизнес'
    }
    return class_names.get(label, f'Неизвестный класс {label}')

In [None]:
# Построение гистограммы распределения классов
plt.figure(figsize=(10, 6))
sns.countplot(data=df, x='Label')
plt.title('Распределение классов в датасете')
plt.xlabel('Метка класса')
plt.ylabel('Количество примеров')

# Добавление числовых меток над столбцами
for i, v in enumerate(class_counts.values):
    plt.text(i, v + 10, str(v), ha='center', va='bottom')
    
# Настройка подписей оси X
plt.xticks(ticks=range(5), labels=[get_class_name(i) for i in range(5)])
plt.tight_layout()
plt.show()

# Анализ дисбаланса классов
print('\nАнализ дисбаланса классов:')
max_class = max(class_counts)
min_class = min(class_counts)
imbalance_ratio = max_class / min_class
print(f'Коэффициент дисбаланса (наибольший/наименьший): {imbalance_ratio:.2f}')

In [None]:
# Функция предобработки текста
def preprocess_text(text):
    # Приведение к нижнему регистру
    text = text.lower()
    
    # Удаление специальных символов и цифр
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    
    # Токенизация
    tokens = word_tokenize(text)
    
    # Удаление стоп-слов
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words]
    
    # Объединение токенов обратно в строку
    processed_text = ' '.join(tokens)
    
    return processed_text

In [None]:
# Предобработка текстовых данных
print('Предобработка текстовых данных...')
df['processed_text'] = df['Text'].apply(preprocess_text)

# Просмотр примеров
print('\nПримеры оригинального и обработанного текста:')
for i in range(3):
    print(f'Оригинал: {df.iloc[i]["Text"][:100]}...')
    print(f'Обработано: {df.iloc[i]["processed_text"][:100]}...')
    print()

In [None]:
# Определение максимальной длины последовательности
text_lengths = [len(text.split()) for text in df['processed_text']]
max_length = int(np.percentile(text_lengths, 95))  # Используем 95-й перцентиль
print(f'Максимальная длина последовательности (95-й перцентиль): {max_length}')

# Гистограмма распределения длин текстов
plt.figure(figsize=(10, 6))
plt.hist(text_lengths, bins=50, edgecolor='black')
plt.axvline(max_length, color='red', linestyle='--', label=f'Макс. длина: {max_length}')
plt.title('Распределение длин текстов (в словах)')
plt.xlabel('Количество слов')
plt.ylabel('Частота')
plt.legend()
plt.show()

In [None]:
# Инициализация токенизатора и обучение на текстах
tokenizer = Tokenizer()
tokenizer.fit_on_texts(df['processed_text'])

# Размер словаря
vocab_size = len(tokenizer.word_index) + 1
print(f'Размер словаря: {vocab_size}')

In [None]:
# Преобразование текстов в последовательности и паддинг
sequences = tokenizer.texts_to_sequences(df['processed_text'])
X = pad_sequences(sequences, maxlen=max_length, padding='post', truncating='post')
y = df['Label'].values

print(f'Размер входных данных: {X.shape}')
print(f'Размер меток: {y.shape}')

In [None]:
# Разделение данных на обучающую, валидационную и тестовую выборки
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp)

print(f'Обучающая выборка: {X_train.shape[0]} примеров')
print(f'Валидационная выборка: {X_val.shape[0]} примеров')
print(f'Тестовая выборка: {X_test.shape[0]} примеров')

In [None]:
# Класс датасета для PyTorch
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = torch.tensor(texts, dtype=torch.long)
        self.labels = torch.tensor(labels, dtype=torch.long)
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

In [None]:
# Создание загрузчиков данных
batch_size = 32
train_dataset = TextDataset(X_train, y_train)
val_dataset = TextDataset(X_val, y_val)
test_dataset = TextDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Определение модели LSTM
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes, num_layers=2, dropout=0.3):
        super(LSTMClassifier, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(
            embedding_dim, 
            hidden_dim, 
            num_layers=num_layers, 
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True  # Используем двунаправленную LSTM
        )
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)  # *2 из-за двунаправленности
        
    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, (hidden, _) = self.lstm(embedded)
        
        # Используем последние скрытые состояния
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        
        output = self.dropout(hidden)
        output = self.fc(output)
        
        return output

In [None]:
# Инициализация модели
embedding_dim = 100
hidden_dim = 128
num_classes = 5
num_layers = 2
dropout = 0.3

model = LSTMClassifier(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_dim=hidden_dim,
    num_classes=num_classes,
    num_layers=num_layers,
    dropout=dropout
)

# Перемещение модели на GPU (если доступен)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

print(model)
print(f'Количество параметров модели: {sum(p.numel() for p in model.parameters()):,}')

In [None]:
# Определение функции потерь и оптимизатора
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5)

In [None]:
# Функция обучения модели
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=20):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    
    best_val_loss = float('inf')
    best_model_state = None
    patience_counter = 0
    patience = 5
    
    for epoch in range(num_epochs):
        # Этап обучения
        model.train()
        running_loss = 0.0
        correct_train = 0
        total_train = 0
        
        for texts, labels in train_loader:
            texts, labels = texts.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
        
        epoch_train_loss = running_loss / len(train_loader)
        epoch_train_acc = 100 * correct_train / total_train
        
        # Этап валидации
        model.eval()
        val_running_loss = 0.0
        correct_val = 0
        total_val = 0
        
        with torch.no_grad():
            for texts, labels in val_loader:
                texts, labels = texts.to(device), labels.to(device)
                outputs = model(texts)
                loss = criterion(outputs, labels)
                
                val_running_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()
        
        epoch_val_loss = val_running_loss / len(val_loader)
        epoch_val_acc = 100 * correct_val / total_val
        
        train_losses.append(epoch_train_loss)
        val_losses.append(epoch_val_loss)
        train_accuracies.append(epoch_train_acc)
        val_accuracies.append(epoch_val_acc)
        
        # Обновление скорости обучения
        scheduler.step(epoch_val_loss)
        
        print(f'Эпоха [{epoch+1}/{num_epochs}]')
        print(f'  Потери (обучение): {epoch_train_loss:.4f}, Точность (обучение): {epoch_train_acc:.2f}%')
        print(f'  Потери (валидация): {epoch_val_loss:.4f}, Точность (валидация): {epoch_val_acc:.2f}%')
        
        # Ранняя остановка
        if epoch_val_loss < best_val_loss:
            best_val_loss = epoch_val_loss
            best_model_state = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f'Ранняя остановка на эпохе {epoch+1}')
                break
    
    # Загрузка лучшей модели
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    
    return train_losses, val_losses, train_accuracies, val_accuracies

In [None]:
# Обучение модели
print('Начало обучения...')
train_losses, val_losses, train_accuracies, val_accuracies = train_model(
    model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=30
)

In [None]:
# Визуализация процесса обучения
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# График потерь
ax1.plot(train_losses, label='Обучение', marker='o')
ax1.plot(val_losses, label='Валидация', marker='s')
ax1.set_title('Динамика функции потерь')
ax1.set_xlabel('Эпоха')
ax1.set_ylabel('Потери')
ax1.legend()
ax1.grid(True)

# График точности
ax2.plot(train_accuracies, label='Обучение', marker='o')
ax2.plot(val_accuracies, label='Валидация', marker='s')
ax2.set_title('Динамика точности')
ax2.set_xlabel('Эпоха')
ax2.set_ylabel('Точность (%)')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Функция оценки модели
def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for texts, labels in test_loader:
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            _, predicted = torch.max(outputs, 1)
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    return np.array(all_preds), np.array(all_labels)

In [None]:
# Оценка на тестовой выборке
test_preds, test_labels = evaluate_model(model, test_loader)

# Точность
test_accuracy = accuracy_score(test_labels, test_preds)
print(f'Точность на тестовой выборке: {test_accuracy:.4f}')

# Отчёт по классификации
class_names = [get_class_name(i) for i in range(5)]
print('\nОтчёт по классификации:')
print(classification_report(test_labels, test_preds, target_names=class_names))

In [None]:
# Матрица ошибок
cm = confusion_matrix(test_labels, test_preds)

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=class_names, yticklabels=class_names)
plt.title('Матрица ошибок')
plt.xlabel('Предсказанный класс')
plt.ylabel('Истинный класс')
plt.show()

In [None]:
# Функция предсказания для нового текста
def predict_text(text, model, tokenizer, max_length):
    # Предобработка
    processed_text = preprocess_text(text)
    
    # Преобразование в последовательность
    sequence = tokenizer.texts_to_sequences([processed_text])
    padded_sequence = pad_sequences(sequence, maxlen=max_length, padding='post', truncating='post')
    
    # Тензор
    tensor = torch.tensor(padded_sequence, dtype=torch.long).to(device)
    
    # Предсказание
    model.eval()
    with torch.no_grad():
        output = model(tensor)
        probabilities = torch.softmax(output, dim=1)
        predicted_class = torch.argmax(output, dim=1).item()
        confidence = probabilities[0][predicted_class].item()
    
    return predicted_class, confidence

In [None]:
# Примеры предсказаний
sample_texts = [
    "Правительство объявило о новых мерах в международной торговле.",
    "Футбольная команда выиграла чемпионат после напряжённого матча.",
    "Учёные разработали новый алгоритм для приложений искусственного интеллекта.",
    "Актёр получил награду за выдающуюся игру в фильме.",
    "Компания сообщила о высокой прибыли и расширила долю на рынке."
]

print('Примеры предсказаний:')
for text in sample_texts:
    pred_class, confidence = predict_text(text, model, tokenizer, max_length)
    print(f'Текст: "{text[:50]}..."')
    print(f'Предсказанный класс: {get_class_name(pred_class)} (уверенность: {confidence:.3f})')
    print()

In [None]:
# Функция подбора гиперпараметров (опционально)
def hyperparameter_tuning():
    # Комбинации гиперпараметров
    hyperparams = [
        {'embedding_dim': 100, 'hidden_dim': 64, 'num_layers': 1, 'dropout': 0.2},
        {'embedding_dim': 100, 'hidden_dim': 128, 'num_layers': 2, 'dropout': 0.3},
        {'embedding_dim': 150, 'hidden_dim': 128, 'num_layers': 2, 'dropout': 0.3},
        {'embedding_dim': 100, 'hidden_dim': 128, 'num_layers': 1, 'dropout': 0.2},
        {'embedding_dim': 200, 'hidden_dim': 256, 'num_layers': 2, 'dropout': 0.4},
    ]
    
    results = []
    
    for i, params in enumerate(hyperparams):
        print(f'\nТестируется комбинация гиперпараметров {i+1}: {params}')
        
        model = LSTMClassifier(
            vocab_size=vocab_size,
            embedding_dim=params['embedding_dim'],
            hidden_dim=params['hidden_dim'],
            num_classes=num_classes,
            num_layers=params['num_layers'],
            dropout=params['dropout']
        ).to(device)
        
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5)
        
        train_losses, val_losses, train_accuracies, val_accuracies = train_model(
            model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=15
        )
        
        val_preds, val_labels = evaluate_model(model, val_loader)
        val_accuracy = accuracy_score(val_labels, val_preds)
        
        results.append({
            'params': params,
            'val_accuracy': val_accuracy,
            'final_val_loss': val_losses[-1] if val_losses else float('inf')
        })
        
        print(f'Результат — точность на валидации: {val_accuracy:.4f}, потери: {val_losses[-1]:.4f}')
    
    best_result = max(results, key=lambda x: x['val_accuracy'])
    print(f'\nЛучшие гиперпараметры: {best_result["params"]}')
    print(f'Лучшая точность на валидации: {best_result["val_accuracy"]:.4f}')
    
    return results, best_result

In [None]:
# Подбор гиперпараметров (раскомментируйте для запуска)
# results, best_result = hyperparameter_tuning()

In [None]:
# Итоговый отчёт
print('=== ИТОГИ ===')
print(f'Точность на тестовой выборке: {test_accuracy:.4f}')
print(f'Размер словаря: {vocab_size}')
print(f'Максимальная длина последовательности: {max_length}')
print(f'Архитектура модели: LSTM с {num_layers} слоями, {hidden_dim} скрытыми нейронами')
print(f'Размерность эмбеддингов: {embedding_dim}')
print(f'Вероятность dropout: {dropout}')

# Анализ результатов
print('\n=== АНАЛИЗ ===')
if test_accuracy > 0.8:
    print('Модель достигла высокой точности (>80%), что указывает на хорошее качество')
else:
    print('Точность модели можно улучшить — рассмотрите дальнейшую оптимизацию')

# Проверка переобучения
final_train_acc = train_accuracies[-1]
final_val_acc = val_accuracies[-1]
acc_diff = final_train_acc - final_val_acc

if acc_diff > 10:
    print('Обнаружено значительное переобучение (точность на обучении сильно выше, чем на валидации)')
elif acc_diff > 5:
    print('Наблюдается умеренное переобучение')
else:
    print('Хорошая обобщающая способность, переобучение минимально')

print(f'Разрыв между точностью на обучении и валидации: {acc_diff:.2f}%')