<a href="https://colab.research.google.com/github/FlowSight/MlAlgoFromScratch/blob/master/pso.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from __future__ import print_function, division
import numpy as np
import copy
from sklearn import datasets
from sklearn.model_selection import train_test_split

In [None]:
class Loss(object):
    def loss(self, y_true, y_pred):
        return NotImplementedError()

    def gradient(self, y, y_pred):
        raise NotImplementedError()

    def acc(self, y, y_pred):
        return 0

class SquareLoss(Loss):
    def __init__(self): pass

    def loss(self, y, y_pred):
        return 0.5 * np.power((y - y_pred), 2)

    def gradient(self, y, y_pred):
        return -(y - y_pred)

    def hessian(self, y, y_pred):
        return 1

class CrossEntropy(Loss):
    def __init__(self): pass

    def loss(self, y, p):
        # Avoid division by zero
        p = np.clip(p, 1e-15, 1 - 1e-15)
        return - y * np.log(p) - (1 - y) * np.log(1 - p)

    def acc(self, y, p):
        return accuracy_score(np.argmax(y, axis=1), np.argmax(p, axis=1))

    def gradient(self, y, p):
        # Avoid division by zero
        p = np.clip(p, 1e-15, 1 - 1e-15)
        return - (y / p) + (1 - y) / (1 - p)

    def hessian(self,y,p):
        p = np.clip(p, 1e-15, 1 - 1e-15)
        return y/(p**2) + (1-y)/(1-p)**2

class LogisticLoss():
    def __init__(self):
        sigmoid = Sigmoid()
        self.log_func = sigmoid
        self.log_grad = sigmoid.gradient

    def loss(self, y, y_pred):
        y_pred = np.clip(y_pred, 1e-15, 1 - 1e-15)
        p = self.log_func(y_pred)
        return y * np.log(p) + (1 - y) * np.log(1 - p)
  
    def gradient(self, y, y_pred):
        p = self.log_func(y_pred)
        return -(y - p)

    def hessian(self, y, y_pred):
        p = self.log_func(y_pred)
        return p * (1 - p)

    def acc(self, y, y_pred):
        p = self.log_func(y_pred)
        return accuracy_score(np.argmax(y, axis=1), np.argmax(p, axis=1))

class TanH():
    def __call__(self, x):
        return 2 / (1 + np.exp(-2*x)) - 1
    def gradient(self, x):
        return 1 - np.power(self.__call__(x), 2)

class ReLU():
    def __call__(self, x):
        return np.where(x >= 0, x, 0)
    def gradient(self, x):
        return np.where(x >= 0, 1, 0)

class Softmax():
    def __call__(self, x):
        e_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return e_x / np.sum(e_x, axis=-1, keepdims=True)

    def gradient(self, x):
        p = self.__call__(x)
        return p * (1 - p)

In [None]:
class Layer(object):

    def set_input_shape(self, shape):
        self.input_shape = shape

    def layer_name(self):
        return self.__class__.__name__

    def parameters(self):
        return 0

    def forward_pass(self, X, training):
        raise NotImplementedError()

    def backward_pass(self, accum_grad):
        raise NotImplementedError()

    def output_shape(self):
        raise NotImplementedError()

class Dense(Layer):
    def __init__(self, n_units, input_shape=None):
        self.layer_input = None
        self.input_shape = input_shape
        self.n_units = n_units
        self.trainable = True
        self.W = None
        self.w0 = None

    def initialize(self, optimizer):
        # Initialize the weights
        limit = 1 / math.sqrt(self.input_shape[0])
        self.W  = np.random.uniform(-limit, limit, (self.input_shape[0], self.n_units))
        self.w0 = np.zeros((1, self.n_units))
        # Weight optimizers
        self.W_opt  = copy.copy(optimizer)
        self.w0_opt = copy.copy(optimizer)

    def parameters(self):
        return np.prod(self.W.shape) + np.prod(self.w0.shape)

    def forward_pass(self, X, training=True):
        self.layer_input = X
        return X.dot(self.W) + self.w0

    def backward_pass(self, accum_grad):
        W = self.W
        if self.trainable:
            grad_w = self.layer_input.T.dot(accum_grad)
            grad_w0 = np.sum(accum_grad, axis=0, keepdims=True)

            # Update the layer weights
            self.W = self.W_opt.update(self.W, grad_w)
            self.w0 = self.w0_opt.update(self.w0, grad_w0)

        accum_grad = accum_grad.dot(W.T)
        return accum_grad

    def output_shape(self):
        return (self.n_units, )

class Activation(Layer):

    def __init__(self, name):
        self.activation_name = name
        self.activation_func = activation_functions[name]()
        self.trainable = True

    def layer_name(self):
        return "Activation (%s)" % (self.activation_func.__class__.__name__)

    def forward_pass(self, X, training=True):
        self.layer_input = X
        return self.activation_func(X)

    def backward_pass(self, accum_grad):
        return accum_grad * self.activation_func.gradient(self.layer_input)

    def output_shape(self):
        return self.input_shape

