In [83]:
#Following this article 
#https://medium.com/@zeeshanahmad10809/train-in-no-time-numpy-vectorized-particle-swarm-optimization-af105be68f25

In [84]:
#Implementation with arrays

In [85]:
import numpy as np
import time as ti

#This is a PSO(interia weight) variation...
class Particle:
    """
    Particle class represents a solution inside a pool(Swarm).
    """
    def __init__(self, no_dim, x_range, v_range):
        """
        Particle class constructor
        :param no_dim: int
            No of dimensions.
        :param x_range: tuple(double)
            Min and Max value(range) of dimension.
        :param v_range: tuple(double)
            Min and Max value(range) of velocity.
        """
        self.x = np.random.uniform(x_range[0], x_range[1], (no_dim, )) #particle position in each dimension...
        self.v = np.random.uniform(v_range[0], v_range[1], (no_dim, )) #particle velocity in each dimension...
        self.pbest = np.inf
        self.pbestpos = np.zeros((no_dim, ))

In [86]:
class Swarm:
    """
    Swarm class represents a pool of solution(particle).
    """
    def __init__(self, no_particle, no_dim, x_range, v_range, iw_range, c):
        """
        Swarm class constructor.
        :param no_particle: int
            No of particles(solutions).
        :param no_dim: int
            No of dimensions.
        :param x_range: tuple(double)
            Min and Max value(range) of dimension.
        :param v_range: tuple(double)
            Min and Max value(range) of velocity.
        :param iw_range: tuple(double)
            Min and Max value(range) of interia weight.
        :param c: tuple(double)
            c[0] -> cognitive parameter, c[1] -> social parameter.
        """
        self.p = np.array([Particle(no_dim, x_range, v_range) for i in range(no_particle)])
        self.gbest = np.inf
        self.gbestpos = np.zeros((no_dim, ))
        self.x_range = x_range
        self.v_range = v_range
        self.iw_range = iw_range
        self.c0 = c[0]
        self.c1 = c[1]
        self.no_dim = no_dim
        
    def optimize(self, function, print_step,  iter):
        """
        optimize is used start optimization.
        :param function: function
            Function to be optimized.
        :param print_step: int
            Print pause between two adjacent prints.
        :param iter: int
            No of iterations.
        """
        for i in range(iter):
            for particle in self.p:
                fitness = function(particle.x)

                if fitness < particle.pbest:
                    particle.pbest = fitness
                    particle.pbestpos = particle.x.copy()

                if fitness < self.gbest:
                    self.gbest = fitness
                    self.gbestpos = particle.x.copy()


            for particle in self.p:
                #Here iw is inertia weight...
                iw = np.random.uniform(self.iw_range[0], self.iw_range[1], 1)[0]
                particle.v = iw * particle.v + (self.c0 * np.random.uniform(0.0, 1.0, (self.no_dim, )) * \
                (particle.pbestpos - particle.x)) + (self.c1 * np.random.uniform(0.0, 1.0, (self.no_dim, )) \
                * (self.gbestpos - particle.x))
                #particle.v = particle.v.clip(min=self.v_range[0], max=self.v_range[1])
                particle.x = particle.x + particle.v
                #particle.x = particle.x.clip(min=self.x_range[0], max=self.x_range[1])

            if i % print_step == 0:
                print('iteration#: ', i+1,  ' fitness: ', fitness)

        print("global best fitness: ", self.gbest)
        
    def get_best_solution(self):
        '''
        :return: array of parameters/weights.
        '''
        return self.gbestpos

In [87]:
def sphere(x):
    """
    sphere is an objective function used to test
    optimization algorithms.

    :param particle: 1d Numpy Array of Particle
        List of position of particle in all dimensions.
    :return: double
        Calculated value of objective function.
    """
    _sum = 0.0
    for x_d in x:
        _sum = _sum + x_d ** 2

    return _sum

In [None]:
no_particle = 100
x_range = (-5.12,5.12)
v_range = (-2.0,2.0)
iw_range = (0.4,0.9)
c = (1.49, 1.49)
s = Swarm(no_particle, no_dim,x_range,v_range,iw_range,c)

function = sphere
print_step = 1000
iterations = 5000

dimensions = [100, 1000, 10000]
for no_dim in dimensions:
    print(f'n = {no_dim}')
    s = Swarm(no_particle, no_dim,x_range,v_range,iw_range,c)
    tic = ti.time()
    s.optimize(function, print_step, iterations)
    toc = ti.time()
    print("Printing best solution....")
#     print(s.get_best_solution())
    print(f'Time to compute: {toc-tic}')
    print('---------------------------------------------')

n = 100
iteration#:  1  fitness:  827.0726592005346
iteration#:  1001  fitness:  0.9437802389191051
iteration#:  2001  fitness:  0.3429755169035865
iteration#:  3001  fitness:  0.04883144777826503
iteration#:  4001  fitness:  0.0007088175015016839
global best fitness:  0.0001889493575204673
Printing best solution....
Time to compute: 33.04661703109741
---------------------------------------------
n = 1000
iteration#:  1  fitness:  9288.731045398843
iteration#:  1001  fitness:  2360.7147313155538
iteration#:  2001  fitness:  2199.2636945135723
iteration#:  3001  fitness:  1981.5612018740535
iteration#:  4001  fitness:  1921.6127234567878
global best fitness:  1864.0020948263812
Printing best solution....
Time to compute: 252.62561511993408
---------------------------------------------
n = 10000
iteration#:  1  fitness:  86596.76325912282
iteration#:  1001  fitness:  54194.85283815496


