In [None]:
!pip install datasets transformers evaluate

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from datasets import load_dataset

In [None]:
from datasets import load_dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer,TrainingArguments, Trainer
import numpy as np
import evaluate
import torch
import torch.nn as nn

In [None]:
model_id="FacebookAI/roberta-base"
num_labels=28
model = AutoModelForSequenceClassification.from_pretrained(
    model_id, num_labels=num_labels, problem_type="multi_label_classification"
)
tokenizer = AutoTokenizer.from_pretrained(model_id)

In [None]:
dataset = load_dataset("go_emotions")

dataset

In [None]:
train_dataset=dataset["train"]
eval_dataset=dataset["validation"]
#print(train_dataset["id"][0])
labels = train_dataset.features["labels"].feature.names
print({i: l for i, l in enumerate(labels)})


In [None]:
train_dataset

In [None]:
from collections import Counter

# Get label names
labels = train_dataset.features["labels"].feature.names

# Count occurrences of each label
label_counts = Counter()

for sample in train_dataset["labels"]:
    for label_idx in sample:  # Each sample has multiple labels
        label_counts[label_idx] += 1

# Convert to a dictionary with label names
label_distribution = {labels[i]: count for i, count in label_counts.items()}

# Print the distribution
for emotion, count in sorted(label_distribution.items(), key=lambda x: x[1], reverse=True):
    print(f"{emotion}: {count}")


In [None]:
import random

# Get the index of "neutral"
neutral_idx = labels.index("neutral")

# Identify sentences where "neutral" is the only label
purely_neutral = [i for i in range(len(train_dataset)) if train_dataset[i]["labels"] == [neutral_idx]]

# Identify sentences where "neutral" appears with other emotions
neutral_with_other = [i for i in range(len(train_dataset)) if neutral_idx in train_dataset[i]["labels"] and len(train_dataset[i]["labels"]) > 1]

print(f"Total purely neutral sentences: {len(purely_neutral)}")
print(f"Total neutral + other emotion sentences: {len(neutral_with_other)}")

# Decide how much of the purely neutral sentences to remove (e.g., keep 25%)
remove_fraction = 0.5  # Adjust based on dataset balance
num_to_remove = int(len(purely_neutral) * remove_fraction)

# Randomly sample the indices to remove
to_remove = random.sample(purely_neutral, num_to_remove)

# Create a new dataset without the selected purely neutral sentences
filtered_train_dataset = [
    train_dataset[i] for i in range(len(train_dataset))
    if i not in to_remove  # Keep reduced neutral-only
]

print(f"New dataset size: {len(filtered_train_dataset)}")




In [None]:
filtered_train_dataset[1]

In [None]:
from datasets import Dataset, ClassLabel, Sequence, Features, Value

# Convert the filtered list back to a Hugging Face Dataset
# and explicitly specify the features using the Features class
features = Features({
    "text": Value("string"),  # Change "string" to Value("string")
    "labels": Sequence(ClassLabel(names=dataset["train"].features["labels"].feature.names)),
    "id": Value("string")   # Change "string" to Value("string
}) # Use Features class to define the schema
filtered_train_dataset = Dataset.from_list(filtered_train_dataset, features=features)

In [None]:
print(filtered_train_dataset)

In [None]:
filtered_train_dataset[0]["labels"]

In [None]:
from collections import Counter

# Get label names
labels = filtered_train_dataset.features["labels"].feature.names

# Count occurrences of each label
label_counts = Counter()

for sample in filtered_train_dataset["labels"]:
    for label_idx in sample:  # Each sample has multiple labels
        label_counts[label_idx] += 1

# Convert to a dictionary with label names
label_distribution = {labels[i]: count for i, count in label_counts.items()}

# Print the distribution
for emotion, count in sorted(label_distribution.items(), key=lambda x: x[1], reverse=True):
    print(f"{emotion}: {count}")


In [None]:
emotions = ["grief","pride","relief","nervousness","embarrassment","remorse"
,"fear","desire","disgust","excitement"]
for sample in emotions:
    emo_idx = labels.index(sample)

    # Identify sentences where "neutral" is the only label
    print(sample)
    print(label_distribution[sample])
    purely_emo = [i for i in range(len(filtered_train_dataset)) if filtered_train_dataset[i]["labels"] == [emo_idx]]
    print(len(purely_emo))

In [None]:
import numpy as np
X = filtered_train_dataset["text"]  # Extract input text
Y = filtered_train_dataset["labels"]  # Extract labels


num_labels = len(labels)  # Total number of emotions

# Convert list of label indices into multi-hot encoding
Y_multi_hot = np.zeros((len(Y), num_labels))
for i, label_list in enumerate(Y):
    Y_multi_hot[i, label_list] = 1  # Set 1 for each emotion present

print(Y_multi_hot.shape)  # Should be (num_samples, num_labels)
print(num_labels)
print(Y_multi_hot[0])
print(train_dataset[0]["lables"])

In [None]:
print(Y_multi_hot[1040])
print(filtered_train_dataset[1040]["labels"])

In [None]:
# Count occurrences of each label
label_counts = np.sum(Y_multi_hot, axis=0)  # Sum along all samples
total_samples = len(Y_multi_hot)

# Compute class weights as (neg/pos) for each label
pos = label_counts
neg = total_samples - pos
class_weights = neg / pos

# Handle division by zero (if any label has zero positives)
class_weights = np.nan_to_num(class_weights, nan=0.0, posinf=0.0, neginf=0.0)

