In [1]:
# %% Cell 0: Environment Setup and Dependency Installation
# Uninstall conflicting packages and install specific versions
!pip uninstall -y torch torchvision torchaudio triton
!pip install torch==2.3.0 --no-cache-dir
!pip install transformers==4.41.2 datasets==2.20.0 nltk==3.8.1 scikit-learn==1.5.0 tqdm==4.66.4

# Verify PyTorch installation
import torch
print(f"PyTorch Version: {torch.__version__}")
print(f"CUDA Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA Version: {torch.version.cuda}")

# Check Python version
import sys
print(f"Python Version: {sys.version}")

# Download NLTK data
import nltk
nltk.download('wordnet')
nltk.download('omw-1.4')

Found existing installation: torch 2.6.0+cu124
Uninstalling torch-2.6.0+cu124:
  Successfully uninstalled torch-2.6.0+cu124
Found existing installation: torchvision 0.21.0+cu124
Uninstalling torchvision-0.21.0+cu124:
  Successfully uninstalled torchvision-0.21.0+cu124
Found existing installation: torchaudio 2.6.0+cu124
Uninstalling torchaudio-2.6.0+cu124:
  Successfully uninstalled torchaudio-2.6.0+cu124
Found existing installation: triton 3.2.0
Uninstalling triton-3.2.0:
  Successfully uninstalled triton-3.2.0
Collecting torch==2.3.0
  Downloading torch-2.3.0-cp311-cp311-manylinux1_x86_64.whl.metadata (26 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch==2.3.0)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch==2.3.0)
  Downloading nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch==2.3

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

In [2]:
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
!pip install datasets
from datasets import load_dataset
import numpy as np
from nltk.corpus import wordnet
import random
import nltk
from sklearn.metrics import classification_report, f1_score
from tqdm import tqdm

# Download NLTK data for augmentation
nltk.download('wordnet')
nltk.download('omw-1.4')

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')




[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


In [3]:
# Hyperparameters and Dataset Loading
# Hyperparameters
MAX_LEN = 128
BATCH_SIZE = 32
EPOCHS = 5
LSTM_HIDDEN_DIM = 256
LSTM_LAYERS = 2
ATTENTION_HEADS = 4
DROPOUT = 0.3
LEARNING_RATE = 2e-5
ALPHA_FOCAL = 0.75
GAMMA_FOCAL = 2.0

# Load dataset
try:
    dataset = load_dataset('emotion')
    emotions = dataset['train'].features['label'].names
    NUM_CLASSES = len(emotions)
    print(f"Loaded dataset with {NUM_CLASSES} emotions: {emotions}")
except Exception as e:
    print(f"Error loading dataset: {e}")
    raise

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading readme:   0%|          | 0.00/9.05k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/1.03M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/127k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/129k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/16000 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/2000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/2000 [00:00<?, ? examples/s]

Loaded dataset with 6 emotions: ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']


In [4]:
#Data Augmentation
def synonym_replacement(text, n=2):
    words = text.split()
    new_words = words.copy()
    random_word_list = list(set([word for word in words if wordnet.synsets(word)]))
    random.shuffle(random_word_list)
    num_replaced = 0
    for random_word in random_word_list:
        synonyms = []
        for syn in wordnet.synsets(random_word):
            for lemma in syn.lemmas():
                synonyms.append(lemma.name())
        if len(synonyms) >= 1:
            synonym = random.choice(list(set(synonyms)))
            new_words = [synonym if word == random_word else word for word in new_words]
            num_replaced += 1
        if num_replaced >= n:
            break
    return ' '.join(new_words)

In [5]:
class EmotionDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len, augment=False):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.augment = augment

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        if self.augment and random.random() > 0.5:
            text = synonym_replacement(text)

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

