# FSEC Project - Diego Meloni 536041


## Particle Swarm Optimization (PSO) Project

This project implements the **Particle Swarm Optimization (PSO)** algorithm following carefully the requirements given by the professor.

The implementation is built around a custom `ParticleCandidate` class, which
represents individual particles with a position (the candidate solution) and
a velocity vector. The `ParticleSwarmOptimizer` class then manages a population
of such particles, updating them iteratively based on personal best, neighborhood
best, and global best solutions.

Finally, the implementation is tested on different fitness functions
(Sphere, Rastrigin, Rosenbrock) to validate correctness and robustness.


## Notes on the implementation

A few small changes have been made for compatibility and to ensure the code runs correctly, with the optimizer behaving as expected.

1.  Correction of the update formula  
   - The formula written in the project's specifications used `(candidate - local best)`, `(candidate - neighborhood best)` and `(candidate - global best)`, which makes particles move away from the best solution and leads to poor results and few updates.  
   - In my implementation of the Particle Swarm Optimization I use `(local best - candidate)`, `(neighborhood best - candidate)` and `(global best - candidate)`, which makes particles move towards the best solutions. With this correction the algorithm shows better and more consistent results with proper convergence behavior and numerous updates.

2. The `distribution` argument  
   - The base class `FloatVectorCandidate` requires a `distribution` parameter in its constructor.  
   - This was not explicitly specified in the project instructions, but without it the code raises an error.  
   - For this reason, `self.distribution` is passed around even if it is not directly used in the Particle Swarm Optimizer.  


3. Import Error
   - In Google Colab, the library `datasets` comes preinstalled.  
   - This causes a dependency conflict with `softpy` because `datasets` requires an older version of `dill`, while `softpy` installs a newer one.  
   - Since `datasets` is not needed in this project, I simply uninstall it at the beginning of the notebook. This ensures that `softpy` can be installed without errors and the project runs smoothly.

In [None]:
# --- Libraries management --- #
# !pip uninstall -y datasets  # Uncomment this line if you are using Google Colab
!pip install softpy



In [2]:
# --- Imports --- #
import numpy as np

from scipy import stats
from typing import Callable, List

import sys
sys.path.append("../")

from softpy.evolutionary.candidate import FloatVectorCandidate
from softpy.evolutionary.singlestate import MetaHeuristicsAlgorithm


### ParticleCandidate Class

The `ParticleCandidate` class represents a single particle in the swarm.  
It stores the particle's **position (`candidate`)**, **velocity**, and PSO parameters (`inertia`, `wl`, `wn`, `wg`).  

Key methods:
- `generate()`: Class method to initialize a particle with random position and velocity within specified bounds.
- `mutate()`: Updates the particle's position using its velocity and clips it within bounds.  
- `recombine()`: Updates the particle's velocity based on personal, neighborhood, and global best positions.
- `copy()`: Returns a copy of the particle.  

This class encapsulates all particle-specific logic needed for the PSO algorithm.


