In [None]:
import os
import pandas as pd
import numpy as np

from sklearn.preprocessing import LabelEncoder

from transformers import AutoTokenizer, DataCollatorWithPadding, AutoModelForSequenceClassification, TrainingArguments, Trainer, get_scheduler

import torch

from sklearn.utils import resample
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, accuracy_score

import seaborn as sns
import matplotlib.pyplot as plt

from sentences_helper import sentence_es, sentence_en

In [None]:
# model_name = "google-bert/bert-base-multilingual-cased"
# model_name = "google-bert/bert-base-multilingual-uncased"
# model_name = "FacebookAI/xlm-roberta-base"
model_name = "FacebookAI/xlm-roberta-large"

# PREPROCESS DATASETS

## EmpatheticDialogues

Emotions labels:
Surprised, Excited, Angry, Proud, Sad, Annoyed, Grateful, Lonely, Afraid, Terrified, Guilty, Impressed, Disgusted, Hopeful, Confident, Furious, Anxious, Anticipating, Joyful, Nostalgic, Disappointed, Prepared, Jealous, Content, Devastated, Embarrassed, Caring, Sentimental, Trusting, Ashamed, Apprehensive, Faithful

In [None]:
seg = 'data/MPATHY/MPATHY_translation_en2es.csv'
label = 'data/MPATHY/MPATHY_dialoginfo.csv'

data_seg = pd.read_csv(seg)
data_label = pd.read_csv(label)

ED_data = pd.DataFrame(columns=['label', 'text'])
ED_data['uid'] = data_seg['UID']
ED_data['text'] = data_seg['SEG']
ED_data['translation'] = data_seg['translation']

# ED_data['label'] = data_label['emotion']
row= 0
for i in range(len(data_label)):
    turns = data_label['turns'][i] + 1 # plus because index 0
    for j in range(turns):
        ED_data['label'][row+j] = data_label['emotion'][i]
    row += turns

In [None]:
# Other - Anticipating/Prepared
ED_data.drop(ED_data.loc[ED_data['label'] == 'anticipating'].index[:], inplace=True) # delete rows
ED_data.drop(ED_data.loc[ED_data['label'] == 'prepared'].index[:], inplace=True) # delete rows

# Remap Emotions

# Anger - Angry/Annoyed/Furious/Jealous
ED_data["label"]= ED_data["label"].str.replace("angry", "anger")
ED_data["label"]= ED_data["label"].str.replace("annoyed", "anger")
ED_data["label"]= ED_data["label"].str.replace("furious", "anger")
ED_data["label"]= ED_data["label"].str.replace("jealous", "anger")

# Disgust - Disgusted/
ED_data["label"]= ED_data["label"].str.replace("disgusted", "disgust")

# Fear - Afraid/Anxious/Apprehensive/Terrified
ED_data["label"]= ED_data["label"].str.replace("afraid", "fear")
ED_data["label"]= ED_data["label"].str.replace("anxious", "fear")
ED_data["label"]= ED_data["label"].str.replace("apprehensive", "fear")
ED_data["label"]= ED_data["label"].str.replace("terrified", "fear")

# Happiness - Caring/Confident/Content/Excited/Faithful/Grateful/Joyful/Hopeful/Proud/Trusting
ED_data["label"]= ED_data["label"].str.replace("caring", "happiness")
ED_data["label"]= ED_data["label"].str.replace("confident", "happiness")
ED_data["label"]= ED_data["label"].str.replace("content", "happiness")
ED_data["label"]= ED_data["label"].str.replace("excited", "happiness")
ED_data["label"]= ED_data["label"].str.replace("faithful", "happiness")
ED_data["label"]= ED_data["label"].str.replace("grateful", "happiness")
ED_data["label"]= ED_data["label"].str.replace("joyful", "happiness")
ED_data["label"]= ED_data["label"].str.replace("hopeful", "happiness")
ED_data["label"]= ED_data["label"].str.replace("proud", "happiness")
ED_data["label"]= ED_data["label"].str.replace("trusting", "happiness")

