In [2]:
from datasets import load_dataset
import random
from sklearn.metrics import f1_score
from tokenizers import Tokenizer, models, trainers
from tokenizers.pre_tokenizers import Whitespace
import torch.nn as nn
import torch
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
dataset = load_dataset("dair-ai/emotion", "split")
labels = ["sadness", "joy", "love", "anger", "fear", "surprise"]
train_data = dataset["train"]
validation_data = dataset["validation"]
test_data = dataset["test"]
# Tokenization
vocab_n = 5000
sequence_len = 64

# Initialize a tokenizer using BPE (Byte Pair Encoding)
tokenizer = Tokenizer(models.BPE())
tokenizer.pre_tokenizer = Whitespace()
tokenizer.enable_padding(length=sequence_len)
tokenizer.enable_truncation(max_length=sequence_len)
tokenizer_trainer = trainers.BpeTrainer(vocab_size=vocab_n)
tokenizer.train_from_iterator(train_data["text"], trainer=tokenizer_trainer)
def preprocess_text(text: str, tokenizer: Tokenizer):
    """ 
    Helper function to tokenize text and return corresponding token IDs as tensors.

    Args:
        text, str: Text instance from training data.
        tokenizer, Tokenizer: The respective tokenizer to be used for tokenization.
    Returns:
        Tensor: One-dimensional PyTorch tensor with token IDs.
    """
    return torch.tensor(tokenizer.encode(text).ids)


def preprocess_label(label: int):
    """ 
    Helper function to return label as tensor.

    Args:
        label, int: Label from instance.
    Returns:
        Tensor: One-dimensional PyTorch tensor containing the label index.
    """
    return torch.tensor(label)


def preprocess(data: dict, tokenizer: Tokenizer):
    """ 
    Transforms input dataset to tokenized vector representations.

    Args:
        data, dict: Dictionary with text instances and labels.
        tokenizer, Tokenizer: The respective tokenizer to be used for tokenization.
    Returns:
        list: List with tensors for the input texts and labels.
    """
    instances = []

    for text, label in zip(data["text"], data["label"]):
        input = preprocess_text(text, tokenizer)
        label = preprocess_label(label)
        
        instances.append((input, label))

    return instances
train_instances = preprocess(train_data, tokenizer)
val_instances = preprocess(validation_data, tokenizer)
test_instances = preprocess(test_data, tokenizer)
# Batching for LSTM input

def batching_lstm(instances: list, batch_size: int, shuffle: bool):
    """
    Batching for LSTM input: with padding support.

    Args:
        instances: List of (input_tensor, label_tensor) pairs.
        batch_size: Number of instances per batch.
        shuffle: Whether to shuffle the dataset before batching.
    
    Returns:
        batches: List of (padded_input_tensor, label_tensor) for each batch.
    """
    if shuffle:
        random.shuffle(instances)

    batches = []

    for i in range(0, len(instances), batch_size):
        batch = instances[i : i + batch_size]

        # Take out a batch of inputs and labels
        batch_inputs = [item[0] for item in batch]  # list of tensors (seq_len,)
        batch_labels = torch.stack([item[1] for item in batch])  # tensor of shape [batch_size]

        # Automatic padding becomes [batch_size, max_seq_len]
        padded_inputs = pad_sequence(batch_inputs, batch_first=True, padding_value=0)

        batches.append((padded_inputs, batch_labels))

    return batches

In [45]:
class CNN_Classifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, output_dim, padding_idx):
        super(CNN_Classifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        self.conv1 = nn.Conv1d(in_channels=embedding_dim, out_channels=100, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=100, out_channels=100, kernel_size=3, padding=1)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        embedded = self.embedding(x)  # [batch, seq_len, emb_dim]
        embedded = embedded.transpose(1, 2)  # [batch, emb_dim, seq_len]
        
        conv1_out = F.relu(self.conv1(embedded))  # [batch, 100, seq_len]
        conv2_out = F.relu(self.conv2(conv1_out))  # [batch, 100, seq_len]
        
        return self.dropout(conv2_out.transpose(1, 2))  # [batch, seq_len, 100]

class LSTM_Classifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, padding_idx=None): 
        super(LSTM_Classifier, self).__init__()

        # Single-layer bidirectional LSTM
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=1,
            bidirectional=True,
            batch_first=True
        )

        # Dropout layer
        self.dropout = nn.Dropout(0.3)

        # Fully connected layer, output 6 types of emotions
        self.fc = nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, cnn_features):
         # cnn_features: [batch, seq_len, 100]
        lstm_out, (hidden, _) = self.lstm(cnn_features)

        # Take the last hidden layer of the forward and reverse directions and concatenate them
        hidden_forward = hidden[-2, :, :]  # [batch, hidden_dim]
        hidden_backward = hidden[-1, :, :]  # [batch, hidden_dim]
        combined = torch.cat((hidden_forward, hidden_backward), dim=1)  # [batch, hidden_dim * 2]

        return self.fc(self.dropout(combined))  # [batch, output_dim]
    
# Get the vocabulary dictionary from the tokenizer (word → ID)
word2idx = tokenizer.get_vocab()  # e.g., {'i': 4, 'love': 5, 'this': 6, ...}

# Reversal
idx2word = {idx: word for word, idx in word2idx.items()}

vocab_size = len(word2idx)
padding_idx = word2idx.get("[PAD]", 0) 