In [3]:
class ParticleCandidate(FloatVectorCandidate):
    """
    ParticleCandidate implementation for PSO.
    Inherits from softpy.evolutionary.candidate.FloatVectorCandidate.
    Represents a single particle with position (candidate) and velocity.
    """
    def __init__(self,
                 size: int,
                 lower: np.ndarray,
                 upper: np.ndarray,
                 candidate: np.ndarray,
                 velocity: np.ndarray,
                 inertia: float = 0.5,
                 wl: float = 0.33, wn: float = 0.33, wg: float = 0.34,
                 distribution = stats.uniform,
                 rng = np.random.default_rng()):

        # --- Store particle dimensions --- #
        self.size = int(size)
        self.lower = np.asarray(lower, dtype=float).reshape(self.size)
        self.upper = np.asarray(upper, dtype=float).reshape(self.size)
        self.candidate = np.asarray(candidate, dtype=float).reshape(self.size)  # current position in the search space
        self.velocity = np.asarray(velocity, dtype=float).reshape(self.size)

        # --- PSO coefficients --- #
        self.inertia = float(inertia)
        self.wl = float(wl)   # weight for personal best
        self.wn = float(wn)   # weight for neighborhood best
        self.wg = float(wg)   # weight for global best
        self.rng = rng        # random generator for stochastic updates

        # --- Boundary check: ensure candidate is inside [lower, upper] --- #
        if not (np.all(self.lower <= self.candidate) and np.all(self.candidate <= self.upper)):
            raise ValueError("Candidate must stay within [lower, upper] for every dimension.")

        # --- Limit the velocity to half of the search range --- #
        amplitude = np.abs(self.upper - self.lower)
        vmax = 0.5 * amplitude
        self.velocity = np.minimum(np.maximum(self.velocity, -vmax), vmax)

        # --- Constraint on weights: must sum to 1 --- #
        if not np.isclose(self.wl + self.wn + self.wg, 1.0):
            raise ValueError("Weights wl + wn + wg must sum to 1.")

        # Distribution (kept for compatibility with FloatVectorCandidate)
        self.distribution = distribution

        # --- Call the parent constructor (FloatVectorCandidate) --- #
        super().__init__(size=self.size,
                         candidate=self.candidate,
                         distribution=self.distribution,
                         lower=self.lower,
                         upper=self.upper,
                         mutate=None,
                         recombine=None)

    # --- Method to generate new Particle Candidates --- #
    @classmethod
    def generate(self,
                 size: int,
                 lower: np.ndarray,
                 upper: np.ndarray,
                 inertia: float = 0.5,
                 wl: float = 0.33, wn: float = 0.33, wg: float = 0.34,
                 distribution = stats.uniform,
                 rng = np.random.default_rng(),
                 **kwargs
                 ) -> "ParticleCandidate":
        """
        Method to generate a new ParticleCandidate.
        Initializes position uniformly in [lower, upper], and velocity
        in [-|upper-lower|, |upper-lower|].
        """
        lower = np.array(lower, dtype=float).reshape(size)
        upper = np.array(upper, dtype=float).reshape(size)

        # Random initial position
        candidate = rng.uniform(low=lower, high=upper, size=size)

        # Random initial velocity
        velocity_range = np.abs(upper - lower)
        velocity = rng.uniform(low=-velocity_range, high=velocity_range, size=size)

        return ParticleCandidate(
            size=size,
            lower=lower,
            upper=upper,
            candidate=candidate,
            velocity=velocity,
            inertia=inertia,
            wl=wl,
            wn=wn,
            wg=wg,
            distribution=distribution,
            rng=rng)

    # --- Method to update the position of the particle --- #
    def mutate(self) -> "ParticleCandidate":
        """
        Update the position of the particle by moving along its velocity.
        The new position is clipped to stay within [lower, upper].
        """
        self.candidate = self.candidate + self.velocity
        self.candidate = np.minimum(np.maximum(self.candidate, self.lower), self.upper)

        return self

    # --- Method to update the velocity of the particle --- #
    def recombine(self,
                  local_best: "ParticleCandidate",
                  neighborhood_best: "ParticleCandidate",
                  global_best: "ParticleCandidate") -> "ParticleCandidate":
        """
        Update velocity according to PSO rule:
            v <- inertia * velocity
                 + rl * wl * (local_best - candidate)
                 + rn * wn * (neighborhood_best - candidate)
                 + rg * wg * (global_best - candidate)
        where rl, rn, rg ~ U(0,1) independently.
        """
        # Random factors for stochastic weighting
        rl = self.rng.uniform(low=0, high=1, size=self.size)
        rn = self.rng.uniform(low=0, high=1, size=self.size)
        rg = self.rng.uniform(low=0, high=1, size=self.size)

        # Attraction terms
        term_local = rl * self.wl *  (local_best.candidate - self.candidate)
        term_neigh = rn * self.wn *  (neighborhood_best.candidate - self.candidate)
        term_global = rg * self.wg *  (global_best.candidate - self.candidate)

        # Update velocity
        self.velocity = (self.inertia * self.velocity) + term_local + term_neigh + term_global

        return self

    # --- Method to generate a copy of the particle, useful if multiple arrays are used --- #
    def copy(self) -> "ParticleCandidate":
        """Return a deep copy of this particle (used to store best positions)."""
        return ParticleCandidate(
            size=self.size,
            lower=self.lower.copy(),
            upper=self.upper.copy(),
            candidate=self.candidate.copy(),
            velocity=self.velocity.copy(),
            inertia=self.inertia,
            wl=self.wl,
            wn=self.wn,
            wg=self.wg,
            distribution=self.distribution,
            rng=self.rng)

    # --- Method to print the candidates, useful for testing purposes --- #
    def __repr__(self):
        return (f"ParticleCandidate(size={self.size}, "
                f"candidate={self.candidate}, "
                f"velocity={self.velocity})")