# Sadness - Ashamed/Devastated/Disapointed/Embarrased/Guilty/Lonely/Nostalgic/Sad/Sentimental
ED_data["label"]= ED_data["label"].str.replace("sad", "sadness")
ED_data["label"]= ED_data["label"].str.replace("ashamed", "sadness")
ED_data["label"]= ED_data["label"].str.replace("devastated", "sadness")
ED_data["label"]= ED_data["label"].str.replace("disappointed", "sadness")
ED_data["label"]= ED_data["label"].str.replace("embarrassed", "sadness")
ED_data["label"]= ED_data["label"].str.replace("guilty", "sadness")
ED_data["label"]= ED_data["label"].str.replace("lonely", "sadness")
ED_data["label"]= ED_data["label"].str.replace("nostalgic", "sadness")
ED_data["label"]= ED_data["label"].str.replace("sentimental", "sadness")

# Surprise - Impressed/Surprised
ED_data["label"]= ED_data["label"].str.replace("impressed", "surprise")
ED_data["label"]= ED_data["label"].str.replace("surprised", "surprise")

ED_data = ED_data.reset_index(drop=True)

In [None]:
ED_data['label'].value_counts()

In [None]:
ED_data.count()

In [None]:
ED_data = ED_data[['uid', 'label', 'text', 'translation']]
ED_data

## DailyDialog

Emotions labels:
Anger, Disgust, Fear, Happiness, Sadness, Surprise and Other

In [None]:
seg = 'data/DAILYD/DAILYD_translation_en2es.csv'
label = 'data/DAILYD/DAILYD_dialoginfo.csv'

data_seg = pd.read_csv(seg)
data_label = pd.read_csv(label)

DD_data = pd.DataFrame(columns=['label', 'text'])
DD_data['uid'] = data_seg['UID']
DD_data['text'] = data_seg['SEG']
DD_data['translation'] = data_seg['translation']
DD_data['label'] = data_label['emotion']

# Remap Emotions
DD_data["label"]= DD_data["label"].str.replace("no emotion", "neutral")

In [None]:
# DD_data
DD_data['label'].value_counts()

In [None]:
DD_data.count()

In [None]:
DD_data.loc[DD_data['label'] == 'neutral']

In [None]:
DD_data = DD_data[['uid', 'label', 'text', 'translation']]
DD_data

## Concat all datasets

In [None]:
frames = [ED_data, DD_data]
data = pd.concat(frames)
data = data.reset_index(drop=True)
data

In [None]:
data = data.drop(columns=['uid'])

In [None]:
data_es = data.copy()
data_en = data.copy()
data_es['language'] = 'es'
data_en['language'] = 'en'
data = pd.concat([data_en[['label', 'text', 'language']], data_es[['label', 'translation', 'language']].rename(columns={'translation': 'text'})], ignore_index=True)

In [None]:
duplicates = data[data.duplicated(subset=['text'])]
duplicates

In [None]:
# Remove duplicates
data.drop_duplicates(subset=['text'], inplace=True)

In [None]:
# Check for duplicates in data
duplicates = data.duplicated().sum()
print(f'Duplicates in data: {duplicates}')

In [None]:
data.count()

In [None]:
data['label'].unique()

In [None]:
data['label'].value_counts()

In [None]:
data['label'].value_counts().plot.bar()

In [None]:
data.to_csv('data/dataset_multi.csv', index = False)

# POSTPROCESS DATA

## TRAIN & TEST DATA

In [None]:
df = pd.read_csv("data/dataset_multi.csv")

In [None]:
# Define the number of samples per class for the test and validation sets
num_samples_per_class_test = 250  # Adjust this value as needed
num_samples_per_class_val = 250   # Adjust this value as needed

# If language is 'multi', ensure balanced datasets for both English and Spanish
test_list_en, test_list_es = [], []
val_list_en, val_list_es = [], []
train_list_en, train_list_es = [], []

