In [1]:
# Import Libraries
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from gensim.models import KeyedVectors
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, precision_score, recall_score, f1_score,ConfusionMatrixDisplay
import seaborn as sns
import nltk
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')
from nltk.tokenize import word_tokenize

In [None]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

## Step 2: Load and Preprocess the Dataset
Here, we load the dataset from the CSV files and preprocess it for training the model.


In [None]:
# Load the preprocessed data from CSV files
train_data = pd.read_csv("train.csv")
val_data = pd.read_csv("val.csv")
test_data = pd.read_csv("test.csv")

In [None]:
X_train = train_data['text']
y_train = train_data['label']

X_val = val_data['text']
y_val = val_data['label']

X_test = test_data['text']
y_test = test_data['label']

## Step 3: Load Pre-trained Word2Vec Embeddings
We use pre-trained Word2Vec embeddings to represent words as dense vectors.
These embeddings improve the performance of the model by leveraging semantic relationships between words.

In [None]:
# Load pre-trained Word2Vec model
word2vec = KeyedVectors.load_word2vec_format("GoogleNews-vectors-negative300.bin.gz", binary=True)

In [None]:
# Create a vocabulary
embedding_dim = 300
vocab = {"<PAD>": 0, "<UNK>": 1}  # Special tokens
embedding_matrix = [np.zeros(embedding_dim), np.random.uniform(-0.01, 0.01, embedding_dim)]  # Initialize <PAD> and <UNK>

In [None]:
# Build vocabulary from Word2Vec
for text in X_train:
    for word in word_tokenize(text.lower()):
        if word not in vocab and word in word2vec:
            vocab[word] = len(vocab)
            embedding_matrix.append(word2vec[word])

embedding_matrix = np.array(embedding_matrix)
vocab_size = len(vocab)

print(f"Vocabulary size: {vocab_size}")

In [None]:
print(embedding_matrix.shape)
print(embedding_matrix[embedding_matrix.shape[0]-1])


## Step 4: Tokenize and Pad Sequences
Convert the text into sequences of integers based on the vocabulary.
We also pad sequences to ensure they all have the same length for batch processing.

In [None]:
# Tokenize and convert text to sequences
def text_to_sequence(text, vocab, max_len=1000):
    sequence = [vocab.get(word, vocab["<UNK>"]) for word in word_tokenize(text.lower())]
    if len(sequence) < max_len:
        sequence.extend([vocab["<PAD>"]] * (max_len - len(sequence)))
    return sequence[:max_len]

# Apply tokenization
max_len = 1000
X_train_seq = [text_to_sequence(text, vocab, max_len) for text in X_train]
X_val_seq = [text_to_sequence(text, vocab, max_len) for text in X_val]
X_test_seq = [text_to_sequence(text, vocab, max_len) for text in X_test]

In [None]:
# Custom Dataset Class
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = torch.tensor(texts, dtype=torch.long)
        self.labels = torch.tensor(labels.values, dtype=torch.float32)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

In [None]:
# Create Dataset and DataLoader
batch_size = 32
train_dataset = TextDataset(X_train_seq, y_train)
val_dataset = TextDataset(X_val_seq, y_val)
test_dataset = TextDataset(X_test_seq, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [None]:
## Step 6: Define the MLP Model

In [None]:
class MLPModel(nn.Module):

    def __init__(self, embedding_matrix, hidden_dims=[512, 256, 128], output_dim=1):
        super(MLPModel, self).__init__()

        # Embedding Layer with frozen weights
        self.embedding = nn.Embedding.from_pretrained(
            torch.tensor(embedding_matrix, dtype=torch.float32),
            freeze=True,
            padding_idx=0
        )

        # Calculate input dimension
        input_dim = embedding_matrix.shape[1] * max_len

        # Create list to hold all layers
        layers = []

        # Input layer
        layers.append(nn.Linear(input_dim, hidden_dims[0]))
        layers.append(nn.LayerNorm(hidden_dims[0]))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(0.5))

        # Hidden layers
        for i in range(len(hidden_dims) - 1):
            layers.append(nn.Linear(hidden_dims[i], hidden_dims[i + 1]))
            layers.append(nn.LayerNorm(hidden_dims[i + 1]))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.2))

        # Output layer
        layers.append(nn.Linear(hidden_dims[-1], output_dim))

        # Combine all layers
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        # Get embeddings and flatten
        embedded = self.embedding(x)
        flattened = embedded.view(embedded.size(0), -1)

        # Forward pass through all layers
        return self.model(flattened)


## Step 7: Initialize the Model 
We initialize the model with the embedding matrix.

In [None]:
model = MLPModel(embedding_matrix).to(device)

## Step 7: Function for Evaluate the Model
This method evaluates the model on the validation and test sets 

In [None]:
def evaluate_model(model, data_loader, criterion):
    model.eval()
    total_loss = 0
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for texts, labels in data_loader:
            texts = texts.to(device)
            labels = labels.float().to(device)

            outputs = model(texts).squeeze()

            loss = criterion(outputs, labels)
            total_loss += loss.item()

            predicted = torch.round(outputs)
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    metrics = {
        'accuracy': accuracy_score(all_labels, all_predictions),
        'precision': precision_score(all_labels, all_predictions),
        'recall': recall_score(all_labels, all_predictions),
        'f1': f1_score(all_labels, all_predictions)
    }

    return total_loss / len(data_loader), metrics

