# Natural Computing - Particle Swarm Optimization
This notebook is used on Kaggle for the Natural Computing project. In this notebook the dataset Metastatic Tissue Classification - PatchCamelyon (metastatic-tissue-classification-patchcamelyon) is used.

# Imports

In [None]:
%%capture
import sys
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

import tensorflow_datasets as tfds
import tensorflow as tf

import keras
from keras.models import Sequential
from keras.applications.vgg16 import VGG16
from keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input
from keras.layers import Input, Flatten, Dense
from keras.models import Model
from keras import backend as K
from keras.datasets import mnist

import pyswarms
from pyswarms.utils.plotters import plot_cost_history, plot_contour, plot_surface
from pyswarms.utils.plotters.formatters import Mesher
from pyswarms.utils.functions import single_obj as fx

from IPython.display import Image

import h5py

# Pretrained model (VGG16)

Setup of the Keras model VGG16 (pretrained weights on ImageNet), all layers are pretrained except the last layer. A Dense layer is added.

In [None]:
def pretrained_model_last(img_shape, num_classes, layer_type, summary_print=False):
  vgg16 = VGG16(weights='imagenet', include_top=False, input_shape=img_shape)

  for layer in vgg16.layers:
    if layer.name == 'block5_conv3':
      break
    layer.trainable = False
  
  last = vgg16.layers[-1].output
  print(vgg16.input.shape)

  x = Flatten(name='flatten')(last)
  x = Dense(num_classes, activation=layer_type, name='predictions')(x)
    
  pretrained_model = Model(inputs=vgg16.input, outputs=x)
  
  layer_last = pretrained_model.layers[-1]

  if summary_print==True:
    pretrained_model.summary()

  return pretrained_model

# Particle Swarm Optimization (class optimizer)

Possibility to implement the PSO_optimizer (class) in Keras network. Computation times are longer so therefore this implementation is not used.

In [None]:
'''
class PSO_optimizer (keras.optimizers.Optimizer):
    
    def __init__(self, c1=0.5, c2=2, w=1, dimensions=0, n_particles=100,
                 name="PSO_optimizer", **kwargs):
        super().__init__(name, **kwargs)
        self._set_hyper("c1", kwargs.get("c1", c1))
        self._set_hyper("c2", kwargs.get("c2", c2))
        self._set_hyper("w", kwargs.get("w", w))
        self._set_hyper("dimensions", kwargs.get("dimensions", dimensions))
        self._set_hyper("n_particles", kwargs.get("n_particles", n_particles))
        self._is_first = True 
        
    def _create_slots(self, var_list):
        for var in var_list:
            self.add_slot(var, "pw") 
            
    @tf.function
    def _resource_apply_dense(self, var, y): 
        
        def logits_function(p):
            start = 0
            end = n_inputs*n_hidden
            W1 = p[start:end].reshape((n_inputs,n_hidden))

            start = end 
            end = start+n_hidden
            b1 = p[start:end].reshape((n_hidden,))

            start = end 
            end = start+(n_classes*n_hidden)
            W2 = p[start:end].reshape((n_hidden,n_classes))

            start = end 
            end = start+n_classes
            b2 = p[start:end].reshape((n_classes,))

            z1 = X.dot(W1) + b1  
            a1 = np.tanh(z1)    
            logits = a1.dot(W2) + b2 
            return logits  

        def forward_prop(params):
            logits = logits_function(params)
            exp_scores = np.exp(logits)
            probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
            corect_logprobs = -np.log(probs[range(num_samples), y])
            loss = np.sum(corect_logprobs) / num_samples
            return loss

        def f(x):
            n_particles = x.shape[0]
            j = [forward_prop(x[i]) for i in range(n_particles)]
            return np.array(j)

        var_dtype = var.dtype.base_dtype
        
        X = flatten_output.reshape((num_samples, n_inputs))
        y = y_train_sample

        n_particles = self._get_hyper('n_particles')
        dimensions = self._get_hyper('dimensions')
        w = self._get_hyper('w')
        c1 = self._get_hyper('c1')
        c2 = self._get_hyper('c2')
        options = {"c1": int(c1.initialized_value()),
                   "c2": int(c2.initialized_value()),
                   "w": int(w.initialized_value())}
        n_particles = int(n_particles.initialized_value())
        dimensions = int(dimensions.initialized_value())

        optimizer_pso = pyswarms.single.GlobalBestPSO(n_particles=n_particles, dimensions=dimensions, options=options)

        cost, pos = optimizer_pso.optimize(f, iters=10, verbose=0)

        pw_var = self.get_slot(var, "pw")

        if self._is_first:
            self._is_first = False
            new_var = pos
        else:
            cost, new_var = optimizer_pso.optimize(f, iters=50, verbose=0) 

        pw_var.assign(new_var)
         
    def _resource_apply_sparse(self, var, y):
        raise NotImplementedError
        
    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config,
            "c1": self._serialize_hyperparameter("c1"),
            "c2": self._serialize_hyperparameter("c2"),
            "w": self._serialize_hyperparameter("w"),
            "dimensions": self._serialize_hyperparameter("dimensions"),
            "n_particles": self._serialize_hyperparameter("n_particles"),
        }
'''

