In [None]:
import pandas as pd
import numpy as np
import random
import torch
import transformers
import torch.nn as nn
from transformers import AutoModel, BertTokenizer, BertForSequenceClassification
from transformers import TrainingArguments, Trainer
from datasets import load_metric, Dataset
from sklearn.metrics import classification_report, f1_score
from pytorch_lightning import seed_everything
import os

In [None]:
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt

def load_data(path: str) -> tuple[dict, dict]:
    train, val = {}, {}
    TRESHOLD = 0.4
    max_count_train = None
    max_count_val = 5000
    df = pd.read_csv(path)
    min_max_scaler = lambda x: (x - x.min()) / (x.max() - x.min())
    df = df[df['views'] > 50000]
    df['likes_div_views'] = min_max_scaler(df['likes_div_views'])
    df['likes_div_views_trunced'] = (df['likes_div_views'] > TRESHOLD).astype(float)
    df['text'] = df['text'].astype(str)
    train['text'], val['text'], train['y_prob'], val['y_prob'], train['y_label'], val['y_label'] = \
        train_test_split(df['text'].values, df['likes_div_views'].values, df['likes_div_views_trunced'].values, test_size=0.33, random_state=70)

    for key in val.keys():
        val[key] = val[key][:max_count_val]
    for key in train.keys():
        train[key] = train[key][:max_count_train]

    print('Train labels:', len(train['text']))
    print('Val labels:', len(val['text']))
    return train, val


train, val = load_data(f'./data/df_all.csv')
plt.hist(train['y_prob'], bins=50);
plt.show()
plt.hist(val['y_prob'], bins=50);

In [None]:
from torch.utils.data import DataLoader
from torch import nn
from transformers import Trainer
from torch.nn import functional as F
from sklearn.metrics import precision_recall_curve

seed_everything(42)
from transformers import AutoTokenizer

model = BertForSequenceClassification.from_pretrained('DeepPavlov/rubert-base-cased-sentence', num_labels=1).to(device)
model.config.id2label = {"0": "Haha score"}
model.config.label2id = {"Haha score": 0}

tokenizer = AutoTokenizer.from_pretrained("DeepPavlov/rubert-base-cased-sentence")

def tokenize_batch(tokenizer, texts:list):
    dl = DataLoader(texts, batch_size=8, shuffle=False)
    temp = []
    for text in dl:
        t = tokenizer.batch_encode_plus(
            text,
            padding='max_length',
            max_length = 512,
            truncation = True,
            pad_to_max_length = True,
            return_tensors='pt'
        )
        temp.append(t)
    ans = {}
    for key in temp[0].keys():
        ans[key] = torch.cat([x[key] for x in temp])
    return ans


tokens_train = tokenize_batch(tokenizer, train['text'])
tokens_val = tokenize_batch(tokenizer, val['text'])

class DatasetT(torch.utils.data.Dataset):
    def __init__(self, encodings, target):
        self.encodings = encodings
        self.target = target
        
    def __getitem__(self, idx):
        item = {k: torch.tensor(v[idx]) for k, v in self.encodings.items()}
        item['labels'] = torch.tensor(self.target[idx])
        return item

    def __len__(self):
        return len(self.target)

train_dataset = DatasetT(tokens_train, train['y_label'])
test_dataset = DatasetT(tokens_val, val['y_label'])


from sklearn.metrics import roc_auc_score

def compute_metrics(pred):
    labels = pred.label_ids >= 0.4
    preds = pred.predictions[:,[0]]    
    prec, recall, thresholds = precision_recall_curve(labels, preds)
    f1_scores = 2*recall*prec/(recall+prec)
    roc = roc_auc_score(labels, preds)
    sigm = lambda x:1/(1 + np.exp(-x))
    
    return {'ROC AUC': roc, 'maxF1': np.nanmax(f1_scores), 'best_threshold':sigm(thresholds[np.nanargmax(f1_scores)])}


training_args = TrainingArguments(
    output_dir = f'./UmoreskiAI',
    num_train_epochs = 2,
    per_device_train_batch_size = 8,
    per_device_eval_batch_size = 8,
    weight_decay =0.01,
    logging_dir = f'.logs',
    load_best_model_at_end = True,
    learning_rate = 2e-5,
    evaluation_strategy ='epoch',
    logging_strategy = 'epoch',
    save_strategy = 'epoch', 
    save_total_limit = 1,
    seed=21)


class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fct = nn.MSELoss()
        loss = loss_fct(F.sigmoid(logits), labels.view(-1,1))
        return (loss, outputs) if return_outputs else loss

trainer = CustomTrainer(model=model,
                  tokenizer = tokenizer,
                  args = training_args,
                  train_dataset = train_dataset,
                  eval_dataset = test_dataset,
                  compute_metrics = compute_metrics
                  )


In [None]:
!huggingface-cli login

In [None]:
trainer.push_to_hub()