<a href="https://colab.research.google.com/github/Abeszz/SC4002-NLP-Assignment/blob/main/SC4002.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installation & Requirements :

In [None]:
pip install datasets

In [None]:
pip install nltk

In [None]:
pip install numpy

In [None]:
pip install npm

In [None]:
pip install torch

In [None]:
pip install gdown

In [None]:
glove_file_id = '17CUd7jxuh6ptIljKaz_9gJQ8-40JXJ-F'
glove_file = 'glove.6B.100d.txt'
!gdown {glove_file_id} -O {glove_file}

In [None]:
import os
import nltk
import sys

env_base_path = sys.prefix
nltk_path = os.path.join(env_base_path, 'nltk_data')
nltk.download('punkt', nltk_path)
nltk.download('punkt_tab', nltk_path)

In [None]:
from datasets import load_dataset
dataset = load_dataset('rotten_tomatoes')

In [None]:
train_dataset = dataset['train']
validation_dataset = dataset['validation']
test_dataset = dataset['test']

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
import numpy as np
from nltk.tokenize import word_tokenize
from collections import Counter
import csv

In [None]:
# Global variables
UNKNOWN_TOKEN = '<UNKNOWN>'

In [None]:
# Functions to build vocabulary and create embedding matrix
def build_vocabulary(dataset, oov_handling_method='unknown_token'):
    vocab_counter = Counter()
    for sample in dataset:
        tokens = word_tokenize(sample['text'].lower())
        vocab_counter.update(tokens)
    vocabulary = list(vocab_counter.keys())
    if oov_handling_method == 'unknown_token':
        if UNKNOWN_TOKEN not in vocabulary:
            vocabulary.append(UNKNOWN_TOKEN)
    return vocabulary

def create_embedding_matrix(embedding_dim, vocabulary, glove_embeddings, oov_handling_method='unknown_token'):
    vocab_size = len(vocabulary)
    embedding_matrix = np.zeros((vocab_size, embedding_dim))
    word_to_index = {word: idx for idx, word in enumerate(vocabulary)}

    # Initialize special embeddings
    if oov_handling_method == 'unknown_token':
        # Use a single <UNKNOWN> token for all OOV words
        unknown_vector = np.random.normal(scale=0.6, size=(embedding_dim,))
        unknown_index = word_to_index[UNKNOWN_TOKEN]
        embedding_matrix[unknown_index] = unknown_vector

    # Fill the embedding matrix
    for word, idx in word_to_index.items():
        if word in glove_embeddings:
            embedding_matrix[idx] = glove_embeddings[word]
        else:
            if oov_handling_method == 'unknown_token':
                embedding_matrix[idx] = embedding_matrix[unknown_index]
            elif oov_handling_method == 'random':
                embedding_matrix[idx] = np.random.normal(scale=0.6, size=(embedding_dim,))
            elif oov_handling_method == 'none':
                embedding_matrix[idx] = np.zeros(embedding_dim)

    return embedding_matrix