# MNIST

In [None]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_test = [cv2.cvtColor(cv2.resize(i, (32,32)), cv2.COLOR_GRAY2BGR) for i in x_test]
x_test = np.concatenate([arr[np.newaxis] for arr in x_test]).astype('float32')

x_train = [cv2.cvtColor(cv2.resize(i, (32,32)), cv2.COLOR_GRAY2BGR) for i in x_train]
x_train = np.concatenate([arr[np.newaxis] for arr in x_train]).astype('float32')

y_train_onehot = tf.keras.utils.to_categorical(y_train, 10)
y_test_onehot = tf.keras.utils.to_categorical(y_test, 10)

print(x_train.shape)
print(y_train.shape)

Characteristics of the data

In [None]:
num_classes = y_train_onehot.shape[-1]
input_shape = x_train.shape[-3:]

## Select part of the data (n samples)

Import the data from the dataset and define the number of samples from the train set to run with the PSO.

In [None]:
num_samples = 1000

x_train_sample = x_train[:num_samples]
print(x_train_sample.shape)

y_train_sample = y_train[:num_samples]
print(y_train_sample.shape)

y_train_oh_sample = y_train_onehot[:num_samples]
print(y_train_oh_sample.shape)

## Stochastic Gradient Descent

Apply SGD in order to compare the performances and evaluate


In [None]:
batch_size = 100
epochs = 5

In [None]:
model_mnist_sgd = pretrained_model_last(img_shape=input_shape, num_classes=num_classes, layer_type='softmax', summary_print=True) #num_classes

In [None]:
model_mnist_sgd.compile(optimizer=keras.optimizers.Adam(1e-4), loss='categorical_crossentropy', metrics=['acc'])
model_mnist_sgd.fit(x_train_sample, y_train_oh_sample, epochs=epochs, batch_size=batch_size)
loss_sgd, acc_sgd = model_mnist_sgd.evaluate(x_test, y_test_onehot)

In [None]:
print('Accuracy: ', acc_sgd)

In [None]:
prediction_sgd = np.argmax(model_mnist_sgd.predict(x_test), axis=1)
confusion_matrix(y_test,prediction_sgd)

## Particle Swarm Optimization

In [None]:
'''
model_mnist_pso = pretrained_model_last(img_shape=input_shape, num_classes=num_classes, layer_type='softmax', summary_print=True)
model_mnist_pso.compile(loss="mse", 
                        optimizer=PSO_optimizer(dimensions=dimensions, 
                                                n_particles=100))
model_mnist_pso.fit(x_train_sample,y_train_sample, epochs=epochs, batch_size=batch_size)
'''

In [None]:
model_mnist_pso = pretrained_model_last(img_shape=input_shape, num_classes=num_classes, layer_type='softmax', summary_print=True)

* Define number of inputs, hidden and classes
* Calculate the dimensions
* Retrieve the layer to train
* Define input for the PSO
* Apply PSO optimizer
    * Logits function to update weights and biases of a particle
    * Forward propagation (for one particle)
        * apply softmax and negative log likelihood
    * Compute forward propagation and thus the loss for the whole swarm


In [None]:
n_inputs = 512 
n_hidden = 1 
n_classes = 10 

In [None]:
dimensions = (n_inputs * n_hidden) + (n_hidden * n_classes) + n_hidden + n_classes
print(dimensions)

In [None]:
layer_flatten = K.function([model_mnist_pso.layers[0].output],
                                  [model_mnist_pso.layers[-2].output])
flatten_output = layer_flatten([x_train_sample])[0]


In [None]:
X = flatten_output.reshape((num_samples, n_inputs))
y = y_train_sample 
print(X.shape)
print(y.shape)

In [None]:
layer_last = model_mnist_pso.layers[-1]
weights_ll = np.array(layer_last.get_weights())
print(weights_ll[0].shape)

Compute the logits (weights and biases)

