In [31]:
import os
import time
from collections import Counter

import pandas as pd
import torch
import torch.nn as nn
from sklearn.preprocessing import LabelEncoder
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
from torchmetrics.classification import (
    MulticlassAccuracy,
    MulticlassPrecision,
    MulticlassF1Score,
    MulticlassRecall,
)

In [None]:
# Parameters
BATCH_SIZE = 128
EMBEDDING_DIM = 100
HIDDEN_DIM = 128
EPOCHS = 20

device = "cuda" if torch.cuda.is_available() else "cpu"
dataset = "dair_ai_emotion"

model_name = f"LSTM_{HIDDEN_DIM}-hidden_{EMBEDDING_DIM}-embed_{BATCH_SIZE}-batch_{EPOCHS}-epochs_{dataset}-dataset_{device}"

In [33]:
def load_data(file_path):
    df = pd.read_csv(file_path, delimiter=";", header=None, names=["text", "emotion"])
    return df["text"].values, df["emotion"].values


# Tokenize and encode labels
def tokenize(text, word_to_idx):
    return [word_to_idx.get(word, word_to_idx["<UNK>"]) for word in text.split()]


def encode_labels(labels):
    label_encoder = LabelEncoder()
    return label_encoder.fit_transform(labels), label_encoder

In [34]:
train_texts, train_labels = load_data(f"data/{dataset}/train.txt")
test_texts, test_labels = load_data(f"data/{dataset}/test.txt")

# Build vocabulary
word_counter = Counter()
for text in train_texts:
    word_counter.update(text.split())
vocab = ["<PAD>", "<UNK>"] + [word for word, freq in word_counter.items() if freq > 1]
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
vocab_size = len(vocab)


train_labels, label_encoder = encode_labels(train_labels)
test_labels, _ = encode_labels(test_labels)

In [35]:
# Dataset
class EmotionDataset(Dataset):
    def __init__(self, texts, labels, word_to_idx, device=None):
        """
        Args:
            texts (list of list of str): List of tokenized sentences.
            labels (list of int): List of corresponding labels.
            word_to_idx (dict): Dictionary mapping words to indices.
            device (torch.device, optional): Device to store the tensors on (e.g., 'cuda' or 'cpu').
        """
        self.device = device if device is not None else torch.device("cpu")

        # Preprocess texts and labels into tensors directly
        # token_texts = []
        # for text in texts:
        #     tokens = tokenize(text, word_to_idx)
        #     token_texts.append(torch.tensor(tokens, dtype=torch.float32, device=self.device))
        # self.texts = token_texts

        self.texts = [
            torch.tensor(tokenize(text, word_to_idx), device=device) for text in texts
        ]
        self.labels = torch.tensor(labels, dtype=torch.long, device=self.device)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        # Return preprocessed tensors
        return self.texts[idx], self.labels[idx]

In [36]:
# LSTM Model
class LSTMEmotionClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
        super(LSTMEmotionClassifier, self).__init__()
        self.embedding = nn.Embedding(
            vocab_size, embed_dim, padding_idx=word_to_idx["<PAD>"]
        )
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        x = lstm_out[:, -1, :]
        return self.fc(x)

In [37]:
# Padding function
def collate_fn(batch):
    texts, labels = zip(*batch)
    texts = pad_sequence(texts, batch_first=True, padding_value=word_to_idx["<PAD>"])
    return texts.to(device), torch.tensor(labels).to(device)

In [38]:
def calc_metrics(predictions, labels, num_classes: int):
    accuracy_metric = MulticlassAccuracy(num_classes=num_classes, average="micro")
    precision_metric = MulticlassPrecision(num_classes=num_classes, average="macro")
    recall_metric = MulticlassRecall(num_classes=num_classes, average="macro")
    f1_metric= MulticlassF1Score(num_classes=num_classes, average="macro")

    predictions = torch.tensor(predictions)
    labels = torch.tensor(labels)

    accuracy = accuracy_metric(predictions, labels)
    precision = precision_metric(predictions, labels)
    recall = recall_metric(predictions, labels)
    f1 = f1_metric(predictions, labels)

    return accuracy, precision, recall, f1

