In [None]:
import datasets
import torch
import numpy as np
from torch.utils.data import DataLoader
from string import punctuation
from nltk.corpus import stopwords
import pymorphy2
from gensim.models import Word2Vec
from gensim.test.utils import common_texts
from gensim.models import KeyedVectors
import matplotlib.pyplot as plt
from sklearn.preprocessing import normalize
from bokeh.io import output_notebook
import bokeh.models as bm
import bokeh.plotting as pl
import umap.umap_ as umap
from torch.nn.utils.rnn import pad_sequence
from torch import nn
import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import accuracy_score
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence




In [None]:
STOP_WORDS = set(stopwords.words('english'))
morph = pymorphy2.MorphAnalyzer(lang='uk')
model_path = "word2vec-google-news-300.model"
BATCH_SIZE = 64
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.1)
NUM_EPOCHS = 5
device = torch.device("cpu")

In [None]:
device

In [None]:
dataset = datasets.load_dataset("ag_news")
wor2vec_model = KeyedVectors.load(model_path)

Создаем класс, который делает предобработку текста, разбивку на токены, превращение предложений в эмбеддинги

In [None]:
class CustomDataset():
    def __init__(self, dataset,word2vec_model, fine_tune=False ):
        self.dataset = dataset
        self.fine_tune = fine_tune
        self.corpus = [self.clean(self.dataset[idx]['text']).split() for idx in range(len(self.dataset))]
        self.word2vec_model = word2vec_model

        if self.fine_tune:
            # Инициализация модели с параметрами исходной модели
            self.word2vec_model = Word2Vec(vector_size=wor2vec_model.vector_size, min_count=1)
            
            # Инициализация словаря новыми словами
            self.word2vec_model.build_vocab(self.corpus, update=True)
            
            # Копируем предобученные вектора в текущую модель
            self.word2vec_model.wv.vectors = np.vstack(
                [wor2vec_model[word] if word in wor2vec_model else np.random.uniform(-1, 1, wor2vec_model.vector_size)
                 for word in self.word2vec_model.wv.index_to_key]
            )
            
            # Продолжаем дообучение модели на новом корпусе
            self.word2vec_model.train(self.corpus, total_examples=len(self.corpus), epochs=10)
        

    def clean(self, text):
        # Очистка текста от пунктуации и нормализация
        text = text.lower()
        for i in punctuation:
            text = text.replace(i, '')
        text = [word for word in text.split() if word not in STOP_WORDS and not word.isdigit()]
        text = [morph.parse(word)[0].normal_form for word in text]
        return ' '.join(text)

    def embedded_text(self, text):
        # Получение эмбеддингов для текста
        embed_tensor = torch.tensor([
            self.word2vec_model[word] for word in text.split() if word in self.word2vec_model
        ])
        return embed_tensor

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        cleaned_item = self.dataset[idx]['text']
        cleaned_item = self.clean(cleaned_item)
        cleaned_item = self.embedded_text(cleaned_item)    
        return cleaned_item, self.dataset[idx]['label']

In [None]:
train_dataset = CustomDataset(dataset['train'], wor2vec_model)

Рисуем график эмбеддингов


In [None]:
# Для отображения графика внутри Jupyter Notebook
output_notebook()

# Извлечение уникальных слов из train_dataset
all_words = set()
for idx in range(len(train_dataset)):
    cleaned_text = train_dataset.clean(train_dataset.dataset[idx]['text'])
    words = cleaned_text.split()
    all_words.update(words)

# Получение эмбеддингов для уникальных слов
word_embeddings = []
filtered_words = []  # слова, которые есть в модели
for word in all_words:
    if word in train_dataset.word2vec_model:
        word_embeddings.append(train_dataset.word2vec_model[word])
        filtered_words.append(word)

# Применение UMAP для снижения размерности
embedding_2d = umap.UMAP(n_neighbors=10, min_dist=0.1, metric='cosine').fit_transform(word_embeddings)

# Функция для рисования графика с Bokeh
def draw_vectors(
    x,
    y,
    radius=10,
    alpha=0.5,
    color="blue",
    width=800,
    height=600,
    show=True,
    **kwargs,
):
    """Рисует интерактивный график с данными и отображением дополнительной информации при наведении."""
    if isinstance(color, str):
        color = [color] * len(x)
    data_source = bm.ColumnDataSource({"x": x, "y": y, "color": color, **kwargs})

    fig = pl.figure(active_scroll="wheel_zoom", width=width, height=height)
    fig.scatter("x", "y", size=radius, color="color", alpha=alpha, source=data_source)

    fig.add_tools(bm.HoverTool(tooltips=[(key, "@" + key) for key in kwargs.keys()]))
    if show:
        pl.show(fig)
    return fig

# Визуализация
draw_vectors(
    embedding_2d[:, 0], 
    embedding_2d[:, 1], 
    radius=10,
    alpha=0.6,
    color="blue",
    token=filtered_words  # добавляем слова для отображения в tooltip
)

In [None]:
def collate_fn(batch):
    text, labels = zip(*batch)
    padded_texts = pad_sequence(text, batch_first=True, padding_value=0)
    labels = torch.tensor(labels)
    lengths = torch.tensor([len(seq) for seq in text])
    return padded_texts, labels, lengths
    

In [None]:
dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,collate_fn=collate_fn)

for text, label, length in dataloader:
    print(text)
    print(label)
    print(length)
    print(text.shape)
    break
    

In [None]:
class BidirLSTM(nn.Module):
    def __init__(self):
        super(BidirLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size=300,
                            hidden_size=100,
                            num_layers=2,
                            bias=True,
                            batch_first=True,
                            
                            bidirectional=True)
        
        self.linear = nn.Linear(in_features=200, out_features=4)

    def forward(self, x, lengths):
        packed_input = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
        _, (h_n, _) = self.lstm(packed_input)
        out = torch.cat((h_n[-2], h_n[-1]), dim=1)
        out = self.linear(out)
        return out


model = BidirLSTM()



In [None]:
from tqdm import tqdm

def train(model, criterion, dataloader, optimizer, epochs):
    for epoch in range(epochs):
        print(f'Start Epoch {epoch + 1}/{epochs}')
        model.train()
        total_loss = 0

        
        for texts, labels, lengths in tqdm(dataloader, desc=f'Epoch {epoch + 1}/{epochs}', leave=False):
            
            optimizer.zero_grad()
            outputs = model(texts, lengths)

            loss = criterion(outputs, labels)
            total_loss += loss.item()

            loss.backward()
            optimizer.step()
        
        avg_loss = total_loss / len(dataloader)
        print(f'EPOCH {epoch + 1} / {epochs}, Loss: {avg_loss:.4f}')

In [None]:
train(model, criterion, dataloader, optimizer, NUM_EPOCHS)

In [None]:

test_dataset = CustomDataset(dataset['test'], wor2vec_model)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)


def evaluate(model, dataloader):
    model.eval() 
    all_labels = []
    all_preds = []
    
    with torch.no_grad():
        for texts, labels, lengths in tqdm(dataloader):
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts, lengths)
            _, predicted = torch.max(outputs, dim=1)
            
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_preds)
    print(f'Test Accuracy: {accuracy:.4f}')


evaluate(model, test_dataloader)