# Split each class separately to ensure the same class distribution in the test and validation sets
for label in df['label'].unique():
    class_samples_en = df[(df['label'] == label) & (df['language'] == 'en')]
    class_samples_es = df[(df['label'] == label) & (df['language'] == 'es')]
    
    # Extract the specified number of samples for the test and validation sets
    test_data_en = class_samples_en[~class_samples_en.duplicated(subset=['text'])].sample(n=num_samples_per_class_test, random_state=42)
    val_data_en = class_samples_en.drop(test_data_en.index).sample(n=num_samples_per_class_val, random_state=42)
    test_data_es = class_samples_es[~class_samples_es.duplicated(subset=['text'])].sample(n=num_samples_per_class_test, random_state=42)
    val_data_es = class_samples_es.drop(test_data_es.index).sample(n=num_samples_per_class_val, random_state=42)
    
    # Remaining samples go to the train set
    train_data_en = class_samples_en.drop(test_data_en.index).drop(val_data_en.index)
    train_data_es = class_samples_es.drop(test_data_es.index).drop(val_data_es.index)
            
    train_data_en = train_data_en[~train_data_en['text'].isin(test_data_en['text'])]
    train_data_es = train_data_es[~train_data_es['text'].isin(test_data_es['text'])]
    
    test_list_en.append(test_data_en)
    val_list_en.append(val_data_en)
    test_list_es.append(test_data_es)
    val_list_es.append(val_data_es)
    train_list_en.append(train_data_en)
    train_list_es.append(train_data_es)

# Concatenate train, validation, and test datasets and shuffle them
train_df_en = pd.concat(train_list_en).sample(frac=1, random_state=42).reset_index(drop=True)
train_df_es = pd.concat(train_list_es).sample(frac=1, random_state=42).reset_index(drop=True)
val_df_en = pd.concat(val_list_en).sample(frac=1, random_state=42).reset_index(drop=True)
val_df_es = pd.concat(val_list_es).sample(frac=1, random_state=42).reset_index(drop=True)
test_df_en = pd.concat(test_list_en).sample(frac=1, random_state=42).reset_index(drop=True)
test_df_es = pd.concat(test_list_es).sample(frac=1, random_state=42).reset_index(drop=True)

# Combine English and Spanish datasets
train_df = pd.concat([train_df_en, train_df_es]).sample(frac=1, random_state=42).reset_index(drop=True)
val_df = pd.concat([val_df_en, val_df_es]).sample(frac=1, random_state=42).reset_index(drop=True)
test_df = pd.concat([test_df_en, test_df_es]).sample(frac=1, random_state=42).reset_index(drop=True)

# Check new distributions
print("Train Class Distribution:\n", train_df['label'].value_counts())
print("\nValidation Class Distribution:\n", val_df['label'].value_counts())
print("\nTest Class Distribution:\n", test_df['label'].value_counts())

# Save to CSV
train_df.to_csv("data/train_dataset.csv", index=False)
val_df.to_csv("data/val_dataset.csv", index=False)
test_df.to_csv("data/test_dataset.csv", index=False)

In [None]:
# Group by language and label, then count occurrences
emotion_counts = train_df.groupby(['language', 'label']).size().unstack(fill_value=0)
# Display the counts
print(emotion_counts)
# Group by language and label, then count occurrences
emotion_counts = val_df.groupby(['language', 'label']).size().unstack(fill_value=0)
# Display the counts
print(emotion_counts)
# Group by language and label, then count occurrences
emotion_counts = test_df.groupby(['language', 'label']).size().unstack(fill_value=0)
# Display the counts
print(emotion_counts)

In [None]:
# Find duplicate rows in train_df
duplicates = train_df[train_df.duplicated()]

# Display the duplicate r
print("Duplicate Rows in train_df:")
print(duplicates)

# Find duplicate rows in test_df
duplicates = val_df[val_df.duplicated()]

# Display the duplicate ro
print("Duplicate Rows in val_df:")
print(duplicates)

# Find duplicate rows in test_df
duplicates = test_df[test_df.duplicated()]

# Display the duplicate rows
print("Duplicate Rows in test_df:")
print(duplicates)

In [None]:
# Check for common rows between train_df and val_df
common_rows_val = pd.merge(train_df, val_df, on=['text'], how='inner')
print(f"Number of common rows between train_df and val_df: {len(common_rows_val)}")

# Check for common rows between train_df and test_df
common_rows_test = pd.merge(train_df, test_df, on=['text'], how='inner')
print(f"Number of common rows between train_df and test_df: {len(common_rows_test)}")

# Display the common rows if any
if not common_rows_val.empty:
    print("Common rows between train_df and val_df:")
    print(common_rows_val)

