### Runtime & Environment Flags

In [None]:
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128'
os.environ['CUDA_LAUNCH_BLOCKING'] = '1' 

### Core Imports

In [None]:
import pandas as pd
import math
import numpy as np
import torch 
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import (
    RobertaTokenizer, DebertaV2Model, DebertaV2Tokenizer,
    RobertaModel, AdamW, get_linear_schedule_with_warmup
)
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support,
    roc_curve, auc, precision_recall_curve, roc_auc_score, average_precision_score, classification_report,
    confusion_matrix, ConfusionMatrixDisplay
)
from sklearn.model_selection import StratifiedKFold, train_test_split
import matplotlib.pyplot as plt
import copy
from collections import Counter
from sklearn.utils.class_weight import compute_class_weight

### Data Loading & Basic Cleaning

In [None]:
df = pd.read_csv('dataset.csv')

# Fill NaN values in 'Extracted Code' column with empty strings
df['Extracted Code'].fillna('', inplace=True)

### Custom PyTorch Dataset (Text + Code)

In [None]:
class TextCodeDataset(Dataset):
    def __init__(self, texts, codes, labels, tokenizer_text, tokenizer_code, max_length=512):
        # Initialize dataset with texts, codes, labels, and tokenizers for each modality
        self.texts = texts
        self.codes = codes
        self.labels = labels
        self.tokenizer_text = tokenizer_text
        self.tokenizer_code = tokenizer_code
        self.max_length = max_length  # Maximum token length for padding/truncation

    def __len__(self):
        # Return total number of samples in the dataset
        return len(self.texts)

    def __getitem__(self, idx):
        # Retrieve the text, code, and label at the given index
        text = str(self.texts[idx])   # Ensure text is a string
        code = str(self.codes[idx])   # Ensure code is a string
        label = self.labels[idx]

        # Tokenize text input using the text tokenizer
        text_inputs = self.tokenizer_text(
            text, padding='max_length', truncation=True,
            max_length=self.max_length, return_tensors="pt"
        )

        # Tokenize code input using the code tokenizer
        code_inputs = self.tokenizer_code(
            code, padding='max_length', truncation=True,
            max_length=self.max_length, return_tensors="pt"
        )

        # Extract token IDs and attention masks, remove extra batch dimension
        input_ids_text = text_inputs['input_ids'].squeeze()
        attention_mask_text = text_inputs['attention_mask'].squeeze()
        input_ids_code = code_inputs['input_ids'].squeeze()
        attention_mask_code = code_inputs['attention_mask'].squeeze()

        # Return a dictionary of tokenized inputs and corresponding label
        return {
            'input_ids_text': input_ids_text,
            'attention_mask_text': attention_mask_text,
            'input_ids_code': input_ids_code,
            'attention_mask_code': attention_mask_code,
            'labels': torch.tensor(label, dtype=torch.long)  # Convert label to tensor
        }


### Dual-Encoder Model (DeBERTa-v3 for text + CodeBERT for code)