In [None]:
def logits_function(p):
    start = 0
    end = n_inputs*n_hidden
    W1 = p[start:end].reshape((n_inputs,n_hidden))

    start = end 
    end = start+n_hidden
    b1 = p[start:end].reshape((n_hidden,))

    start = end 
    end = start+(n_classes*n_hidden)
    W2 = p[start:end].reshape((n_hidden,n_classes))

    start = end 
    end = start+n_classes
    b2 = p[start:end].reshape((n_classes,))

    z1 = X.dot(W1) + b1 
    a1 = np.tanh(z1)   
    logits = a1.dot(W2) + b2 
    return logits   

* Compute the softmax activation for the returned logits,
* Compute the negative log-likelihood loss and return this.
The negative log-likelihood loss is the error between the predictions and theground truth.

In [None]:
def forward_prop(params):
    logits = logits_function(params)

    # Softmax (logits)
    exp_scores = np.exp(logits)
    probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)

    # Negative log likelihood
    corect_logprobs = -np.log(probs[range(num_samples), y])
    loss = np.sum(corect_logprobs) / num_samples

    return loss

The loss for each particle of the swarm is the result of the forward propagation.

In [None]:
def f(x):
    n_particles = x.shape[0]
    j = [forward_prop(x[i]) for i in range(n_particles)]
    return np.array(j)

Prediction function for the given positions (sum up to 1)

In [None]:
def predict(pos):
    logits = logits_function(pos)
    y_pred = np.argmax(logits, axis=1)
    return y_pred

* Define the intializations for the swarm
* Apply PSO
    * Define the optimzer for the MNIST data
    * Optimize the optimizer given the swarm optimizer function defined earlier (f)
* Plot the cost of the optimization
* Accuracy
* Confusion matrix

The personal best position (given the loss) is compared to the current position for each particle in the swarm.  After this, the personal best is compared to the global best, and the matrices (position andcosts) are updated.  We want the cost as low as possible (close to zero), and the cost represents the lowest value of the loss.

Optimizer is done for 3 different combinations of number of particles and generations

In [None]:

options = {'c1': 0.25, 'c2': 2, 'w':0.8} #options in order to initialize swarm

dimensions = dimensions
optimizer_mnist_pso = pyswarms.single.GlobalBestPSO(n_particles=100, dimensions=dimensions, options=options)
cost_mnist, pos_mnist = optimizer_mnist_pso.optimize(f, iters=1000, verbose=0)

print('Cost: ', cost_mnist)

plot_cost_history(optimizer_mnist_pso.cost_history)
plt.show()

print('Accuracy: ', (predict(pos_mnist) == y).mean())

prediction_pso = predict(pos_mnist) 
confusion_matrix(y_test[:num_samples],prediction_pso)

In [None]:
options = {'c1': 0.25, 'c2': 2, 'w':0.8} #options in order to initialize swarm

dimensions = dimensions
optimizer_mnist_pso = pyswarms.single.GlobalBestPSO(n_particles=100, dimensions=dimensions, options=options)
cost_mnist, pos_mnist = optimizer_mnist_pso.optimize(f, iters=2000, verbose=0) 

print('Cost: ', cost_mnist)

plot_cost_history(optimizer_mnist_pso.cost_history)
plt.show()

print('Accuracy: ', (predict(pos_mnist) == y).mean())

prediction_pso = predict(pos_mnist) 
confusion_matrix(y_test[:num_samples],prediction_pso)



In [None]:
options = {'c1': 0.25, 'c2': 2, 'w':0.8} #options in order to initialize swarm

dimensions = dimensions
optimizer_mnist_pso = pyswarms.single.GlobalBestPSO(n_particles=250, dimensions=dimensions, options=options)
cost_mnist, pos_mnist = optimizer_mnist_pso.optimize(f, iters=1000, verbose=0)

print('Cost: ', cost_mnist)

plot_cost_history(optimizer_mnist_pso.cost_history)
plt.show()

print('Accuracy: ', (predict(pos_mnist) == y).mean())

prediction_pso = predict(pos_mnist)
confusion_matrix(y_test[:num_samples],prediction_pso)

# Patch Camelyon

Characteristics of the data

In [None]:
num_classes = 2
input_shape = (96,96,3)

## Select part of the data (n samples)

Import the data from the dataset and define the number of samples from the train set to run with the PSO.

In [None]:
num_samples = 1000

x_train_sample = []
y_train_sample = []

x_test = []
y_test = []

with h5py.File('/kaggle/input/metastatic-tissue-classification-patchcamelyon/pcam/training_split.h5', 'r') as f:
    x_train_sample = np.array(f['x'][:num_samples])

