# Train Description

## Purpose
This notebook implements a deep neural network model to identify negation in text. It involves loading preprocessed data, defining a custom dataset, building the neural network model, and training it using PyTorch.

## Notebook Content

### 1. Dependency Installation
Installs and imports necessary libraries for the project, including:
- `torch` for building and training the neural network.
- `numpy` for numerical operations.
- `json` for loading and saving data.

### 2. Data Loading
Loads preprocessed data from JSON files. The data includes word embeddings, casing information, part-of-speech (POS) tags, and labels.

### 3. Dataset Definition
Defines a custom dataset class using PyTorch's `Dataset` and `DataLoader` to handle the training and validation data.

### 4. Model Definition
Defines the neural network model architecture for negation detection. The model combines word embeddings, POS tags, casing information, and character embeddings.

### 5. Loss Function and Weights
Prepares the loss function (`CrossEntropyLoss`) with custom weights to handle class imbalances. The weights are computed based on the frequency of each class in the training data.

### 6. Model Initialization and Training
Initializes the model, loss function, and optimizer. The model is then trained on the training data with early stopping based on validation loss.

### 7. Saving the Model
Saves the best model based on validation performance during training.

In [None]:
# Install dependencies

import torch 
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import json 
from torch.utils.data import DataLoader, random_split

In [None]:
def load_json(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)
    return data

def save_json(data, file_path):
    with open(file_path, 'w') as f:
        json.dump(data, f)


In [None]:
# Format used: [[[sent1 w1 feature],[sent1 w2 feature],[sent1 w3 feature]],[[sent2 w1 feature],[sent2 w2 feature],[sent2 w3 feature]]...]
# Each feature is depending the case embedding of len == 100 or a one hot vector. 
# Load data pre-processed.

train_word_embedding_reduced = load_json('train_word_embeddings_reduced.json')
train_casing_onehot =  load_json('train_casing_onehot.json')
train_pos_onehot = load_json('train_pos_onehot.json')
y_train = load_json('y_train_numerical.json')
w_fast_vectors_reduced = load_json('w_fast_vectors_reduced.json')

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split

class CustomDataset(Dataset):
    def __init__(self, w_fast, w_vector, pos, casing, labels):
        self.w_fast = torch.tensor(w_fast, dtype=torch.float32)
        self.w_vector = torch.tensor(w_vector, dtype=torch.float32)
        self.pos = torch.tensor(pos, dtype=torch.float32)
        self.casing = torch.tensor(casing, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32)

    def __len__(self):
        return len(self.w_fast)

    def __getitem__(self, idx):
        return (self.w_fast[idx], self.w_vector[idx], self.pos[idx], self.casing[idx]), self.labels[idx]


# Split the dataset into training and validation sets
dataset = CustomDataset(w_fast_vectors_reduced, train_word_embedding_reduced, train_pos_onehot, train_casing_onehot, y_train)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])


In [None]:
# Create DataLoaders for training and validation sets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# My gpu crashed everytime I try to use it. 
device = 'cpu'

In [None]:
dict_index_tag={"O": 0, "NEG": 1, "NSCO": 2, "UNC": 3, "USCO": 4, "PAD":5}
dict_index_tag_inverted={0:"O", 1:"NEG", 2:"NSCO", 3:"UNC", 4:"USCO", 5:"PAD"}

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np

class NegationModel(nn.Module):
    def __init__(self, word_embedding_dim, pos_embedding_dim, char_embedding_dim, casing_dim, lstm_dim, num_filters, kernel_size, conv_dropout_rate=0.5, lstm_dropout_rate=0.5, lstm_recurrent_dropout_rate=0.25):
        super(NegationModel, self).__init__()
        self.word_embedding_dim = word_embedding_dim
        self.pos_embedding_dim = pos_embedding_dim
        self.char_embedding_dim = char_embedding_dim
        self.casing_dim = casing_dim
        
        self.conv1d = nn.Conv1d(in_channels=char_embedding_dim, out_channels=num_filters, kernel_size=kernel_size)
        self.conv_dropout = nn.Dropout(conv_dropout_rate)
        self.max_pool = nn.MaxPool1d(kernel_size=3)
        
        self.bi_lstm = nn.LSTM(425,
                               hidden_size=lstm_dim, batch_first=True, bidirectional=True, num_layers=3,
                               dropout=lstm_recurrent_dropout_rate)
        
        self.lstm_dropout = nn.Dropout(lstm_dropout_rate)
        self.fc = nn.Linear(2 * lstm_dim, 6)

        self.fc_pos = nn.Linear(pos_embedding_dim, pos_embedding_dim)
        self.fc_casing = nn.Linear(casing_dim, casing_dim)

    def forward(self, x):
        char_embeddings, word_embeddings, pos_embeddings, casing_info = x
        
        char_embeddings = char_embeddings.permute(0, 2, 1)
        char_features = self.conv1d(char_embeddings)
        char_features = self.conv_dropout(char_features)  # Dropout after Conv1D
        char_features = self.max_pool(char_features)
        char_features = char_features.view(char_features.size(0), -1, char_features.size(2))
        
        pos_embeddings = self.fc_pos(pos_embeddings)
        casing_info = self.fc_casing(casing_info)

        combined_features = torch.cat((word_embeddings, pos_embeddings, char_features, casing_info), dim=2)
        
        lstm_out, _ = self.bi_lstm(combined_features)
        lstm_out = self.lstm_dropout(lstm_out)  # Dropout after LSTM output
        
        output = self.fc(lstm_out)
        
        return output

