In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, random_split
import numpy as np
import random
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, average_precision_score, matthews_corrcoef, balanced_accuracy_score
import pandas as pd
import seaborn as sns

# Set random seeds for reproducibility
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()

# Define the device for computation
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Mount Google Drive to save and load the model (if using Google Colab)
try:
    from google.colab import drive
    drive.mount('/content/drive')
    base_path = '/content/drive/My Drive/FYP'
except ImportError:
    base_path = '.'  # Use current directory if not in Colab

# Define the folder to save model checkpoints
date_str = datetime.now().strftime('%Y%m%d')
checkpoint_folder = f'{base_path}/VGGModel/HQ3latest_{date_str}/'
os.makedirs(checkpoint_folder, exist_ok=True)

# Data Augmentation for Training Set
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),  # VGG expects 224x224 input size
    transforms.RandomHorizontalFlip(p=0.5),  # Flip left/right
    transforms.RandomVerticalFlip(p=0.3),  # Flip up/down
    transforms.RandomRotation(degrees=10),  # Small rotation
    transforms.RandomAffine(degrees=0, translate=(0.05, 0.05)),  # Slight shifts
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),  # Small color changes
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

# Simple resizing for validation and test sets
transform_val_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

# Load datasets
data_path = f'{base_path}/Dataset/HQ3/train'
test_data_path = f'{base_path}/Dataset/HQ3/test'
train_dataset = datasets.ImageFolder(data_path, transform=transform_train)
test_dataset = datasets.ImageFolder(test_data_path, transform=transform_val_test)

# Split the dataset into 95% training and 5% validation
train_size = int(0.7 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Set validation set transformation explicitly
val_dataset.dataset.transform = transform_val_test

# Create data loaders
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

# Use Pre-trained VGG-16 model and modify it for binary classification
class VGG16Modified(nn.Module):
    def __init__(self, num_classes=2, dropout_rate=0.4):
        super(VGG16Modified, self).__init__()
        from torchvision.models import vgg16, VGG16_Weights

        # Load pre-trained VGG16 model
        self.vgg = vgg16(weights=VGG16_Weights.IMAGENET1K_V1)

        # Replace the classifier with a custom classification layer
        num_ftrs = self.vgg.classifier[6].in_features
        self.vgg.classifier[6] = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(num_ftrs, num_classes)
        )

    def forward(self, x):
        return self.vgg(x)

# Initialize the VGG model
model = VGG16Modified().to(device)

# Define loss function and optimizer with L2 regularization (weight decay)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3, verbose=True)

def calculate_metrics(y_true, y_pred, y_scores):
    """Calculate various classification metrics."""
    metrics = {}
    metrics['accuracy'] = accuracy_score(y_true, y_pred) * 100
    metrics['balanced_accuracy'] = balanced_accuracy_score(y_true, y_pred)
    metrics['precision'] = precision_score(y_true, y_pred, average='binary')
    metrics['recall'] = recall_score(y_true, y_pred, average='binary')
    metrics['f1'] = f1_score(y_true, y_pred, average='binary')
    metrics['mcc'] = matthews_corrcoef(y_true, y_pred)

    # For ROC-AUC and PR-AUC, we need probabilities
    # Ensure y_scores are probabilities of the positive class
    metrics['roc_auc'] = roc_auc_score(y_true, y_scores)
    metrics['pr_auc'] = average_precision_score(y_true, y_scores)

    return metrics

def evaluate(model, loader, criterion):
    """Evaluate model and return predictions, true labels, losses and metrics."""
    model.eval()
    all_preds = []
    all_labels = []
    all_scores = []
    total_loss = 0.0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            # Get predicted class and probability scores
            probs = torch.softmax(outputs, dim=1)
            pos_probs = probs[:, 1].cpu().numpy()  # Probability of positive class
            _, predicted = torch.max(outputs.data, 1)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_scores.extend(pos_probs)

    # Calculate metrics
    metrics = calculate_metrics(all_labels, all_preds, all_scores)
    metrics['loss'] = total_loss / len(loader)

    return metrics, all_preds, all_labels, all_scores

