In [1]:
import os
os.environ["WANDB_DISABLED"] = "true"

import torch
import numpy as np
from datasets import load_dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments
)
from sklearn.metrics import f1_score


In [2]:
dataset = load_dataset("google-research-datasets/go_emotions")

train_ds = dataset["train"]
val_ds   = dataset["validation"]
test_ds  = dataset["test"]


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

simplified/train-00000-of-00001.parquet:   0%|          | 0.00/2.77M [00:00<?, ?B/s]

simplified/validation-00000-of-00001.par(â€¦):   0%|          | 0.00/350k [00:00<?, ?B/s]

simplified/test-00000-of-00001.parquet:   0%|          | 0.00/347k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/43410 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/5426 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/5427 [00:00<?, ? examples/s]

In [3]:
emotion_names = train_ds.features["labels"].feature.names
num_labels = len(emotion_names)

print(num_labels)
print(emotion_names)


28
['admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring', 'confusion', 'curiosity', 'desire', 'disappointment', 'disapproval', 'disgust', 'embarrassment', 'excitement', 'fear', 'gratitude', 'grief', 'joy', 'love', 'nervousness', 'optimism', 'pride', 'realization', 'relief', 'remorse', 'sadness', 'surprise', 'neutral']


In [4]:
def encode_labels(example):
    labels = np.zeros(num_labels, dtype=np.float32)
    for idx in example["labels"]:
        labels[idx] = 1.0
    example["labels"] = labels
    return example

train_ds = train_ds.map(encode_labels)
val_ds   = val_ds.map(encode_labels)
test_ds  = test_ds.map(encode_labels)


Map:   0%|          | 0/43410 [00:00<?, ? examples/s]

Map:   0%|          | 0/5426 [00:00<?, ? examples/s]

Map:   0%|          | 0/5427 [00:00<?, ? examples/s]

In [5]:
MODEL_NAME = "roberta-base"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize(batch):
    return tokenizer(
        batch["text"],
        truncation=True,
        padding="max_length",
        max_length=128
    )

train_ds = train_ds.map(tokenize, batched=True)
val_ds   = val_ds.map(tokenize, batched=True)
test_ds  = test_ds.map(tokenize, batched=True)


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Map:   0%|          | 0/43410 [00:00<?, ? examples/s]

Map:   0%|          | 0/5426 [00:00<?, ? examples/s]

Map:   0%|          | 0/5427 [00:00<?, ? examples/s]

In [6]:
train_ds = train_ds.remove_columns(["text"])
val_ds   = val_ds.remove_columns(["text"])
test_ds  = test_ds.remove_columns(["text"])


In [7]:
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=num_labels,
    problem_type="multi_label_classification"
)


model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [8]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    probs = torch.sigmoid(torch.tensor(logits))
    preds = (probs > 0.5).int().numpy()
    labels = labels.astype(int)

    return {
        "micro_f1": f1_score(labels, preds, average="micro"),
        "macro_f1": f1_score(labels, preds, average="macro"),
    }


In [9]:
from transformers import DataCollatorWithPadding

class MultiLabelDataCollator(DataCollatorWithPadding):
    def __call__(self, features):
        batch = super().__call__(features)
        # Force labels to float32 for BCEWithLogitsLoss
        batch["labels"] = batch["labels"].float()
        return batch


In [10]:
data_collator = MultiLabelDataCollator(tokenizer)

In [11]:
training_args = TrainingArguments(
    output_dir="./emotion_model",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=2,
    weight_decay=0.01,
    logging_steps=200,
    load_best_model_at_end=True,
    metric_for_best_model="micro_f1",
    report_to="none"
)