### ParticleSwarmOptimizer Class

The `ParticleSwarmOptimizer` class manages the PSO process:
- Initializes a **population** of `ParticleCandidate` instances.
- Tracks **personal bests** and the **global best**.
- Iteratively updates particles using the `recombine` and `mutate` methods.
- Handles the influence of neighbors with a **neighbor selection**.  

It serves as the main interface to run PSO on a given fitness function and ensures particles converge towards optimal solutions.


In [4]:
class ParticleSwarmOptimizer(MetaHeuristicsAlgorithm):
    def __init__(self,
                 pop_size: int,
                 fitness_func: Callable[[ParticleCandidate], float],
                 n_neighbors: int,
                 **kwargs):
        """
        PSO implementation.
        - pop_size: number of particles in the swarm
        - fitness_func: function to evaluate candidates
        - n_neighbors: number of neighbors for each particle
        - kwargs: arguments passed to ParticleCandidate.generate
        """
        self.pop_size = pop_size
        self.fitness_func = fitness_func
        self.n_neighbors = n_neighbors
        self.rng = np.random.default_rng()

        # --- Storage structures --- #
        self.population: List[ParticleCandidate] = []                           # current particles
        self.particle_best: np.ndarray = np.empty(self.pop_size, dtype=object)  # best position of each particle
        self.fitness_best: np.ndarray = np.full(self.pop_size, -np.inf)         # best fitness of each particle
        self.global_best: ParticleCandidate = None                              # overall best particle
        self.global_fitness_best: float = -np.inf                               # overall best fitness
        # np.inf is minus infinity, useful for initializations

        # kwargs are passed when generating particles
        self.kwargs = kwargs

    def initialize_population(self):
        """
        Create the initial swarm population.
        Each particle is generated with the parameters from kwargs.
        """
        for idx in range(self.pop_size):
            # Generate a new particle with given bounds/params
            particle = ParticleCandidate.generate(rng=self.rng, **self.kwargs)
            self.population.append(particle)
            # Store its best position (initially, the starting point)
            self.particle_best[idx] = particle.copy()

    def fit(self, n_iters: int, verbose: bool = False):
        """
        Run the PSO optimization loop for n_iters iterations.
        """
        # 1) Initialize population
        self.initialize_population()

        # 2) Iterative optimization process
        for it in range(int(n_iters)):
            # --- (a) Evaluate fitness of all particles --- #
            fitness_values = np.empty(self.pop_size, dtype=float)

            for idx, particle in enumerate(self.population):
                fitness_values[idx] = float(self.fitness_func(particle))

                # Update personal best if current fitness is higher
                if fitness_values[idx] > self.fitness_best[idx]:
                    self.fitness_best[idx] = fitness_values[idx]
                    self.particle_best[idx] = particle.copy()

                # Update global best if current fitness is higher
                if fitness_values[idx] > self.global_fitness_best:
                    self.global_fitness_best = fitness_values[idx]
                    self.global_best = particle.copy()

            # --- (b) For each particle, select neighbors and find best one --- #
            best_neighbor_indices = [None] * self.pop_size

            for i in range(self.pop_size):
                # Candidate neighbor indices (exclude itself)
                candidate_neighbors = []
                for j in range(self.pop_size):
                    if j != i:
                        candidate_neighbors.append(j)

                # If requested neighbors exceed available, take all
                if self.n_neighbors >= len(candidate_neighbors):
                    neighbors = candidate_neighbors
                else:
                    neighbors = self.rng.choice(candidate_neighbors,
                                                size=self.n_neighbors,
                                                replace=False).tolist()

                # Find the neighbor with best fitness
                best_index = neighbors[0] # Initialization
                for n in neighbors:
                    if self.fitness_best[n] > self.fitness_best[best_index]:
                        best_index = n
                best_neighbor_indices[i] = best_index

            # --- (c) Update each particle with recombine + mutate --- #
            for i in range(self.pop_size):
                local_best = self.particle_best[i]
                neighbor_best = self.particle_best[best_neighbor_indices[i]]
                global_best = self.global_best

                # Update velocity (recombine) and then move (mutate)
                self.population[i].recombine(
                    local_best=local_best,
                    neighborhood_best=neighbor_best,
                    global_best=global_best)
                self.population[i].mutate()

            if verbose:
                print(f"[Iter {it+1}/{n_iters}] Global best fitness = {self.global_fitness_best:.6g}, "
                      f"First candidate position = {self.population[0].candidate}, "
                      f"First velocity = {self.population[0].velocity}")


