In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, random_split, Subset
from tqdm import tqdm
import multiprocessing
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score
import matplotlib.pyplot as plt
import torch.nn.functional as F
import numpy as np
import os
num_workers = multiprocessing.cpu_count()

In [5]:
# Set the path to the dataset directory
data_dir = "/home/rikisu/NNDL/CNN/cell_images"

# Set the number of training epochs (you can increase this later for better accuracy)
epochs = 2

# Automatically set the number of worker processes for data loading
# based on the number of available CPU cores
num_workers = multiprocessing.cpu_count()

In [6]:
# Define CNN model
class MalariaCNN(nn.Module):
    def __init__(self):
        super(MalariaCNN, self).__init__()

        #---------- CONVOLUTIONAL LAYERS ----------#
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU()

        #---------- FULLY CONNECTED LAYERS ----------#
        self.fc1 = nn.Linear(128 * 16 * 16, 128)  # 128 feature maps * 16x16 spatial size
        self.relu4 = nn.ReLU()
        self.fc2 = nn.Linear(128, 2)  # Binary classification


    def forward(self, x):
        x = self.pool(self.relu1(self.conv1(x)))  # 128x128 -> 64x64
        x = self.pool(self.relu2(self.conv2(x)))  # 64x64 -> 32x32
        x = self.pool(self.relu3(self.conv3(x)))  # 32x32 -> 16x16
        x = x.view(x.size(0), -1)                 # Flatten
        x = self.relu4(self.fc1(x))
        x = self.fc2(x)
        return x
   

In [7]:
#---------- DATA PREPARATION ----------#
# Define data transformations
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize all images to 128 x 128
    transforms.ToTensor(),          # Convert images to PyTorch tensors
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize pixel values
])

# Load dataset
dataset = datasets.ImageFolder(root=data_dir, transform=transform)  # Expects data in folders named by class
train_size = int(0.8 * len(dataset))  # 80% training data
test_size = len(dataset) - train_size  # 20% testing data
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=num_workers, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=num_workers, pin_memory=True)#---------- MODEL SETUP ----------#
# Initialize model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Use GPU if available
model = MalariaCNN().to(device)  # Move model to GPU if available
criterion = nn.CrossEntropyLoss()  # Loss function for classification
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer

In [8]:
#---------- TRAINING LOOP ----------#


print("\n--- Starting Training ---")
train_losses = []
val_losses = []
val_accuracies = []
test_accuracies = [] # Initialize list to store test accuracies per epoch


# Enable mixed precision training
scaler = torch.amp.GradScaler()  # Fixed: removed 'cuda' parameter


--- Starting Training ---


In [9]:
for epoch in range(epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", leave=False)

    for images, labels in progress_bar:
        # Move data to device
        images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)

        # Forward pass
        optimizer.zero_grad()  # Clear gradients
        with torch.amp.autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'):  # Mixed precision
            outputs = model(images)  # Get model predictions
            loss = criterion(outputs, labels)  # Calculate loss

        # Backward pass with gradient scaling for mixed precision
        scaler.scale(loss).backward()  # Compute gradients
        scaler.step(optimizer)  # Update weights
        scaler.update()  # Update scaler

        # Track loss
        running_loss += loss.item()
        progress_bar.set_postfix(loss=running_loss / (progress_bar.n + 1))

    # Calculate average loss for this epoch
    avg_loss = running_loss / len(train_loader)
    train_losses.append(avg_loss)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    #---------- EVALUATION ----------#
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():  # Disable gradient calculation
        for images, labels in test_loader:
            images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            outputs = model(images)  # Get model predictions
            _, predicted = torch.max(outputs, 1)  # Get class with highest probability
            total += labels.size(0)
            correct += (predicted == labels).sum().item()  # Count correct predictions

    # Calculate and store accuracy
    accuracy = 100 * correct / total
    test_accuracies.append(accuracy)
    print(f"Test Accuracy after epoch {epoch+1}: {accuracy:.2f}%")

                                                                        

Epoch 1/2, Loss: 0.2367


Epoch 2/2:   0%|          | 0/689 [00:00<?, ?it/s]

Test Accuracy after epoch 1: 95.85%


                                                                        

Epoch 2/2, Loss: 0.1370




Test Accuracy after epoch 2: 95.85%


In [10]:
#---------- FINAL EVALUATION ON TEST SET ----------#
model.eval()
test_all_preds = []
test_all_labels = []
test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        test_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        test_all_preds.extend(predicted.cpu().numpy())
        test_all_labels.extend(labels.cpu().numpy())

test_loss /= len(test_loader.dataset)
test_accuracy = 100 * correct / total
test_precision = precision_score(test_all_labels, test_all_preds, average='weighted', zero_division=0)
test_recall = recall_score(test_all_labels, test_all_preds, average='weighted', zero_division=0)
test_f1 = f1_score(test_all_labels, test_all_preds, average='weighted', zero_division=0)

# For AUC, we need probabilities for each class.
# Assuming binary classification (0 and 1), we can take the probability of class 1.
try:
    probabilities = F.softmax(outputs, dim=1)[:, 1].cpu().numpy()
    test_auc = roc_auc_score(test_all_labels, probabilities)
except ValueError:
    print("AUC score cannot be calculated for this setup.")
    test_auc = 0.0