class Adam():
    def __init__(self, learning_rate=0.001, b1=0.9, b2=0.999):
        self.learning_rate = learning_rate
        self.eps = 1e-8
        self.m = None
        self.v = None
        # Decay rates
        self.b1 = b1
        self.b2 = b2

    def update(self, w, grad_wrt_w):
        # If not initialized
        if self.m is None:
            self.m = np.zeros(np.shape(grad_wrt_w))
            self.v = np.zeros(np.shape(grad_wrt_w))
        
        self.m = self.b1 * self.m + (1 - self.b1) * grad_wrt_w
        self.v = self.b2 * self.v + (1 - self.b2) * np.power(grad_wrt_w, 2)

        m_hat = self.m / (1 - self.b1)
        v_hat = self.v / (1 - self.b2)

        self.w_updt = self.learning_rate * m_hat / (np.sqrt(v_hat) + self.eps)

        return w - self.w_updt

In [None]:
class ParticleSwarmOptimizedNN():
    """ Particle Swarm Optimization of Neural Network.
        https://visualstudiomagazine.com/articles/2013/12/01/neural-network-training-using-particle-swarm-optimization.aspx 
    """
    def __init__(self, population_size, 
                        model_builder, 
                        inertia_weight=0.8, 
                        cognitive_weight=2, 
                        social_weight=2, 
                        max_velocity=20):
        self.population_size = population_size
        self.model_builder = model_builder
        self.best_individual = None
        # Parameters used to update velocity
        self.cognitive_w = cognitive_weight
        self.inertia_w = inertia_weight
        self.social_w = social_weight
        self.min_v = -max_velocity
        self.max_v = max_velocity

    def _build_model(self, id):
        """ Returns a new individual """
        model = self.model_builder(n_inputs=self.X.shape[1], n_outputs=self.y.shape[1])
        model.id = id
        model.fitness = 0
        model.highest_fitness = 0
        model.accuracy = 0
        # Set intial best as the current initialization
        model.best_layers = copy.copy(model.layers)

        # Set initial velocity to zero
        model.velocity = []
        for layer in model.layers:
            velocity = {"W": 0, "w0": 0}
            if hasattr(layer, 'W'):
                velocity = {"W": np.zeros_like(layer.W), "w0": np.zeros_like(layer.w0)}
            model.velocity.append(velocity)

        return model

    def _initialize_population(self):
        """ Initialization of the neural networks forming the population"""
        self.population = []
        for i in range(self.population_size):
            model = self._build_model(id=i)
            self.population.append(model)

    def _update_weights(self, individual):
        """ Calculate the new velocity and update weights for each layer """
        # Two random parameters used to update the velocity
        r1 = np.random.uniform()
        r2 = np.random.uniform()
        for i, layer in enumerate(individual.layers):
            if hasattr(layer, 'W'):
                # Layer weights velocity
                first_term_W = self.inertia_w * individual.velocity[i]["W"]
                second_term_W = self.cognitive_w * r1 * (individual.best_layers[i].W - layer.W)
                third_term_W = self.social_w * r2 * (self.best_individual.layers[i].W - layer.W)
                new_velocity = first_term_W + second_term_W + third_term_W
                individual.velocity[i]["W"] = np.clip(new_velocity, self.min_v, self.max_v)

                # Bias weight velocity
                first_term_w0 = self.inertia_w * individual.velocity[i]["w0"]
                second_term_w0 = self.cognitive_w * r1 * (individual.best_layers[i].w0 - layer.w0)
                third_term_w0 = self.social_w * r2 * (self.best_individual.layers[i].w0 - layer.w0)
                new_velocity = first_term_w0 + second_term_w0 + third_term_w0
                individual.velocity[i]["w0"] = np.clip(new_velocity, self.min_v, self.max_v)

                # Update layer weights with velocity
                individual.layers[i].W += individual.velocity[i]["W"]
                individual.layers[i].w0 += individual.velocity[i]["w0"]
        
    def _calculate_fitness(self, individual):
        """ Evaluate the individual on the test set to get fitness scores """
        loss, acc = individual.predict(self.X, self.y)
        individual.fitness = 1 / (loss + 1e-8)
        individual.accuracy = acc

    def fit(self, X, y, n_generations):
        """ Will evolve the population for n_generations based on dataset X and labels y"""
        self.X, self.y = X, y

        self._initialize_population()

        # The best individual of the population is initialized as population's first ind.
        self.best_individual = copy.copy(self.population[0])

        for epoch in range(n_generations):
            for individual in self.population:
                self._update_weights(individual)
                self._calculate_fitness(individual)

                # If the current fitness is higher than the individual's previous highest
                # => update the individual's best layer setup
                if individual.fitness > individual.highest_fitness:
                    individual.best_layers = copy.copy(individual.layers)
                    individual.highest_fitness = individual.fitness
                # If the individual's fitness is higher than the highest recorded fitness for the
                # whole population => update the best individual
                if individual.fitness > self.best_individual.fitness:
                    self.best_individual = copy.copy(individual)

            print ("[%d Best Individual - ID: %d Fitness: %.5f, Accuracy: %.1f%%]" % (epoch,
                                                                            self.best_individual.id,
                                                                            self.best_individual.fitness,
                                                                            100*float(self.best_individual.accuracy)))
        return self.best_individual

