In [None]:
!pip install -q transformers datasets

In [None]:
from datasets import load_dataset

In [None]:
dataset = load_dataset('go_emotions')
dataset

In [None]:
example = dataset['train'][0]
example

In [None]:
label_names = [
 'admiration','amusement','anger','annoyance','approval','caring','confusion',
 'curiosity','desire','disappointment','disapproval','disgust','embarrassment',
 'excitement','fear','gratitude','grief','joy','love','nervousness','optimism',
 'pride','realization','relief','remorse','sadness','surprise','neutral'
]
num_labels = len(label_names)
id2label = {i: n for i, n in enumerate(label_names)}
label2id = {n: i for i, n in id2label.items()}
num_labels


In [None]:
dataset['train'][1]

In [None]:
id2label
label2id

In [None]:
from transformers import AutoTokenizer
import numpy as np

In [None]:
tokenizer = AutoTokenizer.from_pretrained('google/mobilebert-uncased')

In [None]:
def preprocess_data(examples):
    text = examples["text"]
    encoding = tokenizer(text, padding="max_length", truncation=True, max_length=128)

    # force float dtype
    labels_matrix = np.zeros((len(text), len(label2id)), dtype=np.float32)

    for i, lbls in enumerate(examples["labels"]):
        for l in lbls:
            labels_matrix[i, l] = 1.0  # ensure float

    encoding["labels"] = labels_matrix.tolist()
    return encoding

In [None]:
from datasets import ClassLabel, Features, Sequence, Value

features = Features({
    "labels": Sequence(Value("float32")),
    "input_ids": Sequence(Value("int32")), # Add input_ids
    "token_type_ids": Sequence(Value("int8")), # Add token_type_ids
    "attention_mask": Sequence(Value("int8")), # Add attention_mask
})

In [None]:
print("label2id:",label2id)
example = dataset["train"][0]
print(example)
print(example.keys())
labels_batch ={k: 1 if id_num in example["labels"] else 0
    for k, id_num in label2id.items()}
print("labels_batch:" , type(labels_batch))

In [None]:
encoded_dataset = dataset.map(preprocess_data, batched=True, remove_columns=dataset['train'].column_names, features=features)
encoded_dataset

In [None]:
q = encoded_dataset['train'][1]
print(q.keys())

In [None]:
tokenizer.decode(example['input_ids'])

In [None]:
print(q['labels'])

In [None]:
[id2label[idx] for idx, label in enumerate(example['labels']) if label == 1.0]

In [None]:
encoded_dataset.set_format("torch")

In [None]:
from transformers import AutoModelForSequenceClassification
from transformers import (
    MobileBertTokenizer,
    MobileBertForSequenceClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
)

In [None]:
model = MobileBertForSequenceClassification.from_pretrained(
    'google/mobilebert-uncased',
    num_labels=num_labels,
    problem_type='multi_label_classification',
    id2label=id2label,
    label2id=label2id,
)

In [None]:
batch_size = 8
metric_name = "f1"

In [None]:
from transformers import TrainingArguments, Trainer


In [None]:
args = TrainingArguments(
    output_dir='./mobilebert-goemotions',
    eval_strategy='epoch',
    save_strategy='epoch',
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=7,
    weight_decay=0.01,
    load_best_model_at_end=True,
    fp16=torch.cuda.is_available(),
    metric_for_best_model=metric_name,

    report_to=['none'],
    #push_to_hub=True,
)

In [None]:
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from transformers import EvalPrediction
import torch

In [None]:
# source: https://jesusleal.io/2021/04/21/Longformer-multilabel-classification/
def multi_label_metrics(predictions, labels, threshold=0.5):
    # first, apply sigmoid on predictions which are of shape (batch_size, num_labels)
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))
    # next, use threshold to turn them into integer predictions
    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= threshold)] = 1
    # finally, compute metrics
    y_true = labels
    f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro')
    roc_auc = roc_auc_score(y_true, y_pred, average = 'micro')
    accuracy = accuracy_score(y_true, y_pred)
    # return as dictionary
    metrics = {'f1': f1_micro_average,
               'roc_auc': roc_auc,
               'accuracy': accuracy}
    return metrics

def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions,
            tuple) else p.predictions
    result = multi_label_metrics(
        predictions=preds,
        labels=p.label_ids)
    return result

In [None]:
encoded_dataset['train'][0]['labels'].type()

In [None]:
encoded_dataset['train']['input_ids'][0]

In [None]:
outputs = model(input_ids=encoded_dataset['train']['input_ids'][0].unsqueeze(0), labels=encoded_dataset['train'][0]['labels'].unsqueeze(0))
outputs

In [None]:
trainer = Trainer(
    model,
    args,
    train_dataset=encoded_dataset["train"],
    eval_dataset=encoded_dataset["validation"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
trainer.train()

In [None]:
trainer.train(resume_from_checkpoint=True)

In [None]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Device:', DEVICE)

In [None]:
trainer.evaluate()

In [None]:
# Save final model + tokenizer
trainer.save_model("./senti")
tokenizer.save_pretrained("./senti")

import shutil

# Zip the saved model directory
shutil.make_archive("senti", 'zip', "./senti")

from google.colab import files
files.download("senti.zip")

In [None]:
from transformers import Trainer
import shutil

# Save the *final* model
trainer.save_model("./mobilebert-goemotions/final")
tokenizer.save_pretrained("./mobilebert-goemotions/final")

# Zip entire folder (includes checkpoints + final model)
shutil.make_archive("mobilebert-goemotions", 'zip', "./mobilebert-goemotions")

In [None]:
from google.colab import files
files.download("mobilebert-goemotions.zip")

In [None]:
text = "I'm happy I can finally train a model for multi-label classification"

encoding = tokenizer(text, return_tensors="pt")
encoding = {k: v.to(trainer.model.device) for k,v in encoding.items()}

outputs = trainer.model(**encoding)

In [None]:
logits = outputs.logits
logits.shape

In [None]:
# apply sigmoid + threshold
sigmoid = torch.nn.Sigmoid()
probs = sigmoid(logits.squeeze().cpu())
predictions = np.zeros(probs.shape)
predictions[np.where(probs >= 0.5)] = 1
# turn predicted id's into actual label names
predicted_labels = [id2label[idx] for idx, label in enumerate(predictions) if label == 1.0]
print(predicted_labels)