In [7]:
import numpy as np
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import torch

plt.rcParams['figure.figsize'] = [7, 7]
plt.rcParams['figure.dpi'] = 80
plt.rcParams['font.size'] = 12

def normalize(x, lower, upper):
    return (x - lower) / (upper - lower)

def unnormalize(x, lower, upper):
    return x * (upper - lower) + lower


class ParticleSwarmOptimizer:
    def __init__(self, func, lb, ub, args=(), kwargs={}, swarmsize=100, omega=0.5, phip=0.5, 
                 phig=0.5, maxiter=25, minstep=1e-8, minfunc=1e-8, debug=False, minimize=True, use_net=False,
                 net=None, normalize=False):
        self.func = func
        self.lb = np.array(lb)
        self.ub = np.array(ub)
        self.args = args
        self.kwargs = kwargs
        self.swarmsize = swarmsize
        self.omega = omega
        self.phip = phip
        self.phig = phig
        self.maxiter = maxiter
        self.minstep = minstep
        self.minfunc = minfunc
        self.debug = debug
        self.global_best_values = []
        self.collected_data = []
        self.minimize = minimize
        self.use_net = use_net
        self.net = net
        self.normalize = normalize

        self._validate_inputs()
        self._initialize_swarm()

    def _validate_inputs(self):
        assert len(self.lb) == len(self.ub), 'Lower- and upper-bounds must be the same length'
        assert np.all(self.ub > self.lb), 'All upper-bound values must be greater than lower-bound values'

    def _initialize_swarm(self):
        self.vhigh = np.abs(self.ub - self.lb)
        self.vlow = -self.vhigh

        if not self.normalize:
            self.x = np.random.uniform(low=0, high=1, size=(self.swarmsize, len(self.lb))) * (self.ub - self.lb) + self.lb
        else:
            self.x = np.random.uniform(low=0, high=1, size=(self.swarmsize, len(self.lb)))
        self.v = np.random.uniform(low=self.vlow, high=self.vhigh, size=(self.swarmsize, len(self.lb)))
        self.p = np.copy(self.x)
        self.fp = np.array([self._evaluate_objective(p) for p in unnormalize(self.p, self.lb, self.ub)])
        self.pv = self.fp.copy()
        self.g = self.p[np.argmin(self.fp)] if self.minimize else self.p[np.argmax(self.fp)]
        self.fg = np.min(self.fp) if self.minimize else np.max(self.fp)
        self.fw = np.max(self.fp) if self.minimize else np.min(self.fp)

    def _evaluate_objective(self, x):
        try:
            y =  self.func.evaluate(np.array([x]).reshape(1, -1))
        except Exception as e:
            y = self.func(x, *self.args, **self.kwargs)
        return y.item()

    def _update_velocity(self, i):
        rp = np.random.uniform(size=len(self.lb))
        rg = np.random.uniform(size=len(self.lb))
        self.v[i] = self.omega * self.v[i] + self.phip * rp * (self.p[i] - self.x[i]) + self.phig * rg * (self.g - self.x[i])
        for dim in range(len(self.v[i])):
            self.data_point[f"velocity_{dim}"] = self.v[i][dim]
            self.data_point[f"cognitive_velocity_{dim}"] = (self.p[i][dim] - self.x[i][dim])
            self.data_point[f"social_velocity_{dim}"] = (self.g[dim] - self.x[i][dim])
            self.data_point[f"position_{dim}"] = self.x[i][dim] if not self.normalize else unnormalize(self.x[i][dim], self.lb[dim], self.ub[dim])

    def _update_velocity_with_net(self, i):
        inference_input = []
        for dim in range(len(self.v[i])):
            inference_input.append(self.x[i][dim])
            inference_input.append(self.p[i][dim] - self.x[i][dim])
            inference_input.append(self.g[dim] - self.x[i][dim])
            inference_input.append(self.fp[i])
            inference_input.append(self.fg)
            inference_input.append((self.pv[i] - self.fw) / (self.fg - self.fw))

            inference_input = np.nan_to_num(inference_input, nan=0)
            inference_tensor = torch.tensor(inference_input).float().unsqueeze(0)  # Add batch dimension
            with torch.no_grad():  # Ensure no gradients are computed
                predicted_velocity_tensor = self.net(inference_tensor)
                self.data_point[f"velocity_{dim}"] = predicted_velocity_tensor.item()
            inference_input = []
        self.v[i] = predicted_velocity_tensor.item()

    def _update_position(self, i):
        self.x[i] += self.v[i]
        if not self.normalize:
            self.x[i] = np.clip(self.x[i], self.lb, self.ub)
        else:
            self.x[i] = np.clip(self.x[i], 0, 1)


    def _update_personal_best(self, i):
        fx = self._evaluate_objective(self.x[i]) if not self.normalize else self._evaluate_objective(unnormalize(self.x[i], self.lb, self.ub))
        self.pv[i] = fx
        if self.minimize:
            if fx < self.fp[i]:
                self.p[i] = self.x[i]
                self.fp[i] = fx
        else:
            if fx > self.fp[i]:
                self.p[i] = self.x[i]
                self.fp[i] = fx
        self.data_point["personal_best"] = self.fp[i]

    def _update_global_best(self, i):
        if self.minimize:
            if self.fp[i] < self.fg:
                self.g = self.p[i]
                self.fg = self.fp[i]
                if self.debug:
                    print(f'New best for swarm at iteration {self.it}: {self.g} {self.fg}')
            if self.fp[i] > self.fw:
                self.fw = self.fp[i]
        else:
            if self.fp[i] > self.fg:
                self.g = self.p[i]
                self.fg = self.fp[i]
                if self.debug:
                    print(f'New best for swarm at iteration {self.it}: {self.g} {self.fg}')
            if self.fp[i] < self.fw:
                self.fw = self.fp[i]
        self.data_point["global_best"] = self.fg

    def optimize(self):
        self.it = 1
        self.pos_history = np.array([self.x]) if not self.normalize else np.array([unnormalize(self.x, self.lb, self.ub)])
        self.history = [self.fg]
        while self.it <= self.maxiter:
            for i in range(self.swarmsize):
                self.data_point = {}
                self.data_point["iteration"] = self.it
                self._update_velocity(i) if not self.use_net else self._update_velocity_with_net(i)
                self._update_position(i)
                self._update_personal_best(i)
                self._update_global_best(i)
            self.pos_history = np.append(self.pos_history, [self.x], axis=0) if not self.normalize else np.append(self.pos_history, [unnormalize(self.x, self.lb, self.ub)], axis=0)
            self.history.append(self.fg)
            if self.debug:
                print(f'Best after iteration {self.it}: {self.g} {self.fg}')

            # if np.linalg.norm(self.g - self.p[i]) <= self.minstep and np.abs(self.fg - self.fp[i]) <= self.minfunc:
            #     if self.debug:
            #         print(f'Stopping search: Swarm best objective change less than {self.minfunc}')
            #     break

            self.it += 1
            self.collected_data.append(self.data_point)

        if self.debug:
            print(f'Stopping search: maximum iterations reached --> {self.maxiter}')
        return self.g, self.fg
    
    def multiple_run(self, runs=10, plot_particles=False, plot_history=False, fps=10):
        for _ in range(runs):
            self._initialize_swarm()
            self.optimize()
            self.global_best_values.append(self.history)
            if plot_particles:
                self.plot_particles_state()
            if plot_history:
                _ = self.plot_particles_history(fps=fps, save_dir=f"pso_{_}.gif")
        return self.global_best_values
    
    def plot_particles_state(self):
        # plot all particles position in the 2D function space: if the function is 2D else raise an error
        if len(self.lb) != 2:
            raise ValueError("The function must be 2D")
        else:
            plt.figure(figsize=(10, 10))
            plt.scatter(self.x[:, 0], self.x[:, 1], marker='o', c='b', s=30, label='particles')
            plt.scatter(self.g[0], self.g[1], marker='*', c='r', s=60, label='global best')
            plt.legend(loc='upper right', fontsize=15)
            plt.xlabel('x1', fontsize=15)
            plt.ylabel('x2', fontsize=15)
            plt.title('Particles position', fontsize=15)
            plt.show()

    def plot_particles_history(self, save_dir=None, fps=10):
        # plot a gif of the particle positions at each time step
        if len(self.lb) != 2:
            raise ValueError("The function must be 2D")
        else:
            x = np.linspace(self.lb[0], self.ub[0], 1000)
            y = np.linspace(self.lb[1], self.ub[1], 1000)
            X, Y = np.meshgrid(x, y)
            Z = self.func.evaluate(np.array([X.flatten(), Y.flatten()]).T).reshape(X.shape)

            fig = plt.figure(figsize=(10, 10))
            ax = plt.axes(xlim=(self.lb[0], self.ub[0]), ylim=(self.lb[1], self.ub[1]))
            
            ax.contour(X, Y, Z, 50)
            ax.scatter(self.g[0], self.g[1], marker='*', c='r', s=60, label='global best')
            ax.set_xlabel('x1', fontsize=15)
            ax.set_ylabel('x2', fontsize=15)
            ax.set_title('Particles position', fontsize=15)

            scat = ax.scatter([], [], c="red", s=100, marker="^", edgecolors="black")
            # add a text box to display the iteration number
            text = ax.text(0.05, 0.95, "", transform=ax.transAxes)

            def animate(i):
                scat.set_offsets(self.pos_history[i])
                text.set_text(f"Iteration: {i}")
                return scat, text
            print(f"Number of iterations: {len(self.pos_history)}")
            anim = animation.FuncAnimation(fig, animate, frames=len(self.pos_history), interval=100, blit=True)
            if save_dir is not None:
                anim.save(save_dir, writer="imagemagick", fps=fps)
            # close the animation figure to avoid displaying it below the cell
            plt.close()
            return anim