if not common_rows_test.empty:
    print("Common rows between train_df and test_df:")
    print(common_rows_test)

In [None]:
# Check if any row in test_df is equal to any row in train_df
common_rows = val_df[val_df['text'].isin(pd.concat(train_list_en + train_list_es)['text'])]

# Display the common rows
print("Common Rows between val_df and train_df:")
print(common_rows)

# Check if any row in test_df is equal to any row in train_df
common_rows = test_df[test_df['text'].isin(pd.concat(train_list_en + train_list_es)['text'])]

# Display the common rows
print("Common Rows between test_df and train_df:")
print(common_rows)

## Downsampling Majority Class (Neutral)
## Upsampling Minority Classes

In [None]:
# METHOD 1: Downsampling (frac), Upsampling (ratio)

# Check initial class distribution
print("Before Balancing:\n", train_df['label'].value_counts())

### STEP 1: DOWNSAMPLE ONLY THE NEUTRAL CLASS ###
neutral_class = train_df[train_df['label'] == "neutral"]

frac = 0.6
neutral_downsampled_en = neutral_class[neutral_class['language'] == 'en'].sample(frac=frac, random_state=42)
neutral_downsampled_es = neutral_class[neutral_class['language'] == 'es'].sample(frac=frac, random_state=42)
neutral_downsampled = pd.concat([neutral_downsampled_en, neutral_downsampled_es])

### STEP 2: UPSAMPLE OTHER CLASSES ###
upsample_ratios = {
    "anger": 2,
    "fear": 3,
    "disgust": 9,
    "happiness": 1,
    "sadness": 1,
    "surprise": 4
}

# Initialize a list to store upsampled data
train_df_upsampled = []

# Upsample the specified classes
for label, ratio in upsample_ratios.items():
    class_samples = train_df[train_df['label'] == label]
    train_df_upsampled.append(resample(class_samples, replace=True, n_samples=len(class_samples) * ratio, random_state=42))

# Leave the other classes (not upsampled) unchanged
for label in train_df['label'].unique():
    if label not in upsample_ratios and label != "neutral":
        class_samples = train_df[train_df['label'] == label]
        train_df_upsampled.append(class_samples)

### STEP 3: COMBINE DOWNSAMPLED NEUTRAL CLASS WITH UPSAMPLED CLASSES ###
train_df_balanced = pd.concat([neutral_downsampled] + train_df_upsampled)

# Shuffle the dataset to mix classes
train_df_balanced = train_df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)

# Check new class distribution
print("After Balancing:\n", train_df_balanced['label'].value_counts())

# Update train_df
train_df = train_df_balanced

# Save the balanced dataset to a CSV file
train_df.to_csv("data/train_dataset.csv", index=False)

In [None]:
# Group by language and label, then count occurrences
emotion_counts = train_df.groupby(['language', 'label']).size().unstack(fill_value=0)
# Display the counts
print(emotion_counts)
# Group by language and label, then count occurrences
emotion_counts = val_df.groupby(['language', 'label']).size().unstack(fill_value=0)
# Display the counts
print(emotion_counts)
# Group by language and label, then count occurrences
emotion_counts = test_df.groupby(['language', 'label']).size().unstack(fill_value=0)
# Display the counts
print(emotion_counts)

# TEXT CLASSIFICATION MODEL

In [None]:
train_df = pd.read_csv("data/train_dataset.csv")
val_df = pd.read_csv("data/val_dataset.csv")

In [None]:
# Encode the labels
label_encoder = LabelEncoder()
train_df['label'] = label_encoder.fit_transform(train_df['label'])
val_df['label'] = label_encoder.transform(val_df['label'])

In [None]:
label_encoder.classes_

In [None]:
# Path to save model and tokenizer
model_name_save = './classifier_model_multi/final_model'

## TOKENIZER

In [None]:
# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
def tokenize_function(examples):
    return tokenizer(list(examples), truncation=True, padding='max_length', max_length=128)

# Apply tokenization correctly
train_encodings = tokenize_function(train_df['text'])
val_encodings = tokenize_function(val_df['text'])

In [None]:
# Convert to torch Dataset
class EmotionDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = EmotionDataset(train_encodings, train_df['label'].values)
val_dataset = EmotionDataset(val_encodings, val_df['label'].values)