class NeuralNetwork():
    def __init__(self, optimizer, loss, validation_data=None):
        self.optimizer = optimizer
        self.layers = []
        self.errors = {"training": [], "validation": []}
        self.loss_function = loss()
        self.progressbar = progressbar.ProgressBar(widgets= [
                    'Training: ', progressbar.Percentage(), ' ', progressbar.Bar(marker="-", left="[", right="]"),
                    ' ', progressbar.ETA()])
        self.val_set = None
        if validation_data:
            X, y = validation_data
            self.val_set = {"X": X, "y": y}

    def set_trainable(self, trainable):
        for layer in self.layers:
            layer.trainable = trainable

    def add(self, layer):
        if self.layers:
            layer.set_input_shape(shape=self.layers[-1].output_shape())

        if hasattr(layer, 'initialize'):
            layer.initialize(optimizer=self.optimizer)
        self.layers.append(layer)

    def predict(self, X, y):
        y_pred = self._forward_pass(X, training=False)
        loss = np.mean(self.loss_function.loss(y, y_pred))
        acc = self.loss_function.acc(y, y_pred)

        return loss, acc

    def train_on_batch(self, X, y):
        y_pred = self._forward_pass(X)
        loss = np.mean(self.loss_function.loss(y, y_pred))
        acc = self.loss_function.acc(y, y_pred)
        loss_grad = self.loss_function.gradient(y, y_pred)
        self._backward_pass(loss_grad=loss_grad)
        return loss, acc

    def fit(self, X, y, n_epochs, batch_size):
        for _ in self.progressbar(range(n_epochs)):
            batch_error = []
            for X_batch, y_batch in batch_iterator(X, y, batch_size=batch_size):
                loss, _ = self.train_on_batch(X_batch, y_batch)
                batch_error.append(loss)
            self.errors["training"].append(np.mean(batch_error))
            if self.val_set is not None:
                val_loss, _ = self.predict(self.val_set["X"], self.val_set["y"])
                self.errors["validation"].append(val_loss)

        return self.errors["training"], self.errors["validation"]

    def _forward_pass(self, X, training=True):
        layer_output = X
        for layer in self.layers:
            layer_output = layer.forward_pass(layer_output, training)
        return layer_output

    def _backward_pass(self, loss_grad):
        for layer in reversed(self.layers):
            loss_grad = layer.backward_pass(loss_grad)

    def summary(self, name="Model Summary"):
        print (AsciiTable([[name]]).table)
        print ("Input Shape: %s" % str(self.layers[0].input_shape))
        table_data = [["Layer Type", "Parameters", "Output Shape"]]
        tot_params = 0
        for layer in self.layers:
            layer_name = layer.layer_name()
            params = layer.parameters()
            out_shape = layer.output_shape()
            table_data.append([layer_name, str(params), str(out_shape)])
            tot_params += params
        print (AsciiTable(table_data).table)
        print ("Total Parameters: %d\n" % tot_params)

    def predict(self, X):

        return self._forward_pass(X, training=False)

In [None]:
X, y = datasets.make_classification(n_samples=1000, n_features=10, n_classes=4, n_clusters_per_class=1, n_informative=2)

data = datasets.load_iris()
X = normalize(data.data)
y = data.target
y = to_categorical(y.astype("int"))

# Model builder
def model_builder(n_inputs, n_outputs):    
    model = NeuralNetwork(optimizer=Adam(), loss=CrossEntropy)
    model.add(Dense(16, input_shape=(n_inputs,)))
    model.add(Activation('relu'))
    model.add(Dense(n_outputs))
    model.add(Activation('softmax'))

    return model

# Print the model summary of a individual in the population
print ("")
model_builder(n_inputs=X.shape[1], n_outputs=y.shape[1]).summary()

population_size = 100
n_generations = 10

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, seed=1)

inertia_weight = 0.8
cognitive_weight = 0.8
social_weight = 0.8

print ("Population Size: %d" % population_size)
print ("Generations: %d" % n_generations)
print ("")
print ("Inertia Weight: %.2f" % inertia_weight)
print ("Cognitive Weight: %.2f" % cognitive_weight)
print ("Social Weight: %.2f" % social_weight)
print ("")

model = ParticleSwarmOptimizedNN(population_size=population_size, 
                    inertia_weight=inertia_weight,
                    cognitive_weight=cognitive_weight,
                    social_weight=social_weight,
                    max_velocity=5,
                    model_builder=model_builder)

model = model.fit(X_train, y_train, n_generations=n_generations)

loss, accuracy = model.predict(X_test, y_test)

print ("Accuracy: %.1f%%" % float(100*accuracy))