def train(model, train_loader, val_loader, criterion, optimizer, num_epochs, patience=1):
    model.train()
    best_loss = float('inf')
    epochs_no_improve = 0
    early_stop = False

    for epoch in range(num_epochs):
        if early_stop:
            print("Early stopping")
            break

        model.train()
        running_loss = 0.0
        correct_predictions = {i: 0 for i in range(6)}
        total_predictions = {i: 0 for i in range(6)}

        for idx, (inputs, labels) in enumerate(train_loader):
            inputs = [inp.to(device) for inp in inputs]
            labels = labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            # Reshape outputs and labels for CrossEntropyLoss
            outputs = outputs.view(-1, outputs.size(-1))  # (batch_size * sequence_length, num_classes)
            labels = labels.view(-1, labels.size(-1))  # (batch_size * sequence_length, num_classes)

            # Convert one-hot labels to class indices
            labels = torch.argmax(labels, dim=1)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            for label in range(6):
                correct_predictions[label] += ((predicted == labels) & (labels == label)).sum().item()
                total_predictions[label] += (labels == label).sum().item()

            if idx % 25 == 0:
                print(f"Batch = {idx} Running loss= {running_loss}")

        # Calculate accuracy for each class
        accuracies = {label: (correct_predictions[label] / total_predictions[label] if total_predictions[label] > 0 else 0.0) for label in range(6)}
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")
        for label, accuracy in accuracies.items():
            print(f"Accuracy for class {label}: {accuracy * 100:.2f}%")

        # Validate the model
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for val_inputs, val_labels in val_loader:
                val_inputs = [inp.to(device) for inp in val_inputs]
                val_labels = val_labels.to(device)

                val_outputs = model(val_inputs)
                val_outputs = val_outputs.view(-1, val_outputs.size(-1))
                val_labels = val_labels.view(-1, val_labels.size(-1))
                val_labels = torch.argmax(val_labels, dim=1)

                val_loss += criterion(val_outputs, val_labels).item()

        val_loss /= len(val_loader)
        print(f"Validation Loss: {val_loss}")

        # Early stopping
        if val_loss < best_loss:
            best_loss = val_loss
            epochs_no_improve = 0
            # Save the model
            save_path = 'top_model.pth'
            torch.save(model.state_dict(), save_path)
            print(f"Model saved to {save_path}")
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping after {patience} epochs with no improvement")
                early_stop = True
""" 
Labels shape: torch.Size([377, 6])
Character embedding_dim: torch.Size([377, 100])
Word embedding_dim: torch.Size([377, 100])
OnehotPOS_dim: torch.Size([377, 17])
OnehotCASING_dim: torch.Size([377, 8])

"""



In [None]:
dict_index_tag={"O": 0, "NEG": 1, "NSCO": 2, "UNC": 3, "USCO": 4, "PAD":5}
dict_index_tag_inverted={0:"O", 1:"NEG", 2:"NSCO", 3:"UNC", 4:"USCO", 5:"PAD"}

# Preparación de los pesos
frecuencias = np.zeros(len(dict_index_tag))
for sentence in y_train:
    for word in sentence:
        frecuencias += np.array(word)


pesos = 1.0 / np.sqrt(frecuencias + 1e-8)
pesos[dict_index_tag["PAD"]] = 0.0  # El peso de "PAD" debe ser 0


pesos = pesos / pesos.sum()


pesos_tensor = torch.tensor(pesos, dtype=torch.float32).to(device)
print(pesos_tensor)


for label, weight in zip(dict_index_tag_inverted.values(), pesos):
    print(f"Peso para la etiqueta {label}: {weight:.4f}")


criterion = torch.nn.CrossEntropyLoss(weight=pesos_tensor)

In [None]:
# Inicializar el modelo y transferirlo al dispositivo
model = NegationModel(word_embedding_dim=100, pos_embedding_dim=17, char_embedding_dim=100, casing_dim=8, lstm_dim=150, num_filters=377, kernel_size=3)
model.to(device)

# Inicializar el criterio de pérdida y transferir los pesos al dispositivo
criterion = nn.CrossEntropyLoss(weight=pesos_tensor.to(device))

# Inicializar el optimizador
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

# Entrenar el modelo
train(model, train_loader,val_loader, criterion, optimizer, num_epochs=5)


In [None]:
inputs, labels = train_dataset[0]
shapes = [input.shape for input in inputs]

print(f"Labels shape: {labels.shape}")
print(f"Character embedding_dim: {shapes[0]}")
print(f"Word embedding_dim: {shapes[1]}")
print(f"OnehotPOS_dim: {shapes[2]}")
print(f"OnehotCASING_dim: {shapes[3]}")