def train(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs=30):
    """Train the model and track metrics."""
    # Initialize lists to store metrics
    metrics_df = pd.DataFrame(columns=['epoch', 'training_loss', 'val_loss', 'val_accuracy',
                                       'val_balanced_accuracy', 'val_precision', 'val_recall',
                                       'val_f1', 'val_mcc', 'val_roc_auc', 'val_pr_auc'])

    best_val_acc = 0.0
    checkpoint_model_path = os.path.join(checkpoint_folder, f"checkpoint_model_vgg_{date_str}.pth")
    best_model_path = os.path.join(checkpoint_folder, f"best_model_vgg_{date_str}.pth")

    for epoch in range(epochs):
        # Training phase
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=2.0)
            optimizer.step()

            running_loss += loss.item()

        # Calculate training loss
        train_loss = running_loss / len(train_loader)

        # Validation phase
        val_metrics, _, _, _ = evaluate(model, val_loader, criterion)
        val_loss = val_metrics['loss']
        val_acc = val_metrics['accuracy']

        # Update learning rate scheduler
        scheduler.step(val_acc)

        # Save checkpoint
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'train_loss': train_loss,
            'val_loss': val_loss,
            'val_acc': val_acc
        }, checkpoint_model_path)

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), best_model_path)
            print(f"New best model saved with validation accuracy: {val_acc:.2f}%")

        # Print epoch results
        print(f"Epoch [{epoch+1}/{epochs}] - "
              f"Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}, "
              f"Val Acc: {val_acc:.2f}%, "
              f"Val F1: {val_metrics['f1']:.4f}, "
              f"Val ROC-AUC: {val_metrics['roc_auc']:.4f}")

        # Add metrics to dataframe
        metrics_df = pd.concat([metrics_df, pd.DataFrame({
            'epoch': [epoch + 1],
            'training_loss': [train_loss],
            'val_loss': [val_loss],
            'val_accuracy': [val_metrics['accuracy']],
            'val_balanced_accuracy': [val_metrics['balanced_accuracy']],
            'val_precision': [val_metrics['precision']],
            'val_recall': [val_metrics['recall']],
            'val_f1': [val_metrics['f1']],
            'val_mcc': [val_metrics['mcc']],
            'val_roc_auc': [val_metrics['roc_auc']],
            'val_pr_auc': [val_metrics['pr_auc']]
        })], ignore_index=True)

    # Save metrics to CSV
    metrics_csv_path = os.path.join(checkpoint_folder, f"metrics_{date_str}.csv")
    metrics_df.to_csv(metrics_csv_path, index=False)
    print(f"Training metrics saved to {metrics_csv_path}")

    # Plot and save training curves
    plot_training_curves(metrics_df, checkpoint_folder)

    # Plot confusion matrix for best model
    model.load_state_dict(torch.load(best_model_path))
    val_metrics, val_preds, val_labels, _ = evaluate(model, val_loader, criterion)
    plot_confusion_matrix(val_labels, val_preds, ['Normal', 'Tumor'], checkpoint_folder, "validation")

    return metrics_df

def plot_training_curves(metrics_df, save_folder):
    """Plot and save training curves."""
    plt.figure(figsize=(20, 15))

    # Plot 1: Training vs Validation Loss
    plt.subplot(2, 2, 1)
    plt.plot(metrics_df['epoch'], metrics_df['training_loss'], 'b-', label='Training Loss')
    plt.plot(metrics_df['epoch'], metrics_df['val_loss'], 'r-', label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training vs Validation Loss')
    plt.legend()
    plt.grid(True)

    # Plot 2: Validation Accuracy
    plt.subplot(2, 2, 2)
    plt.plot(metrics_df['epoch'], metrics_df['val_accuracy'], 'g-')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.title('Validation Accuracy')
    plt.grid(True)

    # Plot 3: Validation F1, Precision, Recall
    plt.subplot(2, 2, 3)
    plt.plot(metrics_df['epoch'], metrics_df['val_f1'], 'c-', label='F1 Score')
    plt.plot(metrics_df['epoch'], metrics_df['val_precision'], 'm-', label='Precision')
    plt.plot(metrics_df['epoch'], metrics_df['val_recall'], 'y-', label='Recall')
    plt.xlabel('Epochs')
    plt.ylabel('Score')
    plt.title('Validation F1, Precision, Recall')
    plt.legend()
    plt.grid(True)

    # Plot 4: Validation ROC-AUC and PR-AUC
    plt.subplot(2, 2, 4)
    plt.plot(metrics_df['epoch'], metrics_df['val_roc_auc'], 'b-', label='ROC-AUC')
    plt.plot(metrics_df['epoch'], metrics_df['val_pr_auc'], 'r-', label='PR-AUC')
    plt.plot(metrics_df['epoch'], metrics_df['val_mcc'], 'g-', label='MCC')
    plt.xlabel('Epochs')
    plt.ylabel('Score')
    plt.title('Validation ROC-AUC, PR-AUC, and MCC')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.savefig(os.path.join(save_folder, f"training_curves_{date_str}.png"), dpi=300)
    plt.show()

def plot_confusion_matrix(y_true, y_pred, class_names, save_folder, dataset_name):
    """Plot and save confusion matrix."""
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'Confusion Matrix - {dataset_name.capitalize()} Set')
    plt.tight_layout()
    plt.savefig(os.path.join(save_folder, f"confusion_matrix_{dataset_name}_{date_str}.png"), dpi=300)
    plt.show()

