<a href="https://colab.research.google.com/github/Denev6/practice/blob/main/transformer/RoBERTa_base.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!python --version

Python 3.8.15


In [None]:
!pip install transformers
!pip install datasets

In [None]:
import os
import warnings

import torch
import pandas as pd
from tqdm import tqdm
from datasets import Dataset
from transformers import (
    Trainer,
    TrainingArguments,
    RobertaTokenizerFast,
    RobertaForSequenceClassification,
    EarlyStoppingCallback
)
from sklearn.metrics import accuracy_score, f1_score
from google.colab import drive

drive.mount("/content/drive")
warnings.filterwarnings("ignore")

In [None]:
CWD = "/content/drive/MyDrive"


def join_path(*args):
    return os.path.join(CWD, *args)


DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
TRAIN_CSV = join_path("data", "train.csv")
TEST_CSV = join_path("data", "test.csv")
MODEL = "tae898/emoberta-base"
BATCH_SIZE = 32
EPOCHS = 5
MAX_LENGTH = 256

TRAIN_ARGS = TrainingArguments(
    output_dir=join_path("emoberta"),
    num_train_epochs=EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    learning_rate=2e-5,
    warmup_steps=500,
    weight_decay=0.01,
    save_total_limit=1,
    load_best_model_at_end=True,
    evaluation_strategy="epoch",
    save_strategy="epoch"
)

# Tokenizers

In [None]:
class LabelEncoder(object):
    def __init__(self):
        self._targets = [
            "neutral",
            "joy",
            "surprise",
            "anger",
            "sadness",
            "disgust",
            "fear",
        ]
        self.target_size = len(self._targets)

    def encode(self, labels):
        labels = [self._targets.index(lb) for lb in labels]
        return labels

    def decode(self, labels):
        labels = [self._targets[lb] for lb in labels]
        return labels

# Dataset

In [None]:
train_csv = pd.read_csv(TRAIN_CSV)

In [None]:
roberta_tokenizer = RobertaTokenizerFast.from_pretrained(MODEL, truncation=True)
label_encoder = LabelEncoder()
label_size = label_encoder.target_size
train_csv["Target"] = label_encoder.encode(train_csv["Target"])

train_id = len(train_csv) // 8
dialogue_id = train_csv.loc[train_id, "Dialogue_ID"]
df_train, df_eval = train_csv[: train_id + 1], train_csv[train_id:]

df_train = df_train.loc[:, ["Utterance", "Target"]].rename(columns={"Target": "label"})
df_eval = df_eval.loc[:, ["Utterance", "Target"]].rename(columns={"Target": "label"})

In [None]:
def roberta_tokenize(data):
    return roberta_tokenizer(
        data["Utterance"],
        max_length=MAX_LENGTH,
        padding="max_length",
        truncation=True,
    )


train_set = Dataset.from_pandas(df_train.reset_index(drop=True))
eval_set = Dataset.from_pandas(df_eval.reset_index(drop=True))

train_set = train_set.map(roberta_tokenize, batched=True, batch_size=len(train_set))
eval_set = eval_set.map(roberta_tokenize, batched=True, batch_size=len(eval_set))

train_set.set_format("torch", columns=["input_ids", "attention_mask", "label"])
eval_set.set_format("torch", columns=["input_ids", "attention_mask", "label"])

# Model

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    f1 = f1_score(labels, preds, average="macro")
    acc = accuracy_score(labels, preds)
    return {"f1-macro": f1, "accuracy": acc}


torch.cuda.empty_cache()
model = RobertaForSequenceClassification.from_pretrained(MODEL, num_labels=label_size)

trainer = Trainer(
    model=model,
    args=TRAIN_ARGS,
    compute_metrics=compute_metrics,
    train_dataset=train_set,
    eval_dataset=eval_set,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

In [None]:
trainer.train()

In [None]:
model_eval = trainer.evaluate()
print(f"Accuracy: {model_eval['eval_accuracy']:.5f}")
print(f"F1-macro: {model_eval['eval_f1-macro']:.5f}")

# Prediction

In [None]:
def roberta_tokenize(data):
    return roberta_tokenizer(
        data["Utterance"],
        max_length=MAX_LENGTH,
        padding="max_length",
        truncation=True,
        return_tensors="pt",
    )


test_csv = pd.read_csv(join_path("data", "test.csv"))
df_test = test_csv.loc[:, "Utterance"].to_frame()

test_set = Dataset.from_pandas(df_test.reset_index(drop=True))
test_set = test_set.map(roberta_tokenize, batched=True, batch_size=len(test_set))
test_set.set_format("torch", columns=["input_ids", "attention_mask"])

In [None]:
def predict(model, test_set):
    model.to(DEVICE)
    model.eval()

    test_predict = []
    for data in tqdm(test_set):
        input_id = data["input_ids"].unsqueeze(0).to(DEVICE)
        mask = data["attention_mask"].unsqueeze(0).to(DEVICE)
        output = model(input_id, mask)
        y_pred = output.logits
        test_predict += y_pred.argmax(1).detach().cpu().numpy().tolist()
    return test_predict
     
preds = predict(model, test_set)
preds = label_encoder.decode(preds)
     
test_csv["Target"] = preds
result_csv = test_csv.loc[:, ["ID", "Target"]]
result_csv.head(10)