# **Particle Swarm optimization**
class Particle:
- Define the particle object with the **__init__**. Every particle must have a position, a velocity.
- Define a function, called **FitnessCalculator**, that computes the fitness value of the particle. Its indipendent variable is the **accuracy metric** of the neural network.
- Define a function, called **PositionCalculator**
- Define a function, called **VelocityCalculator**


In [117]:
import numpy as np
import random

In [118]:
class Particle:

    def __init__(self, position, velocity ):
        '''position and velocity are vectors (lists)'''

        self.position = position
        self.velocity = velocity
        self.bestp = position

        self.iteration = 0


    def FitnessCalculator(self, position, accuracy):
        '''It takes as input the model and the parameter (which isthe particle position).
         Calculates the accuracy (or loss, we need to decide) of the model and return it. '''

        # Ho messo model.accuracy, poi quando definiamo il modello credo qui vada messa la parte proprio del training per ritornare poi l'accuracy o la loss in base a come vogliamo fare noi.

        self.fitness = accuracy(position)
        return self

    def inertia_coefficient(self, c1, c2, random_1, random_2, max_iter = None, old_w = None, schedule_type = 'constant'):
        
        min_w = (c1+c2)*(1/2)-1
        
        if schedule_type == 'costant':
            # w is set to be constant
            w = c1*random_1 + c2*random_2

        if schedule_type == 'random':
            # w is randomly selected ad each iteration from a gaussian distribution with center 0.72 and σ small enough to ensure that w is not predominantly greater than one
            w = np.random.normal(0.72, 0.4)

        if schedule_type == 'linearly decreasing':
            if max_iter != None:
                w = ((0.9-min_w)*(max_iter-self.iteration)/max_iter) + min_w
            else: 
                raise Exception('ERROR YOU MUST SPECIFY THE MAXIMUM NUMBER OF ITERATION')
        
        if schedule_type == 'nonlinearly decreasing':
            if old_w != None:
                w = 0.975*old_w
            else: 
                raise Exception('ERROR YOU MUST SPECIFY W AT PREVIOUS ITERATION')
        
        if schedule_type not in ['constant','random','linearly decreasing', 'nonlinearly decreasing']:
            raise Exception('You must specify a valid type for w')

        # w > min_w guarantees convergent particle trajectories. If this condition is not satisfied, divergent or cyclic behavior may occur.
        if w < min_w:
            w = min_w
        return w


    def VelocityCalculator(self, c1, c2, best_glob_pos, w = 0.9, v_max = None):

        random_1 = np.random.random(len(self.position))
        random_2 = np.random.random(len(self.position))

        velocity = w*self.velocity + c1*random_1*(self.bestp - self.position) + c2*random_2*(best_glob_pos - self.position)

        # velocity quickly explodes to large values, especially for particles far from the neighborhood best and personal best positions. Consequently, particles have large position updates, which result in particles leaving the boundaries of the search space – the particles diverge. To control the global exploration of particles, velocities are clamped to stay within boundary constraints.

        if v_max != None:
            new_velocity = np.zeros(len(velocity)) #I inizialize an array with all zeros and then I change the elements

            for i in range (len(velocity)):
                if velocity[i] > v_max[i]:
                    new_velocity[i] = v_max[i]
                else:
                    new_velocity[i] = velocity[i]
            self.velocity = new_velocity

        else:
            self.velocity = velocity
        
        #let's update w
        w = self.inertia_coefficient(self, c1, c2, random_1, random_2, old_w = w, schedule_type = 'nonlinearly decreasing')

        return self


    def PositionCalculator(self, new_vel):
        
        self.iteration += 1

        self.position = self.position + new_vel
        return self

    def BestLocal(self, problem):
        '''Takes as input the particle and the type of optimization problem (problem could be minimum or maximum) and calculates best fitness and best position'''
        if self.iteration == 0:
            self.bestfit = self.fitness

        if problem == 'minimum':
            if self.fitness < self.bestfit:
                self.bestfit = self.fitness
                self.bestp = self.position
        elif problem == 'maximum':
            if self.fitness > self.bestfit:
                self.bestfit = self.fitness
                self.bestp = self.position
        else:
            return "Error! problem must be: 'minimum' or 'maximum'"    
        return self