In [None]:
# Function to load GloVe embeddings
def load_glove_embeddings(glove_file_path):
    embeddings_index = {}
    with open(glove_file_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.strip().split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = vector
    return embeddings_index

In [None]:
# TextDataset class for loading data
class TextDataset(Dataset):
    def __init__(self, dataset, vocabulary, word_to_index, oov_handling_method='unknown_token'):
        self.dataset = dataset
        self.vocabulary = vocabulary
        self.word_to_index = word_to_index
        self.oov_handling_method = oov_handling_method

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        sentence = self.dataset[idx]['text']
        label = self.dataset[idx]['label']
        tokens = word_tokenize(sentence.lower())

        indices = []
        oov_flags = []
        for token in tokens:
            if token in self.word_to_index:
                indices.append(self.word_to_index[token])
                oov_flags.append(0)  # Not an OOV word
            else:
                if self.oov_handling_method == 'unknown_token':
                    indices.append(self.word_to_index[UNKNOWN_TOKEN])
                elif self.oov_handling_method == 'random':
                    indices.append(0)
                    oov_flags.append(1)
                elif self.oov_handling_method == 'none':
                    continue  # Skip OOV word
        if self.oov_handling_method == 'random':
            oov_flags = torch.tensor(oov_flags, dtype=torch.bool)
        else:
            oov_flags = None
        return torch.tensor(indices, dtype=torch.long), torch.tensor(label, dtype=torch.long), oov_flags

In [None]:
# Custom collate functions
def collate_fn(batch):
    inputs, labels, oov_flags = zip(*batch)

    inputs = [x for x in inputs]
    labels = torch.stack(labels)

    padded_inputs = pad_sequence(inputs, batch_first=True, padding_value=0)

    # Handle oov_flags
    if oov_flags[0] is not None:
        oov_flags = [x for x in oov_flags]
        padded_oov_flags = pad_sequence(oov_flags, batch_first=True, padding_value=0)
    else:
        padded_oov_flags = None

    return padded_inputs, labels, padded_oov_flags

def collate_fn_cnn(batch, max_length=100):
    inputs, labels, oov_flags = zip(*batch)
    labels = torch.stack(labels)

    processed_inputs = []
    processed_oov_flags = []
    for input_seq, oov_flag_seq in zip(inputs, oov_flags):
        seq_len = len(input_seq)
        if seq_len >= max_length:
            processed_inputs.append(input_seq[:max_length])
            if oov_flag_seq is not None:
                processed_oov_flags.append(oov_flag_seq[:max_length])
            else:
                processed_oov_flags.append(torch.zeros(max_length, dtype=torch.bool))
        else:
            pad_len = max_length - seq_len
            processed_inputs.append(torch.cat([input_seq, torch.zeros(pad_len, dtype=torch.long)]))
            if oov_flag_seq is not None:
                processed_oov_flags.append(torch.cat([oov_flag_seq, torch.zeros(pad_len, dtype=torch.bool)]))
            else:
                processed_oov_flags.append(torch.zeros(max_length, dtype=torch.bool))

    # Ensure inputs are LongTensor for embedding
    inputs = torch.stack(processed_inputs).long()
    if oov_flags[0] is not None:
        oov_flags = torch.stack(processed_oov_flags)
    else:
        oov_flags = None

    return inputs, labels, oov_flags


In [None]:
# SentimentRNN class
class SentimentRNN(nn.Module):
    def __init__(self, embedding_matrix, hidden_size, output_size,
                 rnn_type="RNN", num_layers=1, use_bidirectional=False,
                 use_dropout=False, use_batch_norm=False, use_layer_norm=False,
                 aggregation_method='last_hidden', freeze_embeddings=True):
        super(SentimentRNN, self).__init__()

        vocab_size, embedding_dim = embedding_matrix.shape

        # Embedding layer using pre-trained embeddings
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding.weight = nn.Parameter(torch.tensor(embedding_matrix, dtype=torch.float32))
        self.embedding.weight.requires_grad = not freeze_embeddings  # Control freezing

        # Choose RNN type dynamically
        if rnn_type == "LSTM":
            self.rnn = nn.LSTM(embedding_dim, hidden_size, num_layers=num_layers,
                               batch_first=True, bidirectional=use_bidirectional)
        elif rnn_type == "GRU":
            self.rnn = nn.GRU(embedding_dim, hidden_size, num_layers=num_layers,
                              batch_first=True, bidirectional=use_bidirectional)
        else:  # Default to Simple RNN
            self.rnn = nn.RNN(embedding_dim, hidden_size, num_layers=num_layers,
                              batch_first=True, bidirectional=use_bidirectional)

        # Store the aggregation method
        self.aggregation_method = aggregation_method

        # Determine the final hidden size after aggregation
        if aggregation_method == 'last_hidden':
            if use_bidirectional:
                final_hidden_size = hidden_size * 2
            else:
                final_hidden_size = hidden_size
        else:
            if use_bidirectional:
                final_hidden_size = hidden_size * 2
            else:
                final_hidden_size = hidden_size

        # Fully connected layer for classification
        self.fc = nn.Linear(final_hidden_size, output_size)

        # Optional Regularization Layers
        self.use_dropout = use_dropout
        self.use_batch_norm = use_batch_norm
        self.use_layer_norm = use_layer_norm

        if self.use_dropout:
            self.dropout = nn.Dropout(0.3)  # Dropout rate of 0.3

        if self.use_batch_norm:
            self.batch_norm = nn.BatchNorm1d(final_hidden_size)

        if self.use_layer_norm:
            self.layer_norm = nn.LayerNorm(final_hidden_size)

    def forward(self, x, oov_flags=None):
        embedded = self.embedding(x)
        batch_size, seq_length, embedding_dim = embedded.shape

        if oov_flags is not None:
            # Replace embeddings at OOV positions with random embeddings
            random_embeddings = torch.randn(batch_size, seq_length, embedding_dim, device=embedded.device) * 0.6
            oov_flags = oov_flags.unsqueeze(-1).expand_as(embedded)
            embedded = torch.where(oov_flags, random_embeddings, embedded)

        output, hidden = self.rnn(embedded)

        # For LSTM, hidden is a tuple of (h_n, c_n); use h_n
        if isinstance(hidden, tuple):
            hidden = hidden[0]  # h_n

        if self.aggregation_method == 'last_hidden':
            if self.rnn.bidirectional:
                final_output = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
            else:
                final_output = hidden[-1,:,:]
        elif self.aggregation_method == 'mean_pooling':
            final_output = output.mean(dim=1)
        elif self.aggregation_method == 'max_pooling':
            final_output, _ = torch.max(output, dim=1)
        else:
            raise ValueError(f"Unknown aggregation method: {self.aggregation_method}")

        # Apply optional regularization layers
        if self.use_batch_norm:
            final_output = self.batch_norm(final_output)

        if self.use_layer_norm:
            final_output = self.layer_norm(final_output)

        if self.use_dropout:
            final_output = self.dropout(final_output)

        return self.fc(final_output)

In [None]:
# SentimentCNN class
class SentimentCNN(nn.Module):
    def __init__(self, embedding_matrix, output_size, freeze_embeddings=True,
                 num_filters=100, filter_sizes=[3,4,5], dropout_rate=0.5):
        super(SentimentCNN, self).__init__()

        vocab_size, embedding_dim = embedding_matrix.shape

        # Embedding layer using pre-trained embeddings
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding.weight = nn.Parameter(torch.tensor(embedding_matrix, dtype=torch.float32))
        self.embedding.weight.requires_grad = not freeze_embeddings  # Control freezing

        # Convolutional layers with multiple filter sizes
        self.convs = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=num_filters, kernel_size=(fs, embedding_dim))
            for fs in filter_sizes
        ])

        # Fully connected layer
        self.fc = nn.Linear(len(filter_sizes) * num_filters, output_size)

        # Dropout layer
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, oov_flags=None):
        embedded = self.embedding(x)
        batch_size, seq_length, embedding_dim = embedded.shape

        if oov_flags is not None:
            # Replace embeddings at OOV positions with random embeddings
            random_embeddings = torch.randn(batch_size, seq_length, embedding_dim, device=embedded.device) * 0.6
            oov_flags = oov_flags.unsqueeze(-1).expand_as(embedded)
            embedded = torch.where(oov_flags, random_embeddings, embedded)

        embedded = embedded.unsqueeze(1)  # Add channel dimension

        # Apply convolution and ReLU activation
        conv_outs = [torch.relu(conv(embedded)).squeeze(3) for conv in self.convs]

        # Apply max pooling over the sequence length
        pooled_outs = [torch.max(conv_out, dim=2)[0] for conv_out in conv_outs]

        # Concatenate pooled outputs
        cat = torch.cat(pooled_outs, dim=1)

        # Apply dropout
        out = self.dropout(cat)

        # Fully connected layer
        out = self.fc(out)
        return out


