## Install and import dependencies

In [4]:
import os
import pickle

import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import gensim.downloader as api

from datasets import load_dataset
from gensim.models import KeyedVectors
from torch.utils.data import Dataset, DataLoader
import random
import spacy
from nltk.tokenize import word_tokenize
from sklearn.metrics import accuracy_score
import pandas as pd
import torch.nn.functional as F

nlp = spacy.load("en_core_web_sm")

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True  # Ensures reproducibility in CUDA operations
    torch.backends.cudnn.benchmark = False     # Disables some optimizations to ensure determinism

# Set the seed
set_seed(42)


# Part 3.4: Replace RNN with CNN model

## Part 0. Dataset Preparation

In [3]:
from datasets import load_dataset
dataset = load_dataset("rotten_tomatoes")
train_dataset = dataset['train']
validation_dataset = dataset['validation']
test_dataset = dataset['test']

In [50]:
# import pickle
# with open('updated_embedding_matrix.pkl', 'rb') as f:
#     new_embedding_matrix = pickle.load(f)
# print(type(new_embedding_matrix))
# print(len(new_embedding_matrix))
# keys= new_embedding_matrix.keys()
# print(keys)
# print(new_embedding_matrix['word_to_index'])


<class 'dict'>
2
dict_keys(['embeddings', 'word_to_index'])


In [14]:
with open("updated_embedding_matrix.pkl", "rb") as f:
    data = pickle.load(f)
    embedding_matrix = data["embeddings"]
    word_to_index = data["word_to_index"]

embedding_matrix_array = np.array(embedding_matrix)
embedding_matrix_tensor = torch.tensor(embedding_matrix_array, dtype=torch.float32)

In [6]:
pre_tokenized_train_texts = []
for sentence in train_dataset['text']:
    # Tokenize the sentence using spaCy and store tokens as a list of strings
    tokens = [token.text for token in nlp(sentence.lower())]
    pre_tokenized_train_texts.append(tokens)

In [7]:
# Pre-tokenize validation and test sets
pre_tokenized_validation_texts = [[token.text for token in nlp(sentence.lower())] for sentence in validation_dataset['text']]
pre_tokenized_test_texts = [[token.text for token in nlp(sentence.lower())] for sentence in test_dataset['text']]

In [8]:
# Prepare Dataset for PyTorch
class SentimentDataset(Dataset):
    def __init__(self, tokenized_texts, labels, vocab, embedding_matrix, max_len=30):
        self.texts = tokenized_texts
        self.labels = labels
        self.vocab = word_to_index
        self.embedding_matrix = embedding_matrix
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        tokens = self.texts[idx]
        label = self.labels[idx]
        vectorized_text = self.vectorize(tokens)
        return torch.tensor(vectorized_text), torch.tensor(label)

    def vectorize(self, tokens):
        vectorized = [self.vocab.get(token, self.vocab['<UNK>']) for token in tokens]

        # Check for out-of-range indices
        for index in vectorized:
            if index >= len(self.embedding_matrix):
                raise ValueError(f"Index {index} is out of range for the embedding matrix.")

        # Pad or truncate to max_len
        if len(vectorized) < self.max_len:
            vectorized += [self.vocab['<PAD>']] * (self.max_len - len(vectorized))
        else:
            vectorized = vectorized[:self.max_len]
        return vectorized

In [9]:
# Create the CNN Model
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNNModel(nn.Module):
    def __init__(self, embedding_matrix=embedding_matrix, num_filters=100, filter_sizes=[3,4,5], output_size=1, dropout=0.5):
        super(CNNModel, self).__init__()
        vocab_size, embedding_dim = embedding_matrix.shape

        # Define the embedding layer - Question has asked to unfreeze embedding layer
        self.embedding = nn.Embedding.from_pretrained(torch.tensor(embedding_matrix, dtype=torch.float32), freeze=False)

        # Define convolutional layers for different filter sizes
        self.convs = nn.ModuleList([
            nn.Conv2d(1, num_filters, (filter_size, embedding_dim))
            for filter_size in filter_sizes
        ])

        # Fully connected layer for classification
        self.fc = nn.Linear(num_filters * len(filter_sizes), output_size)

        # Dropout layer to prevent overfitting
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Embedding lookup and add channel dimension (batch_size, 1, seq_len, embedding_dim)
        x = self.embedding(x).unsqueeze(1)

        # Apply convolution + ReLU + Max Pooling in order to extract the most significant features
        conv_outs = [
            F.max_pool1d(F.relu(conv(x)).squeeze(3), x.size(2) - conv.kernel_size[0] + 1).squeeze(2)
            for conv in self.convs
        ]

        # Concatenate all convolution outputs along the filter dimension
        out = torch.cat(conv_outs, dim=1)

        # Apply dropout for regularization
        out = self.dropout(out)

        # Pass through the fully connected layer
        out = self.fc(out)

        return out


In [None]:
# Testing the model for now

In [10]:
# Prepare DataLoader
def create_data_loader(dataset, batch_size):
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

