In [1]:
import torch
import pandas as pd
import torch.optim as optim
from typing import Tuple, List, Dict, Any
from torch.utils.data import DataLoader, Dataset

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


### LOAD DATA

In [None]:
train_texts_list = torch.load('train_embeddings_bert.pt')
test_texts_list = torch.load('test_embeddings_bert.pt')
val_texts_list = torch.load('val_embeddings_bert.pt')

In [23]:
train_texts_list[0].shape

torch.Size([1, 51, 768])

In [21]:
len(train_texts_list[0][0])

51

In [3]:
train_texts = []

for i in range(len(train_texts_list)):
    tensor = train_texts_list[i]
    train_texts.append(tensor.squeeze())

test_texts = []

for i in range(len(test_texts_list)):
    tensor = test_texts_list[i]
    test_texts.append(tensor.squeeze())

val_texts = []

for i in range(len(val_texts_list)):
    tensor = val_texts_list[i]
    val_texts.append(tensor.squeeze())

In [4]:
df_train_sentiment = pd.read_csv('sentiment_train.csv')
df_val_sentiment = pd.read_csv('sentiment_val.csv')
df_test_sentiment = pd.read_csv('sentiment_test.csv')

train_sentiment_word = df_train_sentiment["sentiment"]
val_sentiment_word = df_val_sentiment["sentiment"]
test_sentiment_word = df_test_sentiment["sentiment"]

train_sentiment = torch.tensor([0 if sentiment == "NEGATIVE" else 1 for sentiment in train_sentiment_word])
val_sentiment = torch.tensor([0 if sentiment == "NEGATIVE" else 1 for sentiment in val_sentiment_word])
test_sentiment = torch.tensor([0 if sentiment == "NEGATIVE" else 1 for sentiment in test_sentiment_word])

In [5]:
df_train_entities = pd.read_json('finer_train_data.json')
df_val_entities = pd.read_json('finer_validation_data.json')
df_test_entities = pd.read_json('finer_test_data.json')

df_train_entities.columns = ["texts", "entities"]
df_val_entities.columns = ["texts", "entities"]
df_test_entities.columns = ["texts", "entities"]

train_entities = df_train_entities["entities"]
val_entities = df_train_entities["entities"]
test_entities = df_train_entities["entities"]

In [24]:
train_entities[0]

{'entities': [[30, 35, 'PERSON'], [111, 116, 'PERSON'], [183, 189, 'LOC']]}

### DATASET

In [None]:
class SentimentDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts : List[torch.Tensor] = texts
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = torch.tensor(self.texts[idx], dtype=torch.float32)
        return text, self.labels[idx]

### COLLATE FUNCTION

In [7]:
def pad_collate(batch):
    texts, labels = zip(*batch)

    # Quitar la dimensión 1 si viene como [1, seq_len, 768]
    texts = [text.squeeze(0) if text.dim() == 3 and text.size(0) == 1 else text for text in texts]

    lengths = [text.size(0) for text in texts]
    max_len = max(lengths)
    
    padded_texts = [
        torch.cat([text, torch.zeros(max_len - text.size(0), text.size(1))], dim=0)
        for text in texts
    ]
    
    return torch.stack(padded_texts), torch.tensor(labels), torch.tensor(lengths)

In [8]:
BATCH_SIZE = 32

train_dataset = SentimentDataset(train_texts_list, train_sentiment)
val_dataset = SentimentDataset(val_texts_list, val_sentiment)
test_dataset = SentimentDataset(test_texts_list, test_sentiment)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad_collate)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=pad_collate)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=pad_collate)

  self.labels = torch.tensor(labels, dtype=torch.float32)


---------------------------------------------------------------

### MODELS