In [119]:
def InitializeSwarm(swarm_size, dimensionality, lower_bound, upper_bound):
    '''It takes as input the swarm size (number of particles I want to create, the dimensionality of the swarm (number of parameter for each particle) and the lower and upper bound of the parameter (that are two lists))'''

    #np.random.seed(3)
    # Usually the positions of particles are initialized to uniformly cover the search space
    swarm_list = []
    for particle in range(swarm_size):
        position = np.random.uniform(lower_bound, upper_bound, dimensionality)

        velocity = np.zeros(dimensionality)
        # velocity = np.random.random(dimensionality)

        part = Particle(position, velocity)
        swarm_list.append(part)

    return swarm_list

In [120]:
# Let's first define the function we want to use for the evaluation:

def f(lista):
    '''Definisco una funzione f che prende in input una lista di tre elementi (ovviamente va cambiata se cambio la swarm size)'''
    return(lista[0]**2+lista[1]**2+lista[2]**2)

In [121]:
def global_optimum(swarm):
    '''takes as input a swarm of particles and return the position of global optimum and the value of the fitness function in that global optimum'''

    global_opt = (min([particle.bestfit for particle in swarm]))
    best_global_position = swarm[np.argmin(np.array([particle.bestfit for particle in swarm]))].position

    return best_global_position, global_opt  

In [122]:
def acceleration_coefficient(lower_bound, upper_bound):
    '''It takes as input two vector: lower_bound is a vector containing the minimum element which can assume each parameter and upper_buond contains the maximum. e.g. for parameter j which assume value in range (aj,bj), the j-esimo element of lower_bound will be aj and the j-esimo element of upper_bund will be bj'''

    #random.seed(3)
    a1 = random.random()
    a2 = random.random()

    c1 = a1*(np.array(upper_bound)-np.array(lower_bound))
    c2 = a2*(np.array(upper_bound)-np.array(lower_bound))

    return c1,c2


In [123]:
def PSO_inizialization(swarm_size, dim, evaluation_funct, lower_bound, upper_bound):
    swarm = InitializeSwarm(swarm_size, dim, lower_bound, upper_bound)

    #First we need to evaluate each particle

    for particle in swarm:
        particle.FitnessCalculator(particle.position, evaluation_funct)

        # When we inizialize the swarm we need also to calculate the local optimum:
        particle.BestLocal(problem = 'minimum')

    return(swarm)

In [124]:
def PSO_alg(swarm, lower_bound, upper_bound, v_max, evaluation_funct):

    # First we calculate the starting global best position of the swarm
    best_global_position, global_opt = global_optimum(swarm)

    # Calculate the coefficient needed for updating the velocity of the particles.
    c1, c2 = acceleration_coefficient(lower_bound,upper_bound)

    #w = 0.9

    # set the termination criteria
    criteria_not_reach = True

    while criteria_not_reach:

        for particle in swarm:
            ## We need to calculate the new velocity:",
            particle.VelocityCalculator(c1, c2, best_global_position, v_max = v_max)

            ## Now we can calculate the new position:\n",
            particle.PositionCalculator(particle.velocity)

            ## We need to evaluate again the fitness function\n",
            particle.FitnessCalculator(particle.position, evaluation_funct)

            ## With the new position calculated we have to update the local best position:
            particle.BestLocal(problem = 'minimum')

            #print('New velocity: ', particle.velocity, '\n New position: ', particle.position, '\n New fitness: ', particle.fitness , '\n Best local position: ', particle.bestp, 'Best local fitness', particle.bestfit ,'\n')


            # w /= 2 #at each iteraton w is halved (IN THIS CASE I HAVE SET W TO A FIXED VALUE DIRECTLY INTO THE CLASS PARTICLE )

        # Then we update the global optimum
        best_position, opt = global_optimum(swarm)
        if opt< global_opt:
            global_opt = opt
            best_global_position = best_position
        
        # print('GLOBAL OPTIMUM: ', global_opt, 'GLOBAL OPT POSITION: ', best_global_position)

        # we set the criteria depending on the number of iteration
        if swarm[0].iteration == 200:
            criteria_not_reach = False
    
    return best_global_position, global_opt

In [125]:
def main():
    swarm_size = 10
    dim = 3
    evaluation_funct = f
    lower_bound=[0,0,0]
    upper_bound = [10,10,10]
    v_max = [3,3,3]

    swarm = PSO_inizialization(swarm_size, dim, evaluation_funct, lower_bound, upper_bound)

    best_global_position, global_opt = PSO_alg(swarm, lower_bound, upper_bound, v_max, evaluation_funct)
    
    print('Best position found: ', best_global_position, 'with an evaluation of: ',global_opt)

In [126]:
main()

TypeError: unsupported operand type(s) for +: 'Particle' and 'float'