In [12]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)


  trainer = Trainer(


In [13]:
batch = train_ds[0]

print(type(batch["input_ids"]), batch["input_ids"][:5])
print(type(batch["labels"]), batch["labels"][:5])


<class 'list'> [0, 2387, 5548, 689, 16]
<class 'list'> [0, 0, 0, 0, 0]


In [14]:
trainer.train()


Epoch,Training Loss,Validation Loss,Micro F1,Macro F1
1,0.1052,0.09735,0.501349,0.234423
2,0.0903,0.089925,0.540368,0.326272


TrainOutput(global_step=2714, training_loss=0.11322368096145709, metrics={'train_runtime': 1807.119, 'train_samples_per_second': 48.043, 'train_steps_per_second': 1.502, 'total_flos': 5712158611722240.0, 'train_loss': 0.11322368096145709, 'epoch': 2.0})

In [15]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer

model_path = "/content/emotion_model/checkpoint-2714"

tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)


In [16]:
def predict_emotions(text):
    device = model.device  # automatically gets cuda or cpu

    inputs = tokenizer(
        text,
        return_tensors="pt",
        truncation=True
    )

    # Move inputs to same device as model
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)

    probs = torch.sigmoid(outputs.logits)[0].detach().cpu().numpy()

    return dict(zip(emotion_names, probs))


In [17]:
predict_emotions("This should have been done already.")