### Fitness Functions

In the project requests no particular fitness function was specified.  
To properly test the PSO implementation across different optimization landscapes,
I selected three well-known benchmark functions with the help of generative AI:


- **Sphere function**: a simple convex function with a single global optimum
  at the origin. It is useful to check that the algorithm converges properly
  in a smooth landscape.

- **Rastrigin function**: a function containing many local optima.
  This tests the algorithm’s ability to avoid premature convergence and to
  explore the search space effectively.

- **Rosenbrock function**: a non-convex function with a narrow curved valley,
  often considered difficult for optimization algorithms. It is useful to test stability.

Together, these three functions allow us to validate both the correctness and robustness of the implementation.


In [5]:
# ------------------ Fitness Functions ------------------ #

def sphere_fitness(p: ParticleCandidate) -> float:
    """Sphere function: f(x) = -sum(x_i^2).
       A simple convex function, optimal at x=0.
       We negate it so PSO maximizes instead of minimizes."""
    return -float(np.sum(p.candidate ** 2))


def rastrigin_fitness(p: ParticleCandidate) -> float:
    """Rastrigin function: f(x) = -[10*n + sum(x_i^2 - 10*cos(2*pi*x_i))].
       Highly multimodal with many local minima, often used to test robustness.
       Maximum is at x=0 (negated for maximization)."""
    x = p.candidate
    n = len(x)
    return -float(10*n + np.sum(x**2 - 10*np.cos(2*np.pi*x)))


def rosenbrock_fitness(p: ParticleCandidate) -> float:
    """Rosenbrock function: f(x) = -sum(100*(x_{i+1}-x_i^2)^2 + (x_i-1)^2).
       Narrow curved valley, global optimum at x_i=1.
       Negated to turn minimization into maximization."""
    x = p.candidate
    return -float(np.sum(100*(x[1:] - x[:-1]**2)**2 + (x[:-1] - 1)**2))

### Test Parameters

This setup is sufficient to validate the correctness of the algorithm and to
demonstrate that it works on both simple and more complex fitness landscapes.

**Note:** The PSO class only requires the population size, the fitness function,
and the number of neighbors as explicit arguments. All other hyperparameters
(`size`, `lower`, `upper`, `inertia`, `wl`, `wn`, `wg` ...) are passed
as `**kwargs` and used internally when generating the `ParticleCandidate`
instances.


