In [None]:
!nvidia-smi

In [None]:
from typing import Optional, Union, Tuple, List
from pathlib import Path
import functools

import pandas as pd
import torch
import torch.nn as nn
from transformers.models.deberta_v2.modeling_deberta_v2 import *
from transformers import AutoTokenizer, AutoModel, Trainer
from transformers.modeling_outputs import SequenceClassifierOutput
import datasets

from metrics import compute_metrics

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
models_dir = Path("../hub")
datasets_dir = Path("../datasets")
results_dir = Path("../results/baseline")
if not results_dir.exists():
    results_dir.mkdir(parents=True)

In [None]:
# add configs here
model_name = "appropriateness-classifier-multilabel"

model_inputs = "text_only" # from "text_only", 

classes = [ # class layer
    'Inappropriateness', # 0
    
    'Toxic Emotions', # 1
    'Excessive Intensity', # 2
    'Emotional Deception', # 2
    
    'Missing Commitment', # 1
    'Missing Seriousness', # 2
    'Missing Openness', # 2
    
    'Missing Intelligibility', # 1
    'Unclear Meaning', # 2
    'Missing Relevance', # 2
    'Confusing Reasoning', # 2
    
    'Other Reasons', # 1
    'Detrimental Orthography', # 2
    'Reason Unclassified', # 2
]

class2id = {class_:id for id, class_ in enumerate(classes)}
id2class = {id:class_ for class_, id in class2id.items()}

In [None]:
class AppropriatenessMultilabelModel(DebertaV2PreTrainedModel):

    def __init__(self, config):
        super().__init__(config)

        num_labels = getattr(config, "num_labels", 2)
        self.num_labels = num_labels

        self.deberta = DebertaV2Model(config)
        self.pooler = ContextPooler(config)
        output_dim = self.pooler.output_dim

        self.classifier = nn.Linear(output_dim, num_labels)
        drop_out = getattr(config, "cls_dropout", None)
        drop_out = self.config.hidden_dropout_prob if drop_out is None else drop_out
        self.dropout = StableDropout(drop_out)

        # Initialize weights and apply final processing
        self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple, SequenceClassifierOutput]:
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.deberta(
            input_ids,
            token_type_ids=token_type_ids,
            attention_mask=attention_mask,
            position_ids=position_ids,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        encoder_layer = outputs[0]
        pooled_output = self.pooler(encoder_layer)
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss(pos_weight=self.pos_weights)
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


In [None]:
tokenizer = AutoTokenizer.from_pretrained(
    models_dir / model_name,
    local_files_only=True
)

model = AppropriatenessMultilabelModel.from_pretrained(
    models_dir / model_name,
    num_labels=len(classes),
    problem_type="multi_label_classification",
    local_files_only=True,
).to(device)
model.pos_weights = None # needed because otherwise loss calculation fails

In [None]:
# load dataset
ds = datasets.load_dataset(str(datasets_dir / "appropriateness-corpus"))
train_set = ds["train"]
val_set = ds["validation"]
test_set = ds["test"]

# align labels from columns to "labels" list for given dataset
def align_labels(dataset: datasets.Dataset, classes: List[str]):
    pd_dataset = pd.DataFrame(dataset)
    labels = []
    for index, instance in pd_dataset[classes].iterrows():
        labels.append(list(map(float, instance.tolist()))) # labels are needed with float type for loss calculation
    dataset = dataset.add_column("labels", labels)
    return dataset

train_set = align_labels(train_set, classes)
val_set = align_labels(val_set, classes)
test_set = align_labels(test_set, classes)

In [None]:
# tokenize datasets
def tokenize(instances, tokenizer: AutoTokenizer, model_inputs: str):
    if model_inputs == "text_only":
        return tokenizer(instances["post_text"], padding=True)
    else:
        raise NotImplementedError
    
tokenized_train_set = train_set.map(functools.partial(tokenize, tokenizer=tokenizer, model_inputs=model_inputs), batched=True)
tokenized_val_set = val_set.map(functools.partial(tokenize, tokenizer=tokenizer, model_inputs=model_inputs), batched=True)
tokenized_test_set = test_set.map(functools.partial(tokenize, tokenizer=tokenizer, model_inputs=model_inputs), batched=True)

In [None]:
# convert dataset to tensors
tokenized_train_set.with_format("torch")
tokenized_val_set.with_format("torch")
tokenized_test_set.with_format("torch")

In [None]:
trainer = Trainer(
    model,
    compute_metrics=functools.partial(compute_metrics, id2class=id2class),
)

In [None]:
val_metrics = trainer.evaluate(tokenized_val_set, metric_key_prefix="validation")
print(f"{val_metrics=}")
pd.DataFrame(val_metrics, index=[0]).to_csv(results_dir / "validation.csv")

In [None]:
test_metrics = trainer.evaluate(tokenized_test_set, metric_key_prefix="test")
print(f"{test_metrics=}")
pd.DataFrame(test_metrics, index=[0]).to_csv(results_dir / "test.csv")