<a href="https://colab.research.google.com/github/ildimas/NeuralNetworksInManagment/blob/main/%D0%98%D0%BB%D1%8C%D1%8E%D1%89%D0%B5%D0%BD%D1%8F_%D0%9D%D0%A2%D0%92%D0%A3_2_%D0%97%D0%B0%D0%B4%D0%B0%D0%BD%D0%B8%D0%B5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

 # NLP

## 1. Настройка и импорты

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import gensim
from gensim.models import Word2Vec
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import re
import string

nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## 2. Загрузка и подготовка данных

In [None]:
dataset = load_dataset('ag_news')
train_data = dataset['train']
test_data = dataset['test']

train_texts, val_texts, train_labels, val_labels = train_test_split(
    train_data['text'], train_data['label'], test_size=0.15, random_state=42
)

test_texts = test_data['text']
test_labels = test_data['label']

In [None]:
def preprocess_text(text, remove_stopwords=True):
    text = text.lower()
    text = re.sub(r'\d+', '', text)
    text = text.translate(str.maketrans('', '', string.punctuation))
    tokens = word_tokenize(text)

    if remove_stopwords:
        stop_words = set(stopwords.words('english'))
        tokens = [t for t in tokens if t not in stop_words]

    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(t) for t in tokens]

    return ' '.join(tokens)

train_texts_processed = [preprocess_text(text) for text in train_texts]
val_texts_processed = [preprocess_text(text) for text in val_texts]
test_texts_processed = [preprocess_text(text) for text in test_texts]

## 3. Имплементация модели

In [None]:
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

In [None]:
class TextCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.convs = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(fs, embedding_dim))
            for fs in filter_sizes
        ])
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.embedding(text)
        embedded = embedded.unsqueeze(1)

        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]

        cat = self.dropout(torch.cat(pooled, dim=1))
        return self.fc(cat)

In [None]:
class TextLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, dropout=dropout)
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))
        output, (hidden, cell) = self.lstm(embedded)

        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]

        return self.fc(self.dropout(hidden))

In [None]:
class EncoderDecoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.encoder = nn.LSTM(input_dim, hidden_dim, n_layers, dropout=dropout, bidirectional=True)
        self.decoder = nn.LSTM(input_dim + hidden_dim * 2, hidden_dim, n_layers, dropout=dropout)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.fc.out_features

        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(device)

        encoder_outputs, (hidden, cell) = self.encoder(src)

        input = trg[0,:]

        for t in range(1, trg_len):
            output, (hidden, cell) = self.decoder(input, (hidden, cell))
            output = self.fc(output)
            outputs[t] = output

            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1

        return outputs

## 4. Тестирование и функции оценки

In [None]:
def train_model(model, train_loader, optimizer, criterion, device):
    model.train()
    total_loss = 0

    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(train_loader)

In [None]:
def evaluate_model(model, val_loader, criterion, device):
    model.eval()
    total_loss = 0
    predictions = []
    true_labels = []

    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            predictions.extend(outputs.argmax(dim=1).cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    return total_loss / len(val_loader), accuracy_score(true_labels, predictions)

In [None]:
def plot_metrics(train_losses, val_losses, train_accs, val_accs):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

    ax1.plot(train_losses, label='Train Loss')
    ax1.plot(val_losses, label='Validation Loss')
    ax1.set_title('Loss Curves')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()

    ax2.plot(train_accs, label='Train Accuracy')
    ax2.plot(val_accs, label='Validation Accuracy')
    ax2.set_title('Accuracy Curves')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy')
    ax2.legend()

    plt.tight_layout()
    plt.show()

In [None]:
def train_and_evaluate(model, train_loader, val_loader, optimizer, criterion, n_epochs, device):
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []

    for epoch in range(n_epochs):
        train_loss = train_model(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = evaluate_model(model, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)
        val_accs.append(val_acc)

        print(f'Epoch {epoch+1}/{n_epochs}:')
        print(f'Train Loss: {train_loss:.4f}')
        print(f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}')

    plot_metrics(train_losses, val_losses, train_accs, val_accs)
    return model

## 5. Главный процесс

In [None]:
def main():
    sentences = [text.split() for text in train_texts_processed]
    word2vec_model = Word2Vec(sentences, vector_size=100, window=5, min_count=5, workers=4)

    tfidf = TfidfVectorizer(max_features=10000)
    count_vec = CountVectorizer(max_features=10000)

    X_train_tfidf = tfidf.fit_transform(train_texts_processed)
    X_val_tfidf = tfidf.transform(val_texts_processed)
    X_test_tfidf = tfidf.transform(test_texts_processed)

    X_train_count = count_vec.fit_transform(train_texts_processed)
    X_val_count = count_vec.transform(val_texts_processed)
    X_test_count = count_vec.transform(test_texts_processed)

    lr_tfidf = LogisticRegression(max_iter=1000)
    lr_tfidf.fit(X_train_tfidf, train_labels)

    svm_tfidf = SVC(kernel='linear')
    svm_tfidf.fit(X_train_tfidf, train_labels)

    cnn_model = TextCNN(
        vocab_size=10000,
        embedding_dim=100,
        n_filters=100,
        filter_sizes=[3, 4, 5],
        output_dim=4,
        dropout=0.5
    ).to(device)

    lstm_model = TextLSTM(
        vocab_size=10000,
        embedding_dim=100,
        hidden_dim=256,
        output_dim=4,
        n_layers=2,
        bidirectional=True,
        dropout=0.5
    ).to(device)

    cnn_optimizer = optim.Adam(cnn_model.parameters())
    cnn_criterion = nn.CrossEntropyLoss()
    cnn_model = train_and_evaluate(cnn_model, train_loader, val_loader, cnn_optimizer, cnn_criterion, n_epochs=10, device=device)

    lstm_optimizer = optim.Adam(lstm_model.parameters())
    lstm_criterion = nn.CrossEntropyLoss()
    lstm_model = train_and_evaluate(lstm_model, train_loader, val_loader, lstm_optimizer, lstm_criterion, n_epochs=10, device=device)

    lr_pred = lr_tfidf.predict(X_test_tfidf)
    svm_pred = svm_tfidf.predict(X_test_tfidf)

    cnn_pred = evaluate_model(cnn_model, test_loader, cnn_criterion, device)[1]
    lstm_pred = evaluate_model(lstm_model, test_loader, lstm_criterion, device)[1]

    print("\nResults:")
    print(f"Logistic Regression Accuracy: {accuracy_score(test_labels, lr_pred):.4f}")
    print(f"SVM Accuracy: {accuracy_score(test_labels, svm_pred):.4f}")
    print(f"CNN Accuracy: {cnn_pred:.4f}")
    print(f"LSTM Accuracy: {lstm_pred:.4f}")

if __name__ == "__main__":
    main()