In [None]:
class CombinedModel(nn.Module):
    def __init__(self):
        super(CombinedModel, self).__init__()

        # Load pretrained transformer models for text and code
        self.text_model = DebertaV2Model.from_pretrained('microsoft/deberta-v3-base')
        self.code_model = RobertaModel.from_pretrained('microsoft/codebert-base')

        # Get hidden sizes from each model's configuration
        text_hidden_size = self.text_model.config.hidden_size
        code_hidden_size = self.code_model.config.hidden_size

        # Align code model's hidden dimension to match text model's hidden size
        self.adjust_code_hidden_size = nn.Linear(code_hidden_size, text_hidden_size)

        # Combined hidden size after concatenating text and (aligned) code representations
        combined_hidden_size = text_hidden_size + text_hidden_size  # after aligning code -> text size

        # Dropout layer for regularization
        self.dropout = nn.Dropout(0.1)

        # Multi-layer feedforward classifier network
        self.classifier = nn.Sequential(
            nn.Linear(combined_hidden_size, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 512)
        )

        # Skip connection to directly project combined embeddings
        self.skip_linear = nn.Linear(combined_hidden_size, 512)

        # Final classifier takes concatenated outputs from both classifier and skip path
        self.final_classifier = nn.Linear(1024, 2)  # concatenate classifier_output (512) + skip_output (512)

    def forward(self, input_ids_text, attention_mask_text, input_ids_code, attention_mask_code=None):
        # Forward pass through text and code transformer encoders
        outputs_text = self.text_model(input_ids=input_ids_text, attention_mask=attention_mask_text)
        outputs_code = self.code_model(input_ids=input_ids_code, attention_mask=attention_mask_code)

        # Extract [CLS] token embeddings from both models
        pooled_output_text = outputs_text.last_hidden_state[:, 0]
        pooled_output_code = self.adjust_code_hidden_size(outputs_code.last_hidden_state[:, 0])

        # Concatenate text and code embeddings along the feature dimension
        combined_output = torch.cat((pooled_output_text, pooled_output_code), dim=1)
        combined_output = self.dropout(combined_output)

        # Pass through the main classifier and the skip connection
        classifier_output = self.classifier(combined_output)
        skip_output = self.skip_linear(combined_output)

        # Concatenate both outputs for the final classification stage
        final_input = torch.cat((classifier_output, skip_output), dim=1)

        # Compute final logits (binary classification output)
        logits = self.final_classifier(final_input)
        return logits


### Tokenizer Initialization

In [None]:
# Load pretrained tokenizer for natural language text (DeBERTa v3)
tokenizer_deberta = DebertaV2Tokenizer.from_pretrained('microsoft/deberta-v3-base')

# Load pretrained tokenizer for source code (CodeBERT)
tokenizer_codebert = RobertaTokenizer.from_pretrained('microsoft/codebert-base')


### Training & Evaluation Utilities

In [None]:
def train(model, train_loader, optimizer, scheduler, class_weights):
    # Set model to training mode
    model.train()
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0

    # Define loss function with class weighting to handle imbalance
    loss_fn = nn.CrossEntropyLoss(weight=class_weights)

    # Iterate through training batches
    for batch_idx, batch in enumerate(train_loader):
        # Reset gradients before each batch
        optimizer.zero_grad()

        # Move inputs to device (GPU/CPU), excluding labels
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        labels = batch['labels'].to(device)

        # Forward pass through the model
        outputs = model(**inputs)

        # Get predicted class indices
        _, preds = torch.max(outputs, dim=1)

        # Update accuracy metrics
        correct_predictions += torch.sum(preds == labels).item()
        total_predictions += labels.size(0)

        # Compute loss
        loss = loss_fn(outputs, labels)

        # Backpropagation
        loss.backward()
        total_loss += loss.item()

        # Update model parameters and learning rate
        optimizer.step()
        scheduler.step()

        # Log progress every 10 batches
        if (batch_idx + 1) % 10 == 0:
            print(f'Batch {batch_idx + 1}/{len(train_loader)} - Training Loss: {loss.item():.4f}')

    # Compute average loss and overall accuracy
    average_loss = total_loss / len(train_loader)
    train_accuracy = correct_predictions / total_predictions

    return average_loss, train_accuracy