# Convert to tensor
#class_weights = torch.tensor(class_weights, dtype=torch.float32).to("cuda")

print(label_counts)
print(pos)
print(neg)
print(class_weights)

In [None]:
def encode_labels(example):
    multi_hot = np.zeros(num_labels, dtype=np.float32)  # Use float32
    for label in example["labels"]:
        multi_hot[label] = 1.0  # Assign as float
    example["labels"] = multi_hot.tolist()
    example.pop("id", None)
    return example

filtered_train_dataset = filtered_train_dataset.map(encode_labels)
eval_dataset = eval_dataset.map(encode_labels)

In [None]:
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512)

tokenized_train_dataset = filtered_train_dataset.map(tokenize_function, batched=True)
tokenized_eval_dataset = eval_dataset.map(tokenize_function, batched=True)


In [None]:
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        # Ensure labels are float and on the correct device
        labels = inputs.get("labels").float().to(model.device)
        outputs = model(**inputs)
        logits = outputs.logits

        # Use pos_weight instead of weight
        loss_fn = nn.BCEWithLogitsLoss(pos_weight=class_weights.to(model.device))  # Move class_weights to device

        # Explicitly cast logits to float32 (if necessary)
        loss = loss_fn(logits.float(), labels)

        return (loss, outputs) if return_outputs else loss

In [None]:
metric = evaluate.load("f1")  # F1-score
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load("recall")

def compute_metrics(eval_pred):
    logits, labels = eval_pred  # Unpack predictions and labels

    # Convert logits to probabilities
    probs = torch.sigmoid(torch.tensor(logits))

    # Apply threshold to get binary predictions (0 or 1)
    preds = (probs > 0.5).int().numpy()
    labels = np.array(labels, dtype=np.int32)
    preds = preds.reshape(-1)  # Flatten predictions
    labels = labels.reshape(-1)
    # Compute F1, Precision, and Recall
    f1 = metric.compute(predictions=preds, references=labels, average="macro")
    precision = precision_metric.compute(predictions=preds, references=labels, average="macro")
    recall = recall_metric.compute(predictions=preds, references=labels, average="macro")

    return {
        "f1": f1["f1"],
        "precision": precision["precision"],
        "recall": recall["recall"],
    }

In [None]:
print(tokenized_eval_dataset[0]["labels"])  # Should be a multi-hot list like [0, 1, 0, ...]

In [None]:
training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/llm-finetuning",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    per_device_train_batch_size=8,   # Reduce batch size to save memory
    per_device_eval_batch_size=8,
    learning_rate=3e-5,
    num_train_epochs=5,
    weight_decay=0.01,
    fp16=True,  # Use mixed precision training to save memory
    logging_dir="./logs",
    logging_steps=500,
    save_total_limit=2,  # Keep only the last 2 checkpoints
    load_best_model_at_end=True,  # Load best model based on eval loss
)

trainer = WeightedTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_eval_dataset,
    compute_metrics=compute_metrics,  # Add evaluation function
)


trainer.train()


In [None]:
from transformers import pipeline

# Load trained model
classifier = pipeline(
    "text-classification",
    model=model,
    tokenizer=tokenizer,
    return_all_scores=True,
    top_k=None,
    id2label={i: label for i, label in enumerate(labels)}  # Map indices to emotion names
)

# Test sentence
sentence = "I'm so excited for the weekend!"

# Get predictions
preds = classifier(sentence)

# Print results
for p in preds[0]:
    print(f"{p['label']}: {p['score']:.4f}")


In [None]:
# Run evaluation on test dataset
results = trainer.evaluate()
print(results)


In [None]:
sample = tokenizer("I love this movie!", return_tensors="pt")
print(tokenizer.decode(sample["input_ids"][0]))  # Should reconstruct the original text

In [None]:
test_sentences = [
    "I love this movie!",
    "I'm so annoyed with my internet connection.",
    "This is amazing!",
    "I'm feeling really sad today.",
    "I feel nervous about my exam."
]

# Get predictions for multiple sentences
batch_preds = classifier(test_sentences)

# Display results
for i, sentence in enumerate(test_sentences):
    print(f"\nSentence: {sentence}")
    for p in batch_preds[i]:
        print(f"  {p['label']}: {p['score']:.4f}")


In [None]:

model_path = "/content/drive/MyDrive/llm-finetuning"  # Change if needed
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# Load your trained model and tokenizerh
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForSequenceClassification.from_pretrained(model_path)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

# Define emotion labels (GoEmotions has 28)
emotion_labels = ["admiration", "amusement", "anger", "annoyance", "approval", "caring", "confusion",
                  "curiosity", "desire", "disappointment", "disapproval", "disgust", "embarrassment",
                  "excitement", "fear", "gratitude", "grief", "joy", "love", "nervousness", "optimism",
                  "pride", "realization", "relief", "remorse", "sadness", "surprise", "neutral"]

# Function to predict emotions
def predict_emotions(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    with torch.no_grad():
        outputs = model(**inputs)

    logits = outputs.logits
    probabilities = torch.sigmoid(logits)  # Use softmax if it's multi-class classification
    predicted_emotions = [emotion_labels[i] for i, p in enumerate(probabilities[0]) if p > 0.5]  # Threshold = 0.5

    return predicted_emotions

# Example sentence
text = "I am really happy today!"
predicted_emotions = predict_emotions(text)
print("Predicted Emotions:", predicted_emotions)