In [6]:
# ------------------ Test Parameters ------------------ #

dim = 4                         # Problem dimension (search space has 4 variables)
lower = np.full(dim, -5.0)      # Lower bound for each dimension
upper = np.full(dim, 5.0)       # Upper bound for each dimension
pop_size = 10                   # Number of particles in the swarm
n_neighbors = 3                 # Number of neighbors for each particle
n_iters = 50                    # Number of iterations
inertia = 0.7                   # Inertia
wl, wn, wg = 0.2, 0.3, 0.5      # Weights for local, neighbor, and global bests
rng_seed = 42                   # Seed for reproducibility

In [7]:
# ------------------ Test ------------------ #

fitness_list = [sphere_fitness, rastrigin_fitness, rosenbrock_fitness]
fitness_names = ["Sphere", "Rastrigin", "Rosenbrock"]

# Run PSO for each fitness function
for name, fit_func in zip(fitness_names, fitness_list):
    print(f"\n--- Running PSO for {name} function ---")

    # Create optimizer instance with given parameters
    pso = ParticleSwarmOptimizer(
        fitness_func=fit_func,
        pop_size=pop_size,
        n_neighbors=n_neighbors,
        rng_seed=rng_seed,
        size=dim,
        lower=lower,
        upper=upper,
        inertia=inertia,
        wl=wl,
        wn=wn,
        wg=wg
    )

    # Run optimization process
    pso.fit(n_iters=n_iters, verbose=True)

    # Print the results
    print(f"Best particle for {name}: {pso.global_best}")
    print(f"Best fitness value: {pso.global_fitness_best:.6f}")



--- Running PSO for Sphere function ---
[Iter 1/50] Global best fitness = -15.1826, First candidate position = [0.83351078 3.36359222 4.42027193 3.08696818], First velocity = [-3.96952781  2.73529217  2.24832264  4.79259632]
[Iter 2/50] Global best fitness = -15.1826, First candidate position = [-0.95654822  1.4165667   2.99868934  3.75876574], First velocity = [-1.790059   -1.94702552 -1.42158259  0.67179756]
[Iter 3/50] Global best fitness = -15.1826, First candidate position = [-0.92550382 -2.6665455   1.46793873  3.4347181 ], First velocity = [ 0.0310444  -4.08311219 -1.53075061 -0.32404764]
[Iter 4/50] Global best fitness = -12.9888, First candidate position = [-0.38092354 -4.92243113 -0.8765251   2.66861548], First velocity = [ 0.54458028 -2.25588563 -2.34446383 -0.76610262]
[Iter 5/50] Global best fitness = -4.16137, First candidate position = [ 0.90666869 -4.47613624 -2.2372445   1.88944274], First velocity = [ 1.28759224  0.44629489 -1.3607194  -0.77917274]
[Iter 6/50] Global

## Conclusions

The project successfully implements the **Particle Swarm Optimization (PSO)** algorithm using a custom `ParticleCandidate` class, compatible with the `softpy` framework. Key points from the implementation and testing are:

- The `ParticleCandidate` class encapsulates the particle's position, velocity, and PSO-specific parameters (`inertia`, `wl`, `wn`, `wg`), providing methods for **mutation**, **recombination**, and **copying**.  
- The `ParticleSwarmOptimizer` class handles the overall optimization, maintaining the population, tracking personal and global bests, and updating particles according to the PSO dynamics.  
- Adjustments were made to ensure compatibility with `FloatVectorCandidate` (notably passing the `distribution` parameter) and to correct the PSO velocity update formula so that particles move **towards** the best solutions rather than away from them.  
- The algorithm was tested on three standard benchmark functions (Sphere, Rastrigin, Rosenbrock) to verify convergence and behavior across smooth, multi-modal, and curved landscapes, obtaining good results in each case.  