In [9]:
class SentimentAnalysisModel(torch.nn.Module):
    def __init__(self, embedding_dim, num_layers, hidden_dim, num_classes=2, dropout_rate=0.5):
        super(SentimentAnalysisModel, self).__init__()
        self.lstm = torch.nn.LSTM(embedding_dim, hidden_dim, batch_first=True, num_layers=num_layers, dropout=dropout_rate, bidirectional=True)
        self.dropout = torch.nn.Dropout(dropout_rate)
        self.fc = torch.nn.Linear(in_features=hidden_dim, out_features=num_classes)

    def forward(self, x):
        packed_output, (hidden, cell) = self.lstm(x)
        hidden = hidden[-1]  
        hidden = self.dropout(hidden)
        output = self.fc(hidden).squeeze()
        return output

In [10]:
class BidirectionalSentimentAnalysisModel(torch.nn.Module):
    def __init__(self, embedding_dim, num_layers, hidden_dim, num_classes=2, dropout_rate=0.5):
        super(BidirectionalSentimentAnalysisModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.bidirectional = True

        self.lstm = torch.nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            dropout=dropout_rate if num_layers > 1 else 0.0,
            bidirectional=self.bidirectional,
            batch_first=True
        )
        self.dropout = torch.nn.Dropout(dropout_rate)
        self.fc = torch.nn.Linear(in_features=hidden_dim * 2, out_features=num_classes) 

    def forward(self, x):
        lstm_out, (hidden, cell) = self.lstm(x)

        forward_hidden = hidden[-2]
        backward_hidden = hidden[-1]
        final_hidden = torch.cat((forward_hidden, backward_hidden), dim=1)

        final_hidden = self.dropout(final_hidden)
        output = self.fc(final_hidden).squeeze(1)
        return output


In [11]:
class UnidirectionalSentimentAnalysisModel(torch.nn.Module):
    def __init__(self, embedding_dim, num_layers, hidden_dim, num_classes=2, dropout_rate=0.5):
        super(UnidirectionalSentimentAnalysisModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.lstm = torch.nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            dropout=dropout_rate if num_layers > 1 else 0.0,
            bidirectional=False,
            batch_first=True
        )
        self.dropout = torch.nn.Dropout(dropout_rate)
        self.fc = torch.nn.Linear(in_features=hidden_dim, out_features=num_classes)

    def forward(self, x):
        lstm_out, (hidden, cell) = self.lstm(x)
        final_hidden = hidden[-1] 

        final_hidden = self.dropout(final_hidden)
        output = self.fc(final_hidden).squeeze(1)
        return output


In [12]:
class TCNBlock(torch.nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, dilation):
        super().__init__()
        self.conv = torch.nn.Conv1d(
            in_channels, 
            out_channels, 
            kernel_size=kernel_size, 
            padding=(kernel_size - 1) * dilation, 
            dilation=dilation
        )
        self.relu = torch.nn.ReLU()
        self.bn = torch.nn.BatchNorm1d(out_channels)
        self.dropout = torch.nn.Dropout(0.2)

    def forward(self, x):
        out = self.conv(x)
        out = out[:, :, :-self.conv.padding[0]]
        out = self.relu(out)
        out = self.bn(out)
        return self.dropout(out)

class SentimentTCN(torch.nn.Module):
    def __init__(self, embedding_dim=768, num_classes=2):
        super().__init__()
        self.tcn1 = TCNBlock(embedding_dim, 128, kernel_size=3, dilation=1)
        self.tcn2 = TCNBlock(128, 128, kernel_size=3, dilation=2)
        self.tcn3 = TCNBlock(128, 128, kernel_size=3, dilation=4)
        self.pool = torch.nn.AdaptiveMaxPool1d(1)
        self.fc = torch.nn.Linear(128, num_classes)

    def forward(self, x):
        x = x.transpose(1, 2)  # (batch, embed_dim, seq_len)
        x = self.tcn1(x)
        x = self.tcn2(x)
        x = self.tcn3(x)
        x = self.pool(x).squeeze(2)
        return self.fc(x)


