<a href="https://colab.research.google.com/github/AnnaM-C/computational-intelligence/blob/main/CIFARPSO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import operator
import random
import numpy
import math
!pip install deap
from deap import base
from deap import benchmarks
from deap import creator
from deap import tools
import torchvision
import torchvision.transforms as transforms




In [26]:
# Set up the transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize to the range [-1, 1]
])

# Load CIFAR-10 training dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=4, shuffle=True, num_workers=2)

# Load CIFAR-10 test dataset
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=4, shuffle=False, num_workers=2)

# Define classes in CIFAR-10
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')


Files already downloaded and verified
Files already downloaded and verified


In [27]:
class Classifier(nn.Module):
    def __init__(self, freeze_last_layer=False):
        super(Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, 10)

        if freeze_last_layer:
            # Freeze all layers except the last layer (fc2)
            for param in self.parameters():
                param.requires_grad = False
            for param in self.fc2.parameters():
                param.requires_grad = True

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [28]:
model = Classifier(freeze_last_layer=False)

In [29]:
# initialise 50 particles with dimension 48
posMinInit      = -3
posMaxInit      = + 5
VMaxInit        = 1.5
VMinInit        = 0.5
populationSize  = 50
dimension       = sum(param.numel() for param in Classifier().parameters())
interval        = 10
iterations      = 400

#Parameter setup
wmax = 0.9 #weighting
wmin = 0.4
c1   = 2.0
c2   = 2.0

creator.create("FitnessMin", base.Fitness, weights=(-1.0,)) # -1 is for minimise
creator.create("Particle", list, fitness=creator.FitnessMin, speed=list, smin=None, smax=None, best=None)

# start generating and modifying their positions
def generate(size, smin, smax):
  part = creator.Particle(random.uniform(posMinInit, posMaxInit) for _ in range(size))
  part.speed = [random.uniform(VMinInit, VMaxInit) for _ in range(size)]
  part.smin = smin #speed clamping values
  part.smax = smax
  return part

def evaluate(individual, freeze_last_layer=False):
    params = [torch.tensor(p, dtype=torch.float32) for p in individual]

    # Load the state_dict excluding frozen parameters
    state_dict = model.state_dict()
    if freeze_last_layer:
        state_dict.update({'fc2.weight': torch.tensor(params[-2].view(model.fc2.weight.shape)),
                           'fc2.bias': torch.tensor(params[-1])})

    model.load_state_dict(state_dict)

    criterion = nn.CrossEntropyLoss()

    # Use the DataLoader to iterate through the batches in the test set
    correct = 0
    total = 0
    running_loss = 0.0
    with torch.no_grad():
        for data in testloader:
            images, labels = data

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Accuracy calculation
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total
    average_loss = running_loss / len(testloader)

    # return accuracy, average_loss
    return (1.0/loss.item()),

def updateParticle(part, best, weight, freeze_last_layer=False):
    #implementing speed = 0.7*(weight*speed + c1*r1*(localBestPos-currentPos) + c2*r2*(globalBestPos-currentPos))
    #Note that part and part.speed are both lists of size dimension
    #hence all multiplies need to apply across lists, so using e.g. map(operator.mul, ...

    r1 = (random.uniform(0, 1) for _ in range(len(part)))
    r2 = (random.uniform(0, 1) for _ in range(len(part)))

    v_r0 = [weight*x for x in part.speed]
    v_r1 = [c1*x for x in map(operator.mul, r1, map(operator.sub, part.best, part))] # local best
    v_r2 = [c2*x for x in map(operator.mul, r2, map(operator.sub, best, part))] # global best

    part.speed = [0.7*x for x in map(operator.add, v_r0, map(operator.add, v_r1, v_r2))]
    # update position with speed
    part[:] = list(map(operator.add, part, part.speed))

    # Transform the particle position to the neural network parameters
    params = [torch.tensor(p, dtype=torch.float32) for p in part]

    # Update only non-frozen parameters
    if not freeze_last_layer:
        model.load_state_dict({key: value for key, value in zip(model.state_dict(), params)})
    else:
        model.load_state_dict({key: value for key, value in zip(model.state_dict(), params[:-2])})

toolbox = base.Toolbox()
toolbox.register("particle", generate, size=dimension, smin=-3, smax=3)
toolbox.register("population", tools.initRepeat, list, toolbox.particle)
toolbox.register("update", updateParticle)
toolbox.register("evaluate", evaluate)

pop = toolbox.population(n=populationSize) # Population Size
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", numpy.mean)
stats.register("std", numpy.std)
stats.register("min", numpy.min)
stats.register("max", numpy.max)

logbook = tools.Logbook()
logbook.header = ["gen", "evals"] + stats.fields

best = None

running_loss=[]
x_generations=10

for every_generation in range(x_generations):
  w = wmax - (wmax-wmin)*every_generation/x_generations #decaying inertia weight


  # calculate an individuals fitness. fitness = loss function = CrossEntropy
  # start off with random weights in the network
  for every_individual in pop:
    # the weights start random. But each individuals weights get added to the last
    # layer of the network
    # predicted = NN(input)
    # loss=CrossEntropy(predicted, label)
    # running_loss += loss

    every_individual.fitness.values = toolbox.evaluate(every_individual)

    if (not every_individual.best) or (every_individual.best.fitness < every_individual.fitness):
                every_individual.best = creator.Particle(every_individual)
                every_individual.best.fitness.values = every_individual.fitness.values

    if (not best) or best.fitness < every_individual.fitness:
                best = creator.Particle(every_individual)
                best.fitness.values = every_individual.fitness.values

    for every_individual in pop:
            toolbox.update(every_individual, best, w)

    if every_generation % interval == 0:
            logbook.record(gen=every_generation, evals=len(pop), **stats.compile(pop))
            print(logbook.stream)

    print('best particle position is ', best)

    print("Populaiton: ", pop)
    print("Logbook: ",logbook)
    print("Best: ", best)


    # TO DO:
    # get individuals weights
    # put them into the network


RuntimeError: ignored