In [None]:
!pip install datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/SC4002 Natural Language Processing



In [None]:
%cd "/content/drive/MyDrive/SC4002 Natural Language Processing"

In [None]:
from datasets import load_dataset

In [None]:
import sys
sys.path.append('/content/SC4002-NLP-Project')

In [None]:
dataset = load_dataset("rotten_tomatoes")
train_dataset = dataset['train']
validation_dataset = dataset['validation']
test_dataset = dataset['test']

In [None]:
import os
import gensim.downloader as api
from gensim.models import KeyedVectors

def save_model_to_drive(model_name):
    import gensim.downloader as api
    from gensim.models import KeyedVectors
    import os

    save_path = '/content/drive/MyDrive/models'
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    if model_name == 'word2vec':
        path = api.load("word2vec-google-news-300", return_path=True)
        model = KeyedVectors.load_word2vec_format(path, binary=True)
        model.save(f'{save_path}/word2vec.model')
        print("Word2Vec model saved to Google Drive successfully!")

    elif model_name == 'glove':
        model = api.load("glove-wiki-gigaword-300")
        model.save(f'{save_path}/glove.model')
        print("GloVe model saved to Google Drive successfully!")

    return model

def load_model_from_drive(model_name):
    from gensim.models import KeyedVectors
    import os

    model_path = f'/content/drive/MyDrive/models/{model_name}.model'

    if os.path.exists(model_path):
        model = KeyedVectors.load(model_path)
        print(f"{model_name.capitalize()} model loaded from Drive successfully!")
        return model
    else:
        print(f"{model_name.capitalize()} model not found in Drive. Downloading and saving...")
        return save_model_to_drive(model_name)

def get_model(model_name):
    if model_name not in ['word2vec', 'glove']:
        raise ValueError("model_name must be either 'word2vec' or 'glove'")

    try:
        return load_model_from_drive(model_name)
    except Exception as e:
        print(f"Error loading {model_name} model: {str(e)}")
        return None

word2vec_model = get_model('word2vec')

glove_model = get_model('glove')

In [None]:
import os
import gensim.downloader as api
import pickle

def load_and_save_fasttext_model(drive_path="/content/drive/MyDrive/models/fasttext_model.model"):
    """
    Load the FastText model for OOV handling. Save the model to Google Drive if it does not exist.

    Args:
        drive_path (str): Path in Google Drive to save/load the FastText model.

    Returns:
        model: Loaded FastText model.
    """
    # Check if the model already exists in Google Drive
    if os.path.exists(drive_path):
        print("Loading FastText model from Drive...")
        fasttext_model = api.load(drive_path)
        print("FastText model loaded from Drive successfully.")
    else:
        print("Downloading FastText model...")
        # Load FastText model from gensim
        fasttext_model = api.load("fasttext-wiki-news-subwords-300")
        print("FastText model loaded successfully.")

    return fasttext_model


In [None]:
import numpy as np
# Load FastText model for OOV handling
fasttext_model = load_and_save_fasttext_model()
print("FastText model loaded successfully.")

In [None]:
from utils import get_embedding, get_improved_embedding

word_to_vector_map = {}

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from collections import Counter
import re

In [None]:
from utils import SentimentDataset
from utils import get_device

device = get_device()

In [None]:
print(type(train_dataset))
print(train_dataset[0])

In [None]:
def prepare_data(dataset):
    texts = [example['text'] for example in dataset]
    labels = [example['label'] for example in dataset]
    return texts, labels

train_texts, train_labels = prepare_data(train_dataset)
val_texts, val_labels = prepare_data(validation_dataset)
test_texts, test_labels = prepare_data(test_dataset)

train_labels = np.array(train_labels)
val_labels = np.array(val_labels)
test_labels = np.array(test_labels)

train_dataset = SentimentDataset(
    train_texts, train_labels, glove_model, word_to_vector_map, get_improved_embedding, fasttext_model
)