In [None]:
import torch.nn.functional as F

# Attention layer for Seq2Seq
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.attention = nn.Linear(hidden_size * 2, hidden_size)
        self.v = nn.Parameter(torch.rand(hidden_size))

    def forward(self, encoder_outputs, final_hidden_state):
        # Expand final hidden state to match encoder outputs
        hidden = final_hidden_state.unsqueeze(1).repeat(1, encoder_outputs.size(1), 1)
        # Concatenate hidden state with encoder outputs
        energy = torch.tanh(self.attention(torch.cat((hidden, encoder_outputs), dim=2)))
        # Compute attention scores
        energy = energy.transpose(1, 2)
        v = self.v.repeat(encoder_outputs.size(0), 1).unsqueeze(1)
        attention_weights = torch.bmm(v, energy).squeeze(1)
        return F.softmax(attention_weights, dim=1)

In [None]:
# SentimentSeq2Seq class
class SentimentSeq2Seq(nn.Module):
    def __init__(self, embedding_matrix, hidden_size, output_size,
                 rnn_type="LSTM", num_layers=1, use_bidirectional=False,
                 use_dropout=False, use_batch_norm=False, use_layer_norm=False,
                 freeze_embeddings=True):
        super(SentimentSeq2Seq, self).__init__()

        vocab_size, embedding_dim = embedding_matrix.shape

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding.weight = nn.Parameter(torch.tensor(embedding_matrix, dtype=torch.float32))
        self.embedding.weight.requires_grad = not freeze_embeddings

        if rnn_type == "LSTM":
            self.encoder = nn.LSTM(embedding_dim, hidden_size, num_layers=num_layers,
                                   batch_first=True, bidirectional=use_bidirectional)
        elif rnn_type == "GRU":
            self.encoder = nn.GRU(embedding_dim, hidden_size, num_layers=num_layers,
                                  batch_first=True, bidirectional=use_bidirectional)
        else:
            self.encoder = nn.RNN(embedding_dim, hidden_size, num_layers=num_layers,
                                  batch_first=True, bidirectional=use_bidirectional)

        if use_bidirectional:
            decoder_input_size = hidden_size * 2
        else:
            decoder_input_size = hidden_size

        self.attention = Attention(decoder_input_size)
        self.decoder = nn.Linear(decoder_input_size, output_size)

        self.use_dropout = use_dropout
        self.use_batch_norm = use_batch_norm
        self.use_layer_norm = use_layer_norm

        if self.use_dropout:
            self.dropout = nn.Dropout(0.3)

        if self.use_batch_norm:
            self.batch_norm = nn.BatchNorm1d(decoder_input_size)

        if self.use_layer_norm:
            self.layer_norm = nn.LayerNorm(decoder_input_size)

    def forward(self, x, oov_flags=None):
        embedded = self.embedding(x)
        batch_size, seq_length, embedding_dim = embedded.shape

        if oov_flags is not None:
            random_embeddings = torch.randn(batch_size, seq_length, embedding_dim, device=embedded.device) * 0.6
            oov_flags = oov_flags.unsqueeze(-1).expand_as(embedded)
            embedded = torch.where(oov_flags, random_embeddings, embedded)

        encoder_outputs, hidden = self.encoder(embedded)

        if isinstance(hidden, tuple):
            hidden = hidden[0]

        if self.encoder.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]

        attention_weights = self.attention(encoder_outputs, hidden)
        context_vector = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)

        if self.use_batch_norm:
            context_vector = self.batch_norm(context_vector)

        if self.use_layer_norm:
            context_vector = self.layer_norm(context_vector)

        if self.use_dropout:
            context_vector = self.dropout(context_vector)

        output = self.decoder(context_vector)

        return output