cm = confusion_matrix(test_all_labels, test_all_preds)
print("\nConfusion Matrix:")
print(cm)
print(f"Final Test Loss: {test_loss:.4f}")
print(f"Final Test Accuracy: {test_accuracy:.2f}%")
print(f"Final Test Precision: {test_precision:.4f}")
print(f"Final Test Recall: {test_recall:.4f}")
print(f"Final Test F1-Score: {test_f1:.4f}")
print(f"Final Test AUC: {test_auc:.4f}")


# --- Store Results for Plotting ---
results = {
    'train_losses': train_losses,
    'val_losses': val_losses,
    'val_accuracies': val_accuracies,
    'test_accuracy': test_accuracy,
    'test_precision': test_precision,
    'test_recall': test_recall,
    'test_f1': test_f1,
    'test_auc': test_auc,
    'confusion_matrix': cm
}

# Get class names from the dataset
class_names = dataset.classes
# Define the save directory for the plot
save_dir = "results"

AUC score cannot be calculated for this setup.

Confusion Matrix:
[[2601  127]
 [ 102 2682]]
Final Test Loss: 0.1324
Final Test Accuracy: 95.85%
Final Test Precision: 0.9585
Final Test Recall: 0.9585
Final Test F1-Score: 0.9585
Final Test AUC: 0.0000


In [None]:


def plot_results(results_dict, class_names_list, save_path):
    # Get actual epochs run
    num_epochs_run = len(results_dict.get('train_losses', []))
    epochs_axis = range(1, num_epochs_run + 1)

    plt.figure(figsize=(16, 12))

    # Plot training and validation loss
    plt.subplot(2, 2, 1)
    plt.plot(epochs_axis, results_dict.get('train_losses', []), label='Training Loss', marker='o', linestyle='-')
    if 'val_losses' in results_dict and results_dict['val_losses']:
        plt.plot(epochs_axis, results_dict.get('val_losses', []), label='Validation Loss', marker='o', linestyle='-')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.xticks(epochs_axis)

    # Plot validation accuracy (only if we have it)
    plt.subplot(2, 2, 2)
    if 'val_accuracies' in results_dict and results_dict['val_accuracies']:
        plt.plot(epochs_axis, results_dict.get('val_accuracies', []), label='Validation Accuracy',
                 color='g', marker='o', linestyle='-')
    # Add test accuracy as a horizontal line if available
    if 'test_accuracy' in results_dict:
        plt.axhline(y=results_dict['test_accuracy'], color='r', linestyle='--',
                    label=f'Test Accuracy: {results_dict["test_accuracy"]:.2f}%')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Validation Accuracy')
    plt.legend()
    plt.grid(True)
    plt.xticks(epochs_axis)

    # Plot confusion matrix
    cm_plot = results_dict.get('confusion_matrix')
    if cm_plot is not None and len(class_names_list) > 0:
        plt.subplot(2, 2, 3)
        im = plt.imshow(cm_plot, interpolation='nearest', cmap=plt.cm.Blues)
        plt.title('Confusion Matrix (Test Set)')
        plt.colorbar(im, fraction=0.046, pad=0.04)
        tick_marks = np.arange(len(class_names_list))
        plt.xticks(tick_marks, class_names_list, rotation=45, ha="right")
        plt.yticks(tick_marks, class_names_list)

        # Add text annotations
        thresh = cm_plot.max() / 2.
        for i in range(cm_plot.shape[0]):
            for j in range(cm_plot.shape[1]):
                plt.text(j, i, format(cm_plot[i, j], 'd'),
                         ha="center", va="center",
                         color="white" if cm_plot[i, j] > thresh else "black")

        plt.ylabel('True label')
        plt.xlabel('Predicted label')
        plt.tight_layout(rect=[0, 0, 0.9, 1])  # Adjust layout for confusion matrix

    # Plot additional test metrics bar chart
    plt.subplot(2, 2, 4)
    metrics = ['Accuracy', 'Precision', 'Recall', 'F1-Score', 'AUC']
    values = [
        results_dict.get('test_accuracy', 0) / 100.0,  # Scale accuracy to 0-1 range
        results_dict.get('test_precision', 0),
        results_dict.get('test_recall', 0),
        results_dict.get('test_f1', 0),
        results_dict.get('test_auc', 0)
    ]
    bars = plt.bar(metrics, values, color=['#1f77b4', '#2ca02c', '#d62728', '#9467bd', '#ff7f0e'])
    plt.title('Final Test Set Metrics (Best Model)')
    plt.ylabel('Score')

    # Adjust y-limit based on the minimum value (with some padding)
    min_val = min(values)
    plt.ylim(max(0, min_val - 0.1), 1.1)  # Dynamic y-limit with lower bound never below 0

    # Add text labels above bars
    for bar in bars:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2.0, yval + 0.01, f"{yval:.3f}",
                ha='center', va='bottom', fontsize=9)

    plt.suptitle('Malaria Cell Classification Results (Team 5 Improved)', fontsize=16)
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])  # Adjust layout considering suptitle

    # Make sure the save directory exists
    os.makedirs(save_path, exist_ok=True)

    # Save the plot
    plot_filename = os.path.join(save_path, 'training_validation_test_results_Team5_improved.png')
    try:
        plt.savefig(plot_filename, dpi=300)  # Higher DPI for better quality
        print(f"\nResults plot saved to {plot_filename}")
    except Exception as e:
        print(f"Error saving plot: {e}")

    plt.close()  # Close the plot

# Call the plotting function
plot_results(results, class_names, save_dir)

print("\n--- Script Finished ---")

SyntaxError: invalid syntax (3167887546.py, line 105)