In [6]:
class Attention(nn.Module):
    def __init__(self, hidden_dim, n_heads):
        super(Attention, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_heads = n_heads
        self.head_dim = hidden_dim // n_heads
        assert self.head_dim * n_heads == hidden_dim, "Hidden dim must be divisible by n_heads"

        self.W_q = nn.Linear(hidden_dim, hidden_dim)
        self.W_k = nn.Linear(hidden_dim, hidden_dim)
        self.W_v = nn.Linear(hidden_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, hidden_dim)
        self.scale = torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))

    def forward(self, x, mask=None):
        batch_size, seq_len, hidden_dim = x.size()

        Q = self.W_q(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        K = self.W_k(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        V = self.W_v(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale

        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        attn = torch.softmax(scores, dim=-1)
        context = torch.matmul(attn, V).transpose(1, 2).contiguous().view(batch_size, seq_len, hidden_dim)
        output = self.fc(context)

        return output



In [7]:
# Model Definition
class BERT_RNN_EmotionClassifier(nn.Module):
    def __init__(self, bert_model, lstm_hidden_dim, lstm_layers, num_classes, dropout, attention_heads):
        super(BERT_RNN_EmotionClassifier, self).__init__()
        self.bert = bert_model
        self.lstm = nn.LSTM(
            input_size=bert_model.config.hidden_size,
            hidden_size=lstm_hidden_dim,
            num_layers=lstm_layers,
            batch_first=True,
            bidirectional=True
        )
        self.attention = Attention(lstm_hidden_dim * 2, attention_heads)
        self.batch_norm = nn.BatchNorm1d(lstm_hidden_dim * 2)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(lstm_hidden_dim * 2, num_classes)

    def forward(self, input_ids, attention_mask):
        with torch.no_grad():
            bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = bert_output.last_hidden_state

        lstm_output, _ = self.lstm(sequence_output)
        attn_output = self.attention(lstm_output, attention_mask.unsqueeze(1).unsqueeze(2))
        pooled_output = attn_output.mean(dim=1)
        pooled_output = self.batch_norm(pooled_output)
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)

        return logits

In [8]:
#Focal Loss
# CRITICAL: This cell defines FocalLoss, which is needed for Cell 11
class FocalLoss(nn.Module):
    def __init__(self, alpha=ALPHA_FOCAL, gamma=GAMMA_FOCAL, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss(reduction='none')(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss


In [9]:
#Training Function
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs, device):
    best_f1 = 0.0
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{epochs}'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)

        model.eval()
        val_preds = []
        val_labels = []
        val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)

                outputs = model(input_ids, attention_mask)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                val_preds.extend(preds)
                val_labels.extend(labels.cpu().numpy())

        avg_val_loss = val_loss / len(val_loader)
        f1 = f1_score(val_labels, val_preds, average='weighted')

        print(f'Epoch {epoch + 1}/{epochs}')
        print(f'Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val F1: {f1:.4f}')

        if f1 > best_f1:
            best_f1 = f1
            torch.save(model.state_dict(), 'best_model.pt')

In [10]:
# Evaluation Function
def evaluate_model(model, test_loader, device):
    model.eval()
    preds = []
    labels = []
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            lbls = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask)
            batch_preds = torch.argmax(outputs, dim=1).cpu().numpy()
            preds.extend(batch_preds)
            labels.extend(lbls.cpu().numpy())

    print('Test Classification Report:')
    print(classification_report(labels, preds, target_names=emotions))

In [11]:
# User Input Prediction
def predict_emotion(model, tokenizer, text, max_len, device):
    if not text.strip():
        return "Error: Empty input. Please provide some text."

    model.eval()
    encoding = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=max_len,
        return_token_type_ids=False,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )

    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)

    with torch.no_grad():
        outputs = model(input_ids, attention_mask)
        pred = torch.argmax(outputs, dim=1).cpu().numpy()[0]

    return emotions[pred]