In [None]:
# Function to get optimizer
def get_optimizer(params, model):
    optimizer_type = params["optimizer_type"]
    lr = params["learning_rate"]
    weight_decay = params.get("weight_decay", 0)  # Default to 0 if not specified

    if optimizer_type == "SGD":
        momentum = params.get("momentum", 0)  # Default to 0 if not specified
        return optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

    elif optimizer_type == "Adam":
        return optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

    else:
        raise ValueError(f"Unknown optimizer type: {optimizer_type}")

In [None]:
# Function to train the model
def train_model(model, train_loader, valid_loader, test_loader, optimizer, epochs, patience, scheduler_step_size, scheduler_gamma, device):
    criterion = nn.CrossEntropyLoss()
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=scheduler_step_size, gamma=scheduler_gamma)

    train_losses = []
    val_accuracies = []
    test_accuracies = []
    best_val_accuracy = 0
    epochs_no_improve = 0

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels, oov_flags in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            if oov_flags is not None:
                oov_flags = oov_flags.to(device)
            optimizer.zero_grad()
            outputs = model(inputs, oov_flags)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        avg_train_loss = running_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Validation
        val_accuracy = evaluate_accuracy(model, valid_loader, device)
        val_accuracies.append(val_accuracy)

        print(f'Epoch {epoch}, Loss: {avg_train_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

        # Early stopping logic
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            epochs_no_improve = 0
            best_epoch = epoch
            best_epoch_model = f'best_model_{best_epoch}.pt'
            # Save the best model
            torch.save(model.state_dict(), best_epoch_model)
            print("best_epoch: ", epoch)
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print(f"Early stopping at epoch {epoch}")
            break

        scheduler.step()

    # Load the best model before evaluating on test set
    # best_epoch = val_accuracies.index(best_val_accuracy) + 1
    model.load_state_dict(torch.load(best_epoch_model))

    # Test Accuracy
    test_accuracy = evaluate_accuracy(model, test_loader, device)
    test_accuracies = [test_accuracy] * len(train_losses)
    print(f'Test Accuracy: {test_accuracy:.2f}%')

    return train_losses, val_accuracies, test_accuracies

In [None]:
# Function to evaluate accuracy
def evaluate_accuracy(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels, oov_flags in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            if oov_flags is not None:
                oov_flags = oov_flags.to(device)
            outputs = model(inputs, oov_flags)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    return accuracy


In [None]:
def check_model_type(model_type, embedding_matrix, config):
    # Initialize the model based on 'model_type'
    if model_type == 'RNN':
        model = SentimentRNN(
            embedding_matrix=embedding_matrix,
            hidden_size=config['hidden_size'],
            output_size=config['output_size'],
            rnn_type=config['rnn_type'],
            num_layers=config['num_layers'],
            use_bidirectional=config['use_bidirectional'],
            use_dropout=config['use_dropout'],
            use_batch_norm=config['use_batch_norm'],
            use_layer_norm=config['use_layer_norm'],
            aggregation_method=config['aggregation_method'],
            freeze_embeddings=config['freeze_embeddings']
        )
        collate_function = collate_fn
        return model, collate_function
    elif model_type == 'CNN':
        model = SentimentCNN(
            embedding_matrix=embedding_matrix,
            output_size=config['output_size'],
            freeze_embeddings=config['freeze_embeddings'],
            num_filters=config['num_filters'],
            filter_sizes=config['filter_sizes'],
            dropout_rate=config['dropout_rate']
        )
        collate_function = collate_fn_cnn
        return model, collate_function
    elif model_type == 'Seq2Seq':
        model = SentimentSeq2Seq(
            embedding_matrix=embedding_matrix,
            hidden_size=config['hidden_size'],
            output_size=config['output_size'],
            rnn_type=config['rnn_type'],
            num_layers=config['num_layers'],
            use_bidirectional=config['use_bidirectional'],
            use_dropout=config['use_dropout'],
            use_batch_norm=config['use_batch_norm'],
            use_layer_norm=config['use_layer_norm'],
            freeze_embeddings=config['freeze_embeddings']
        )
        collate_function = collate_fn
        return model, collate_function
    else:
        raise ValueError(f"Unknown model type: {config['model_type']}")

In [None]:
def call_training_and_record(config, model, collate_function, vocabulary, word_to_index, file_name, header):
    # Get the optimizer dynamically based on the config
    optimizer_params = {
        'optimizer_type': config['optimizer_type'],
        'learning_rate': config['learning_rate'],
        'momentum': config['momentum'],
        'weight_decay': config['weight_decay']
    }
    optimizer = get_optimizer(optimizer_params, model)

    # Create DataLoaders using the appropriate collate function
    train_loader = DataLoader(TextDataset(train_dataset, vocabulary, word_to_index, config['oov_handling_method']), batch_size=config['batch_size'], shuffle=True, collate_fn=collate_function)
    valid_loader = DataLoader(TextDataset(validation_dataset, vocabulary, word_to_index, config['oov_handling_method']), batch_size=config['batch_size'], collate_fn=collate_function)
    test_loader = DataLoader(TextDataset(test_dataset, vocabulary, word_to_index, config['oov_handling_method']), batch_size=config['batch_size'], collate_fn=collate_function)

    device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
    print(device)
    model.to(device)

    # Train the model
    train_loss, val_accuracy, test_accuracy = train_model(
        model=model,
        train_loader=train_loader,
        valid_loader=valid_loader,
        test_loader=test_loader,
        optimizer=optimizer,
        epochs=config['epochs'],
        patience=config['patience'],
        scheduler_step_size=3,
        scheduler_gamma=0.1,
        device=device
    )

    # Open the CSV file for appending
    with open(file_name, mode='a', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=header)

        # If the file is empty, write the header
        csvfile.seek(0, 2)  # Move the cursor to the end of the file
        if csvfile.tell() == 0:
            writer.writeheader()

        for i in range(len(train_loss)):  # Iterate through each epoch's results
            epoch_result = {
                **config,  # Copy the current config
                'Epoch': i+1,
                'Train Loss': round(train_loss[i], 2),
                'Validation Accuracy': round(val_accuracy[i], 2),
                'Test Accuracy': round(test_accuracy[i], 2)
            }
            writer.writerow(epoch_result)  # Write the current epoch's results
        #results.append(epoch_result)  # Append the config to results list

    return print(f"Parameter combinations and performances recorded in {file_name}")

In [None]:
from sklearn.model_selection import ParameterGrid

part_2_rnn_experiments_param_grid = { # rnn, fixed embeddings no OOV soln
    "optimizer_type": ["Adam", "SGD"],
    "learning_rate": [0.001, 0.0001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32, 64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["RNN"],
    "rnn_type": ["RNN"],

    "num_layers": [1, 2, 3],
    "use_bidirectional": [False],
    "use_dropout": [True],
    "use_batch_norm": [True],
    "use_layer_norm": [True],
    "aggregation_method": ["last_hidden"],

    "hidden_size": [64, 128],
    "output_size": [2],
    "freeze_embeddings": [True], # keep embeddings fixed

    "oov_handling_method": ["none"], # no OOV soln
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_rnn_no_oov_experiments_param_grid = { # rnn, update embeddings without OOV soln
    "optimizer_type": ["Adam", "SGD"],
    "learning_rate": [0.001, 0.0001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32, 64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["RNN"],
    "rnn_type": ["RNN"],

    "num_layers": [1, 2, 3],
    "use_bidirectional": [False],
    "use_dropout": [True, False],
    "use_batch_norm": [True, False],
    "use_layer_norm": [True, False],
    "aggregation_method": ["last_hidden"],

    "hidden_size": [64, 128],
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["none"], # apply soln of OOV and train
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_rnn_with_oov_experiments_param_grid = { # rnn, update embeddings with OOV soln
    "optimizer_type": ["Adam", "SGD"],
    "learning_rate": [0.001, 0.0001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32, 64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["RNN"],
    "rnn_type": ["RNN"],

    "num_layers": [1, 2, 3],
    "use_bidirectional": [False],
    "use_dropout": [True, False],
    "use_batch_norm": [True, False],
    "use_layer_norm": [True, False],
    "aggregation_method": ["last_hidden"],

    "hidden_size": [64, 128],
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["unknown_token", "random"], # apply soln of OOV and train
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_blstm_bgru_experiments_param_grid = { # replace rnn with blstm/bgru, update embeddings with OOV soln
    "optimizer_type": ["Adam", "SGD"],
    "learning_rate": [0.001, 0.0001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32, 64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["RNN"],
    "rnn_type": ["LSTM", "GRU"],

    "num_layers": [1, 2, 3],
    "use_bidirectional": [True],
    "use_dropout": [True, False],
    "use_batch_norm": [True, False],
    "use_layer_norm": [True, False],
    "aggregation_method": ["last_hidden", "mean_pooling", "max_pooling"],

    "hidden_size": [64, 128],
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["unknown_token", "random"], # apply soln of OOV and train
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_cnn_experiments_param_grid = { # replace rnn with cnn, update embeddings with OOV soln
    "optimizer_type": ["Adam", "SGD"],
    "learning_rate": [0.001, 0.0001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32, 64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["CNN"],

    "num_filters": [128, 256],  # CNN only
    "filter_sizes": [[3, 4, 5], [2, 3]],
    "dropout_rate": [0.3, 0.7],  # CNN only
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["unknown_token", "random"], # apply soln of OOV and train
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_s2s_experiments_param_grid = { # seq2seq
    "optimizer_type":  ["Adam", "SGD"],
    "learning_rate": [0.001, 0.0001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32, 64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["Seq2Seq"],
    "rnn_type": ["LSTM", "GRU"],

    "num_layers": [1, 2, 3],
    "use_bidirectional": [True],
    "use_dropout": [True, False],
    "use_batch_norm": [True, False],
    "use_layer_norm": [True, False],

    "hidden_size": [64, 128],
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["unknown_token"],  # No OOV solution
    "embedding_dim": [100],  # Default
    "glove_file_path": ["glove.6B.100d.txt"]  # Default
}

In [None]:
import itertools
import csv

results = [] # Initialize a list to hold results

In [None]:
# Function to run experiments from CSV
def run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, param_grid, output_file):
    param_combinations = list(itertools.product(*param_grid.values())) # Initialize a list to hold results

    # Iterate through each combination of parameters
    for param in param_combinations:
        config = dict(zip(param_grid.keys(), param))
        print(config)

        # Load GloVe embeddings
        glove_embeddings = load_glove_embeddings(config['glove_file_path'])

        # Build vocabulary and create embedding matrix
        vocabulary = build_vocabulary(train_dataset, config['oov_handling_method'])
        embedding_matrix = create_embedding_matrix(config['embedding_dim'], vocabulary, glove_embeddings, config['oov_handling_method'])

        # Build word_to_index mapping
        word_to_index = {word: idx for idx, word in enumerate(vocabulary)}

        model, collate_function = check_model_type(model_type=config['model_type'], embedding_matrix=embedding_matrix, config=config)

        header = list(param_grid.keys()) + ['Epoch', 'Train Loss', 'Validation Accuracy', 'Test Accuracy'] # Define CSV header

        results = call_training_and_record(config=config, model=model, collate_function=collate_function, vocabulary=vocabulary, word_to_index=word_to_index, file_name=output_file, header=header)


In [None]:
# run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_2_rnn_experiments_param_grid, 'part_2_rnn_experiments_results.csv')
# run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_rnn_no_oov_experiments_param_grid, 'part_3_rnn_no_oov_experiments_results.csv')
# run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_rnn_with_oov_experiments_param_grid, 'part_3_rnn_with_oov_experiments_results.csv')
# run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_blstm_bgru_experiments_param_grid, 'part_3_blstm_bgru_experiments_results.csv')
# run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_cnn_experiments_param_grid, 'part_3_cnn_experiments_results.csv')
# run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_s2s_experiments_param_grid, 'part_3_seq2seq_experiments_results.csv')

In [None]:
# Best parameter for each model to run as demo

part_2_rnn_demo_param_grid = { # rnn, fixed embeddings no OOV soln
    "optimizer_type": ["SGD"],
    "learning_rate": [0.001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["RNN"],
    "rnn_type": ["RNN"],

    "num_layers": [2],
    "use_bidirectional": [False],
    "use_dropout": [True],
    "use_batch_norm": [True],
    "use_layer_norm": [True],
    "aggregation_method": ["last_hidden"],
    
    "hidden_size": [64],
    "output_size": [2],
    "freeze_embeddings": [True], # keep embeddings fixed

    "oov_handling_method": ["none"], # no OOV soln
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_rnn_no_oov_demo_param_grid = { # rnn, update embeddings with OOV soln
    "optimizer_type": ["Adam"],
    "learning_rate": [0.0001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["RNN"],
    "rnn_type": ["RNN"],

    "num_layers": [2],
    "use_bidirectional": [False],
    "use_dropout": [True],
    "use_batch_norm": [True],
    "use_layer_norm": [True],
    "aggregation_method": ["last_hidden"],
    
    "hidden_size": [128],
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["none"],
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_rnn_with_oov_demo_param_grid = { # rnn, update embeddings with OOV soln
    "optimizer_type": ["SGD"],
    "learning_rate": [0.0001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32],
    "epochs": [100],
    "patience": [10],

    "model_type": ["RNN"],
    "rnn_type": ["RNN"],

    "num_layers": [3],
    "use_bidirectional": [False],
    "use_dropout": [True],
    "use_batch_norm": [True],
    "use_layer_norm": [True],
    "aggregation_method": ["last_hidden"],
    
    "hidden_size": [128],
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["random"], # apply soln of OOV and train
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_blstm_bgru_demo_param_grid = { # replace rnn with blstm/bgru, update embeddings with OOV soln
    "optimizer_type": ["Adam"],
    "learning_rate": [0.001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [64],
    "epochs": [100],
    "patience": [10],

    "model_type": ["RNN"],
    "rnn_type": ["GRU"],

    "num_layers": [3],
    "use_bidirectional": [True],
    "use_dropout": [False],
    "use_batch_norm": [False],
    "use_layer_norm": [False],
    "aggregation_method": ["mean_pooling"],
    
    "hidden_size": [64],
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["unknown_token"], # apply soln of OOV and train
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_cnn_demo_param_grid = { # replace rnn with cnn, update embeddings with OOV soln
    "optimizer_type": ["Adam"],
    "learning_rate": [0.001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32],
    "epochs": [100],
    "patience": [10],

    "model_type": ["CNN"],
    
    "num_filters": [128],  # CNN only
    "filter_sizes": [[2, 3]],
    "dropout_rate": [0.3],  # CNN only
    "output_size": [2],
    "freeze_embeddings": [False], # update embeddings during training

    "oov_handling_method": ["unknown_token"], # apply soln of OOV and train
    "embedding_dim": [100], # default
    "glove_file_path": ["glove.6B.100d.txt"] # default
}

part_3_s2s_demo_param_grid = { # seq2seq
    "optimizer_type": ["Adam"],
    "learning_rate": [0.001],
    "momentum": [0.9],  # Used only for SGD
    "weight_decay": [0.0001],
    "batch_size": [32],
    "epochs": [100],
    "patience": [10],

    "model_type": ["Seq2Seq"],
    "rnn_type": ["LSTM"],

    "num_layers": [1],
    "use_bidirectional": [True],
    "use_dropout": [True],
    "use_batch_norm": [True],
    "use_layer_norm": [True],

    "hidden_size": [64],
    "output_size": [2],
    "freeze_embeddings": [False],  # Keep embeddings fixed

    "oov_handling_method": ["unknown_token"],  # No OOV solution
    "embedding_dim": [100],  # Default
    "glove_file_path": ["glove.6B.100d.txt"]  # Default
}

In [None]:
run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_2_rnn_demo_param_grid, 'part_2_rnn_demo_results.csv')
run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_rnn_no_oov_demo_param_grid, 'part_3_rnn_no_oov_demo_results.csv')
run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_rnn_with_oov_demo_param_grid, 'part_3_rnn_with_oov_demo_results.csv')
run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_blstm_bgru_demo_param_grid, 'part_3_blstm_bgru_demo_results.csv')
run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_cnn_demo_param_grid, 'part_3_cnn_demo_results.csv')
run_experiments_from_paramgrid(train_dataset, validation_dataset, test_dataset, part_3_s2s_demo_param_grid, 'part_3_seq2seq_demo_results.csv')