In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class eloAct(nn.Module):
    def forward(self, x):
        # For positive values, use a logarithm; for negative values, use an exponentiation
        return torch.where(x > 0, torch.log(1 + x), torch.exp(x))

class negRelu(nn.Module):
    ## Negative ReLU
    def forward(self, x):
        return -F.relu(-x)

features = ["position", "cognitive_velocity", "social_velocity", "best_value", "global_best_value", "normalized_value"]
# Define the neural network architecture
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(len(features), 16)
        self.fc2 = nn.Linear(16, 16)
        self.fc3 = nn.Linear(16, 1)
        self.tanh = nn.Tanh()
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.eloact = eloAct()
        self.negrelu = negRelu()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.negrelu(self.fc2(x))
        x = self.eloact(self.fc3(x))
        return x

In [9]:
# Example usage:
def objective_function(x, plot=False):
    if plot == False:
        return 0.1 * np.sum(np.cos(5 * np.pi * x) - x**2)
    else:
        return 0.1 * np.sum(np.cos(5 * np.pi * x), axis=1) - np.sum(x**2, axis=1)

from environment.optimization_functions.barrel import *
objective_function = CosineMixtureFunction()

lb = [-1, -1]
ub = [1, 1]

net = Net()
optimizer = ParticleSwarmOptimizer(objective_function, lb, ub, swarmsize=20, maxiter=200, debug=False,
                                   minimize=False, phip=0.5, phig=0.5, omega=0.5, 
                                   use_net=False, normalize=True, net=net)
