In [None]:
!pip install -q datasets transformers accelerate evaluate scikit-learn textstat

In [None]:
from datasets import load_dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
)
import numpy as np
import torch
import evaluate
import textstat
from pprint import pprint

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device


device(type='cuda')

In [None]:
# Load the simplified GoEmotions dataset
dataset = load_dataset("google-research-datasets/go_emotions", "simplified")
dataset

DatasetDict({
    train: Dataset({
        features: ['text', 'labels', 'id'],
        num_rows: 43410
    })
    validation: Dataset({
        features: ['text', 'labels', 'id'],
        num_rows: 5426
    })
    test: Dataset({
        features: ['text', 'labels', 'id'],
        num_rows: 5427
    })
})

In [None]:
label_names = dataset["train"].features["labels"].feature.names
num_labels = len(label_names)
print("Number of labels:", num_labels)
print("Some labels:", label_names[:10])

Number of labels: 28
Some labels: ['admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring', 'confusion', 'curiosity', 'desire', 'disappointment']


In [None]:
model_name = "roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)

max_length = 128  # good default for short comments

In [None]:
import numpy as np

def preprocess_batch(batch):
    # Tokenize texts
    encodings = tokenizer(
        batch["text"],
        truncation=True,
        padding="max_length",
        max_length=max_length,
    )

    # Build multi-hot vectors for labels
    labels_batch = []
    for label_list in batch["labels"]:
        multi_hot = np.zeros(num_labels, dtype="float32")
        for idx in label_list:
            multi_hot[idx] = 1.0
        labels_batch.append(multi_hot)

    encodings["labels"] = labels_batch
    return encodings

encoded_dataset = dataset.map(preprocess_batch, batched=True)

encoded_dataset = encoded_dataset.remove_columns(["text", "id"])
encoded_dataset

Map:   0%|          | 0/43410 [00:00<?, ? examples/s]

Map:   0%|          | 0/5426 [00:00<?, ? examples/s]

Map:   0%|          | 0/5427 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['labels', 'input_ids', 'attention_mask'],
        num_rows: 43410
    })
    validation: Dataset({
        features: ['labels', 'input_ids', 'attention_mask'],
        num_rows: 5426
    })
    test: Dataset({
        features: ['labels', 'input_ids', 'attention_mask'],
        num_rows: 5427
    })
})

In [None]:
encoded_dataset.set_format(type="torch")

# Grab one batch to inspect dtypes
sample = encoded_dataset["train"][0]
print(type(sample))
print("Keys:", sample.keys())
print("labels dtype:", sample["labels"].dtype)

<class 'dict'>
Keys: dict_keys(['labels', 'input_ids', 'attention_mask'])
labels dtype: torch.int64


In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=num_labels,
    problem_type="multi_label_classification",  # ensures BCEWithLogitsLoss
).to(device)

model.config.problem_type, model.num_labels

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


('multi_label_classification', 28)

In [None]:
from sklearn.metrics import f1_score

In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def compute_metrics(eval_pred, threshold=0.5):
    logits, labels = eval_pred   # logits: (batch, num_labels), labels: (batch, num_labels)

    # Convert to probabilities
    probs = sigmoid(logits)

    # Binarize predictions
    y_pred = (probs >= threshold).astype(int)

    # Flatten for micro F1 across all samples and labels
    y_true_flat = labels.astype(int).reshape(-1)
    y_pred_flat = y_pred.reshape(-1)

    f1_micro = f1_score(y_true_flat, y_pred_flat, average="micro")

    return {"f1_micro": f1_micro}

In [None]:
class MultiLabelTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels = inputs.get("labels")
        if labels is not None:
            inputs["labels"] = labels.to(model.device).float()
        outputs = model(**inputs)
        loss = outputs.loss
        return (loss, outputs) if return_outputs else loss

In [None]:
batch_size = 16

training_args = TrainingArguments(
    output_dir="./roberta-goemotions",
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="steps",
    logging_steps=200,
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size * 2,
    num_train_epochs=3,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="f1_micro",
    greater_is_better=True,
)

trainer = MultiLabelTrainer(
    model=model,
    args=training_args,
    train_dataset=encoded_dataset["train"],
    eval_dataset=encoded_dataset["validation"],
    compute_metrics=compute_metrics,
)

In [None]:
train_result = trainer.train()
train_result

Epoch,Training Loss,Validation Loss,F1 Micro
1,0.0507,0.097276,0.966945
2,0.0691,0.08691,0.968557
3,0.0602,0.089348,0.967945


TrainOutput(global_step=8142, training_loss=0.06310747636679319, metrics={'train_runtime': 702.9801, 'train_samples_per_second': 185.254, 'train_steps_per_second': 11.582, 'total_flos': 8568237917583360.0, 'train_loss': 0.06310747636679319, 'epoch': 3.0})

