# Установка библиотек и загрузка данных

In [None]:
!pip install transformers

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import torch
import numpy as np
from torch import nn
from transformers import BertModel, BertTokenizer
from torch.optim import Adam
from torch.utils.data import DataLoader
from tqdm import tqdm
from sklearn.metrics import classification_report

In [None]:
train = pd.read_csv('train.tsv', sep='\t')
test = pd.read_csv('test.tsv', sep='\t')

Посмотрим на распределение классов (в нашем случае классы распределены равномерно)

In [None]:
sns.countplot(x='is_fake', data=train)
plt.show()

# Мои мысли по поводу решения

В самом начале я решил попробовать лёгкие модели: Logistic Regression, MultinomialNB, Multinomial Classifier, Passive Aggressive Classifier. Решил использовать базовые решения: CatBoost, XGBoost. В результате лучший результат показали MultinomialNB и Logistic Regression: примерно 0.85. Звучит как неплохой результат и можно пошаманить найти хорошие гиперпараметры, регуляризацию и прочее. Но это достаточно просто и немного скучно. Следующим логичным решением была нейросеть, но в поисках решений на похожие идеи, я нашёл статью [Detecting Inappropriate Messages on Sensitive Topics that
Could Harm a Company’s Reputation](https://aclanthology.org/2021.bsnlp-1.4.pdf) И я решил сделать ставку на RuBERT + Russian Sensitive Topics.

# RuBERT

In [None]:
MAX_LENGTH = train.title.apply(len).max()

Загрузим Russian Sensitive Topics

In [None]:
MODEL_NAME = 'Skoltech/russian-sensitive-topics'
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)

Создадим класс датасет (немного ООП)

In [None]:
class Dataset(torch.utils.data.Dataset):

    def __init__(self, df):
        self.labels = [label for label in df['is_fake']]
        self.texts = [tokenizer(text, padding='max_length', max_length=MAX_LENGTH + 3, truncation=True,
                                return_tensors="pt") for text in df['title']]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        return self.texts[idx]

    def __getitem__(self, idx):
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_texts, batch_y

Разделим наши данные

In [None]:
df_train, df_val, df_test = np.split(train.sample(frac=1, random_state=42),
                                     [int(.8 * len(train)), int(.9 * len(train))])

Так как у нас бинарная классификация, то функцией активации я выбираю сигмоиду

In [None]:
class BertClassifier(nn.Module):

    def __init__(self, dropout=0.2):
        super(BertClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(MODEL_NAME)
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(768, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_id, mask):
        _, pooled_output = self.bert(input_ids=input_id, attention_mask=mask, return_dict=False)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear(dropout_output)
        final_layer_sigma = self.sigmoid(linear_output)
        return final_layer_sigma

In [None]:
def train(model, train_data, val_data, lr, epochs):
    train, val = Dataset(train_data), Dataset(val_data)

    train_dataloader = torch.utils.data.DataLoader(train, batch_size=2, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=2)

    device = torch.device("cpu")

    criterion = nn.BCELoss()
    optimizer = Adam(model.parameters(), lr=lr)

    if torch.cuda.is_available():
        device = torch.device("cuda")
        model = model.cuda()
        criterion = criterion.cuda()

    for epoch_num in range(epochs):

        total_acc_train = 0
        total_loss_train = 0

        for train_input, train_label in tqdm(train_dataloader):
            train_label = train_label.to(device)
            mask = train_input['attention_mask'].to(device)
            input_id = train_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, mask)

            train_label = train_label.unsqueeze(1).float()
            batch_loss = criterion(output, train_label)
            total_loss_train += batch_loss.item()

            acc = (output.round() == train_label).sum().item()
            total_acc_train += acc

            model.zero_grad()
            batch_loss.backward()
            optimizer.step()

        total_acc_val = 0
        total_loss_val = 0

        with torch.no_grad():

            for val_input, val_label in val_dataloader:
                val_label = val_label.to(device)
                mask = val_input['attention_mask'].to(device)
                input_id = val_input['input_ids'].squeeze(1).to(device)

                output = model(input_id, mask)

                val_label = val_label.unsqueeze(1).float()
                batch_loss = criterion(output, val_label)
                total_loss_val += batch_loss.item()

                acc = (output.round() == val_label).sum().item()
                total_acc_val += acc

        print(
            f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_data): .3f} \
            | Val Loss: {total_loss_val / len(val_data): .3f}')


EPOCHS = 15
model = BertClassifier(dropout=0.2)
LR = 1e-5

Тут будут долгие 3 часа

In [None]:
train(model, df_train, df_val, LR, EPOCHS)

Функция для оценки модели на тестовой выборке

In [None]:
def evaluate(model, test_data):

    test = Dataset(test_data)
    outputs = []
    test_labels = []

    test_dataloader = torch.utils.data.DataLoader(test, batch_size=2)

    device = torch.device("cpu")

    if torch.cuda.is_available():
        device = torch.device("cuda")
        model = model.cuda()

    total_acc_test = 0
    with torch.no_grad():

        for test_input, test_label in test_dataloader:
            test_labels.append(test_label.numpy())

            test_label = test_label.to(device)
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, mask)
            out = output.round().cpu().numpy()
            outputs.append(out)
            
            test_label = test_label.unsqueeze(1).float()
            acc = (output.round() == test_label).sum().item()

            total_acc_test += acc
    
    print(f'Test Accuracy: {total_acc_test / len(test_data): .3f}')

    # assert len(outputs) == len(test_labels), 'Something is wrong'

    return test_labels, outputs

In [None]:
true_labels, predictions = evaluate(model, df_test)

In [None]:
print(classification_report(np.array(true_labels).flatten(), np.array(predictions).flatten()))

Функция для предсказаний

In [None]:
def inference(model, df_test):

    test_data = Dataset(df_test)
    test_loader = DataLoader(dataset=test_data, batch_size=1)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:
        model = model.cuda()

    outputs = []
    with torch.no_grad():

        for test_input, _ in test_loader:
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, mask)
            out = output.round().cpu().numpy()
            outputs.append(out)

    return outputs

In [None]:
test_predictions = inference(model, test)

И наконец сохраняем результаты

In [None]:
test['is_fake'] = np.array(test_predictions).flatten()
test['is_fake'] = df_test.is_fake.astype(np.int8)
df_test.to_csv('predictions.tsv', sep='\t', index=False)