best_position, best_value = optimizer.optimize()
print(f"The best position is {best_position} with a value of {best_value}")

The best position is [0.5 0.5] with a value of 0.2


In [145]:
optimizer.plot_particles_history(fps=3, save_dir="pso5net.gif")

MovieWriter imagemagick unavailable; using Pillow instead.


Number of iterations: 201


<matplotlib.animation.FuncAnimation at 0x19146eb8b20>

In [10]:
global_bests = optimizer.multiple_run(runs=10, plot_particles=False, plot_history=False, fps=3)

In [12]:
global_bests

[[-0.04513035132816428,
  -0.04513035132816428,
  -0.04513035132816428,
  -0.002021648792108005,
  0.14256086846398527,
  0.14256086846398527,
  0.1915579240149125,
  0.1915579240149125,
  0.19481189603021506,
  0.19681150553207635,
  0.19681150553207635,
  0.19703034248136356,
  0.1996579507825301,
  0.1996579507825301,
  0.1996579507825301,
  0.19984183707407627,
  0.19993428290400497,
  0.19997655474165515,
  0.19999165325687732,
  0.19999943239510715,
  0.19999943239510715,
  0.199999973046752,
  0.199999973046752,
  0.199999973046752,
  0.199999973046752,
  0.19999999314272823,
  0.19999999314272823,
  0.19999999314272823,
  0.19999999314272823,
  0.1999999994308676,
  0.1999999994308676,
  0.1999999994308676,
  0.19999999998573312,
  0.19999999998573312,
  0.19999999998573312,
  0.1999999999942963,
  0.1999999999942963,
  0.1999999999942963,
  0.19999999999556492,
  0.19999999999556492,
  0.1999999999994004,
  0.19999999999969792,
  0.19999999999969792,
  0.19999999999995277,
  0

In [13]:
import scipy
import scipy.stats as stats
import scipy

def mean_confidence_interval(data, confidence=0.95):
    a = 1.0 * np.array(data)
    n = len(a)
    m, se = np.mean(a, axis = 0), stats.sem(a)
    h = se * stats.t.ppf((1 + confidence) / 2., n-1)
    return m, m-h, m+h

def num_function_evaluation(fopt, n_agents, save_dir, opt_value):
    mf1 = np.mean(fopt, axis = 0)
    err = np.std(fopt, axis = 0)
    mf1, ml1, mh1 = mean_confidence_interval(fopt,0.95)

    fig = plt.figure(figsize=(6,4), dpi=200)
    plt.rcParams["figure.figsize"] = [6, 4]
    plt.rcParams["figure.autolayout"] = True
    plt.fill_between((np.arange(len(mf1))+1)*n_agents, ml1, mh1, alpha=0.1, edgecolor='#3F7F4C', facecolor='#7EFF99')
    plt.plot((np.arange(len(mf1))+1)*n_agents, mf1, linewidth=2.0, label = 'RL OPT', color='#3F7F4C')
    plt.plot((np.arange(len(mf1))+1)*n_agents, np.ones(len(mf1))*opt_value, linewidth=1.0, label = 'True OPT', color='#CC4F1B')

    plt.xlabel('number of function evaluations', fontsize = 14)
    plt.ylabel('best fitness value', fontsize = 14)

    plt.legend(fontsize = 14, frameon=False)
    plt.xscale('log')
    plt.yticks(fontsize = 14)
    plt.savefig(save_dir)
    # close the figure
    plt.close(fig)

In [14]:
global_bests = np.array(global_bests)

In [15]:
global_bests.shape

(10, 201)

In [16]:
num_function_evaluation(global_bests, 10, "pso_multi.png", 0.2)