train_dataset_instance = SentimentDataset(pre_tokenized_train_texts, train_dataset['label'], word_to_index, embedding_matrix)
val_dataset_instance = SentimentDataset(pre_tokenized_validation_texts, validation_dataset['label'], word_to_index, embedding_matrix)
test_dataset_instance = SentimentDataset(pre_tokenized_test_texts, test_dataset['label'], word_to_index, embedding_matrix)

In [11]:
def evaluate(model, data_loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for data, labels in data_loader:
            output = model.forward(data)
            probs = torch.sigmoid(output)  # Apply sigmoid to get probabilities
            predicted = (probs >= 0.5).long()  # Convert probabilities to binary predictions
            all_preds.extend(predicted.cpu().numpy().flatten().tolist())
            all_labels.extend(labels.cpu().numpy().tolist())
    acc = accuracy_score(all_labels, all_preds)
    return acc

In [12]:
# Train and validate function
def train_and_validate(model, train_loader, val_loader, optimizer, criterion, max_epochs=100, convergence_threshold=0.001):
    best_val_acc = 0
    epochs_without_improvement = 0

    for epoch in range(max_epochs):
        model.train()
        running_loss = 0
        for data, target in train_loader:
            optimizer.zero_grad()
            output = model(data).squeeze(1)
            loss = criterion(output, target.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        val_acc = evaluate(model, val_loader)
        print(f"Epoch {epoch+1}/{max_epochs}, Loss: {running_loss/len(train_loader)}, Val Accuracy: {val_acc}")

        # Check for improvement
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            epochs_without_improvement = 0  # Reset counter
        else:
            epochs_without_improvement += 1

        # Check for convergence
        if epochs_without_improvement >= 10:  # Convergence condition (no improvement for 5 epochs)
            print("Convergence reached, stopping training.")
            break

    return best_val_acc, epoch


In [15]:
# Hyperparameter tuning
learning_rates = [0.001, 0.005, 0.01]
batch_sizes = [64]
num_filters_options = [100]
filter_sizes_options = [[2,3,4]]
dropout_rates = [0.6]
optimizers = ['rmsprop']
epochs = [10, 20]
weight_decays = [0, 1e-4, 1e-5]

vocab_size, embedding_dim = embedding_matrix_array.shape
output_size = 1


best_val_acc = 0
best_hyperparams = {}

for lr in learning_rates:
    for bs in batch_sizes:
        for num_filters in num_filters_options:
            for filter_sizes in filter_sizes_options:
                for dropout in dropout_rates:
                    for opt in optimizers:
                        print(f"\nTraining with: LR={lr}, Batch Size={bs}, Filters={num_filters}, "
                              f"Filter Sizes={filter_sizes}, Dropout={dropout}, Optimizer={opt}")

                        # Initialize CNN model with current hyperparameters
                        model = CNNModel(
                            embedding_matrix=embedding_matrix_array,
                            num_filters=num_filters,
                            filter_sizes=filter_sizes,
                            output_size=1,  # Binary classification
                            dropout=dropout
                        )
                        criterion = nn.BCEWithLogitsLoss()

                        # Select optimizer
                        if opt == 'adam':
                            optimizer = optim.Adam(model.parameters(), lr=lr)
                        elif opt == 'sgd':
                            optimizer = optim.SGD(model.parameters(), lr=lr)
                        elif opt == 'rmsprop':
                            optimizer = optim.RMSprop(model.parameters(), lr=lr)

                        # Create DataLoaders
                        train_loader = create_data_loader(train_dataset_instance, bs)
                        val_loader = create_data_loader(val_dataset_instance, bs)

                        # Train and validate the model
                        val_acc, epochs_used = train_and_validate(model, train_loader, val_loader, optimizer, criterion)

                        print(f"Validation Accuracy: {val_acc}")

                        # Update best hyperparameters if validation accuracy improves
                        if val_acc > best_val_acc:
                            best_val_acc = val_acc
                            best_hyperparams = {
                                'learning_rate': lr,
                                'batch_size': bs,
                                'num_filters': num_filters,
                                'filter_sizes': filter_sizes,
                                'dropout': dropout,
                                'optimizer': opt
                            }
                            best_epochs = epochs_used

# Print the best configuration found
print(f"\nBest Model Configuration: {best_hyperparams} with Validation Accuracy: {best_val_acc} over {best_epochs} epochs")


Training with: LR=0.001, Batch Size=64, Filters=100, Filter Sizes=[2, 3, 4], Dropout=0.6, Optimizer=rmsprop
Epoch 1/100, Loss: 0.5582745911470101, Val Accuracy: 0.7091932457786116
Epoch 2/100, Loss: 0.3613274007368444, Val Accuracy: 0.7504690431519699
Epoch 3/100, Loss: 0.23905576987942653, Val Accuracy: 0.7504690431519699
Epoch 4/100, Loss: 0.15371353596226492, Val Accuracy: 0.7579737335834896
Epoch 5/100, Loss: 0.08462402604019909, Val Accuracy: 0.7654784240150094
Epoch 6/100, Loss: 0.051056167326255966, Val Accuracy: 0.7673545966228893
Epoch 7/100, Loss: 0.03188472049686113, Val Accuracy: 0.7654784240150094
Epoch 8/100, Loss: 0.020907760886197436, Val Accuracy: 0.7542213883677298
Epoch 9/100, Loss: 0.015221409291574565, Val Accuracy: 0.7523452157598499
Epoch 10/100, Loss: 0.010351553257839726, Val Accuracy: 0.7542213883677298
Epoch 11/100, Loss: 0.008564261173779047, Val Accuracy: 0.7532833020637899
Epoch 12/100, Loss: 0.005215130450330855, Val Accuracy: 0.7617260787992496
Epoch 13

In [16]:
#Trainig the model with the best hyperparameters
batch_size = 64
lr=0.005
num_filters=100
filter_sizes=[2,3,4]
dropout=0.6
model = CNNModel(embedding_matrix_array, num_filters=num_filters, filter_sizes=filter_sizes, output_size=1, dropout=dropout)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.RMSprop(model.parameters(), lr=lr)

train_loader = create_data_loader(train_dataset_instance, batch_size)
val_loader = create_data_loader(val_dataset_instance, batch_size)

# Train and validate
val_acc, epochs_used = train_and_validate(model, train_loader, val_loader, optimizer, criterion)
print(f"Validation Accuracy: {val_acc}, over {epochs_used} epochs")

Epoch 1/100, Loss: 0.783202776268347, Val Accuracy: 0.6894934333958724
Epoch 2/100, Loss: 0.2749965820850721, Val Accuracy: 0.7607879924953096
Epoch 3/100, Loss: 0.12343946154882658, Val Accuracy: 0.7532833020637899
Epoch 4/100, Loss: 0.07709409655836313, Val Accuracy: 0.7420262664165104
Epoch 5/100, Loss: 0.058222315564348516, Val Accuracy: 0.7439024390243902
Epoch 6/100, Loss: 0.036730214913459076, Val Accuracy: 0.7307692307692307
Epoch 7/100, Loss: 0.035013813217567566, Val Accuracy: 0.7410881801125704
Epoch 8/100, Loss: 0.03602388301813186, Val Accuracy: 0.7317073170731707
Epoch 9/100, Loss: 0.03389830855818089, Val Accuracy: 0.7345215759849906
Epoch 10/100, Loss: 0.03148825367729269, Val Accuracy: 0.7288930581613509
Epoch 11/100, Loss: 0.017365599017402963, Val Accuracy: 0.725140712945591
Epoch 12/100, Loss: 0.035425150948545696, Val Accuracy: 0.7185741088180112
Convergence reached, stopping training.
Validation Accuracy: 0.7607879924953096, over 11 epochs


In [18]:
# Step 7: Evaluate on Test Set
test_loader = create_data_loader(test_dataset_instance, batch_size)
test_acc = evaluate(model, test_loader)
print(f"Test Accuracy: {test_acc}")

# Report the configuration
print(f"Final Configuration:\nEpochs: {epochs_used}\nLearning Rate: {lr}\nOptimizer: RmsProp\nBatch Size: {batch_size}")

Test Accuracy: 0.7373358348968105
Final Configuration:
Epochs: 11
Learning Rate: 0.005
Optimizer: RmsProp
Batch Size: 64


In [19]:
# Step 8: Get a sample sentence from the test set and predict
# Select a random index from the test dataset
random_index = random.randint(0, len(test_dataset) - 1)

# Get the corresponding sentence and its label from the test dataset
sample_sentence = test_dataset[random_index]['text']  # Assuming the dataset contains a 'text' field
true_label = test_dataset[random_index]['label']  # Assuming there's a label field

# Tokenize the sample sentence
sample_tokens = nlp(sample_sentence.lower())

# Convert tokens to indices
sample_indices = []
for token in sample_tokens:
    if token in word_to_index:
        sample_indices.append(list(word_to_index).index(token))
    else:
        sample_indices.append(list(word_to_index).index("<UNK>"))
sample_tensor = torch.tensor(sample_indices).unsqueeze(0)  # Add batch dimension
# Make prediction using the model
model.eval()  # Set the model to evaluation mode
with torch.no_grad():  # No need to compute gradients during inference
    output = model(sample_tensor)  # Pass the tensor to the model
    _, predicted = torch.max(output, 1)  # Get the index of the max log-probability

# Map predicted index to sentiment label
sentiment_labels = ['negative', 'positive']  # Adjust according to your label encoding
predicted_label = sentiment_labels[predicted.item()]

# Print results
print(f"Sample Sentence: '{sample_sentence}'")
print(f"True Label: {true_label}")
print(f"Predicted Label: {predicted_label}")

Sample Sentence: 'noyce creates a film of near-hypnotic physical beauty even as he tells a story as horrifying as any in the heart-breakingly extensive annals of white-on-black racism .'
True Label: 1
Predicted Label: negative
