# Particle Swarm Optimization

We have a $m$-dimensional sarch space, usually $\mathbb{R}^m$.
The swarm is composed of $n$ particles, the $i$-th particle is charachterized by:

- the position $x_{i}(t)$ at time $t$;

- the velocity $v_{i}(t)$ at time $t$.


Suppose that our system is updated at each discrte time step $\Delta t$, the position of each particle is updated as
$$
    x_i(t+\Delta t) = x_i(t) + \Delta t \;v_i(t)
$$

If we assume that $\Delta t = 1$, we have that
$$
    x_i(t+1) = x_i(t) + \; v_i(t)
$$

![v_commponents](img/pso/v_components.png)

The velocity of each particle is updated as

$$
    v_i(t+\Delta t) = w_{\text{inertia}} \; v_i(t) + w_{\text{social}} \; r_1 \odot (g_i - x_i(t)) + w_{\text{cognitive}} \; r_2  \odot (b_i - x_i(t))
$$

where $\odot$ denotes the Hadamard product (element-wise product) and $r_1, r_2$ are random vectors in $[0,1]^m$.

It is possible to define a behaviour of the particle when it reaches the boundaries of the search space.

![boundary conditions](img/pso/boundary.png)

# PSO for solving Auckely function

In [52]:
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from ipywidgets import interact

np.random.seed(0)


In [53]:
class PSO2d():
    def __init__(self, boundaries, max_velocities, fitness_fun):
        # Attributes of the problem
        self.num_dimensions = 2
        self.boundaries = boundaries
        self.fitness_fun = fitness_fun

        # Parameters of the algorithm
        self.swarm_size = None
        self.max_velocities = max_velocities
        self.c_social = None
        self.c_cognitive = None
        self.w_inertia = None
        self.n_iter = None

        # State of the swarm
        self.positions = None
        self.velocities = None
        self.local_best_positions = None #The local best of each particle is the best position it found so far

        #Results
        self.history = None
        self.global_best = None
    
    def initialize_swarm(self):
        '''
            Method to randomly initialize the swarm.
        '''
        self.positions = [np.array([np.random.random() * (b[1] - b[0]) + b[0] 
                                    for b in self.boundaries])
                          for _ in range(0, self.swarm_size)]
        
        self.velocities = [np.array([np.random.choice([-1,1]) * np.random.uniform(v[0],v[1]) 
                                     for v in self.max_velocities])
                           for _ in range(0, self.swarm_size)]
        
        self.local_best_positions = self.positions
        self.global_best = min(self.positions, key=self.fitness_fun)
        self.history = [self.positions]

    
    def update_position(self, position, velocity):
        '''
            Method to update the position of a particle.
            The damping boundary condition is implemented here.
            It returns the updated position.
        '''
        #Update the position
        new_position = position + velocity
        #Check if the new position is within the boundaries
        for i, b in enumerate(self.boundaries):
            if new_position[i] < b[0]:
                new_position[i] = b[0]
                velocity[i] = - np.random.random() * velocity[i]
            elif new_position[i] > b[1]:
                new_position[i] = b[1]
                velocity[i] = - np.random.random() * velocity[i]
        return new_position
    
    def update_velocity(self, position, velocity, local_best):
        '''
            Method to update the velocity of a particle.
            If the new velocity is outside the boundaries, it is set to the boundary value.
            It returns the update velocity.
        '''
        #Update the velocity
        r1 = np.random.random(self.num_dimensions)
        r2 = np.random.random(self.num_dimensions)
        social_component = self.c_social * r1 * (self.global_best - position)
        cognitive_component = self.c_cognitive * r2 * (local_best - position)
        inertia = self.w_inertia*velocity
        new_velocity = inertia + social_component + cognitive_component
        #Check if the new velocity is within the boundaries
        for i, v in enumerate(self.max_velocities):
            if np.abs(new_velocity[i]) < v[0]:
                new_velocity[i] = np.sign(new_velocity[i]) * v[0]
            elif np.abs(new_velocity[i]) > v[1]:
                new_velocity[i] = np.sign(new_velocity[i]) * v[1]
        
        return new_velocity

    def plot_evolution(self):
        '''
            Method to plot the evolution of the swarm.
            A slider it's used.
        '''

        def plot_state(time):
            pop = self.history[time]
            ax = plt.figure(figsize=(20,10)).add_subplot(projection='3d')
            xvals = np.linspace(-5, 5, 201)
            yvals = np.linspace(-5, 5, 201)
            xx, yy = np.meshgrid(xvals, yvals)
            z = self.fitness_fun([xx, yy])
            ax.plot_surface(xx, yy, z, antialiased=True, alpha=0.2)
            x_pop = [p[0] for p in pop]
            y_pop = [p[1] for p in pop]
            z_pop = [self.fitness_fun(p) for p in pop]
            ax.scatter(x_pop, y_pop, z_pop, c="red")
            plt.show()
        
        interact(plot_state, time=widgets.IntSlider(min=1,max=len(self.history)-1,step=1,value=0))
        plt.show()

    def run(self, swarm_size, c_cognitive, c_social, w_inertia, n_iter):
        '''
            Method to run the PSO algorithm.
        '''
        #Set the parameters of the algorithm
        self.swarm_size = swarm_size
        self.c_cognitive = c_cognitive
        self.c_social = c_social
        self.w_inertia = w_inertia
        self.n_iter = n_iter

        #Initialize the swarm
        self.initialize_swarm()
        
        for _ in range(self.n_iter):
            #Update the swarm
            self.velocities = [self.update_velocity(p, v, lb)
                               for p, v, lb in zip(self.positions, self.velocities, self.local_best_positions)]
            
            self.positions = [self.update_position(p, v)
                              for p, v in zip(self.positions, self.velocities)]
            
            #Update the local best positions  and global best position
            self.local_best_positions = [min([p, lb], key=self.fitness_fun)
                                         for p, lb in zip(self.positions, self.local_best_positions)]
            
            self.global_best = min([min(self.positions, key=self.fitness_fun), self.global_best], key=self.fitness_fun)

            #Update the history
            self.history.append(self.positions)

Define the fitnessness function

In [54]:
def ackley(x, y):
    return -20 * np.exp(-0.2 * np.sqrt(0.5 * (x**2 + y**2))) - np.exp(0.5 * (np.cos(2 * np.pi * x) + np.cos(2 * np.pi * y))) + np.e + 20

Run the PSO

In [55]:
pso_instance = PSO2d(boundaries=[(-5,5), (-5,5)], max_velocities=[(-1,1), (-1,1)], fitness_fun= lambda x: ackley(*x))
pso_instance.run(swarm_size=100, c_cognitive=0.1, c_social=0.1, w_inertia=0.1, n_iter=100)

In [56]:
pso_instance.plot_evolution()

interactive(children=(IntSlider(value=1, description='time', min=1), Output()), _dom_classes=('widget-interact…