## Step 8: Function for Train the Model
We train the model using the training and validation datasets and monitor the loss and accuracy.
To calculate the loss, we use the Binary Cross-Entropy loss function.
We use the Adam optimizer to update the model parameters based on the gradients.
To prevent overfitting, we make use of early stopping and learning rate scheduling.


In [None]:
def train_model(model, train_loader, val_loader, epochs=15, learning_rate=0.0005):
    metrics_history = {
        'train_loss': [], 'val_loss': [],
        'train_acc': [], 'val_acc': [],
        'train_precision': [], 'val_precision': [],
        'train_recall': [], 'val_recall': [],
        'train_f1': [], 'val_f1': [],
        'learning_rates': []
    }
    
    num_pos = sum(y_train == 1)
    num_neg = sum(y_train == 0)
    pos_weight = torch.tensor([num_neg / num_pos]).to(device)
    criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=learning_rate,
        weight_decay=0.1
    )
    
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer,
        mode='max',
        factor=0.5,
        patience=2,
        verbose=True
    )
    
    best_model_state = None
    best_val_loss = float('inf')
    patience = 4
    patience_counter = 0
    
    num_warmup_steps = 100
    def get_lr(step):
        if step < num_warmup_steps:
            return learning_rate * (step / num_warmup_steps)
        return learning_rate
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_predictions = []
        train_true_labels = []
        total_train_loss = 0
        
        for i, (texts, labels) in enumerate(train_loader):
            current_lr = get_lr(epoch * len(train_loader) + i)
            for param_group in optimizer.param_groups:
                param_group['lr'] = current_lr
            
            texts = texts.to(device)
            # Ensure labels are float and proper shape
            labels = labels.float().to(device)
            
            optimizer.zero_grad()
            
            # Forward pass and ensure output shape matches labels
            outputs = model(texts).squeeze(-1)
            
            # Ensure shapes match
            if len(outputs.shape) == 0:
                outputs = outputs.unsqueeze(0)
            if len(labels.shape) == 0:
                labels = labels.unsqueeze(0)
                
            loss = criterion(outputs, labels)
            loss.backward()
            
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            predicted = torch.round(torch.sigmoid(outputs))
            train_predictions.extend(predicted.cpu().numpy())
            train_true_labels.extend(labels.cpu().detach().numpy())
            
            total_train_loss += loss.item()
            
            if (i+1) % 100 == 0:
                print(f'Epoch: {epoch}, Batch: {i+1}/{len(train_loader)}, Loss: {loss.item():.4f}')
        
        
        # Calculate training metrics
        train_acc = accuracy_score(train_true_labels, train_predictions)
        train_precision = precision_score(train_true_labels, train_predictions)
        train_recall = recall_score(train_true_labels, train_predictions)
        train_f1 = f1_score(train_true_labels, train_predictions)
        avg_train_loss = total_train_loss / len(train_loader)

        # Validation phase
        val_loss, val_metrics = evaluate_model(model, val_loader, criterion)

        # Store metrics
        metrics_history['train_loss'].append(avg_train_loss)
        metrics_history['val_loss'].append(val_loss)
        metrics_history['train_acc'].append(train_acc)
        metrics_history['val_acc'].append(val_metrics['accuracy'])
        metrics_history['train_precision'].append(train_precision)
        metrics_history['val_precision'].append(val_metrics['precision'])
        metrics_history['train_recall'].append(train_recall)
        metrics_history['val_recall'].append(val_metrics['recall'])
        metrics_history['train_f1'].append(train_f1)
        metrics_history['val_f1'].append(val_metrics['f1'])
        metrics_history['learning_rates'].append(current_lr)

        # Print epoch metrics
        print(f"\nEpoch {epoch+1} Results:")
        print(f"Training Loss: {avg_train_loss:.4f}")
        print(f"Validation Loss: {val_loss:.4f}")
        print(f"Training Metrics: Acc={train_acc:.4f}, Prec={train_precision:.4f}, Rec={train_recall:.4f}, F1={train_f1:.4f}")
        print(f"Validation Metrics: Acc={val_metrics['accuracy']:.4f}, Prec={val_metrics['precision']:.4f}, Rec={val_metrics['recall']:.4f}, F1={val_metrics['f1']:.4f}")
        print(f"Learning Rate: {current_lr}")

        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict().copy()
            patience_counter = 0
            torch.save(best_model_state, 'best_mlp_model.pth')
            print("► Saved new best model")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("\nEarly stopping triggered!")
                model.load_state_dict(best_model_state)
                break

        
        scheduler.step(val_metrics['accuracy'])
        

    return metrics_history

## Step 9: Plot Training History
We plot the training history to visualize the loss, accuracy, precision, recall, and F1 score over epochs.


