## Particle Swarm Optimizer 
### code from scikit-opt (https://github.com/guofei9987/scikit-opt/tree/master/sko)

In [1]:
import pandas as pd
import numpy as np
import torch
import matplotlib.pyplot as plt
import os

import multiprocessing
from pathos.pools import ProcessPool


In [2]:
inp = np.ones((64, 100)) # input signal shape from matlab
tar = np.zeros_like(inp) # target signal shape from matlab
learnable_params = np.ones(100) # parameters from S-Functions
epoch = 50

def s_function(inp, params): # S-Function
    return inp * params

# sample = s_function(inp, learnable_params)
# print(sample.shape)


In [26]:
# todo : set upper bound and lower bound different for each parameters

class PSO():
    
    def __init__(self, func, n_dim=None, pop=40, max_iter=150, lb=-1e5, ub=1e5, w=0.8, c1=0.5, c2=0.5,
                 constraint_eq=tuple(), constraint_ueq=tuple(), verbose=False
                 , dim=None, data = tuple()):

        n_dim = n_dim or dim  # support the earlier version

        self.func = func
        self.w = w  # inertia
        self.cp, self.cg = c1, c2  # parameters to control personal best, global best respectively
        self.pop = pop  # number of particles
        self.n_dim = n_dim  # dimension of particles, which is the number of variables of func
        self.max_iter = max_iter  # max iter
        self.verbose = verbose  # print the result of each iter or not
        # self.input = data[0]
        self.input = np.tile(data[0], (self.pop, 1, 1))
        self.target = data[1]
        # self.target = np.tile(data[1], (self.pop, 1, 1))

        self.lb, self.ub = np.array(lb) * np.ones(self.n_dim), np.array(ub) * np.ones(self.n_dim)
        assert self.n_dim == len(self.lb) == len(self.ub), 'dim == len(lb) == len(ub) is not True'
        assert np.all(self.ub > self.lb), 'upper-bound must be greater than lower-bound'

        self.has_constraint = bool(constraint_ueq)
        self.constraint_ueq = constraint_ueq
        self.is_feasible = np.array([True] * pop)

        self.X = np.random.uniform(low=self.lb, high=self.ub, size=(self.pop, self.n_dim))
        v_high = self.ub - self.lb
        self.V = np.random.uniform(low=-v_high, high=v_high, size=(self.pop, self.n_dim))  # speed of particles
        self.Y = self.cal_y()  # y = f(x) for all particles
        self.pbest_x = self.X.copy()  # personal best location of every particle in history
        self.pbest_y = np.array([[np.inf]] * pop)  # best image of every particle in history
        self.gbest_x = self.pbest_x.mean(axis=0).reshape(1, -1)  # global best location for all particles
        self.gbest_y = np.inf  # global best y for all particles
        self.gbest_y_hist = []  # gbest_y of every iteration
        self.update_gbest()

        # record verbose values
        self.record_mode = True
        self.record_value = {'X': [], 'V': [], 'Y': []}
        self.best_x, self.best_y = self.gbest_x, self.gbest_y  # history reasons, will be deprecated

    def check_constraint(self, x):
        # gather all unequal constraint functions
        for constraint_func in self.constraint_ueq:
            if constraint_func(x) > 0:
                return False
        return True

    def update_V(self):
        r1 = np.random.rand(self.pop, self.n_dim)
        r2 = np.random.rand(self.pop, self.n_dim)
        self.V = self.w * self.V + \
                 self.cp * r1 * (self.pbest_x - self.X) + \
                 self.cg * r2 * (self.gbest_x - self.X)

    def update_X(self):
        self.X = self.X + self.V
        self.X = np.clip(self.X, self.lb, self.ub)

    def cal_y(self):
        # calculate y for every x in X
        # self.Y = self.func(self.X).reshape(-1, 1)
        # self.Y = np.sum(np.abs(self.func(self.input, self.X).flatten() - self.target.flatten())) / len(self.target)

        p = ProcessPool(nodes=os.cpu_count())
        output = p.map(self.func, self.input, self.X)
        self.Y = np.expand_dims(np.array([np.mean(np.abs(self.target - i)) for i in output]), axis=1)

        # output = []
        # for idx in range(len(self.X)):
        #     p = multiprocessing.Process(target=self.func, args=(self.X[idx], self.input[idx]))
        #     p.start()
        #     output.append(p)
        # for p in output:
        #     p.join()
        # self.Y = np.expand_dims(np.array([np.mean(np.abs(self.target - i)) for i in output]), axis=1)

        return self.Y



    def update_pbest(self):
        '''
        personal best
        :return:
        '''
        self.need_update = self.pbest_y > self.Y
        for idx, x in enumerate(self.X):
            if self.need_update[idx]:
                self.need_update[idx] = self.check_constraint(x)

        self.pbest_x = np.where(self.need_update, self.X, self.pbest_x)
        self.pbest_y = np.where(self.need_update, self.Y, self.pbest_y)

    def update_gbest(self):
        '''
        global best
        :return:
        '''
        idx_min = self.pbest_y.argmin()
        if self.gbest_y > self.pbest_y[idx_min]:
            self.gbest_x = self.X[idx_min, :].copy()
            self.gbest_y = self.pbest_y[idx_min]

    def recorder(self):
        if not self.record_mode:
            return
        self.record_value['X'].append(self.X)
        self.record_value['V'].append(self.V)
        self.record_value['Y'].append(self.Y)

    def run(self, max_iter=None, precision=None, N=20):
        '''
        precision: None or float
            If precision is None, it will run the number of max_iter steps
            If precision is a float, the loop will stop if continuous N difference between pbest less than precision
        N: int
        '''
        self.max_iter = max_iter or self.max_iter
        c = 0
        for iter_num in range(self.max_iter):
            self.update_V()
            self.recorder()
            self.update_X()
            self.cal_y()
            self.update_pbest()
            self.update_gbest()
            if precision is not None:
                tor_iter = np.amax(self.pbest_y) - np.amin(self.pbest_y)
                if tor_iter < precision:
                    c = c + 1
                    if c > N:
                        break
                else:
                    c = 0
            if self.verbose and ((iter_num % (self.max_iter // 10) == 0) or (iter_num+1 == self.max_iter)):
                print('Iter: {}, Best fit: {}'.format(iter_num, self.gbest_y))
                # print('Iter: {}, Best fit: {} at {}'.format(iter_num, self.gbest_y, self.gbest_x))

            self.gbest_y_hist.append(self.gbest_y)
        self.best_x, self.best_y = self.gbest_x, self.gbest_y
        return self.best_x, self.best_y



In [34]:
pso = PSO(func=s_function, n_dim=len(learnable_params), pop=300, max_iter=100, lb=-2, ub=2, w=0.75, c1=1, c2=1, data=(inp, tar), verbose=True)
learned_params, best_loss = pso.run(max_iter=100)


Iter: 0, Best fit: [1.15044116]
Iter: 10, Best fit: [0.82307478]
Iter: 20, Best fit: [0.70760319]
Iter: 30, Best fit: [0.60291284]
Iter: 40, Best fit: [0.52875411]
Iter: 50, Best fit: [0.47711216]
Iter: 60, Best fit: [0.44518207]
Iter: 70, Best fit: [0.42745285]
Iter: 80, Best fit: [0.41032155]
Iter: 90, Best fit: [0.39881566]
Iter: 99, Best fit: [0.39171101]


In [43]:
# Second Generation of PSO
class SGPSO():
    
    def __init__(self, func, n_dim=None, pop=40, max_iter=150, lb=-1e5, ub=1e5, w=0.8, c1=0.5, c2=0.5, c3=0.5,
                 constraint_eq=tuple(), constraint_ueq=tuple(), verbose=False
                 , dim=None, data = tuple(), p_interval = 0):

        n_dim = n_dim or dim  # support the earlier version

        self.func = func
        self.w = w  # inertia
        self.cp, self.cg, self.cc = c1, c2, c3 # parameters to control personal best, global best respectively
        self.pop = pop  # number of particles
        self.n_dim = n_dim  # dimension of particles, which is the number of variables of func
        self.max_iter = max_iter  # max iter
        self.verbose = verbose  # print the result of each iter or not
        # self.input = data[0]
        self.input = np.tile(data[0], (self.pop, 1, 1))
        self.target = data[1]
        # self.target = np.tile(data[1], (self.pop, 1, 1))
        if not p_interval:
            self.p_interval = self.max_iter // 10
        else:
            self.p_interval = p_interval

        self.lb, self.ub = np.array(lb) * np.ones(self.n_dim), np.array(ub) * np.ones(self.n_dim)
        assert self.n_dim == len(self.lb) == len(self.ub), 'dim == len(lb) == len(ub) is not True'
        assert np.all(self.ub > self.lb), 'upper-bound must be greater than lower-bound'

        self.has_constraint = bool(constraint_ueq)
        self.constraint_ueq = constraint_ueq
        self.is_feasible = np.array([True] * pop)

        self.X = np.random.uniform(low=self.lb, high=self.ub, size=(self.pop, self.n_dim))
        v_high = self.ub - self.lb
        self.V = np.random.uniform(low=-v_high, high=v_high, size=(self.pop, self.n_dim))  # speed of particles
        self.Y = self.cal_y()  # y = f(x) for all particles
        self.pbest_x = self.X.copy()  # personal best location of every particle in history
        self.pbest_y = np.array([[np.inf]] * pop)  # best image of every particle in history
        self.gbest_x = self.pbest_x.mean(axis=0).reshape(1, -1)  # global best location for all particles
        self.gbest_y = np.inf  # global best y for all particles
        self.gbest_y_hist = []  # gbest_y of every iteration
        self.update_gbest()

        self.centre = np.sum(self.pbest_x, axis=0) / self.pop

        # record verbose values
        self.record_mode = True
        self.record_value = {'X': [], 'V': [], 'Y': []}
        self.best_x, self.best_y = self.gbest_x, self.gbest_y  # history reasons, will be deprecated

    def check_constraint(self, x):
        # gather all unequal constraint functions
        for constraint_func in self.constraint_ueq:
            if constraint_func(x) > 0:
                return False
        return True

    def update_V(self):
        r1 = np.random.rand(self.pop, self.n_dim)
        r2 = np.random.rand(self.pop, self.n_dim)
        r3 = np.random.rand(self.pop, self.n_dim)
        self.V = self.w * self.V + \
                 self.cp * r1 * (self.pbest_x - self.X) + \
                 self.cg * r2 * (self.gbest_x - self.X) + \
                 self.cc * r3 * (self.centre - self.X)

    def update_X(self):
        self.X = self.X + self.V
        self.X = np.clip(self.X, self.lb, self.ub)

    def cal_y(self):
        p = ProcessPool(nodes=os.cpu_count())
        output = p.map(self.func, self.input, self.X)
        self.Y = np.expand_dims(np.array([np.mean(np.abs(self.target - i)) for i in output]), axis=1)

        return self.Y
    
    def update_centre(self):
        self.centre = np.sum(self.pbest_x, axis=0) / self.pop
        return 

    def update_pbest(self):
        '''
        personal best
        :return:
        '''
        self.need_update = self.pbest_y > self.Y
        for idx, x in enumerate(self.X):
            if self.need_update[idx]:
                self.need_update[idx] = self.check_constraint(x)

        self.pbest_x = np.where(self.need_update, self.X, self.pbest_x)
        self.pbest_y = np.where(self.need_update, self.Y, self.pbest_y)

    def update_gbest(self):
        '''
        global best
        :return:
        '''
        idx_min = self.pbest_y.argmin()
        if self.gbest_y > self.pbest_y[idx_min]:
            self.gbest_x = self.X[idx_min, :].copy()
            self.gbest_y = self.pbest_y[idx_min]

    def recorder(self):
        if not self.record_mode:
            return
        self.record_value['X'].append(self.X)
        self.record_value['V'].append(self.V)
        self.record_value['Y'].append(self.Y)

    def run(self, max_iter=None, precision=None, N=20, p_interval=0):
        '''
        precision: None or float
            If precision is None, it will run the number of max_iter steps
            If precision is a float, the loop will stop if continuous N difference between pbest less than precision
        N: int
        '''
        self.max_iter = max_iter or self.max_iter
        if p_interval:
            self.p_interval=p_interval
        c = 0
        for iter_num in range(self.max_iter):
            self.update_V()
            self.recorder()
            self.update_X()
            self.cal_y()
            self.update_pbest()
            self.update_gbest()
            if iter_num % self.p_interval == 0:
                self.update_centre()
                
            if precision is not None:
                tor_iter = np.amax(self.pbest_y) - np.amin(self.pbest_y)
                if tor_iter < precision:
                    c = c + 1
                    if c > N:
                        break
                else:
                    c = 0
            if self.verbose and ((iter_num % (self.max_iter // 10) == 0) or (iter_num+1 == self.max_iter)):
                print('Iter: {}, Best fit: {}'.format(iter_num, self.gbest_y))
                # print('Iter: {}, Best fit: {} at {}'.format(iter_num, self.gbest_y, self.gbest_x))

            self.gbest_y_hist.append(self.gbest_y)
        self.best_x, self.best_y = self.gbest_x, self.gbest_y
        return self.best_x, self.best_y


In [40]:
pso = SGPSO(func=s_function, n_dim=len(learnable_params), pop=300, max_iter=100, lb=-2, ub=2, w=0.85, c1=1, c2=1, c3=1, data=(inp, tar), verbose=True)
print(pso.p_interval)
learned_params, best_loss = pso.run(max_iter=1000)


10
Iter: 0, Best fit: [1.20792043]
Iter: 100, Best fit: [0.06850617]
Iter: 200, Best fit: [0.02869607]
Iter: 300, Best fit: [0.01509811]
Iter: 400, Best fit: [0.00996598]
Iter: 500, Best fit: [0.00723233]
Iter: 600, Best fit: [0.00676323]
Iter: 700, Best fit: [0.00631957]
Iter: 800, Best fit: [0.00592183]
Iter: 900, Best fit: [0.00545455]
Iter: 999, Best fit: [0.00245854]


In [46]:
pso = SGPSO(func=s_function, n_dim=len(learnable_params), pop=300, max_iter=100, lb=0, ub=2, w=0.85, c1=1, c2=1, c3=1, data=(inp, tar), verbose=True)
learned_params, best_loss = pso.run(max_iter=1000, p_interval=10)

Iter: 0, Best fit: [0.79144832]
Iter: 100, Best fit: [0.0150517]
Iter: 200, Best fit: [0.00025988]
Iter: 300, Best fit: [3.58601087e-06]
Iter: 400, Best fit: [3.16949477e-08]
Iter: 500, Best fit: [3.86051036e-10]
Iter: 600, Best fit: [4.8293239e-12]
Iter: 700, Best fit: [4.943345e-14]
Iter: 800, Best fit: [4.69617517e-16]
Iter: 900, Best fit: [4.32417676e-18]
Iter: 999, Best fit: [2.69675308e-20]


In [48]:
import gc
gc.collect(generation=2)

24