val_dataset = SentimentDataset(
    val_texts, val_labels, glove_model, word_to_vector_map, get_improved_embedding, fasttext_model
)


In [None]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)


# **BiLSTMModel**

In [None]:
from BiLSTM import BiLSTMModel
from utils import train_model

In [None]:
bilstm = BiLSTMModel(**BiLSTMModel.config).to(device)
bilstm_state,_,_ = train_model(
    bilstm,
    train_loader,
    val_loader,
    num_epochs=30,
    device=device,
    model_name='BiLSTM'
)
bilstm.load_state_dict(bilstm_state)

# **BiGRUMODEL**

In [None]:
from BiGRU import BiGRUModel

In [None]:

bigru = BiGRUModel(**BiGRUModel.config).to(device)
bigru_state,_,_ = train_model(
    bigru,
    train_loader,
    val_loader,
    num_epochs=30,
    device=device,
    model_name='BiGRU'
)
bigru.load_state_dict(bigru_state)

# **CNN Model**

In [None]:
from CNN import CNNModel

In [None]:

cnn = CNNModel(**CNNModel.config).to(device)

cnn_state,_,_ = train_model(
    cnn,
    train_loader,
    val_loader,
    num_epochs=1,
    device=device,
    model_name='CNN'
)
cnn.load_state_dict(cnn_state)

# Enhanced Bi-LSTM using Attention

In [None]:
from AttentiveBiLSTM import AttentiveBiLSTMModel

In [None]:


attentive_bilstm = AttentiveBiLSTMModel(**AttentiveBiLSTMModel.config).to(device)
attentive_bilstm_state,_,_ = train_model(
    attentive_bilstm,
    train_loader,
    val_loader,
    num_epochs=1,
    device=device,
    model_name='AttentiveBiLSTM'
)
attentive_bilstm_state.load_state_dict(attentive_bilstm_state)

# **RESULTS**

In [None]:
test_dataset = SentimentDataset(
    test_texts, test_labels, glove_model, word_to_vector_map, get_embedding, fasttext_model
)


In [None]:
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [None]:
import numpy as np
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support,
    confusion_matrix, roc_curve, auc, mean_squared_error
)
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd



def evaluate_model_comprehensive(model, test_loader, criterion, device, model_name, glove_model, fasttext_model, word_to_vector_map):
    model.eval()
    all_predictions = []
    all_labels = []
    all_probs = []
    total_loss = 0

    print(f"Starting evaluation of {model_name}")
    print(f"Number of batches in test loader: {len(test_loader)}")
    with torch.no_grad():
        for batch_idx, (batch_words, labels) in enumerate(test_loader):


            batch_size = len(batch_words)
            max_length = len(batch_words[0])
            processed_inputs = torch.zeros((batch_size, max_length, 300), device=device)

            # Convert words to embeddings
            for i in range(batch_size):
                for j in range(max_length):
                    word = batch_words[i][j]
                    embedding_models = {
                        "glove": glove_model,
                        "fasttext": fasttext_model
                    }
                    embedding = word
                    processed_inputs[i][j] = torch.tensor(embedding, device=device)
            labels = labels.to(device)
            if len(labels.shape) > 1:
                labels = labels.squeeze()


            outputs = model(processed_inputs)

            loss = criterion(outputs, labels)
            total_loss += loss.item()

            probs = torch.softmax(outputs, dim=1)
            _, predicted = outputs.max(1)

            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs[:,1].cpu().numpy())

    print("\nEvaluation completed successfully")
    print(f"Total samples processed: {len(all_predictions)}")

    all_predictions = np.array(all_predictions)
    all_labels = np.array(all_labels)
    all_probs = np.array(all_probs)

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_predictions, average='binary')
    conf_matrix = confusion_matrix(all_labels, all_predictions)
    rmse = np.sqrt(mean_squared_error(all_labels, all_predictions))
    fpr, tpr, _ = roc_curve(all_labels, all_probs)
    roc_auc = auc(fpr, tpr)
    avg_loss = total_loss / len(test_loader)

    print(f"\nTest Results for {model_name}")
    print("=" * 50)
    print(f"Test Loss: {avg_loss:.4f}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"ROC AUC: {roc_auc:.4f}")

    return {
        'model_name': model_name,
        'loss': avg_loss,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'rmse': rmse,
        'roc_auc': roc_auc,
        'conf_matrix': conf_matrix,
        'fpr': fpr,
        'tpr': tpr,
        'predictions': all_predictions,
        'labels': all_labels,
        'probabilities': all_probs
    }

def plot_confusion_matrix(conf_matrix, model_name):
    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
    plt.title(f'Confusion Matrix - {model_name}')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

def plot_roc_curves(results_dict):
    plt.figure(figsize=(10, 8))
    for result in results_dict.values():
        plt.plot(
            result['fpr'],
            result['tpr'],
            label=f"{result['model_name']} (AUC = {result['roc_auc']:.3f})"
        )
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curves Comparison')
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.show()

def plot_metrics_comparison(results_dict):
    metrics = ['accuracy', 'precision', 'recall', 'f1', 'rmse', 'roc_auc']
    model_names = list(results_dict.keys())

    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.ravel()

    for idx, metric in enumerate(metrics):
        values = [results_dict[model][metric] for model in model_names]
        axes[idx].bar(model_names, values)
        axes[idx].set_title(metric.upper())
        axes[idx].set_ylim(0, max(1.1, max(values) * 1.1))
        axes[idx].tick_params(axis='x', rotation=45)

    plt.tight_layout()
    plt.show()


criterion = nn.CrossEntropyLoss()
results_dict = {}

bilstm.eval()
bigru.eval()
cnn.eval()
attentive_bilstm.eval()

models = {
    'BiLSTM': bilstm,
    'BiGRU': bigru,
    'CNN': cnn,
    'AttentiveBiLSTM': attentive_bilstm,
    'AttentiveBiGRU' : attentive_bigru
}

for model_name, model in models.items():
    print(f"\nEvaluating {model_name}...")
    results = evaluate_model_comprehensive(
        model, test_loader, criterion, device, model_name, glove_model, fasttext_model, word_to_vector_map
    )
    results_dict[model_name] = results

    plot_confusion_matrix(results['conf_matrix'], model_name)

plot_roc_curves(results_dict)

plot_metrics_comparison(results_dict)

summary_df = pd.DataFrame({
    'Model': [],
    'Loss': [],
    'Accuracy': [],
    'Precision': [],
    'Recall': [],
    'F1 Score': [],
    'RMSE': [],
    'ROC AUC': []
})
import pandas as pd

def create_summary_table(results_dict):
    summary_data = []

    for model_name, results in results_dict.items():
        summary_data.append({
            'Model': model_name,
            'Loss': results['loss'],
            'Accuracy': results['accuracy'],
            'Precision': results['precision'],
            'Recall': results['recall'],
            'F1 Score': results['f1'],
            'RMSE': results['rmse'],
            'ROC AUC': results['roc_auc']
        })

    summary_df = pd.DataFrame(summary_data)

    print("\nSummary of Results:")
    print("=" * 100)
    print(summary_df.to_string(index=False, float_format=lambda x: '{:.4f}'.format(x)))

    summary_df.to_csv('model_comparison_results.csv', index=False)

    return summary_df

summary_df = create_summary_table(results_dict)
for model_name, results in results_dict.items():
    summary_df = create_summary_table(results_dict)

print("\nSummary of Results:")
print("=" * 100)
print(summary_df.to_string(index=False, float_format=lambda x: '{:.4f}'.format(x)))

summary_df.to_csv('model_comparison_results.csv', index=False)

In [None]:
print(test_labels)