In [19]:
import os
import re
import time
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, Dataset
from torchmetrics.classification import (
    MulticlassAccuracy,
    MulticlassPrecision,
    MulticlassF1Score,
    MulticlassRecall,
)

In [20]:
# Hyperparameters
MAX_SEQ_LENGTH = 25
EPOCHS = 100
EMBED_DIM = 100
BATCH_SIZE = 128
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# device = "cpu"

In [21]:

# Tokenize and pad/truncate
def tokenize(text, max_length):
    tokens = re.findall(r"\w+", text.lower())
    if len(tokens) > max_length:
        return tokens[:max_length]
    return tokens + ["<PAD>"] * (max_length - len(tokens))


def load_data(file_path, max_length):
    texts, labels = [], []
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            text, label = line.strip().split(";")
            texts.append(tokenize(text, max_length))
            labels.append(label)
    
    return texts, labels

In [22]:
class EmotionDataset(Dataset):
    def __init__(self, texts, labels, word_to_idx, device=None):
        """
        Args:
            texts (list of list of str): List of tokenized sentences.
            labels (list of int): List of corresponding labels.
            word_to_idx (dict): Dictionary mapping words to indices.
            device (torch.device, optional): Device to store the tensors on (e.g., 'cuda' or 'cpu').
        """
        self.device = device if device is not None else torch.device("cpu")

        # Preprocess texts and labels into tensors directly
        self.texts = torch.tensor(
            [
                [word_to_idx.get(word, word_to_idx["<UNK>"]) for word in text]
                for text in texts
            ],
            dtype=torch.long,
            device=self.device,
        )
        self.labels = torch.tensor(labels, dtype=torch.long, device=self.device)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        # Return preprocessed tensors
        return self.texts[idx], self.labels[idx]

In [23]:
class EmotionClassifier(nn.Module):
    def __init__(self, vocab_size, EMBED_DIM, num_classes):
        super(EmotionClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, EMBED_DIM)
        self.fc = nn.Linear(EMBED_DIM, num_classes)

    def forward(self, x):
        x = self.embedding(x)
        x = torch.mean(x, dim=1)
        return self.fc(x)

In [24]:
def calc_metrics(predictions, labels, num_classes: int):
    accuracy_metric = MulticlassAccuracy(num_classes=num_classes, average="micro")
    precision_metric = MulticlassPrecision(num_classes=num_classes, average="macro")
    recall_metric = MulticlassRecall(num_classes=num_classes, average="macro")
    f1_metric= MulticlassF1Score(num_classes=num_classes, average="macro")

    predictions = torch.tensor(predictions)
    labels = torch.tensor(labels)

    accuracy = accuracy_metric(predictions, labels)
    precision = precision_metric(predictions, labels)
    recall = recall_metric(predictions, labels)
    f1 = f1_metric(predictions, labels)

    return accuracy, precision, recall, f1

In [25]:
# Evaluate the model
def evaluate_model(model: nn.Module, data_loader: DataLoader, num_classes: int):
    model.eval()

    predicted_acum = []
    labels_acum = []
    with torch.no_grad():
        for texts, labels in data_loader:
            outputs = model(texts)
            _, predicted = torch.max(outputs.data, 1)
            predicted_acum.extend(predicted.tolist())
            labels_acum.extend(labels.tolist())
    
    accuracy, precision, recall, f1 = calc_metrics(predicted_acum, labels_acum, num_classes)
            
    print(f"Accuracy: {accuracy*100:.2f}%")
    print(f"Precision: {precision*100:.2f}%")
    print(f"Recall: {recall*100:.2f}%")
    print(f"F1 score: {f1*100:.2f}%")

In [26]:
# Load and process data
train_texts, train_labels = load_data(
    os.path.join(os.getcwd(), "data/train.txt"), MAX_SEQ_LENGTH
)
test_texts, test_labels = load_data(
    os.path.join(os.getcwd(), "data/test.txt"), MAX_SEQ_LENGTH
)

# Encode labels
label_encoder = LabelEncoder()
train_labels = label_encoder.fit_transform(train_labels)
test_labels = label_encoder.transform(test_labels)

# Build vocabulary
all_words = [word for text in train_texts for word in text]
word_counts = Counter(all_words)
non_unique_words = [word for word, count in word_counts.items() if count > 1]
vocab = ["<PAD>", "<UNK>"] + non_unique_words

word_to_idx = {word: idx for idx, word in enumerate(vocab)}

# Model parameters
vocab_size = len(vocab)

num_classes = len(label_encoder.classes_)

In [27]:
def train():
    # Model, Loss, and Optimizer
    model = EmotionClassifier(vocab_size, EMBED_DIM, num_classes)
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Data loaders
    train_dataset = EmotionDataset(train_texts, train_labels, word_to_idx, device)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

    test_dataset = EmotionDataset(test_texts, test_labels, word_to_idx, device)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

    # Training loop
    for epoch in range(EPOCHS):
        start = time.time()
        for texts, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        end = time.time()
        epoch_time = end - start
        # print(f"Epoch {epoch+1}, Loss: {loss.item()}, Time: {epoch_time:.2f}s")
        
    return model
    # Save the model to disk
    # torch.save(model.state_dict(), "emotion_classifier_model.pth")

    # evaluate_model(model, test_loader, num_classes)

In [28]:
times = []

print("Training model 5 times...")
for i in range(5):
    start = time.time()
    model = train()
    end = time.time()
    times.append(end - start)

torch.save(model.state_dict(), "emotion_classifier_model.pth")

print("Model trained 5 times")
print(times)
print(f"Average training time: {sum(times)/len(times):.2f}s")

Training model 5 times...
Model trained 5 times
[14.280154705047607, 14.150193691253662, 14.089772939682007, 14.065330743789673, 13.933905124664307]
Average training time: 14.10s


In [33]:
def validate():
    loaded_model = EmotionClassifier(vocab_size, EMBED_DIM, num_classes)
    loaded_model.load_state_dict(torch.load("emotion_classifier_model.pth"))
    loaded_model.to(device)
    # Load and process validation data
    val_texts, val_labels = load_data(
        os.path.join(os.getcwd(), "data/val.txt"), MAX_SEQ_LENGTH
    )

    val_labels = label_encoder.transform(val_labels)
    val_dataset = EmotionDataset(val_texts, val_labels, word_to_idx, device)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    print("Validation results:")
    evaluate_model(loaded_model, val_loader, num_classes)


In [34]:
validate()

Validation results:
Accuracy: 84.90%
Precision: 81.99%
Recall: 80.85%
F1 score: 81.38%


In [35]:
def predict():
    loaded_model = EmotionClassifier(vocab_size, EMBED_DIM, num_classes)
    loaded_model.load_state_dict(torch.load("emotion_classifier_model.pth"))
    loaded_model.to(device)
    # Load and process validation data
    val_texts, val_labels = load_data(
        os.path.join(os.getcwd(), "data/val.txt"), MAX_SEQ_LENGTH
    )

    val_labels = label_encoder.transform(val_labels)
    val_dataset = EmotionDataset(val_texts, val_labels, word_to_idx, device)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    start = time.time()
    with torch.no_grad():
        for texts, labels in val_loader:
            outputs = loaded_model(texts)
            _, predicted_labels = torch.max(outputs.data, 1)
    
    end = time.time()

    return end-start


In [36]:
prediction_times = []
print("Inferring 100 times, 2000 inferences each...")
for i in range(100):
    prediction_times.append(predict())

print(f"Average inference time: {sum(prediction_times)/len(prediction_times):.5f}s")

Inferring 100 times, 2000 inferences each...
Average inference time: 0.00936s