In [12]:
# Model training and evaluation
def train_and_evaluate():
    try:
        tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        bert_model = BertModel.from_pretrained('bert-base-uncased')
    except Exception as e:
        print(f"Error loading BERT models: {e}")
        return None, None, None, None

    train_texts = dataset['train']['text']
    train_labels = dataset['train']['label']
    val_texts = dataset['validation']['text']
    val_labels = dataset['validation']['label']
    test_texts = dataset['test']['text']
    test_labels = dataset['test']['label']

    train_dataset = EmotionDataset(train_texts, train_labels, tokenizer, MAX_LEN, augment=True)
    val_dataset = EmotionDataset(val_texts, val_labels, tokenizer, MAX_LEN)
    test_dataset = EmotionDataset(test_texts, test_labels, tokenizer, MAX_LEN)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

    model = BERT_RNN_EmotionClassifier(
        bert_model=bert_model,
        lstm_hidden_dim=LSTM_HIDDEN_DIM,
        lstm_layers=LSTM_LAYERS,
        num_classes=NUM_CLASSES,
        dropout=DROPOUT,
        attention_heads=ATTENTION_HEADS
    ).to(device)

    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)

    # Check if FocalLoss is defined
    if 'FocalLoss' not in globals():
        raise NameError("FocalLoss is not defined. Ensure Cell 7 is executed.")
    criterion = FocalLoss()

    print("Training model...")
    train_model(model, train_loader, val_loader, criterion, optimizer, EPOCHS, device)

    print("Loading best model for evaluation...")
    try:
        model.load_state_dict(torch.load('best_model.pt'))
    except Exception as e:
        print(f"Error loading best model: {e}")
        return None, None, None, None
    evaluate_model(model, test_loader, device)

    # Return objects needed for prediction
    return model, tokenizer, MAX_LEN, device

In [14]:
# User Input Prediction
def predict_interactively(model, tokenizer, max_len, device):
    if model is None or tokenizer is None:
        print("Error: Model or tokenizer not initialized. Run Cell 11 first.")
        return
    print("\nEmotion Recognition Ready!")
    print("Enter text to predict its emotion. Type 'exit' to quit.")
    for _ in range(10):  # Limit to 10 inputs
        user_input = input("Your text: ").strip()
        if user_input.lower() == 'exit':
            print("Exiting...")
            break
        prediction = predict_emotion(model, tokenizer, user_input, max_len, device)
        print(f"Predicted Emotion: {prediction}\n")
# Execute training and prediction
if __name__ == '__main__':
    model, tokenizer, max_len, device = train_and_evaluate()
    predict_interactively(model, tokenizer, max_len, device)

Training model...


Epoch 1/5:   0%|          | 0/500 [00:12<?, ?it/s]


KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt

emotions = ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']
f1_scores = [0.74, 0.42, 0.60, 0.66, 0.53, 0.72]

plt.figure(figsize=(10, 6))
plt.bar(emotions, f1_scores, color=['#FF9999', '#66B2FF', '#99FF99', '#FFCC99', '#FF99FF', '#99CCCC'])
plt.title('F1-Scores by Emotion', fontsize=14)
plt.xlabel('Emotion', fontsize=12)
plt.ylabel('F1-Score', fontsize=12)
plt.ylim(0, 1)
for i, v in enumerate(f1_scores):
    plt.text(i, v + 0.02, f'{v:.2f}', ha='center', fontsize=10)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.savefig('f1_scores_by_emotion.png')
plt.show()

In [None]:
import matplotlib.pyplot as plt

emotions = ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']
support = [581, 159, 275, 224, 66, 695]  # Adjusted to sum to 2000

plt.figure(figsize=(8, 8))
plt.pie(support, labels=emotions, autopct='%1.1f%%', colors=['#FF9999', '#66B2FF', '#99FF99', '#FFCC99', '#FF99FF', '#99CCCC'], startangle=90)
plt.title('Distribution of Emotion Samples', fontsize=14)
plt.axis('equal')
plt.tight_layout()
plt.savefig('emotion_distribution.png')
plt.show()

In [None]:
import matplotlib.pyplot as plt

emotions = ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']
precision = [0.77, 0.43, 0.59, 0.66, 0.51, 0.71]
recall = [0.71, 0.42, 0.61, 0.66, 0.55, 0.73]
f1_scores = [0.74, 0.42, 0.60, 0.66, 0.53, 0.72]

plt.figure(figsize=(10, 6))
plt.plot(emotions, precision, marker='o', label='Precision', color='b')
plt.plot(emotions, recall, marker='o', label='Recall', color='g')
plt.plot(emotions, f1_scores, marker='o', label='F1-Score', color='r')
plt.title('Precision, Recall, and F1-Score by Emotion', fontsize=14)
plt.xlabel('Emotion', fontsize=12)
plt.ylabel('Score', fontsize=12)
plt.ylim(0, 1)
plt.legend()
plt.grid(True, linestyle='--', alpha=0.7)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('precision_recall_f1.png')
plt.show()