In [13]:
class SentimentDAN(torch.nn.Module):
    def __init__(self, embedding_dim=768, hidden_dim=256, num_classes=2):
        super().__init__()
        self.fc1 = torch.nn.Linear(embedding_dim, hidden_dim)
        self.relu = torch.nn.ReLU()
        self.dropout = torch.nn.Dropout(0.3)
        self.fc2 = torch.nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = x.mean(dim=1) 
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        return self.fc2(x)

------------------------------------------------------------------

### TRAIN MODEL

In [14]:
EMBEDDING_DIM = 768
NUM_LAYERS = 2
DROPOUT_RATE = 0.5
HIDDEN_DIM = 64
EPOCHS = 100
PRINT_EVERY = 10
PATIENCE = 5
LEARNING_RATE = 2e-4  
LEARNING_RATE = 2e-4         
WEIGHT_DECAY = 1e-5 
NUM_CLASSES = 2    
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# SentimentAnalysisModel
# BidirectionalSentimentAnalysisModel BUENO
# UnidirectionalSentimentAnalysisModel MALO

# model = BidirectionalSentimentAnalysisModel(
#     embedding_dim=EMBEDDING_DIM, 
#     num_layers=NUM_LAYERS,
#     dropout_rate=DROPOUT_RATE,
#     hidden_dim=HIDDEN_DIM, 
#     num_classes=NUM_CLASSES)

# model = SentimentTCN(
#     embedding_dim=EMBEDDING_DIM, 
#     num_classes=NUM_CLASSES)

model = SentimentDAN(
    embedding_dim=EMBEDDING_DIM, 
    hidden_dim=HIDDEN_DIM, 
    num_classes=NUM_CLASSES)


criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

In [15]:
def calculate_accuracy(model: torch.nn.Module, dataloader: DataLoader, threshold: float = 0.5, device: str = 'cpu') -> float:
    """
    Calculate the accuracy of a PyTorch model given a DataLoader.

    The function moves the model to the specified device, sets it to evaluation mode, and computes
    the accuracy by comparing the model's predictions against the true labels. The predictions are
    determined based on a specified threshold.

    Args:
        model (torch.nn.Module): The PyTorch model to evaluate.
        dataloader (DataLoader): The DataLoader containing the dataset to evaluate against.
        threshold (float, optional): Probability threshold to predict a sample as positive. Defaults to 0.5.
        device (str, optional): Device to which the model and data are moved ('cpu' or 'cuda'). Defaults to 'cpu'.

    Returns:
        float: The accuracy of the model on the given dataset.
    """
    # TODO: Calculate accuracy of a model given a dataloader
    model.eval()
    model.to(device)
    correct = 0
    total = 0

    with torch.no_grad():
        for features, labels, text_len in dataloader:
            features, labels = features.to(device), labels.to(device)
            outputs = model(features)
            predicted = (torch.sigmoid(outputs)[:,1] >= threshold).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total
    return accuracy