with h5py.File('/kaggle/input/metastatic-tissue-classification-patchcamelyon/Labels/Labels/camelyonpatch_level_2_split_train_y.h5', 'r') as f:
    y_train_sample = np.array(f['y'][:num_samples].flat)

with h5py.File('/kaggle/input/metastatic-tissue-classification-patchcamelyon/pcam/test_split.h5', 'r') as f:
    x_test = np.array(f['x'][:])

with h5py.File('/kaggle/input/metastatic-tissue-classification-patchcamelyon/Labels/Labels/camelyonpatch_level_2_split_test_y.h5', 'r') as f:
    y_test = np.array(f['y'][:].flat)

In [None]:
print(x_train_sample.shape)
print(y_train_sample.shape)

Convert samples to Tensor

In [None]:
def convert_sample(sample):
    image, label = sample[0], sample[1]
    image = tf.image.convert_image_dtype(image, tf.float32)
    label = tf.one_hot(label, 2, dtype=tf.float32)
    return image, label
    
x_train_tf, y_train_tf = map(convert_sample, (x_train_sample, y_train_sample))
x_test_tf, y_test_tf = map(convert_sample, (x_test, y_test))

## Stochastic Gradient Descent 

Apply SGD in order to compare the performances and evaluate

In [None]:
batch_size = 100
epochs = 2

In [None]:
model_patch_sgd = pretrained_model_last(img_shape=input_shape, num_classes=num_classes-1, layer_type='softmax', summary_print=True)

In [None]:
model_patch_sgd.compile(optimizer=keras.optimizers.Adam(1e-4), loss='binary_crossentropy', metrics=['acc'])
model_patch_sgd.fit(x_train_sample, y_train_sample, epochs=epochs, batch_size=batch_size)
loss_sgd, acc_sgd = model_patch_sgd.evaluate(x_test, y_test)


In [None]:
print('Accuracy: ', acc_sgd)

## Particle Swarm Optimization

In [None]:
model_patch_pso = pretrained_model_last(img_shape=input_shape, num_classes=num_classes, layer_type='softmax', summary_print=True)

* Define number of inputs, hidden and classes
* Calculate the dimensions
* Retrieve the layer to train
* Define input for the PSO
* Apply PSO optimizer
    * Logits function to update weights and biases of a particle
    * Forward propagation (for one particle)
        * apply softmax and negative log likelihood
    * Compute forward propagation and thus the loss for the whole swarm

In [None]:
n_inputs = 4608
n_hidden = 1 
n_classes = 2 

In [None]:
dimensions = (n_inputs * n_hidden) + (n_hidden * n_classes) + n_hidden + n_classes
print(dimensions)

In [None]:
layer_flatten = K.function([model_patch_pso.layers[0].output],
                                  [model_patch_pso.layers[-2].output])
flatten_output = layer_flatten([x_train_sample])[0]

In [None]:
X = flatten_output.reshape((num_samples, n_inputs))
y = y_train_sample
print(X.shape)
print(y.shape)

In [None]:
layer_last = model_patch_pso.layers[-1]
weights_ll = np.array(layer_last.get_weights())
print(weights_ll[0].shape)

Compute the logits (weights and biases)

In [None]:
def logits_function(p):
    start = 0
    end = n_inputs*n_hidden
    W1 = p[start:end].reshape((n_inputs,n_hidden))

    start = end 
    end = start+n_hidden
    b1 = p[start:end].reshape((n_hidden,))

    start = end 
    end = start+(n_classes*n_hidden)
    W2 = p[start:end].reshape((n_hidden,n_classes))

    start = end 
    end = start+n_classes
    b2 = p[start:end].reshape((n_classes,))

    z1 = X.dot(W1) + b1
    a1 = np.tanh(z1)     
    logits = a1.dot(W2) + b2 
    return logits     


* Compute the softmax activation for the returned logits,
* Compute the negative log-likelihood loss and return this.
The negative log-likelihood loss is the error between the predictions and theground truth.

In [None]:
def forward_prop(params):
    logits = logits_function(params)

    # Softmax (logits)
    exp_scores = np.exp(logits)
    probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)

    # Negative log likelihood
    corect_logprobs = -np.log(probs[range(num_samples), y])
    loss = np.sum(corect_logprobs) / num_samples

    return loss

The loss for each particle of the swarm is the result of the forward propagation.

In [None]:
def f(x):
    n_particles = x.shape[0]
    j = [forward_prop(x[i]) for i in range(n_particles)]
    return np.array(j)

Prediction function for the given positions (sum up to 1)

In [None]:
def predict(pos):
    logits = logits_function(pos)
    y_pred = np.argmax(logits, axis=1)
    return y_pred

