In [1]:
import transformers
from transformers import BertModel, AutoTokenizer, BertTokenizer, PreTrainedTokenizerFast, AdamW, get_linear_schedule_with_warmup, AutoModelForSequenceClassification
import torch.nn.functional as F

import torch
import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from collections import defaultdict
from textwrap import wrap
from torch import nn, optim
from torch.nn.utils import clip_grad_norm_
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split


RANDOM_SEED = 1
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [3]:
df = pd.read_csv("../input/app-reviews/reviews.csv")

In [4]:
def to_sentiment(rating):
    rating = int(rating)
    if rating <= 2:
        return 0
    elif rating == 3:
        return 1
    else: 
        return 2

df['sentiment'] = df.score.apply(to_sentiment)
class_names = ['negative', 'neutral', 'positive']

In [5]:
df

In [6]:
ax = sns.countplot(df.sentiment)
plt.xlabel('review sentiment')
ax.set_xticklabels(class_names);

In [7]:
df_train, df_test = train_test_split(df, test_size=0.1, random_state=RANDOM_SEED)
df_val, df_test = train_test_split(df_test, test_size=0.5, random_state=RANDOM_SEED)

Я пробовал другие модели, но засоряется видеопамять сильно на большое количество экспериментов в одной тетрадке

In [8]:
from transformers import AutoTokenizer, AutoModel
model_name = 'j-hartmann/emotion-english-distilroberta-base'
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [9]:
train_labels, val_labels, test_labels = list(df_train.sentiment), list(df_val.sentiment), list(df_test.sentiment)

In [10]:
train_texts, val_texts, test_texts = list(df_train.content), list(df_val.content), list(df_test.content)

In [11]:
train_encodings = tokenizer(train_texts, truncation=True, padding=True, return_token_type_ids=False, max_length=512, return_attention_mask = True)
val_encodings = tokenizer(val_texts, truncation=True, padding=True, return_token_type_ids=False, max_length=512, return_attention_mask = True)
test_encodings = tokenizer(test_texts, truncation=True, padding=True, return_token_type_ids=False, max_length=512, return_attention_mask = True)

In [12]:
class GPReviewDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = GPReviewDataset(train_encodings, train_labels)
val_dataset = GPReviewDataset(val_encodings, val_labels)
test_dataset = GPReviewDataset(test_encodings, test_labels)

In [13]:
batch_ex = next(iter(test_dataset))

### Вроде тут все стандартно, кроме того, что функция потерь считается внутри форварда для трейнера. Взято из примера huggingface на гитхабе

In [None]:
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

class SentimentClassifier(nn.Module):

    def __init__(self, n_classes):
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size, n_classes)
        self.n_classes = n_classes
  
    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None
    ):
        
        last_hidden_state, pooled_output = self.bert(
          input_ids=input_ids,
          attention_mask=attention_mask,
          return_dict=False
        )
        
        output = self.drop(pooled_output)
        logits = self.out(output)
        loss = None
        
        if labels is not None:
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.n_classes), labels.view(-1).to(device))        
        output = (logits,)
        
        return ((loss,) + output) if loss is not None else output 

In [None]:
model = SentimentClassifier(len(class_names))
model = model.to(device)

### Поставил weighed т.к. у нас три класса

In [14]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

### Я тестировал на маленьком берте, а потом взял побольше и для сентимент. Так что 3 эпохи для хорошего размера должно хватить. Report_tо для запуска в кеггле

In [15]:
from transformers import Trainer, TrainingArguments

training_args = TrainingArguments(
    output_dir='./results',          # output directory
    num_train_epochs=3,              # total number of training epochs
    per_device_train_batch_size=16,  # batch size per device during training
    per_device_eval_batch_size=16,   # batch size for evaluation
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
    logging_steps=10,
    report_to='tensorboard'
)

In [None]:
trainer = Trainer(
    model=model,                         # the instantiated 🤗 Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=val_dataset,            # evaluation dataset
    compute_metrics = compute_metrics
    # metrics to evaluate
)

trainer.train()

In [None]:
trainer.evaluate()

In [None]:
trainer.evaluate(eval_dataset=test_dataset, metric_key_prefix="test")

In [None]:
del model
torch.cuda.empty_cache()

# Используем cls без линейного слоя

In [None]:
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

class SentimentClassifierWithCLS(nn.Module):

    def __init__(self, n_classes):
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size*2, n_classes)
        self.n_classes = n_classes
  
    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None
    ):
        
        last_hidden_state, pooled_output = self.bert(
          input_ids=input_ids,
          attention_mask=attention_mask,
          return_dict=False
        )
        # первый токен
        cls_token = last_hidden_state[:,0,:]
        output = self.drop(pooled_output)
        concated = torch.cat((cls_token, output), 1)
        logits = self.out(concated)
        loss = None
        
        if labels is not None:
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.n_classes), labels.view(-1).to(device))        
        output = (logits,)
        
        return ((loss,) + output) if loss is not None else output 

In [None]:
model2 = SentimentClassifierWithCLS(3)

In [None]:
trainer = Trainer(
    model=model2,                         # the instantiated 🤗 Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=val_dataset,            # evaluation dataset
    compute_metrics = compute_metrics
    # metrics to evaluate
)

trainer.train()

In [None]:
trainer.evaluate()

In [None]:
trainer.evaluate(eval_dataset=test_dataset, metric_key_prefix="test")
del model2
torch.cuda.empty_cache()

# Готовая голова

In [None]:
model3 = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=3, ignore_mismatched_sizes=True)

In [None]:
trainer = Trainer(
    model=model3,                         # the instantiated 🤗 Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=val_dataset,            # evaluation dataset
    compute_metrics = compute_metrics
    # metrics to evaluate
)