## TRAIN MODEL

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
# Load the model
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(label_encoder.classes_)).to(device)

In [None]:
def compute_metrics(p):
    predictions, labels = p
    preds = np.argmax(predictions, axis=1)  # Convert logits to predicted class ids

    return {
        'accuracy': accuracy_score(labels, preds),
        'f1': f1_score(labels, preds, average="macro"),
        'precision': precision_score(labels, preds, average="macro"),
        'recall': recall_score(labels, preds, average="macro")
    }

In [None]:
class CustomTrainer(Trainer):
    def __init__(self, class_weights=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights.to(self.args.device) if class_weights is not None else None

    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        loss_fn = torch.nn.CrossEntropyLoss(weight=self.class_weights) if self.class_weights is not None else torch.nn.CrossEntropyLoss()
        loss = loss_fn(logits, labels)

        return (loss, outputs) if return_outputs else loss

    def create_optimizer_and_scheduler(self, num_training_steps=None, num_warmup_steps=None):
        optimizer = torch.optim.AdamW(
            [
                {"params": [p for n, p in self.model.named_parameters() if not any(nd in n for nd in ["bias", "LayerNorm.weight"])], "weight_decay": self.args.weight_decay},
                {"params": [p for n, p in self.model.named_parameters() if any(nd in n for nd in ["bias", "LayerNorm.weight"])], "weight_decay": 0.0}
            ],
            lr=self.args.learning_rate
        )

        num_warmup_steps = int(0.1 * num_training_steps) if num_warmup_steps is None else num_warmup_steps  # 10% Warmup

        lr_scheduler = get_scheduler("cosine", optimizer=optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps) # You can choose from 'linear', 'cosine', 'constant' etc.

        self.optimizer, self.lr_scheduler = optimizer, lr_scheduler
        return optimizer, lr_scheduler

In [None]:
# Training Arguments
training_args = TrainingArguments(
    output_dir=f"{model_name_save}/results",  # Where to save model
    evaluation_strategy="epoch",  # Evaluate after each epoch
    save_strategy="epoch",  # Save best model each epoch
    learning_rate=5e-6,  # Lower learning rate for better fine-tuning
    per_device_train_batch_size=32,  # Adjust based on GPU memory (try 8, 16, or 32)
    per_device_eval_batch_size=32,  # Same as train
    num_train_epochs=3,  # More epochs since we have a large dataset
    weight_decay=0.01,  # Regularization to prevent overfitting
    logging_dir='./logs',
    logging_steps=500,  # Log every X steps
    save_total_limit=2,  # Keep only last 2 models to save space
    metric_for_best_model="f1",  # Best checkpoint based on F1-score
    load_best_model_at_end=True,
    report_to="none",  # Disable logging to external platforms (e.g., wandb)
)

In [None]:
# Compute class weights
label_counts = train_df['label'].value_counts().sort_index().values
class_weights = torch.tensor(1.0 / label_counts, dtype=torch.float32)
class_weights /= class_weights.sum()  # Normalize
class_weights = class_weights.to(device)

In [None]:
# Dataset and Trainer setup
trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
    compute_metrics=compute_metrics,  # Pass the custom compute_metrics function
    class_weights=class_weights  # Pass class weights to the custom trainer
)

In [None]:
# Apply dropout regularization
model.config.hidden_dropout_prob = 0.2
model.config.attention_probs_dropout_prob = 0.2

# Create optimizer and scheduler
num_training_steps = len(train_dataset) // training_args.per_device_train_batch_size * training_args.num_train_epochs
trainer.create_optimizer_and_scheduler(num_training_steps=num_training_steps)

In [None]:
# Train the model
trainer.train()

In [None]:
# Evaluate the model
results = trainer.evaluate()
results

In [None]:
# Save the model and tokenizer
model.save_pretrained(model_name_save)
tokenizer.save_pretrained(model_name_save)

# TEST

In [None]:
# Path to save model and tokenizer
model_name_save = './classifier_model_multi/final_model-roberta-large-m1-lr-5e-06-cosine'
    