In [None]:
def plot_training_history(metrics_history):
    # Set up the style
    # plt.style.use('seaborn')

    # Create a figure with multiple subplots
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

    # Plot losses
    ax1.plot(metrics_history['train_loss'], label='Training Loss')
    ax1.plot(metrics_history['val_loss'], label='Validation Loss')
    ax1.set_title('Loss Over Time')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()

    # Plot accuracies
    ax2.plot(metrics_history['train_acc'], label='Training Accuracy')
    ax2.plot(metrics_history['val_acc'], label='Validation Accuracy')
    ax2.set_title('Accuracy Over Time')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy')
    ax2.legend()

    # Plot precision and recall
    ax3.plot(metrics_history['train_precision'], label='Training Precision')
    ax3.plot(metrics_history['val_precision'], label='Validation Precision')
    ax3.plot(metrics_history['train_recall'], label='Training Recall')
    ax3.plot(metrics_history['val_recall'], label='Validation Recall')
    ax3.set_title('Precision and Recall Over Time')
    ax3.set_xlabel('Epoch')
    ax3.set_ylabel('Score')
    ax3.legend()

    # Plot F1 scores
    ax4.plot(metrics_history['train_f1'], label='Training F1')
    ax4.plot(metrics_history['val_f1'], label='Validation F1')
    ax4.set_title('F1 Score Over Time')
    ax4.set_xlabel('Epoch')
    ax4.set_ylabel('F1 Score')
    ax4.legend()

    plt.tight_layout()
    plt.show()

## Step 10: Plot the Confusion Matrix
We plot the confusion matrix to visualize the model's performance on the test set.

In [None]:
def plot_confusion_matrix(model, test_loader):
    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for texts, labels in test_loader:
            texts = texts.to(device)
            outputs = model(texts).squeeze(-1)
            predicted = torch.round(torch.sigmoid(outputs))
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    cm = confusion_matrix(all_labels, all_predictions)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

    print("\nClassification Report:")
    print(classification_report(all_labels, all_predictions))

## Step 11: Train the Model
We train the model using the training and validation datasets and monitor the loss and accuracy.
This process may take a while, depending on the number of epochs and the complexity of the model.

In [None]:
metrics_history = train_model(model, train_loader, val_loader, epochs=2, learning_rate=5e-4)

## Save the model

In [None]:
torch.save(model.state_dict(), "MLP_model.pth")

## Step 12: Evaluate the Model
plot the training history and evaluate the model on the test set.

In [None]:
plot_training_history(metrics_history)
plot_confusion_matrix(model, test_loader)

### plot the loss over epochs

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(metrics_history['train_loss'], label='Training Loss')
plt.plot(metrics_history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss vs. Epochs')
plt.legend()
plt.grid()
plt.show()
plt.savefig("loss_vs_epochs_first try.png")  # Save the current plot


### plot the Accuracy over epochs

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(metrics_history['train_acc'], label='Training Accuracy')
plt.plot(metrics_history['val_acc'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy vs. Epochs')
plt.legend()
plt.grid()
plt.show()
plt.savefig("accuracy_vs_epochs_first_try.png")  # Save the current plot

### plot the precision, recall and f1-score over epochs

In [None]:
# Precision
plt.figure(figsize=(10, 5))
plt.plot(metrics_history['train_precision'], label='Training Precision')
plt.plot(metrics_history['val_precision'], label='Validation Precision')
plt.xlabel('Epochs')
plt.ylabel('Precision')
plt.title('Precision vs. Epochs')
plt.legend()
plt.grid()
plt.show()
plt.savefig("precision_vs_epochs_first_try.png")  # Save the current

# Recall
plt.figure(figsize=(10, 5))
plt.plot(metrics_history['train_recall'], label='Training Recall')
plt.plot(metrics_history['val_recall'], label='Validation Recall')
plt.xlabel('Epochs')
plt.ylabel('Recall')
plt.title('Recall vs. Epochs')
plt.legend()
plt.grid()
plt.show()
plt.savefig("recall_vs_epochs_first_try.png")  # Save the current plot

# F1-Score
plt.figure(figsize=(10, 5))
plt.plot(metrics_history['train_f1'], label='Training F1-Score')
plt.plot(metrics_history['val_f1'], label='Validation F1-Score')
plt.xlabel('Epochs')
plt.ylabel('F1-Score')
plt.title('F1-Score vs. Epochs')
plt.legend()
plt.grid()
plt.show()
plt.savefig("f1_score_vs_epochs_first_try.png")  # Save the current plot


## Step 14: Print out the final evaluation metrics

In [None]:
# Test the model
model.eval()
with torch.no_grad():
    test_predictions, test_labels = [], []
    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        predictions = model(texts).squeeze()
        predictions = torch.round(torch.sigmoid(predictions))
        test_predictions.extend(predictions.cpu().numpy())
        test_labels.extend(labels.cpu().numpy())
        
test_accuracy = accuracy_score(test_labels, test_predictions)
test_precision = precision_score(test_labels, test_predictions)
test_recall = recall_score(test_labels, test_predictions)
test_f1 = f1_score(test_labels, test_predictions)

print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Precision: {test_precision:.4f}")
print(f"Test Recall: {test_recall:.4f}")
print(f"Test F1 Score: {test_f1:.4f}")