In [39]:
# Evaluate the model
def evaluate_model(model: nn.Module, data_loader: DataLoader, num_classes: int):
    model.eval()

    predicted_acum = []
    labels_acum = []
    with torch.no_grad():
        for texts, labels in data_loader:
            outputs = model(texts)
            _, predicted = torch.max(outputs.data, 1)
            predicted_acum.extend(predicted.tolist())
            labels_acum.extend(labels.tolist())

    return calc_metrics(predicted_acum, labels_acum, num_classes)

In [40]:


# Data Loaders
train_dataset = EmotionDataset(train_texts, train_labels, word_to_idx, device)
train_loader = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn
)

test_dataset = EmotionDataset(test_texts, test_labels, word_to_idx, device)
test_loader = DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn
)

In [41]:
def train():
    # Model initialization
    model = LSTMEmotionClassifier(
        vocab_size, EMBEDDING_DIM, HIDDEN_DIM, len(label_encoder.classes_)
    )
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    for epoch in range(EPOCHS):
        for texts, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        end = time.time()
    
    return model

In [42]:
times = []
for i in range(1):
    start = time.time()
    model = train()
    end = time.time()
    times.append(end-start)

torch.save(model.state_dict(), model_name + ".pth")
train_time = sum(times)/len(times)

In [43]:
def validate():
    # Validate the loaded model
    loaded_model = LSTMEmotionClassifier(
        vocab_size, EMBEDDING_DIM, HIDDEN_DIM, len(label_encoder.classes_)
    )
    loaded_model.load_state_dict(torch.load(model_name + ".pth"))
    loaded_model.to(device)
    # Load and process validation data
    val_texts, val_labels = load_data(os.path.join(os.getcwd(), f"data/{dataset}/val.txt"))

    val_labels = label_encoder.transform(val_labels)
    val_dataset = EmotionDataset(val_texts, val_labels, word_to_idx, device)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

    # Evaluate the loaded model on validation data
    return evaluate_model(loaded_model, val_loader, len(label_encoder.classes_))

In [44]:
accuracy, precision, recall, f1 = validate()

  loaded_model.load_state_dict(torch.load(model_name + ".pth"))


In [47]:
def predict():
    loaded_model = LSTMEmotionClassifier(
        vocab_size, EMBEDDING_DIM, HIDDEN_DIM, len(label_encoder.classes_)
    )
    loaded_model.load_state_dict(torch.load(model_name + ".pth"))
    loaded_model.to(device)
    # Load and process validation data
    val_texts, val_labels = load_data(os.path.join(os.getcwd(), f"data/{dataset}/val.txt"))

    val_labels = label_encoder.transform(val_labels)
    val_dataset = EmotionDataset(val_texts, val_labels, word_to_idx, device)
    val_loader = DataLoader(
        val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn
    )

    start = time.time()
    with torch.no_grad():
        for texts, labels in val_loader:
            outputs = loaded_model(texts)
            _, predicted_labels = torch.max(outputs.data, 1)

    end = time.time()

    return end - start

In [48]:
inference_times = []
for i in range(100):
    inference_times.append(predict())

inference_time = sum(inference_times)/len(inference_times)

  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.load(model_name + ".pth"))
  loaded_model.load_state_dict(torch.loa

In [None]:
results = pd.DataFrame(
    {
        "model": [model_name],
        "dataset": [dataset],
        "accuracy": [accuracy.item()],
        "precision": [precision.item()],
        "recall": [recall.item()],
        "f1": [f1.item()],
        "train_time": [train_time],
        "inference_time": [inference_time],
    }
)

filename = "results.csv"

if os.path.exists(filename):
    results.to_csv(filename, mode="a", header=False, index=False)
else:
    results.to_csv(filename, index=False)