* Define the intializations for the swarm
* Apply PSO
    * Define the optimzer for the patch data
    * Optimize the optimizer given the swarm optimizer function defined earlier (f)
* Plot the cost of the optimization
* Accuracy
* Confusion matrix

The personal best position (given the loss) is compared to the current position for each particle in the swarm.  After this, the personal best is compared to the global best, and the matrices (position andcosts) are updated.  We want the cost as low as possible (close to zero), and the cost represents the lowest value of the loss.

Optimizer is done for 5 different combinations of number of particles and generations

In [None]:
options = {'c1': 0.25, 'c2': 3, 'w':0.8} #options in order to initialize swarm

dimensions = dimensions
optimizer_patch_pso = pyswarms.single.GlobalBestPSO(n_particles=100, dimensions=dimensions, options=options) 
cost_patch, pos_patch = optimizer_patch_pso.optimize(f, iters=100, verbose=0) 

print('Cost: ', cost_patch)

plot_cost_history(optimizer_patch_pso.cost_history)
plt.show()

print('Accuracy: ', (predict(pos_patch) == y).mean())

prediction_pso = predict(pos_patch)
confusion_matrix(y_test[:num_samples],prediction_pso)

In [None]:
options = {'c1': 0.25, 'c2': 3, 'w':0.8} #options in order to initialize swarm

dimensions = dimensions
optimizer_patch_pso = pyswarms.single.GlobalBestPSO(n_particles=250, dimensions=dimensions, options=options) 
cost_patch, pos_patch = optimizer_patch_pso.optimize(f, iters=100, verbose=0) 

print('Cost: ', cost_patch)

plot_cost_history(optimizer_patch_pso.cost_history)
plt.show()

print('Accuracy: ', (predict(pos_patch) == y).mean())

prediction_pso = predict(pos_patch)
confusion_matrix(y_test[:num_samples],prediction_pso)

In [None]:
options = {'c1': 0.25, 'c2': 3, 'w':0.8} #options in order to initialize swarm

dimensions = dimensions
optimizer_patch_pso = pyswarms.single.GlobalBestPSO(n_particles=500, dimensions=dimensions, options=options) 
cost_patch, pos_patch = optimizer_patch_pso.optimize(f, iters=100, verbose=0) 

print('Cost: ', cost_patch)

plot_cost_history(optimizer_patch_pso.cost_history)
plt.show()

print('Accuracy: ', (predict(pos_patch) == y).mean())

prediction_pso = predict(pos_patch)
confusion_matrix(y_test[:num_samples],prediction_pso)

In [None]:
options = {'c1': 0.25, 'c2': 3, 'w':0.8} #options in order to initialize swarm

dimensions = dimensions
optimizer_patch_pso = pyswarms.single.GlobalBestPSO(n_particles=100, dimensions=dimensions, options=options) 
cost_patch, pos_patch = optimizer_patch_pso.optimize(f, iters=500, verbose=0) 

print('Cost: ', cost_patch)

plot_cost_history(optimizer_patch_pso.cost_history)
plt.show()

print('Accuracy: ', (predict(pos_patch) == y).mean())

prediction_pso = predict(pos_patch)
confusion_matrix(y_test[:num_samples],prediction_pso)

In [None]:
options = {'c1': 0.25, 'c2': 3, 'w':0.8} #options in order to initialize swarm

dimensions = dimensions
optimizer_patch_pso = pyswarms.single.GlobalBestPSO(n_particles=100, dimensions=dimensions, options=options) 
cost_patch, pos_patch = optimizer_patch_pso.optimize(f, iters=1000, verbose=0) 

print('Cost: ', cost_patch)

plot_cost_history(optimizer_patch_pso.cost_history)
plt.show()

print('Accuracy: ', (predict(pos_patch) == y).mean())

prediction_pso = predict(pos_patch)
confusion_matrix(y_test[:num_samples],prediction_pso)

Possibility to make GIF of the swarm

In [None]:
'''
m = Mesher(func=fx.sphere)
optimizer_patch_pso_gif = pyswarms.single.GlobalBestPSO(n_particles=100, dimensions=2, options=options) 
cost_patch, pos_patch = optimizer_patch_pso_gif.optimize(fx.sphere, iters=100, verbose=0) 
'''

In [None]:
'''
%%capture
animation = plot_contour(pos_history=optimizer_patch_pso_gif.pos_history,
                         mesher=m,
                         mark=(0,0))
'''

In [None]:
'''
animation.save('plot0.gif', writer='imagemagick', fps=10)
Image(url='plot0.gif')
'''