trainer.train()

In [None]:
trainer.evaluate()

In [None]:
trainer.evaluate(eval_dataset=test_dataset, metric_key_prefix="test")
del model3
torch.cuda.empty_cache()

# CLS на нескольких слоях

### Я соединю вектора и передам получившуюся длину в линейный слой

In [16]:
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

class SentimentClassifierWithMultipleCLS(nn.Module):

    def __init__(self, n_classes):
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size*self.bert.config.num_hidden_layers, n_classes)
        self.n_classes = n_classes
  
    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None
    ):
        
        last_hidden_state, pooled_output, hidden_states = self.bert(
          input_ids=input_ids,
          attention_mask=attention_mask,
          return_dict=False,
          output_hidden_states=True
        )

        concated = None
        for state in hidden_states[1:]:
            cls_token = state[:,0,:]
            if concated is not None:
                concated = torch.cat((cls_token, concated), 1)
            else:
                concated = cls_token
        logits = self.out(concated)
        loss = None
        
        if labels is not None:
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.n_classes), labels.view(-1).to(device))        
        output = (logits,)
        
        return ((loss,) + output) if loss is not None else output 

In [17]:
model4 = SentimentClassifierWithMultipleCLS(3)

In [18]:
from transformers import Trainer, TrainingArguments

training_args = TrainingArguments(
    output_dir='./results',          # output directory
    num_train_epochs=3,              # total number of training epochs
    per_device_train_batch_size=8,  # batch size per device during training
    per_device_eval_batch_size=8,   # batch size for evaluation
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
    logging_steps=10,
    report_to='tensorboard'
)

In [20]:
trainer = Trainer(
    model=model4,                         # the instantiated 🤗 Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=val_dataset,            # evaluation dataset
    compute_metrics = compute_metrics
    # metrics to evaluate
)

trainer.train()

In [None]:
trainer.evaluate()

In [None]:
trainer.evaluate(eval_dataset=test_dataset, metric_key_prefix="test")
del model4
torch.cuda.empty_cache()

### Выше я конкатенировал векторы, теперь попробую взять среднее по всем

In [None]:
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

class SentimentClassifierWithMultipleCLS2(nn.Module):

    def __init__(self, n_classes):
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size, n_classes)
        self.n_classes = n_classes
  
    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None
    ):
        
        last_hidden_state, pooled_output, hidden_states = self.bert(
          input_ids=input_ids,
          attention_mask=attention_mask,
          return_dict=False,
          output_hidden_states=True
        )

        all_hidden_states = torch.stack(hidden_states)
        cls_embs = torch.mean(all_hidden_states[:, :, 0], 0)
        
        logits = self.out(cls_embs)
        loss = None
        
        if labels is not None:
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.n_classes), labels.view(-1).to(device))        
        output = (logits,)
        
        return ((loss,) + output) if loss is not None else output 

In [None]:
model5 = SentimentClassifierWithMultipleCLS2(3)

In [None]:
trainer = Trainer(
    model=model5,                         # the instantiated 🤗 Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=val_dataset,            # evaluation dataset
    compute_metrics = compute_metrics
    # metrics to evaluate
)

trainer.train()

In [None]:
trainer.evaluate()

In [None]:
trainer.evaluate(eval_dataset=test_dataset, metric_key_prefix="test")
del model5
torch.cuda.empty_cache()

# Тест на отзывах

In [21]:
twostar = """Why react isn't registering fast, I liked something and then scroll down, After sometime I saw the post and my reaction wasn't there. Not one time, this happens everytime. Don't blame the internet, it's pretty fast here.Fix the problem , otherwise I'm gonna decrease my rating even more."""

In [22]:
fivestar = """Hi I really love this app cause it's light on storage, less data consumption but you said it work in any network condition ahh. No it's not working on any network condition even in 4G network still slow. And this issue I encounter is so annoying . When I'm watching videos and when I'm at the best moment or best scene of the video it kicks me out of the app and automatically back to start where I open the app and I can't find the video so please fix this issue 😊"""

In [23]:
threestar = """Hi, good evening. I have a challenge. I've been trying to update my profile, and upload my stories but it's not working. And whenever i tried to do so, it automatically exits. Please help me out. Thanks, though your app is amazing, but i think there should be more features like . Been able to change my color, just the way I can switch to night mode."""

In [24]:
from torch.nn.functional import softmax

def get_sent(sentence, model):
    encoding = tokenizer.encode_plus(
      sentence,
      max_length=512,
      add_special_tokens=True, 
      return_token_type_ids=True,
      padding='max_length',
      return_attention_mask=True,
      return_tensors='pt',  
      truncation=True
    ).to(device)
    
    return softmax(model(input_ids=encoding['input_ids'], attention_mask=encoding['attention_mask'])[0], dim=1)

In [25]:
get_sent(twostar, model4)

In [26]:
get_sent(threestar, model4)

In [27]:
get_sent(fivestar, model4)

Резы конечно не очень, слишком сильный занос в средний класс. Посмотрю еще пример

In [28]:
fivestar_2 = """Great experience when one uses the face life app. It's the best of the season. We encourage everyone to share it with friends and family as well as with the different social media groups. Its good for business, social activities, relationships, friends search, research, marketing, learning new ideas. This is a global market for the season. It's free for people with open minded ideas about life ideas. It's an opportunity for jobs and creativity. It's a meeting place for tourists and visiting"""

In [38]:
get_sent(fivestar_2, model4)

А вот это уже лучше! Проверю на простых примерах

In [41]:
get_sent('this app is so not good but not bad', model4)

In [42]:
get_sent('this app is so not good', model4)

In [44]:
get_sent('this app is so good', model4)

Все же модель научилась чему-то, это не случайность)