In [1]:
!pip install transformers huggingface_hub sentencepiece



In [2]:
from huggingface_hub import hf_hub_download
import pandas as pd
from tqdm import tqdm
import numpy as np
import torch.optim as optim

REPO_ID = "MonoHime/ru_sentiment_dataset"
FILENAME = "datasets.csv"

dataset = pd.read_csv(
    hf_hub_download(repo_id=REPO_ID, filename=FILENAME, repo_type="dataset"), index_col=0
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [3]:
file_ = 'texts.txt'
with open(file_, 'a') as f:
  for text in dataset.text:
    f.write(text)

In [4]:
from sklearn.model_selection import train_test_split
train, test = train_test_split(dataset, test_size=0.2, random_state=42)

In [5]:
from sentencepiece import SentencePieceTrainer, SentencePieceProcessor

In [6]:
import os
import torch
from typing import Union, List, Tuple
from sentencepiece import SentencePieceTrainer, SentencePieceProcessor
from torch.utils.data import Dataset, DataLoader
import random
from sklearn.model_selection import train_test_split


class TextDataset(Dataset):
    TRAIN_VAL_RANDOM_SEED = 42
    VAL_RATIO = 0.05

    def __init__(self, data_file: str, dataset, train: bool = True, sp_model_prefix: str = None,
                 vocab_size: int = 5000, normalization_rule_name: str = 'nmt_nfkc_cf',
                 model_type: str = 'bpe', max_length: int = 256):

        if not os.path.isfile(sp_model_prefix + '.model'):
            SentencePieceTrainer.train(
                input=data_file, vocab_size=vocab_size,
                model_type=model_type, model_prefix=sp_model_prefix,
                normalization_rule_name=normalization_rule_name,
                unk_id=0, bos_id=1, eos_id=2, pad_id=3
            )
        self.sp_model = SentencePieceProcessor(model_file=sp_model_prefix + '.model')

        with open(data_file) as file:
            texts = file.readlines()

        random.seed(self.TRAIN_VAL_RANDOM_SEED)
        random.shuffle(texts)
        df_train, df_val = train_test_split(dataset, test_size=self.VAL_RATIO, random_state=self.TRAIN_VAL_RANDOM_SEED)

        self.df = df_train if train else df_val
        self.indices = self.sp_model.encode(self.df.text.tolist())

        self.pad_id, self.unk_id, self.bos_id, self.eos_id = \
            self.sp_model.pad_id(), self.sp_model.unk_id(), \
            self.sp_model.bos_id(), self.sp_model.eos_id()
        self.max_length = max_length
        self.vocab_size = self.sp_model.vocab_size()

    def text2ids(self, texts: Union[str, List[str]]) -> Union[List[int], List[List[int]]]:
        return self.sp_model.encode(texts)

    def ids2text(self, ids: Union[torch.Tensor, List[int], List[List[int]]]) -> Union[str, List[str]]:

        return self.sp_model.decode(ids)

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, item: int) -> Tuple[torch.Tensor, int]:
        indices = torch.tensor([self.bos_id, *self.indices[item], self.eos_id])
        if len(indices) < self.max_length:
            padding = torch.full((self.max_length - len(indices),), self.pad_id)
            indices = torch.cat((indices, padding))
        else:
            indices = indices[:self.max_length-1]
            indices = torch.cat((indices, torch.tensor([self.eos_id])))
        return indices, self.df.sentiment.iloc[item]


In [7]:
VOCAB_SIZE = 5000
MAX_LENGTH = 256

train_set = TextDataset(data_file='texts.txt', dataset=dataset, vocab_size=VOCAB_SIZE, train=True, sp_model_prefix='bpe', max_length=MAX_LENGTH)
valid_set = TextDataset(data_file='texts.txt', dataset=dataset, vocab_size=VOCAB_SIZE, train=False, sp_model_prefix='bpe', max_length=MAX_LENGTH)

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [9]:
from torch import nn

class TextClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(TextClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(hidden_dim, 3)

    def forward(self, x):
        embedded = self.embedding(x)
        output, _ = self.rnn(embedded)
        output = self.dropout(output)
        last_hidden = output[:, -1, :]
        logits = self.fc(last_hidden)
        return logits

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [11]:
def train(model, criterion, optimizer, train_loader, val_loader, epoch, scheduler=None):
  for epoch in range(num_epochs):
      model.train()
      total_loss = 0.0
      total_samples = 0
      train_acc = []


      for inputs, targets in tqdm(train_loader):
          inputs = inputs.to(device)
          targets = targets.to(device)

          optimizer.zero_grad()
          outputs = model(inputs)
          loss = criterion(outputs.view(-1, 3), targets.view(-1))
          loss.backward()
          optimizer.step()

          total_loss += loss.item() * len(inputs)
          acc = (targets.view(-1) == outputs.view(-1, 3).argmax(-1)).sum() / len(inputs)
          train_acc.append(acc.cpu())
          total_samples += len(inputs)
          if scheduler is not None:
            scheduler.step()

      model.eval()
      total_val_loss = 0.0
      total_val_samples = 0
      val_acc = []
      with torch.no_grad():
          for inputs, targets in valid_loader:
              inputs = inputs.to(device)
              targets = targets.to(device)
              outputs = model(inputs)
              val_loss = criterion(outputs.view(-1, 3), targets.view(-1))

              total_val_loss += val_loss.item() * len(inputs)
              acc = (targets.view(-1) == outputs.view(-1, 3).argmax(-1)).sum() / len(inputs)
              val_acc.append(acc.cpu())
              total_val_samples += len(inputs)

      avg_loss = total_loss / total_samples
      train_acc = np.mean(train_acc)
      val_acc = np.mean(val_acc)
      avg_val_loss = total_val_loss / total_val_samples

      print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")
  return model

In [12]:
vocab_size = train_set.vocab_size
embedding_dim = 256
hidden_dim = 128
batch_size = 32
num_epochs = 5
learning_rate = 0.001

model = TextClassifier(vocab_size, embedding_dim, hidden_dim).to(device)

criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=4, eta_min=0)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, pin_memory=True)
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=False, pin_memory=True)

model = train(model, criterion, optimizer, train_loader, valid_loader, num_epochs, scheduler)

100%|██████████| 6264/6264 [00:52<00:00, 118.25it/s]


Epoch 1/5, Train Loss: 1.0009, Val Loss: 0.7802, Train Acc: 0.4915, Val Acc: 0.6109


100%|██████████| 6264/6264 [00:52<00:00, 118.46it/s]


Epoch 2/5, Train Loss: 0.6644, Val Loss: 0.5819, Train Acc: 0.6783, Val Acc: 0.7200


100%|██████████| 6264/6264 [00:54<00:00, 114.18it/s]


Epoch 3/5, Train Loss: 0.5516, Val Loss: 0.5295, Train Acc: 0.7432, Val Acc: 0.7581


100%|██████████| 6264/6264 [00:53<00:00, 117.59it/s]


Epoch 4/5, Train Loss: 0.4995, Val Loss: 0.5144, Train Acc: 0.7715, Val Acc: 0.7656


100%|██████████| 6264/6264 [00:53<00:00, 116.22it/s]


Epoch 5/5, Train Loss: 0.4602, Val Loss: 0.5221, Train Acc: 0.7936, Val Acc: 0.7679


In [13]:
torch.save(model.state_dict(), 'model.pth')

In [14]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import pipeline

tokenizer = AutoTokenizer.from_pretrained("yqelz/xml-roberta-large-ner-russian")
ner_model = AutoModelForTokenClassification.from_pretrained("yqelz/xml-roberta-large-ner-russian")

In [15]:
ner_pipeline = pipeline("ner", model=ner_model, tokenizer=tokenizer)
sp_model = SentencePieceProcessor(model_file='bpe.model')
model = TextClassifier(vocab_size, embedding_dim, hidden_dim)
model.load_state_dict(torch.load('model.pth'), strict=False)

In [20]:
def split_by_title(text, ner_pipeline):
  sentences = text.split('.')
  result = []
  temp_text = ''
  for i in range(len(sentences)):
    text = sentences.pop(0)
    if ner_pipeline(text):
      if not result:
        result.append(temp_text)
      else:
        result[-1] = result[-1] + temp_text
      temp_text = text
    else:
      temp_text = '.'.join([temp_text, text])
  else:
    result.append(temp_text)
  return result


def get_sentiments(classification_model, classification_tokenizer, ner_pipeline, text, device):
  mapping = {
      0: 'neutral',
      1: 'positive',
      2: 'negative'
  }
  result = []
  classification_model = classification_model.to(device)
  texts = split_by_title(text, ner_pipeline)
  model.eval()
  for text in texts:
    tokens = torch.tensor(classification_tokenizer.encode(text)).unsqueeze(0).to(device)
    logits = model(tokens).cpu()
    ner_res = ner_pipeline(text)
    if ner_res:
      movie_title = ''
      end = 0
      for entity in ner_res:
        if entity.get('entity') == 'B-ORG':
          movie_title += entity.get('word', '').replace('▁', ' ')
          end = entity.get('end')
        if (entity.get('entity') == 'I-ORG'):
          movie_title += entity.get('word', '').replace('▁', ' ')
          end = entity.get('end')
    result.append({
          'title': movie_title.lstrip(),
          'setiment': mapping[logits.argmax(-1).numpy()[0]]
          })
  return result

In [21]:
text = 'Во все тяжкие это лучший сериал из всех что я смотрел. А вот Игра престолов оказалась абсолютным бредом'
result = get_sentiments(model, sp_model, ner_pipeline, text, device)
result

[{'title': 'Во все тяжкие', 'setiment': 'positive'},
 {'title': 'Игра престолов', 'setiment': 'negative'}]