def evaluate(model, val_loader):
    # Set model to evaluation mode (disables dropout, gradient tracking)
    model.eval()
    total_loss = 0
    predictions, true_labels = [], []

    # Disable gradient calculation for faster evaluation
    with torch.no_grad():
        for batch_idx, batch in enumerate(val_loader):
            # Move inputs and labels to device
            inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
            labels = batch['labels'].to(device)

            # Forward pass through model
            outputs = model(**inputs)

            # Compute validation loss
            loss = nn.CrossEntropyLoss()(outputs, labels)
            total_loss += loss.item()

            # Convert predictions to class indices and move to CPU
            preds = torch.argmax(outputs, dim=1).cpu().numpy()

            # Collect predictions and true labels for metrics
            predictions.extend(preds)
            true_labels.extend(labels.cpu().numpy())

            # Log progress every 10 batches
            if (batch_idx + 1) % 10 == 0:
                print(f'Batch {batch_idx + 1}/{len(val_loader)} - Validation Loss: {loss.item():.4f}')

    # Compute performance metrics
    accuracy = accuracy_score(true_labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(
        true_labels, predictions, average='binary', zero_division=0
    )

    # Return average loss and evaluation metrics
    return total_loss / len(val_loader), accuracy, precision, recall, f1, predictions, true_labels


### Plotting Helpers (Loss, Accuracy, ROC, PR)

In [None]:
def plot_training_validation_loss(train_losses, val_losses):
    # Generate a list of epoch numbers for the x-axis
    epochs = range(1, len(train_losses) + 1)

    # Create a new figure for the loss plot
    plt.figure(figsize=(10, 6))

    # Plot as line graphs if multiple epochs are present
    if len(train_losses) > 1:
        plt.plot(epochs, train_losses, label='Training Loss')
        plt.plot(epochs, val_losses, label='Validation Loss')
    else:
        # Use scatter plot if only a single epoch (for clarity)
        plt.scatter(epochs, train_losses, label='Training Loss')
        plt.scatter(epochs, val_losses, label='Validation Loss')
        plt.xlim(0.5, 1.5)  # Adjust x-axis limits for single-point display

    # Add title, labels, ticks, and legend
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs'); plt.ylabel('Loss')
    plt.xticks(epochs); plt.legend(); plt.show()


def plot_training_validation_accuracy(train_accuracies, val_accuracies):
    # Generate a list of epoch numbers for the x-axis
    epochs = range(1, len(train_accuracies) + 1)

    # Create a new figure for the accuracy plot
    plt.figure(figsize=(10, 6))

    # Plot line or scatter based on the number of epochs
    if len(train_accuracies) > 1:
        plt.plot(epochs, train_accuracies, label='Training Accuracy')
        plt.plot(epochs, val_accuracies, label='Validation Accuracy')
    else:
        # For single epoch, use scatter plot to mark points
        plt.scatter(epochs, train_accuracies, label='Training Accuracy')
        plt.scatter(epochs, val_accuracies, label='Validation Accuracy')
        plt.xlim(0.5, 1.5)  # Adjust x-axis limits

    # Add title, labels, and legend for readability
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs'); plt.ylabel('Accuracy')
    plt.xticks(epochs); plt.legend(); plt.show()


def plot_auc_roc_curve(true_labels, predictions):
    # Compute False Positive Rate, True Positive Rate, and thresholds
    fpr, tpr, _ = roc_curve(true_labels, predictions)

    # Calculate the Area Under the Curve (AUC)
    roc_auc = auc(fpr, tpr)

    # Create a new figure for the ROC curve
    plt.figure(figsize=(10, 6))

    # Plot the ROC curve and the diagonal reference line
    plt.plot(fpr, tpr, lw=2, label=f'ROC (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], lw=2, linestyle='--')

    # Set axis limits and labels
    plt.xlim([0.0, 1.0]); plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate'); plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right"); plt.show()


def plot_precision_recall_curve(true_labels, predictions):
    # Compute precision, recall, and thresholds
    precision, recall, _ = precision_recall_curve(true_labels, predictions)

    # Calculate the Area Under the Precision-Recall Curve (AUC)
    pr_auc = auc(recall, precision)

    # Create a new figure for the Precision-Recall curve
    plt.figure(figsize=(10, 6))

    # Plot Precision vs Recall curve
    plt.plot(recall, precision, lw=2, label=f'PR (AUC = {pr_auc:.2f})')

    # Add axis labels, title, and legend
    plt.xlabel('Recall'); plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.legend(loc="upper right"); plt.show()


### Saving Model & Tokenizers

In [None]:
def save_model_and_tokenizer(model, tokenizer_text, tokenizer_code, path='.'):
    # Create the target directory if it doesn't already exist
    if not os.path.exists(path):
        os.makedirs(path)

    # Save the model's state dictionary (weights and biases)
    torch.save(model.state_dict(), os.path.join(path, 'model_state_dict.pt'))

    # Save the text tokenizer configuration and vocabulary
    tokenizer_text.save_pretrained(os.path.join(path, 'tokenizer_text'))

    # Save the code tokenizer configuration and vocabulary
    tokenizer_code.save_pretrained(os.path.join(path, 'tokenizer_code'))

    # Confirm save operation completion
    print(f'Model state dictionary and tokenizers saved to {path}')


### Hyperparameters & Device Selection

In [None]:
EPOCHS = 30
BATCH_SIZE = 16
LEARNING_RATE = 2e-5
MAX_LENGTH = 512
EARLY_STOPPING_PATIENCE = 7
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

### K-Fold Cross-Validation (Model Selection + Plots)

In [None]:
def cross_validate(dataset, class_weights, k_folds=10):
    # Stratified K-Fold to preserve label distribution across folds
    skf = StratifiedKFold(n_splits=k_folds, shuffle=True)

    # Track global best model metrics across all folds
    best_auc = 0
    best_model_info = {}

    # To store curves for the overall best model
    fold_train_losses = []
    fold_val_losses = []
    fold_train_accuracies = []
    fold_val_accuracies = []

    # Split indices for each fold using labels for stratification
    for fold, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(dataset)), dataset.labels)):
        print(f'Fold {fold+1}/{k_folds}')

        # Initialize a new model and optimizer per fold
        model = CombinedModel().to(device)
        optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

        # Samplers for train/validation subsets
        train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
        val_subsampler = torch.utils.data.SubsetRandomSampler(val_idx)

        # DataLoaders using the samplers
        train_loader = DataLoader(dataset, batch_size=BATCH_SIZE, sampler=train_subsampler)
        val_loader = DataLoader(dataset, batch_size=BATCH_SIZE, sampler=val_subsampler)

        # Free unused GPU memory and set up LR scheduler
        torch.cuda.empty_cache()
        scheduler = get_linear_schedule_with_warmup(
            optimizer, num_warmup_steps=0, num_training_steps=len(train_loader) * EPOCHS
        )

        # Per-epoch logs for this fold
        epoch_train_losses = []
        epoch_val_losses = []
        epoch_train_accuracies = []
        epoch_val_accuracies = []

        # Best trackers within the fold (for early stopping & best snapshot)
        best_val_loss = float('inf')
        best_val_accuracy = 0
        early_stopping_counter = 0
        best_fold_metrics = {
            'epoch': 0, 'val_accuracy': 0, 'precision': 0, 'recall': 0, 'f1': 0,
            'train_loss': 0, 'train_accuracy': 0, 'val_loss': 0, 'auc_score': 0
        }
        best_true_labels = []
        best_predictions = []

        torch.cuda.empty_cache()

        # Train/validate over epochs for this fold
        for epoch in range(EPOCHS):
            # One epoch of training and evaluation
            train_loss, train_accuracy = train(model, train_loader, optimizer, scheduler, class_weights)
            val_loss, val_accuracy, val_precision, val_recall, val_f1, predictions, true_labels = evaluate(model, val_loader)

            torch.cuda.empty_cache()

            # Log epoch metrics
            epoch_train_losses.append(train_loss)
            epoch_val_losses.append(val_loss)
            epoch_train_accuracies.append(train_accuracy)
            epoch_val_accuracies.append(val_accuracy)

            # Compute AUC for model selection within the fold
            epoch_auc_score = roc_auc_score(true_labels, predictions)

            # Console summary for this epoch
            print(
                f'Fold {fold+1}, Epoch {epoch+1}, '
                f'Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, '
                f'Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}, '
                f'Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}, '
                f'AUC: {epoch_auc_score:.4f}'
            )

            torch.cuda.empty_cache()

            # Improvement check: prioritize lower val loss; tie-break with higher AUC
            improved = (val_loss < best_val_loss) or (val_loss <= best_val_loss and epoch_auc_score > best_fold_metrics['auc_score'])
            if improved:
                # Update best fold snapshot
                best_val_loss = min(val_loss, best_val_loss)
                best_fold_metrics = {
                    'epoch': epoch + 1,
                    'val_accuracy': val_accuracy,
                    'precision': val_precision,
                    'recall': val_recall,
                    'f1': val_f1,
                    'train_loss': train_loss,
                    'train_accuracy': train_accuracy,
                    'val_loss': val_loss,
                    'auc_score': epoch_auc_score
                }
                best_model_wts = copy.deepcopy(model.state_dict())
                best_true_labels = true_labels
                best_predictions = predictions
                early_stopping_counter = 0
            else:
                # Increment early stopping counter and break if patience exceeded
                early_stopping_counter += 1
                if early_stopping_counter >= EARLY_STOPPING_PATIENCE:
                    print(f'Early stopping at epoch {epoch+1}')
                    break

        # Plot full-epoch curves for this fold
        print("Training-Validation Loss Curve for all epochs (this fold)")
        plot_training_validation_loss(epoch_train_losses, epoch_val_losses)

        print("Training-Validation Accuracy Curve for all epochs (this fold)")
        plot_training_validation_accuracy(epoch_train_accuracies, epoch_val_accuracies)

        # Plot curves truncated up to the best epoch for clarity
        best_epoch = best_fold_metrics['epoch']
        print(f"Curves up to best epoch ({best_epoch})")
        plot_training_validation_loss(epoch_train_losses[:best_epoch], epoch_val_losses[:best_epoch])
        plot_training_validation_accuracy(epoch_train_accuracies[:best_epoch], epoch_val_accuracies[:best_epoch])

        # Plot ROC and PR curves for the best snapshot in this fold
        print("AUC-ROC curve for the fold-wise best model")
        plot_auc_roc_curve(best_true_labels, best_predictions)

        print("Precision-Recall curve for the fold-wise best model")
        plot_precision_recall_curve(best_true_labels, best_predictions)

        # Fold summary of best metrics
        print(
            f"Fold {fold+1} — Best Epoch: {best_fold_metrics['epoch']}, "
            f"AUC: {best_fold_metrics['auc_score']:.4f}, "
            f"Train Loss: {best_fold_metrics['train_loss']:.4f}, Val Loss: {best_fold_metrics['val_loss']:.4f}, "
            f"Train Acc: {best_fold_metrics['train_accuracy']:.4f}, Val Acc: {best_fold_metrics['val_accuracy']:.4f}, "
            f"Precision: {best_fold_metrics['precision']:.4f}, Recall: {best_fold_metrics['recall']:.4f}, "
            f"F1: {best_fold_metrics['f1']:.4f}"
        )

        torch.cuda.empty_cache()

        # Update global best model across folds:
        # Prefer lower val loss; tie-break on higher AUC
        if (best_fold_metrics['val_loss'] < best_model_info.get('val_loss', float('inf'))) or \
           (best_fold_metrics['val_loss'] <= best_model_info.get('val_loss', float('inf')) and
            best_fold_metrics['auc_score'] > best_model_info.get('auc', 0)):
            best_auc = best_fold_metrics['auc_score']
            best_model_info = {
                'state_dict': best_model_wts,
                'fold': fold,
                'epoch': best_fold_metrics['epoch'],
                'auc': best_auc,
                'val_loss': best_fold_metrics['val_loss'],
                'performance': (
                    best_fold_metrics['train_accuracy'],
                    best_fold_metrics['val_accuracy'],
                    best_fold_metrics['precision'],
                    best_fold_metrics['recall'],
                    best_fold_metrics['f1']
                ),
                'train_losses': epoch_train_losses,
                'val_losses': epoch_val_losses,
                'train_accuracies': epoch_train_accuracies,
                'val_accuracies': epoch_val_accuracies
            }
            # Store data for final overall plots
            fold_train_losses = epoch_train_losses
            fold_val_losses = epoch_val_losses
            fold_train_accuracies = epoch_train_accuracies
            fold_val_accuracies = epoch_val_accuracies
            fold_true_labels = best_true_labels
            fold_predictions = best_predictions

    # Final cleanup before overall plots
    torch.cuda.empty_cache()

    # Plot curves for the overall best model across folds (all epochs)
    print("Train-Val Loss Curve for all epochs — Overall Best Model")
    plot_training_validation_loss(fold_train_losses, fold_val_losses)

    print("Train-Val Accuracy Curve for all epochs — Overall Best Model")
    plot_training_validation_accuracy(fold_train_accuracies, fold_val_accuracies)

    # Plot curves truncated to the best epoch for the overall best model
    print("Train-Val Loss curve up to best epoch — Overall Best Model")
    plot_training_validation_loss(fold_train_losses[:best_model_info['epoch']], fold_val_losses[:best_model_info['epoch']])

    print("Train-Val Accuracy curve up to best epoch — Overall Best Model")
    plot_training_validation_accuracy(fold_train_accuracies[:best_model_info['epoch']], fold_val_accuracies[:best_model_info['epoch']])

    # Plot PR and ROC for the overall best model
    print("Precision-Recall curve — Overall Best Model")
    plot_precision_recall_curve(fold_true_labels, fold_predictions)

    print("AUC-ROC curve — Overall Best Model")
    plot_auc_roc_curve(fold_true_labels, fold_predictions)

    # Save & package best model: persist state dict and full artifacts (model + tokenizers)
    torch.save(best_model_info['state_dict'], 'deberta_codebert.pt')
    model.load_state_dict(torch.load('deberta_codebert.pt'))
    save_model_and_tokenizer(model, tokenizer_deberta, tokenizer_codebert, 'sm_deberta_codebert/deberta_codebert')

    torch.cuda.empty_cache()

    # Final summary of which fold/epoch produced the best model
    print(f"Best model from fold {best_model_info['fold']+1} with AUC: {best_auc:.4f} at epoch {best_model_info['epoch']} saved.")
    return best_model_info['performance']