class CNN_LSTM_Ensemble(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx):
        super(CNN_LSTM_Ensemble, self).__init__()
        self.cnn = CNN_Classifier(vocab_size=vocab_size, embedding_dim=embedding_dim, output_dim=100, padding_idx=padding_idx)
        self.lstm = LSTM_Classifier(input_dim=100, hidden_dim=hidden_dim, output_dim=output_dim)
        self.fc = nn.Linear(output_dim * 2, output_dim)

    def forward(self, x):
        cnn_features = self.cnn(x)  # [batch, seq_len, 100]
        return self.lstm(cnn_features)  # [batch, output_dim]
        
from sklearn.metrics import accuracy_score, f1_score

def train_and_evaluate(model, train_batches, val_batches, num_epochs=25, lr=1e-3, device="gpu"):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        model.train()
        train_losses = []
        all_preds, all_labels = [], []

        for batch_x, batch_y in train_batches:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)

            optimizer.zero_grad()
            outputs = model(batch_x)  # shape: [batch_size, 6]
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

            train_losses.append(loss.item())
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch_y.cpu().numpy())

        train_acc = accuracy_score(all_labels, all_preds)
        train_f1 = f1_score(all_labels, all_preds, average='weighted')

        # ---- verify ----
        model.eval()
        val_preds, val_labels = [], []
        with torch.no_grad():
            for val_x, val_y in val_batches:
                val_x, val_y = val_x.to(device), val_y.to(device)
                val_out = model(val_x)
                val_pred = torch.argmax(val_out, dim=1)
                val_preds.extend(val_pred.cpu().numpy())
                val_labels.extend(val_y.cpu().numpy())

        val_acc = accuracy_score(val_labels, val_preds)
        val_f1 = f1_score(val_labels, val_preds, average='weighted')

        print(f"Epoch {epoch+1}/{num_epochs} | "
              f"Train Loss: {sum(train_losses)/len(train_losses):.4f} | "
              f"Train Acc: {train_acc:.4f} | F1: {train_f1:.4f} || "
              f"Val Acc: {val_acc:.4f} | Val F1: {val_f1:.4f}")

In [47]:
# Parameters tuning
model = LSTMClassifier(
    vocab_size=len(word2idx),        # vocabulary size
    embedding_dim=100,               # Dimensions of word vectors
    hidden_dim=128,                  # Hidden layer dimensions
    output_dim=6,                    # The number of output categories
    padding_idx=word2idx.get("[PAD]", 0)  # Index of pad token
)
train_instances = preprocess(dataset["train"], tokenizer)
val_instances = preprocess(dataset["validation"], tokenizer)

train_batches = batching_lstm(train_instances, batch_size=32, shuffle=True)
val_batches = batching_lstm(val_instances, batch_size=32, shuffle=False)

device = "cuda" if torch.cuda.is_available() else "cpu"
train_and_evaluate(model, train_batches, val_batches, num_epochs=25, device=device)

Epoch 1/25 | Train Loss: 1.4742 | Train Acc: 0.4093 | F1: 0.3411 || Val Acc: 0.5630 | Val F1: 0.4939
Epoch 2/25 | Train Loss: 0.8787 | Train Acc: 0.6914 | F1: 0.6578 || Val Acc: 0.7625 | Val F1: 0.7408
Epoch 3/25 | Train Loss: 0.4917 | Train Acc: 0.8257 | F1: 0.8198 || Val Acc: 0.8280 | Val F1: 0.8291
Epoch 4/25 | Train Loss: 0.3176 | Train Acc: 0.8904 | F1: 0.8895 || Val Acc: 0.8620 | Val F1: 0.8605
Epoch 5/25 | Train Loss: 0.2182 | Train Acc: 0.9230 | F1: 0.9228 || Val Acc: 0.8775 | Val F1: 0.8772
Epoch 6/25 | Train Loss: 0.1648 | Train Acc: 0.9417 | F1: 0.9417 || Val Acc: 0.8865 | Val F1: 0.8864
Epoch 7/25 | Train Loss: 0.1249 | Train Acc: 0.9561 | F1: 0.9561 || Val Acc: 0.8905 | Val F1: 0.8911
Epoch 8/25 | Train Loss: 0.0994 | Train Acc: 0.9654 | F1: 0.9654 || Val Acc: 0.8945 | Val F1: 0.8944
Epoch 9/25 | Train Loss: 0.0744 | Train Acc: 0.9754 | F1: 0.9754 || Val Acc: 0.9005 | Val F1: 0.9004
Epoch 10/25 | Train Loss: 0.0858 | Train Acc: 0.9729 | F1: 0.9729 || Val Acc: 0.8855 | Val 

In [48]:
from sklearn.metrics import classification_report

# Get the label name order
label_names = dataset["train"].features["label"].names  # ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']

# Model prediction validation set
true_labels = []
pred_labels = []

model.eval()
with torch.no_grad():
    for x_batch, y_batch in val_batches:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)

        outputs = model(x_batch)
        predictions = torch.argmax(outputs, dim=1)

        true_labels.extend(y_batch.cpu().numpy())
        pred_labels.extend(predictions.cpu().numpy())

# Print each category's precision, recall, f1-score
print(classification_report(true_labels, pred_labels, target_names=label_names))

              precision    recall  f1-score   support

     sadness       0.92      0.94      0.93       550
         joy       0.91      0.93      0.92       704
        love       0.84      0.81      0.83       178
       anger       0.91      0.88      0.90       275
        fear       0.85      0.83      0.84       212
    surprise       0.82      0.79      0.81        81

    accuracy                           0.90      2000
   macro avg       0.88      0.86      0.87      2000
weighted avg       0.90      0.90      0.90      2000



In [34]:
import torch
print("CUDA is supported:", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)
print("Current device:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "无")

CUDA is supported: True
CUDA version: 11.8
Current device: NVIDIA GeForce RTX 4060 Laptop GPU