{'admiration': np.float32(0.013008962),
 'amusement': np.float32(0.0056204097),
 'anger': np.float32(0.0066556428),
 'annoyance': np.float32(0.016785089),
 'approval': np.float32(0.13715439),
 'caring': np.float32(0.014229466),
 'confusion': np.float32(0.009350911),
 'curiosity': np.float32(0.008461683),
 'desire': np.float32(0.008354471),
 'disappointment': np.float32(0.010272068),
 'disapproval': np.float32(0.015627591),
 'disgust': np.float32(0.00411252),
 'embarrassment': np.float32(0.00262652),
 'excitement': np.float32(0.006370675),
 'fear': np.float32(0.0028111916),
 'gratitude': np.float32(0.0035440454),
 'grief': np.float32(0.0018856146),
 'joy': np.float32(0.006584857),
 'love': np.float32(0.003748238),
 'nervousness': np.float32(0.0021063706),
 'optimism': np.float32(0.024033267),
 'pride': np.float32(0.002284998),
 'realization': np.float32(0.034636717),
 'relief': np.float32(0.0036004456),
 'remorse': np.float32(0.0023711403),
 'sadness': np.float32(0.005736262),
 'surpris

In [18]:
NEGATIVE_EMOTIONS = [
    "anger",
    "annoyance",
    "disapproval",
    "disappointment",
    "disgust",
    "sadness",
    "remorse",
]

POSITIVE_EMOTIONS = [
    "gratitude",
    "joy",
    "love",
    "optimism",
    "approval",
    "admiration",
    "relief",
    "pride"
]

NEUTRAL_EMOTIONS = [
    "neutral",
    "realization",
    "confusion",
    "curiosity",
    "surprise"
]


In [19]:
def aggregate_tone(emotion_scores):
    negative_score = sum(emotion_scores[e] for e in NEGATIVE_EMOTIONS)
    positive_score = sum(emotion_scores[e] for e in POSITIVE_EMOTIONS)
    neutral_score  = sum(emotion_scores[e] for e in NEUTRAL_EMOTIONS)

    scores = {
        "negative": negative_score,
        "positive": positive_score,
        "neutral": neutral_score
    }

    tone = max(scores, key=scores.get)
    return tone, scores


In [20]:
RISK_WEIGHTS = {
    "anger": 0.35,
    "annoyance": 0.30,
    "disapproval": 0.30,
    "disgust": 0.15,
    "sadness": 0.10
}


URGENT_KEYWORDS = [
    "immediately",
    "asap",
    "urgent",
    "unacceptable",
    "right now",
    "fix this",
    "must be done",
    "do this now",
]

import unicodedata

def normalize_text(text: str) -> str:
    # Normalize unicode (e.g., smart quotes â†’ ascii)
    text = unicodedata.normalize("NFKD", text)
    # Replace common smart apostrophes explicitly
    text = text.replace("â€™", "'").replace("â€˜", "'")
    return text.lower()


def urgency_multiplier(text):
    text = normalize_text(text)
    for kw in URGENT_KEYWORDS:
        if kw in text:
            return 1.5
    return 1.0




In [21]:
SOFT_DISAGREEMENT_PHRASES = [
    "i don't think",
    "i dont think",
    "i do not think",
    "doesn't make sense",
    "does not make sense",
    "i disagree",
    "i'm not sure this",
]

def disagreement_discount(text):
    text = normalize_text(text)
    for phrase in SOFT_DISAGREEMENT_PHRASES:
        if phrase in text:
            return 0.7
    return 1.0



In [22]:
def override_risk_level_for_soft_disagreement(risk_level, text):
    text = normalize_text(text)

    has_soft_disagreement = any(
        phrase in text for phrase in SOFT_DISAGREEMENT_PHRASES
    )
    has_urgency = urgency_multiplier(text) > 1.0

    if has_soft_disagreement and not has_urgency:
        return "medium"

    return risk_level


In [23]:
def compute_risk_score(emotion_scores, text):
    base_risk = 0.0
    for emotion, weight in RISK_WEIGHTS.items():
        base_risk += weight * emotion_scores.get(emotion, 0.0)

    risk = base_risk
    risk *= urgency_multiplier(text)
    risk *= disagreement_discount(text)

    return risk


In [24]:
def risk_level(risk_score):
    if risk_score >= 0.18:
        return "high"
    elif risk_score >= 0.08:
        return "medium"
    return "low"


In [25]:
REWRITE_TEMPLATES = {
    "negative": "Could you please share an update on this?",
    "neutral": "Just checking in on this.",
    "positive": "Thanks for the update!"
}


In [26]:
def analyze_tone(text):
    emotion_scores = predict_emotions(text)

    tone, tone_scores = aggregate_tone(emotion_scores)
    risk_score = compute_risk_score(emotion_scores, text)
    initial_risk = risk_level(risk_score)

    # ðŸ‘‡ FINAL OVERRIDE STEP
    final_risk = override_risk_level_for_soft_disagreement(
        initial_risk, text
    )

    rewrite = REWRITE_TEMPLATES[tone]

    return {
        "input_text": text,
        "tone": tone,
        "tone_scores": {
            k: float(v) for k, v in tone_scores.items()
        },
        "risk_score": round(float(risk_score), 3),
        "risk_level": final_risk,
        "suggested_rewrite": rewrite
    }


In [27]:
analyze_tone("This should have been done already.")


{'input_text': 'This should have been done already.',
 'tone': 'neutral',
 'tone_scores': {'negative': 0.061560310423374176,
  'positive': 0.19395922124385834,
  'neutral': 0.8259173035621643},
 'risk_score': 0.013,
 'risk_level': 'low',
 'suggested_rewrite': 'Just checking in on this.'}

In [28]:
analyze_tone("Thanks a lot for helping me with this!")


{'input_text': 'Thanks a lot for helping me with this!',
 'tone': 'positive',
 'tone_scores': {'negative': 0.0832420364022255,
  'positive': 1.1157952547073364,
  'neutral': 0.07774502784013748},
 'risk_score': 0.015,
 'risk_level': 'low',
 'suggested_rewrite': 'Thanks for the update!'}

In [29]:
analyze_tone("I donâ€™t think this approach makes sense.")


{'input_text': 'I donâ€™t think this approach makes sense.',
 'tone': 'negative',
 'tone_scores': {'negative': 0.9054042100906372,
  'positive': 0.1517563909292221,
  'neutral': 0.2791873514652252},
 'risk_score': 0.169,
 'risk_level': 'medium',
 'suggested_rewrite': 'Could you please share an update on this?'}

In [30]:
analyze_tone("Please fix this immediately, this is unacceptable.")


{'input_text': 'Please fix this immediately, this is unacceptable.',
 'tone': 'negative',
 'tone_scores': {'negative': 0.8663831353187561,
  'positive': 0.10481319576501846,
  'neutral': 0.13574345409870148},
 'risk_score': 0.354,
 'risk_level': 'high',
 'suggested_rewrite': 'Could you please share an update on this?'}