### Train/Val Split, Class Weights & Dataset

In [None]:
# Split the full dataframe into training and test sets
# - 10% of the data is reserved for testing
# - Stratified split ensures class balance is preserved (based on 'TDR' column)
# - Random seed fixed for reproducibility
train_df, test_df = train_test_split(
    df, test_size=0.1, stratify=df['TDR'], random_state=42
)

# Extract class labels from the training data
labels = train_df['TDR'].tolist()

# Compute class weights to handle imbalanced classification
# 'balanced' mode assigns weights inversely proportional to class frequencies
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(labels),
    y=labels
)

# Convert computed weights into a PyTorch tensor and move to the correct device (GPU/CPU)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)

# Save the held-out test subset for future evaluation or reproducibility
test_df.to_csv('test_deberta_codebert.csv', index=False)

# Build the custom dataset for training
# - Text input comes from column 'TB'
# - Code input comes from column 'Extracted Code'
# - Labels are from column 'TDR'
dataset = TextCodeDataset(
    train_df['TB'].tolist(),
    train_df['Extracted Code'].tolist(),
    train_df['TDR'].tolist(),
    tokenizer_deberta,
    tokenizer_codebert,
    MAX_LENGTH
)


### Run Cross-Validation & Final Report

In [None]:
# Perform k-fold cross-validation on the prepared dataset
# This will train and evaluate the model across multiple folds,
# returning the performance metrics (train acc, val acc, precision, recall, F1)
fold_performance = cross_validate(dataset, class_weights_tensor)