def test_model(model, test_loader, criterion):
    """Evaluate the model on the test set and visualize results."""
    print("\nEvaluating model on test set...")
    test_metrics, test_preds, test_labels, test_scores = evaluate(model, test_loader, criterion)

    # Print test metrics
    print(f"\nTest Set Metrics:")
    print(f"Loss: {test_metrics['loss']:.6f}")
    print(f"Accuracy: {test_metrics['accuracy']:.2f}%")
    print(f"Balanced Accuracy: {test_metrics['balanced_accuracy']:.4f}")
    print(f"Precision: {test_metrics['precision']:.4f}")
    print(f"Recall: {test_metrics['recall']:.4f}")
    print(f"F1 Score: {test_metrics['f1']:.4f}")
    print(f"MCC: {test_metrics['mcc']:.4f}")
    print(f"ROC-AUC: {test_metrics['roc_auc']:.4f}")
    print(f"PR-AUC: {test_metrics['pr_auc']:.4f}")

    # Plot confusion matrix for test set
    plot_confusion_matrix(test_labels, test_preds, ['Normal', 'Tumor'], checkpoint_folder, "test")

    # Save test metrics to CSV
    test_metrics_df = pd.DataFrame({metric: [value] for metric, value in test_metrics.items()})
    test_metrics_df.to_csv(os.path.join(checkpoint_folder, f"test_metrics_{date_str}.csv"), index=False)

    return test_metrics

# ------------------- Main Training & Testing Routine -------------------
if __name__ == "__main__":
    # Train the model and collect metrics
    metrics_df = train(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs=30)

    # Load best model for testing
    best_model_path = os.path.join(checkpoint_folder, f"best_model_vgg_{date_str}.pth")
    model.load_state_dict(torch.load(best_model_path))

    # Evaluate the model on the test set
    test_metrics = test_model(model, test_loader, criterion)

    print(f"\nTraining and evaluation complete. Results saved to {checkpoint_folder}")

Using device: cpu
Mounted at /content/drive


Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth
100%|██████████| 528M/528M [00:08<00:00, 69.1MB/s]


New best model saved with validation accuracy: 67.84%
Epoch [1/30] - Train Loss: 0.388848, Val Loss: 0.845492, Val Acc: 67.84%, Val F1: 0.7638, Val ROC-AUC: 0.9564


  metrics_df = pd.concat([metrics_df, pd.DataFrame({


New best model saved with validation accuracy: 97.80%
Epoch [2/30] - Train Loss: 0.140219, Val Loss: 0.049306, Val Acc: 97.80%, Val F1: 0.9867, Val ROC-AUC: 0.9997
Epoch [3/30] - Train Loss: 0.113170, Val Loss: 0.180741, Val Acc: 95.15%, Val F1: 0.9704, Val ROC-AUC: 0.9939
New best model saved with validation accuracy: 98.24%
Epoch [4/30] - Train Loss: 0.021982, Val Loss: 0.056263, Val Acc: 98.24%, Val F1: 0.9895, Val ROC-AUC: 0.9980
Epoch [5/30] - Train Loss: 0.053431, Val Loss: 0.032870, Val Acc: 98.24%, Val F1: 0.9894, Val ROC-AUC: 0.9997
Epoch [6/30] - Train Loss: 0.061378, Val Loss: 0.152767, Val Acc: 97.80%, Val F1: 0.9871, Val ROC-AUC: 0.9843
Epoch [7/30] - Train Loss: 0.000153, Val Loss: 0.246252, Val Acc: 97.36%, Val F1: 0.9840, Val ROC-AUC: 0.9983
Epoch [8/30] - Train Loss: 0.019992, Val Loss: 0.300291, Val Acc: 96.48%, Val F1: 0.9786, Val ROC-AUC: 0.9951
Epoch [9/30] - Train Loss: 0.005959, Val Loss: 0.052999, Val Acc: 98.24%, Val F1: 0.9895, Val ROC-AUC: 0.9983
Epoch [10/30