In [None]:
test_metrics = trainer.evaluate(encoded_dataset["test"])
pprint(test_metrics)

{'epoch': 3.0,
 'eval_f1_micro': 0.96865540024744,
 'eval_loss': 0.08649284392595291,
 'eval_runtime': 8.3698,
 'eval_samples_per_second': 648.403,
 'eval_steps_per_second': 20.311}


In [None]:
def readability_flesch(text: str) -> float:
    """Flesch Reading Ease (higher = easier)."""
    return textstat.flesch_reading_ease(text)

def readability_grade(text: str) -> float:
    """Flesch–Kincaid Grade Level (approx school grade)."""
    return textstat.flesch_kincaid_grade(text)


In [None]:
import json
import torch
import numpy as np

# Load trained model + tokenizer
inference_dir = "./roberta-goemotions-final"

tokenizer_inf = AutoTokenizer.from_pretrained(inference_dir)
model_inf = AutoModelForSequenceClassification.from_pretrained(inference_dir).to(device)

# Load labels
with open(f"{inference_dir}/labels.json", "r") as f:
    label_names_inf = json.load(f)

id2label_inf = {i: l for i, l in enumerate(label_names_inf)}

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# ---- Readability ----
def readability_flesch(text: str) -> float:
    return textstat.flesch_reading_ease(text)

def readability_grade(text: str) -> float:
    return textstat.flesch_kincaid_grade(text)

# ---- Emotion + Readability ----
def analyze_text(text: str, threshold: float = 0.3):
    # Tokenize
    inputs = tokenizer_inf(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=128,
    ).to(device)

    model_inf.eval()
    with torch.no_grad():
        outputs = model_inf(**inputs)
        logits = outputs.logits[0].cpu().numpy()

    probs = sigmoid(logits)

    # Multi-label emotion selection
    pred_idx = np.where(probs >= threshold)[0]
    if len(pred_idx) == 0:
        pred_idx = [int(np.argmax(probs))]

    emotions = [
        {"label": id2label_inf[i], "score": float(probs[i])}
        for i in sorted(pred_idx, key=lambda i: probs[i], reverse=True)
    ]

    # Readability
    fres = readability_flesch(text)
    grade = readability_grade(text)

    if fres >= 80:
        desc = "very easy to read"
        clarity = "emotion is very easy to understand"
    elif fres >= 60:
        desc = "easy to read"
        clarity = "emotion should be easy to understand"
    elif fres >= 40:
        desc = "fairly difficult to read"
        clarity = "emotion might require more effort to understand"
    else:
        desc = "difficult to read"
        clarity = "emotion may be hard to interpret from the text"

    return {
        "text": text,
        "emotions": emotions,
        "readability_flesch": fres,
        "readability_grade": grade,
        "readability_description": desc,
        "emotion_clarity_comment": clarity,
    }

def print_analysis(result):
    print("TEXT:")
    print(result["text"])
    print("\nPREDICTED EMOTIONS:")
    for e in result["emotions"]:
        print(f"  - {e['label']}: {e['score']:.3f}")
    print("\nREADABILITY:")
    print(f"  Flesch Reading Ease: {result['readability_flesch']:.2f}")
    print(f"  Flesch-Kincaid Grade: {result['readability_grade']:.2f}")
    print(f"  Interpretation: {result['readability_description']}")
    print("\nCONCLUSION:")
    print(result["emotion_clarity_comment"])

In [None]:
example_text = "I felt really anxious today because every decision seemed more difficult than usual, and I couldn’t focus the way I wanted to"
res = analyze_text(example_text, threshold=0.3)
print_analysis(res)

TEXT:
I felt really anxious today because every decision seemed more difficult than usual, and I couldn’t focus the way I wanted to

PREDICTED EMOTIONS:
  - nervousness: 0.561
  - disappointment: 0.312

READABILITY:
  Flesch Reading Ease: 42.22
  Flesch-Kincaid Grade: 12.84
  Interpretation: fairly difficult to read

CONCLUSION:
emotion might require more effort to understand


In [None]:
save_dir = "./roberta-goemotions-final"

trainer.save_model(save_dir)
tokenizer.save_pretrained(save_dir)

import json
with open(f"{save_dir}/labels.json", "w") as f:
    json.dump(label_names, f)

save_dir

'./roberta-goemotions-final'

In [None]:
import shutil
from google.colab import files

# Name of the folder to download
folder_path = "./roberta-goemotions-final"
# Name of the output zip file (without extension)
output_filename = "roberta-goemotions-final"

# Create a zip archive
shutil.make_archive(output_filename, 'zip', folder_path)

# Download the zip file
files.download(output_filename + ".zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>