# Display the overall best model's performance metrics after cross-validation
print(
    f'Overall Performance: '
    f'Train Acc: {fold_performance[0]:.4f}, '
    f'Val Acc: {fold_performance[1]:.4f}, '
    f'Precision: {fold_performance[2]:.4f}, '
    f'Recall: {fold_performance[3]:.4f}, '
    f'F1: {fold_performance[4]:.4f}'
)


### Load Trained Weights

In [None]:
# Initialize a new instance of the combined DeBERTa + CodeBERT model
model = CombinedModel().to(device)

# Load the previously saved model weights into the initialized model
# 'map_location=device' ensures compatibility with the current hardware (CPU or GPU)
model.load_state_dict(torch.load('deberta_codebert.pt', map_location=device))

# Set the model to evaluation mode (disables dropout, gradient updates, etc.)
model.eval()

# Confirm successful model loading
print("Model loaded successfully and set to evaluation mode.")


### Prepare the Test Dataset & DataLoader

In [None]:
# Load the held-out test split from the CSV file saved earlier
test_df = pd.read_csv('test_deberta_codebert.csv')

# Build a dataset for the test data
# - 'TB' column contains the natural language text
# - 'Extracted Code' column contains the source code snippets
# - 'TDR' column provides the ground truth labels
test_dataset = TextCodeDataset(
    test_df['TB'].tolist(),
    test_df['Extracted Code'].tolist(),
    test_df['TDR'].tolist(),
    tokenizer_deberta,
    tokenizer_codebert,
    MAX_LENGTH
)