# Get latest checkpoint
checkpoint_dir = model_name_save + '/results'
checkpoint_folders = sorted([f for f in os.listdir(checkpoint_dir) if f.startswith('checkpoint')])[-1]
checkpoint_path = os.path.join(checkpoint_dir, checkpoint_folders)

# Load model from checkpoint
model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)

# Reinitialize Trainer
trainer = Trainer(
    model=model,
    args=TrainingArguments(output_dir=checkpoint_path),
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# Resume training (now with a fresh optimizer)
trainer.train(resume_from_checkpoint=checkpoint_path)

# Access the trainer state after resuming from checkpoint
trainer_state = trainer.state
# print(f"Trainer State: {trainer_state}")

# Now, let's access the log history for detailed metrics for each epoch
log_history = trainer.state.log_history

# Extract metrics (train loss, eval loss, accuracy, etc.) from log_history
epochs = []
train_loss = []
val_loss = []
accuracy = []
f1 = []
precision = []
recall = []

for log in log_history:
    # Extracting relevant metrics from the logs
    if 'epoch' in log:
        epochs.append(log['epoch'])
    if 'loss' in log:
        train_loss.append(log['loss'])
    if 'eval_loss' in log:
        val_loss.append(log['eval_loss'])
    if 'eval_accuracy' in log:
        accuracy.append(log['eval_accuracy'])
    if 'eval_f1' in log:
        f1.append(log['eval_f1'])
    if 'eval_precision' in log:
        precision.append(log['eval_precision'])
    if 'eval_recall' in log:
        recall.append(log['eval_recall'])

# If they don't match, adjust the lengths to be consistent:
# Trim to the minimum length
min_len = min(len(epochs), len(train_loss), len(val_loss), len(accuracy), len(f1), len(precision), len(recall))

epochs = epochs[:min_len]
train_loss = train_loss[:min_len]
val_loss = val_loss[:min_len]
accuracy = accuracy[:min_len]
f1 = f1[:min_len]
precision = precision[:min_len]
recall = recall[:min_len]

# Now, you should be able to plot the metrics without dimension mismatch
fig, axs = plt.subplots(3, 1, figsize=(5, 10))

# Plot Training and Validation Loss
axs[0].plot(epochs, train_loss, label="Training Loss", marker='o')
axs[0].plot(epochs, val_loss, label="Validation Loss", marker='o')
axs[0].set_title('Training and Validation Loss')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Loss')
axs[0].legend()

# Plot Accuracy
axs[1].plot(epochs, accuracy, label="Accuracy", marker='o', color='g')
axs[1].set_title('Accuracy over Epochs')
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('Accuracy')
axs[1].legend()

# Plot F1, Precision, and Recall
axs[2].plot(epochs, f1, label="F1 Score", marker='o', color='b')
axs[2].plot(epochs, precision, label="Precision", marker='o', color='r')
axs[2].plot(epochs, recall, label="Recall", marker='o', color='orange')
axs[2].set_title('F1, Precision, and Recall over Epochs')
axs[2].set_xlabel('Epoch')
axs[2].set_ylabel('Score')
axs[2].legend()

# Show the plots
plt.tight_layout()
plt.show()

In [None]:
# Load the saved model and tokenizer
model = AutoModelForSequenceClassification.from_pretrained(model_name_save)
tokenizer = AutoTokenizer.from_pretrained(model_name_save)

In [None]:
# Manual test example
def predict_emotion(text):
    # Tokenize the input text
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding='max_length', max_length=128)
    
    # Move inputs to the correct device (CPU/GPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    
    # Ensure the model is in evaluation mode
    model.eval()
    
    # Get model predictions
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Get logits and apply softmax to get probabilities
    logits = outputs.logits
    probabilities = torch.nn.functional.softmax(logits, dim=1)
    
    # Get predicted label and confidence score
    predicted_class_id = torch.argmax(probabilities, dim=1).item()
    confidence_score = probabilities[0, predicted_class_id].item()
    
    # Decode the label
    predicted_label = label_encoder.inverse_transform([predicted_class_id])[0]
    
    return predicted_label, confidence_score

In [None]:
sentence = ['Hola Ray, ¿Qué tal estás?', 'Hola Ray, ¿Que tal estas', 'Hola Ray, que tal estas', 'Hola Ray, ¿Cómo estás?']
count = 0
for text in sentence:
    predicted_label, confidence = predict_emotion(text)
    print(f"Text: {text}\nPredicted: {predicted_label}\nConfidence: {confidence:.4f}\n")