In [16]:
def train_torch_model(model: torch.nn.Module, train_dataloader: DataLoader,
                val_dataloader: DataLoader, criterion: torch.nn.Module,
                optimizer: optim.Optimizer, epochs: int,
                print_every: int, patience: int,
                device: str = 'cpu') -> Tuple[Dict[int, float],Dict[int, float]]:
    """
    Train and validate the logistic regression model.

    Args:
        model (torch.nn.Module): An instance of the model to be trained.
        train_dataloader (DataLoader): DataLoader for the training dataset.
        val_dataloader (DataLoader): DataLoader for the validation dataset.
        learning_rate (float): The learning rate for the optimizer.
        criterion (nn.Module): Loss function to use for training.
        optimizer (optim.Optimizer): Optimizer to use for training.
        epochs (int): The number of epochs to train the model.
        print_every (int): Frequency of epochs to print training and validation loss.
        patience (int): The number of epochs to wait for improvement on the validation loss before stopping training early.
        device (str): device where to train the model.

    Returns:
        Tuple[Dict[int, float],Dict[int, float]]: Dictionary of accuracies at each `print_every` interval for the training and validation datasets.
    """
    # TODO: Initialize dictionaries to store training and validation accuracies
    train_accuracies: Dict[int, float] = {}
    val_accuracies: Dict[int, float] = {}

    # TODO: Initialize variables for Early Stopping
    best_loss: float = float('inf')
    epochs_no_improve: int = 0

    # TODO: Move the model to the specified device (CPU or GPU)
    model.to(device)


    # TODO: Implement the training loop over the specified number of epochs
    for epoch in range(epochs):
        # TODO: Set the model to training mode
        model.train(True)
        total_loss: int = 0

        # TODO: Implement the loop for training over each batch in the training dataloader
        for features, labels, text_len in train_dataloader:
            # TODO: Move features and labels to the specified device
            features = features.to(device).float()
            labels = labels.to(device).long()

            # TODO: Clear the gradients
            optimizer.zero_grad()

            # TODO: Forward pass (compute the model output)
            outputs = model(features)

            # TODO: Compute the loss
            loss = criterion(outputs, labels)

            # TODO: Backward pass (compute the gradients)
            loss.backward()

            # TODO: Update model parameters
            optimizer.step()

            # TODO: Accumulate the loss
            total_loss += loss.item()


        # TODO: Implement the evaluation phase
        val_loss: int = 0
        with torch.no_grad():
            # TODO: Loop over the validation dataloader
            for features, labels, text_len in val_dataloader:
                # TODO: Move features and labels to the specified device
                features = features.to(device).float()
                labels = labels.to(device).long()

                # TODO: Forward pass (compute the model output)
                outputs = model(features)

                # TODO: Compute the loss
                loss = criterion(outputs, labels)

                # TODO: Accumulate validation loss
                val_loss += loss.item()


        # TODO: Print training and validation results every 'print_every' epochs
        if epoch % print_every == 0 or epoch == epochs - 1:
            # TODO: Calculate training and validation accuracy
            train_accuracy = calculate_accuracy(model, train_dataloader, device=device)
            val_accuracy = calculate_accuracy(model, val_dataloader, device=device)

            # TODO: Store accuracies
            train_accuracies[epoch] = train_accuracy
            val_accuracies[epoch] = val_accuracy

            # TODO: Calculate and print average losses and accuracies
            train_loss = total_loss / len(train_dataloader)
            val_loss = val_loss / len(val_dataloader)

            print(f"Epoch {epoch}/{epochs} - Train Loss: {train_loss:.4f} - Val Loss: {val_loss:.4f} - Train Accuracy: {train_accuracy:.4f} - Val Accuracy: {val_accuracy:.4f}")

        # TODO: Implement Early Stopping
        if val_loss < best_loss:
            best_loss = val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            
        if epochs_no_improve >= patience:
            print(f"Early stopping: {epoch}")
            break

    return train_accuracies, val_accuracies

In [17]:
rnn_train_accuracies, rnn_val_accuracies = train_torch_model(model, train_loader,
                                                             val_loader, criterion,
                                                             optimizer, epochs=EPOCHS, print_every=PRINT_EVERY,
                                                             patience=PATIENCE, device=DEVICE)

  text = torch.tensor(self.texts[idx], dtype=torch.long)


Epoch 0/100 - Train Loss: 0.6814 - Val Loss: 0.6570 - Train Accuracy: 0.5699 - Val Accuracy: 0.6393
Early stopping: 5


In [18]:
accuracy_train = calculate_accuracy(model, train_loader, device=DEVICE)
accuracy_val = calculate_accuracy(model, val_loader, device=DEVICE)
accuracy_test = calculate_accuracy(model, test_loader, device=DEVICE)

print(f"SA Model - Training Accuracy: {accuracy_train}")
print(f"SA Model - Validation Accuracy: {accuracy_val}")
print(f"SA Model - Test Accuracy: {accuracy_test}")

  text = torch.tensor(self.texts[idx], dtype=torch.long)


SA Model - Training Accuracy: 0.7676272225628449
SA Model - Validation Accuracy: 0.8034825870646766
SA Model - Test Accuracy: 0.773953488372093