# Create a DataLoader for efficient batched inference 
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Confirm successful preparation of the test dataset
print(f"Test set prepared with {len(test_dataset)} samples.")


### Run Inference on the Test Set

In [None]:
# Initialize empty lists to collect ground-truth labels, predicted classes, and prediction scores
all_labels = []
all_preds = []
all_scores = []

# Define softmax function to convert logits into probability distributions
softmax = nn.Softmax(dim=1)

# Disable gradient computation for efficient inference
with torch.no_grad():
    # Iterate through the test DataLoader in batches
    for batch in test_loader:
        # Move all inputs (except labels) to the target device (CPU/GPU)
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        labels = batch['labels'].to(device)

        # Forward pass through the model to obtain raw logits
        logits = model(**inputs)

        # Apply softmax to convert logits to probabilities
        probs = softmax(logits)

        # Get predicted class indices (0 or 1)
        preds = torch.argmax(probs, dim=1).cpu().numpy()

        # Extract probabilities for the positive class (index 1)
        scores = probs[:, 1].cpu().numpy()

        # Store labels, predictions, and positive-class probabilities
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds)
        all_scores.extend(scores)

# Confirm that inference has finished successfully
print("Inference complete.")


### Evaluate Model Performance

In [None]:
# Compute key classification metrics using true and predicted labels
accuracy = accuracy_score(all_labels, all_preds)