In [None]:
sentence = sentence_es
count = 0
for s in sentence:
    text = s[1]
    predicted_label, confidence = predict_emotion(text)
    # print(f"Text: {text}\nTarget: {s[0]}\nPredicted: {predicted_label} (Confidence: {confidence:.4f})\n")
    if s[0] == predicted_label:
        count += 1
total = count / len(sentence) * 100
print(f"Total Accuracy es: {total:.2f}%")

In [None]:
sentence = sentence_en
count = 0
for s in sentence:
    text = s[1]
    predicted_label, confidence = predict_emotion(text)
    # print(f"Text: {text}\nTarget: {s[0]}\nPredicted: {predicted_label} (Confidence: {confidence:.4f})\n")
    if s[0] == predicted_label:
        count += 1
total = count / len(sentence) * 100
print(f"Total Accuracy en: {total:.2f}%")

## PREDICTIONS

In [None]:
# Confusion matrix analysis function
def cm_analysis(y_true, y_pred, labels, ymap=None, figsize=(10,10)):
    """
    Generate matrix plot of confusion matrix with pretty annotations.
    The plot image is saved to disk.
    args: 
      y_true:    true label of the data, with shape (nsamples,)
      y_pred:    prediction of the data, with shape (nsamples,)
      filename:  filename of figure file to save
      labels:    string array, name the order of class labels in the confusion matrix.
                 use `clf.classes_` if using scikit-learn models.
                 with shape (nclass,).
      ymap:      dict: any -> string, length == nclass.
                 if not None, map the labels & ys to more understandable strings.
                 Caution: original y_true, y_pred and labels must align.
      figsize:   the size of the figure plotted.
    """
    if ymap is not None:
        y_pred = [ymap[yi] for yi in y_pred]
        y_true = [ymap[yi] for yi in y_true]
        labels = [ymap[yi] for yi in labels]
    
    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    cm_sum = np.sum(cm, axis=1, keepdims=True)
    cm_perc = cm / cm_sum.astype(float) * 100
    
    # Create annotations for the heatmap
    annot = np.empty_like(cm).astype(str)
    nrows, ncols = cm.shape
    for i in range(nrows):
        for j in range(ncols):
            c = cm[i, j]
            p = cm_perc[i, j]
            if i == j:
                s = cm_sum[i]
                annot[i, j] = '%.1f%%\n%d/%d' % (p, c, s)
            elif c == 0:
                annot[i, j] = ''
            else:
                annot[i, j] = '%.1f%%\n%d' % (p, c)
    
    # Convert confusion matrix into DataFrame
    cm = pd.DataFrame(cm, index=labels, columns=labels)
    cm.index.name = 'True'
    cm.columns.name = 'Predicted'
    
    # Plot the heatmap
    fig, ax = plt.subplots(figsize=figsize)
    sns.heatmap(cm, annot=annot, fmt='', ax=ax, cmap='Oranges')
    # plt.title('Confusion Matrix of the Classifier')
    plt.show()

In [None]:
test_df = pd.read_csv("data/test_dataset.csv")
y_test = test_df['label'].tolist()

# Get true labels, predicted labels, and confidence scores for the test set
predictions = [predict_emotion(test) for test in test_df['text']]
y_pred, confidence_score = zip(*predictions)

# Get the list of class labels from the test dataset
labels = test_df['label'].value_counts().index.tolist()

# Compute metrics
accuracy = sum([1 for true, pred in zip(y_test, y_pred) if true == pred]) / len(y_test)
f1 = f1_score(y_test, y_pred, average="macro")
precision = precision_score(y_test, y_pred, average="macro")
recall = recall_score(y_test, y_pred, average="macro")

# Print the results
print("Accuracy: %.3f %%" % (accuracy * 100))
print("F1 Score: %.4f" % f1)
print("Precision Score: %.4f" % precision)
print("Recall Score: %.4f" % recall, "\n")

# Call the confusion matrix analysis function
cm_analysis(y_test, y_pred, labels, ymap=None, figsize=(10,10))