In [None]:
#Implementation with numpy vectorization

In [None]:
class Particle:
    """
    Particle class represents a solution inside a pool(Swarm).
    """
    def __init__(self, dim_shape, x_range, v_range):
        """
        Particle class constructor

        :param dim_shape: tuple(no_dim, )
            Shape of x(position), v(velocity).
        :param x_range: tuple(double)
            Min and Max value(range) of dimension.
        :param v_range: tuple(double)
            Min and Max value(range) of velocity.
        """
        self.x = np.random.uniform(x_range[0], x_range[1], dim_shape)
        self.v = np.random.uniform(v_range[0], v_range[1], dim_shape)
        self.pbest = np.inf
        self.pbestpos = np.zeros(dim_shape)

In [None]:
class Swarm:
    """
    Swarm class represents a pool of solution(particle).
    """
    def __init__(self, no_particle, dim_shape, x_range, v_range, iw_range, c):
        """
        Swarm class constructor.

        :param no_particle: int
            No of particles(solutions).
        :param dim_shape:  tuple(no_dim, )
            Shape of x(position), v(velocity).
        :param x_range: tuple(double)
            Min and Max value(range) of dimension.
        :param v_range: tuple(double)
            Min and Max value(range) of velocity.
        :param iw_range: tuple(double)
            Min and Max value(range) of interia weight.
        :param c: tuple(double)
            c[0] -> cognitive parameter, c[1] -> social parameter.
        """
        self.p = np.array([Particle(dim_shape, x_range, v_range) for i in range(no_particle)])
        self.gbest = np.inf
        self.gbestpos = np.zeros(dim_shape)
        self.x_range = x_range
        self.v_range = v_range
        self.c0 = c[0]
        self.c1 = c[1]
        self.iw_range = iw_range
        self.dim_shape = dim_shape
        self.update_particle_pos = None
        self.update_particle_vel = None
        
    def _update_particle_pos(self, p,  fitness):
        """
        It updates particle position.
        :param p: Particle
            Particle to updated position.
        :param fitness: double
            Fitness value or loss(to be optimized).
        :return: Particle
            Updated Particle.
        """
        if fitness < p.pbest:
            p.pbest = fitness
            p.pbestpos = p.x

        return p
    
    def _update_particle_vel(self, p):
        """
        It updates velocity of a particle.
        It is used by optimize function.
        :param p: Particle
            Particle to update velocity.
        :return: Particle
            Particle with updated velocity.
        """
        iw = np.random.uniform(self.iw_range[0], self.iw_range[1], self.dim_shape)
        p.v = iw * p.v + (self.c0 * np.random.uniform(0.0, 1.0, self.dim_shape) *\
        (p.pbestpos - p.x)) + (self.c1 * np.random.uniform(0.0, 1.0, self.dim_shape) * (self.gbestpos - p.x))
        p.v = p.v.clip(min=self.v_range[0], max=self.v_range[1])
        p.x = p.x + p.v
        p.x = p.x.clip(min=self.x_range[0], max=self.x_range[1])
        return p
    
    def optimize(self, function, print_step, iter):
        """
        optimize is used start optimization.

        :param function: function
            Function to be optimized.
        :param print_step: int
            Print pause between two adjacent prints.
        :param iter: int
            No of iterations.
        """
        function = np.vectorize(function)
        self.update_particle_pos = np.vectorize(self._update_particle_pos)
        self.update_particle_vel =  np.vectorize(self._update_particle_vel)


        for i in range(iter):
            fitness = function(self.p)
            self.p = self.update_particle_pos(self.p, fitness)
            min_fitness = fitness[np.argmin(fitness)]
            if min_fitness < self.gbest:
                self.gbest = min_fitness
                self.gbestpos = self.p[np.argmin(fitness)].x


            self.p = self.update_particle_vel(self.p)

            if i % print_step == 0:
                print("Iteration: ", i, " Loss/gbest: ", self.gbest, "Fitness: ", min_fitness)

        print("global best fitness: ", self.gbest)
    def get_best_solution(self):
        '''
        :return: array of parameters/weights.
        '''
        return self.gbestpos

In [None]:
def sphere(particle):
    """
    sphere is an objective function used to test
    optimization algorithms.

    :param particle: 1d Numpy Array of Particle
        List of position of particle in all dimensions.
    :return: double
        Calculated value of objective function.
    """
    _sum = 0.0
    for _x in particle.x:
        _sum = _sum + _x**2
    
    return _sum

In [None]:
for no_dim in dimensions:
    print(f'n = {no_dim}')
    s = Swarm(no_particle, (no_dim,) ,x_range,v_range,iw_range,c)
    tic = ti.time()
    s.optimize(function, print_step, iterations)
    toc = ti.time()
    print("Printing best solution....")
    #print(s.get_best_solution())
    print(f'Time to compute: {toc-tic}')
    print('---------------------------------------------')