# Calculate precision, recall, and F1 score for the binary classification task
# 'zero_division=0' avoids division errors when there are no positive predictions
precision, recall, f1, _ = precision_recall_fscore_support(
    all_labels, all_preds, average='binary', zero_division=0
)

# Compute ROC AUC using true labels and positive-class prediction scores
roc_auc = roc_auc_score(all_labels, all_scores)

# Compute Average Precision (area under the precision-recall curve)
ap_score = average_precision_score(all_labels, all_scores)

# Display the calculated performance metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")
print(f"Average Precision (PR AUC): {ap_score:.4f}")

# Print a detailed classification report showing per-class metrics
print("\nClassification Report:")
print(classification_report(all_labels, all_preds, zero_division=0))


### Visualize Confusion Matrix

In [None]:
# Compute the confusion matrix comparing true vs. predicted labels
conf_matrix = confusion_matrix(all_labels, all_preds)

# Create a visualization object for the confusion matrix
disp = ConfusionMatrixDisplay(conf_matrix)

# Plot the confusion matrix using a blue color map for better contrast
disp.plot(cmap=plt.cm.Blues)

# Add a descriptive title for clarity
plt.title("Confusion Matrix — Test Set")

# Display the plot
plt.show()


### Save Predictions

In [None]:
# Add model prediction labels to the test DataFrame
test_df['pred_label'] = all_preds

# Add predicted positive-class probabilities (confidence scores) to the DataFrame
test_df['pred_score'] = all_scores

# Save the updated DataFrame containing predictions and scores to a CSV file
test_df.to_csv('test_predictions.csv', index=False)

# Confirm successful export of prediction results
print("Predictions saved to 'test_predictions.csv'.")