<a href="https://colab.research.google.com/github/SharlotteManganye/Deep-Learning-CNN/blob/main/CNN_CPSO_2025_02_05.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt

# CNN Architecture (same as before)
class CNN(nn.Module):
    # ... (CNN definition - same as previous examples)

# Data Loading (same as before)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)


# Cooperative PSO Implementation
def cooperative_pso(fitness_function, lb, ub, swarmsize=10, maxiter=10, num_subswarms=2):  # Added num_subswarms
    n_dimensions = len(lb)
    personal_bests = np.zeros((swarmsize, n_dimensions))
    personal_best_fitness = np.full(swarmsize, -np.inf)  # Initialize with negative infinity
    global_best = np.zeros(n_dimensions)
    global_best_fitness = -np.inf

    # Initialize swarm positions and velocities (using numpy for efficiency)
    swarm = np.random.uniform(lb, ub, size=(swarmsize, n_dimensions))
    velocity = np.random.uniform(-abs(ub - lb) / 2, abs(ub - lb) / 2, size=(swarmsize, n_dimensions)) # Init velocity

    all_train_losses = [[] for _ in range(swarmsize)]
    all_val_losses = [[] for _ in range(swarmsize)]
    all_train_accuracies = [[] for _ in range(swarmsize)]
    all_val_accuracies = [[] for _ in range(swarmsize)]

    for i in range(maxiter):
        # Divide swarm into subswarms
        subswarm_size = swarmsize // num_subswarms
        for k in range(num_subswarms):
            start = k * subswarm_size
            end = (k + 1) * subswarm_size if k < num_subswarms - 1 else swarmsize
            subswarm_indices = range(start, end)

            # Find subswarm best
            subswarm_best = np.zeros(n_dimensions)
            subswarm_best_fitness = -np.inf

            for j in subswarm_indices:
                fitness, train_losses, val_losses, train_accuracies, val_accuracies = fitness_function(swarm[j])
                all_train_losses[j].extend(train_losses)
                all_val_losses[j].extend(val_losses)
                all_train_accuracies[j].extend(train_accuracies)
                all_val_accuracies[j].extend(val_accuracies)

                if fitness > personal_best_fitness[j]:
                    personal_best_fitness[j] = fitness
                    personal_bests[j] = swarm[j]

                if fitness > subswarm_best_fitness:
                    subswarm_best_fitness = fitness
                    subswarm_best = swarm[j]

                if fitness > global_best_fitness:
                    global_best_fitness = fitness
                    global_best = swarm[j]

            # Update velocities and positions (using subswarm best and global best)
            inertia_weight = 0.729  # Adjust as needed
            cognitive_coeff = 1.49445  # Adjust as needed
            social_coeff = 1.49445  # Adjust as needed

            r1 = np.random.rand(len(subswarm_indices), n_dimensions)
            r2 = np.random.rand(len(subswarm_indices), n_dimensions)

            velocity[subswarm_indices] = (inertia_weight * velocity[subswarm_indices] +
                                        cognitive_coeff * r1 * (personal_bests[subswarm_indices] - swarm[subswarm_indices]) +
                                        social_coeff * r2 * (np.tile(subswarm_best, (len(subswarm_indices), 1)) - swarm[subswarm_indices]))

            swarm[subswarm_indices] = np.clip(swarm[subswarm_indices] + velocity[subswarm_indices], lb, ub) # Clip swarm to bounds

    return global_best_fitness, global_best, all_train_losses, all_val_losses, all_train_accuracies, all_val_accuracies


# Fitness Function (same structure, but now uses numpy arrays)
def fitness_function(particle):
    learning_rate = 10**particle[0]
    batch_size = int(2**particle[1])

    train_loader_pso = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader_pso = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model = CNN()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    num_epochs = 3  # Reduced for PSO

    for epoch in range(num_epochs):
      # ... (Training and validation loops - same as before, using train_loader_pso and test_loader_pso)
        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

    accuracy = val_accuracy
    return accuracy, train_losses, val_losses, train_accuracies, val_accuracies


# PSO Parameters (same as before)
n_particles = 20 # Increased number of particles
n_dimensions = 2
iterations = 20 # Increased number of iterations
num_subswarms = 4 # Added number of subswarms

lb = [-5, 8]
ub = [0, 8]

# Run Cooperative PSO
best_accuracy, best_hyperparameters, all_train_losses, all_val_losses, all_train_accuracies, all_val_accuracies = cooperative_pso(
    fitness_function, lb, ub, swarmsize=n_particles, maxiter=iterations, num_subswarms=num_subswarms
)



print("Best Accuracy:", best_accuracy)
print("Best Hyperparameters (learning rate exponent, batch size exponent):", best_hyperparameters)
print("Best Learning Rate:", 10**best_hyperparameters[0])
print("Best Batch Size:", int(2**best_hyperparameters[1]))

# Plotting (after Cooperative PSO) - Corrected and filled in
# ... (same plotting code as before)

# Final Training (same as before)
# ... (same final training loop and plotting as before)