In [None]:
!pip install datasets

In [None]:
import pandas as pd

bbkpi_df = pd.read_csv('/content/bbkpi_gold.csv')
ground_truth_cols = [c for c in bbkpi_df.columns[2:] if not c.endswith('_pred')]
verity_prediction_cols = [c for c in bbkpi_df.columns[2:] if c.endswith('_pred')]

bbkpi_df.head()

In [None]:
prod_silver_df = pd.read_csv('/content/prod_silver.csv')
prod_silver_df = prod_silver_df[['TIMESTAMP', 'VIDEO_UUID', 'INTERVAL_ID', 'TEXT', 'LANGUAGE_CODE'] + ground_truth_cols]
prod_silver_df.head()

In [None]:
import numpy as np
from datasets import Dataset
pre_train_df = pd.DataFrame({'text' : prod_silver_df['TEXT'].tolist(),
                        'labels' : [x for x in prod_silver_df[ground_truth_cols].values.astype(float)]})
pre_train_df = pre_train_df[pre_train_df['text'].apply(lambda s : isinstance(s, str) and len(s) >= 1)]

print(len(pre_train_df))
dataset = Dataset.from_pandas(pre_train_df)
pre_train_df.head()

In [None]:
pre_test_df = pd.DataFrame({'text' : bbkpi_df['text'].tolist(),
                        'labels' : [x for x in bbkpi_df[ground_truth_cols].values.astype(float)]})
pre_test_df = pre_test_df[pre_test_df['text'].apply(lambda s : isinstance(s, str) and len(s) >= 1)]

print(len(pre_test_df))
test_dataset = Dataset.from_pandas(pre_test_df)
pre_test_df.head()

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

tokenized_train_dataset = dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)
all_datasets = tokenized_train_dataset.train_test_split(test_size=0.2)
train_dataset = all_datasets["train"]
val_dataset = all_datasets["test"]

In [None]:
from transformers import AutoModelForSequenceClassification

id2label = dict((i, l) for i, l in enumerate(ground_truth_cols))
label2id = dict((l, i) for i, l in id2label.items())

model = AutoModelForSequenceClassification.from_pretrained('bert-base-uncased',
                                                           problem_type="multi_label_classification",
                                                           num_labels=len(id2label),
                                                           id2label = id2label,
                                                           label2id = label2id).to('cuda')

In [None]:
from transformers import TrainingArguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=3,
    logging_dir='./logs',
)


In [None]:
from sklearn.metrics import accuracy_score, f1_score

def compute_multilabel_metrics(predictions, labels, threshold=0.5):
        # first, apply sigmoid on predictions which are of shape (batch_size, num_labels)
        sigmoid = torch.nn.Sigmoid()
        probs = sigmoid(torch.Tensor(predictions))
        y_pred = np.zeros(probs.shape)
        # next, use threshold to turn them into integer predictions
        y_pred[np.where(probs >= threshold)] = 1

        # finally, compute metrics
        y_true = labels
        f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro')
        f1_macro_average = f1_score(y_true=y_true, y_pred=y_pred, average='macro')
        accuracy = accuracy_score(y_true, y_pred)
        # return as dictionary
        metrics = {'f1_micro': f1_micro_average,
               'f1_macro': f1_macro_average,
               'accuracy': accuracy}
        return metrics

def compute_metrics(p):
        preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions

        result = compute_multilabel_metrics(
            predictions=preds,
            labels=p.label_ids)
        return result

In [None]:
from transformers import Trainer
import torch

# Custom Trainer to use BCEWithLogitsLoss
class MultilabelTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fct = torch.nn.BCEWithLogitsLoss()
        loss = loss_fct(logits, labels.float())
        return (loss, outputs) if return_outputs else loss

trainer = MultilabelTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics
)

trainer.train()