In [1]:
!python3 -m pip install folium
# or
!python -m pip install folium


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
import sys
sys.path.append('..')

In [3]:
import random
import folium
import numpy as np
from copy import deepcopy

from library.solution import Solution
from library.problems.data.warehouse_data import customer_locations, delivery_cost

## Particle Swarm Optimization (PSO)

PSO is a population-based optimization algorithm inspired by the collective behavior of bird flocks, unlike Genetic Algorithms (GAs), which draw from evolutionary theory. In PSO, particles (potential solutions) move through the search space, adjusting their positions based on their own experience and that of the whole swarm. This social interaction guides the swarm toward optimal solutions. Because its inspiration comes from social behavior rather than evolution, PSO is not typically classified under Evolutionary Computation.

It is used to optimize continuous optimization problems where individuals can be represented $m$-dimensional arrays.

### Terminology

- **Individuals (particles):**  
  Each particle is an $m$-dimensional vector of real numbers: $\mathbf{x}_i = (x_{i1}, x_{i2}, \dots, x_{im}) \in \mathbb{R}^m$

- **Population (swarm):**  
  A set of $n$ particles: $\{\mathbf{x}_1, \mathbf{x}_2, \dots, \mathbf{x}_n\}$

  Where:  
  - $x_i$ is the $i$-th particle of the swarm ($i = 1, \dots, n$)  
  - $x_{ij}$ is the $j$-th component of particle $i$ ($j = 1, \dots, m$)

- **Velocities:**  
  Each particle has an associated velocity vector: $\mathbf{v}_i = (v_{i1}, v_{i2}, \dots, v_{im}) \in \mathbb{R}^m$

- **Local best (personal best):**  
  The best position ever visited by particle $i$: $\mathbf{b}_i \in \mathbb{R}^m$

- **Global best:**  
  The best position found by any particle in the swarm: $\mathbf{g} \in \mathbb{R}^m$

### Pseudo-code

1. **Initialize** particles $\mathbf{x}_i$ and velocities $\mathbf{v}_i$ for each particle $i = 1, \dots, n$:
    - Each $\mathbf{x}_i \in \mathbb{R}^m$ is initialized randomly, with each component $\mathbf{x}_{ij}$ drawn uniformly from the interval $[\boldsymbol{\alpha}_i, \boldsymbol{\beta}_i]$ for $j = 1, \dots, m$.
    - $\mathbf{v}_i$ is typically initialized as the zero vector.

2. **Set personal bests**:  
   $\mathbf{b}_i \leftarrow \mathbf{x}_i$ for all $i = 1, \dots, n$

3. **Set global best**:  
   $\mathbf{g} \leftarrow \arg\min_{\mathbf{x}_i} f(\mathbf{x}_i)$ (or $\arg\max$, depending on optimization goal)

4. **Repeat until termination condition is met**:
    - For each particle $i = 1, \dots, n$:
        1. **Update position**:  
           $\mathbf{x}_i \leftarrow \mathbf{x}_i + \mathbf{v}_i$
        2. **Update velocity**:  
           $\mathbf{v}_i \leftarrow w * \mathbf{v}_i + c_1 * r_1 \cdot (\mathbf{b}_i - \mathbf{x}_i) + c_2 * r_2 \cdot (\mathbf{g} - \mathbf{x}_i)$  
           where:
           - $w$ is the inertia weight  
           - $c_1$, $c_2$ are acceleration coefficients  
           - $\mathbf{r}_1, \mathbf{r}_2 \in \mathbb{R}^m$ are random vectors with each component drawn from $[0, 1]$
           - $\cdot$ denotes element-wise (Hadamard) product
        3. **Update personal best**:  
           If $f(\mathbf{x}_i) < f(\mathbf{b}_i)$, then $\mathbf{b}_i \leftarrow \mathbf{x}_i$
        4. **Update global best**:  
           If $f(\mathbf{x}_i) < f(\mathbf{g})$, then $\mathbf{g} \leftarrow \mathbf{x}_i$

5. **Return global best** $\mathbf{g}$


#### High-level intuition

Each particle represents a point in an $m$-dimensional space and has a velocity vector that dictates its movement. At every iteration, a particle updates its position based on its current velocity, and its velocity is updated according to three main influences:

$\mathbf{v}_i \leftarrow w \cdot \mathbf{v}_i + c_1 * r_1 \cdot (\mathbf{b}_i - \mathbf{x}_i) + c_2 * r_2 \cdot (\mathbf{g} - \mathbf{x}_i)$  


Intuition Behind Each Term:
- $w * \mathbf{v}_i$: Keeps some momentum from the previous direction.
- $c_1 * \mathbf{r}_1 \cdot (\mathbf{b}_i - \mathbf{x}_i)$: Pulls the particle toward its own best-known position (individual memory).
- $c_2 * \mathbf{r}_2 \cdot (\mathbf{g} - \mathbf{x}_i)$: Pulls the particle toward the best position found by the entire swarm (collective wisdom).

This balance between exploration (inertia and randomness) and exploitation (personal and social bests) drives the swarm to converge toward optimal or near-optimal solutions over time.

![image.png](images/pso.png)

### Algorithm

First let's define the `PSOSolution` class, which extends the `Solution` class by introducing additional attributes during initialization. This is still an abstract class, so every class that inherits from this one (problem-specific classes) will have to implement the `fitness` and `random_initial_representation` methods.

In [4]:
class PSOSolution(Solution):
    def __init__(
        self,
        repr = None,
    ):
        super().__init__(repr=repr)

        self.best_repr = self.repr
        self.best_fitness = self.fitness()
        self.velocity = np.array([0 for _ in range(len(self.repr))])

Now let's implement the algorithm.

In [5]:
def particle_swarm_optimization(
    population: list[PSOSolution],
    w=0.5,          # inertia weight
    c1=1.5,         # cognitive coefficient
    c2=1.5,         # social coefficient
    maximization=False,
    max_iter=100,
    verbose=False
):
    # 1. Initialize particles and velocities
    # and
    # 2. Set personal bests
    # are already done in the initial population, as it consists of PSOSolutions

    # 3. Set global best
    if maximization:
        global_best = deepcopy(max(population, key=lambda p: p.fitness()))
    else:
        global_best = deepcopy(min(population, key=lambda p: p.fitness()))

    global_best_history = []

    # 2. Repeat until termination condition
    for iter in range(max_iter):
        if verbose:
            print(f"Iteration: {iter+1}")
        # 2.1. For each particle
        for particle in population:
            if verbose:
                print(f"Initial particle location: {particle.repr}")
            
            # Update particle position
            new_position = particle.repr + particle.velocity

            # Update particle velocity
            r1 = np.array([random.random() for _ in range(len(particle.repr))])
            r2 = np.array([random.random() for _ in range(len(particle.repr))])

            inertia = w * particle.velocity
            cognitive = np.multiply(c1 * r1 , particle.best_repr - particle.repr)
            social = np.multiply(c2 * r2, global_best.repr - particle.repr)

            particle.velocity = inertia + cognitive + social

            particle.repr = new_position

            # Update personal best
            if particle.fitness() < particle.best_fitness:
                particle.best_repr = deepcopy(particle.repr)
                particle.best_fitness = particle.fitness()

            if verbose:
                print(f"New particle location: {particle.repr}")
                print(f"Updated velocity: {particle.velocity}")

        # Update global best
        if maximization:
            global_best_iter = deepcopy(max(population, key=lambda p: p.fitness()))
            if global_best_iter.fitness() > global_best.fitness():
                global_best = global_best_iter
        else:
            global_best_iter = deepcopy(min(population, key=lambda p: p.fitness()))
            if global_best_iter.fitness() < global_best.fitness():
                global_best = deepcopy(global_best_iter)

        global_best_history.append(global_best)

        if verbose:
            print(f"Best Fitness: {global_best.fitness()}")
            print("---------------------")

    return global_best, global_best_history


## Warehouse Location Optimization Problem

**Goal:** Find the optimal location for a warehouse that minimizes the total delivery cost to a set of customer locations. 


Let:

- $\mathbf{x} = [x, y]$: coordinates of the warehouse (decision variables)
- $(x_i, y_i)$ for $i=1, \ldots, n$: coordinates of $n$ customer locations
- $c_i$: delivery cost weight for customer $i$

We define the cost function as:

$f(\mathbf{x}) = \sum_{i=1}^n c_i \cdot \sqrt{(x - x_i)^2 + (y - y_i)^2}$

The goal is to minimize the cost function.

Let's start by defining the `WarehousePSOSolution` class.


In [20]:
class WarehousePSOSolution(PSOSolution):
    def __init__(
        self,
        customer_locations : list[list[float]],
        delivery_cost: list[float],
        repr=None,
    ):
        self.customer_locations = customer_locations
        self.delivery_cost = delivery_cost

        super().__init__(repr=repr)

    def fitness(self):
        # Compute the Euclidean distance from the warehouse (self.repr) to each customer
        distances = np.linalg.norm(np.array(self.customer_locations) - self.repr, axis=1)

        # Total delivery cost is the dot product of distances and corresponding delivery costs
        return np.dot(np.array(self.delivery_cost), distances)

    def random_initial_representation(self):
        # Randomly initialize a warehouse location within the bounding box of customer locations
        lats, lons = zip(*self.customer_locations)  # Unzips into two tuples
        # Get min and max bounds for each coordinate
        min_lat, max_lat = min(lats), max(lats)
        min_lon, max_lon = min(lons), max(lons)

        # Define custom range margins (alpha and beta) to control the range of random values
        # Since latitude and longitude are so different from each other, let's initilize the latitude between
        # a certain alpha_lat and beta_lat, and the longitude between a certain alpha_lon and beta_lon
        alpha_lat, beta_lat = random.uniform(min_lat, max_lat), random.uniform(min_lat, max_lat)
        alpha_lon, beta_lon = random.uniform(min_lon, max_lon), random.uniform(min_lon, max_lon)

        # Make sure alpha is smaller than beta
        if alpha_lat > beta_lat:
            alpha_lat, beta_lat = beta_lat, alpha_lat
        if alpha_lon > beta_lon:
            alpha_lon, beta_lon = beta_lon, alpha_lon

        # Randomly generate a latitude and longitude within the scaled bounds
        return np.array([random.uniform(alpha_lat, beta_lat), random.uniform(alpha_lon, beta_lon)])

Let's test it

In [21]:
solution = WarehousePSOSolution(customer_locations=customer_locations, delivery_cost=delivery_cost)

print(f"Solution: {solution} with fitness {solution.fitness()}")

Solution: [38.89955858 -9.16210319] with fitness 0.9466566292381775


### Solving the Warehouse Location Problem using PSO

In [24]:
POP_SIZE = 50

best_solution, global_best_history = particle_swarm_optimization(
    population=[WarehousePSOSolution(customer_locations=customer_locations, delivery_cost=delivery_cost) for _ in range(POP_SIZE)],
    maximization=False,
    max_iter=1000,
    verbose=False
)

print(f"Best solution {best_solution} with fitness {best_solution.fitness()}")

Best solution [38.92345529 -9.1387052 ] with fitness 0.9144875084631903


Now, let’s visualize the customer locations, the final warehouse position, and all the global best solutions identified throughout the search process.

In [25]:
lats, lons = zip(*customer_locations)
# Center of the map
map_center = [np.mean(lats), np.mean(lons)]

# Create the map
my_map = folium.Map(location=map_center, zoom_start=12)

# Add customer markers
for coord in customer_locations:
    folium.Marker(location=coord).add_to(my_map)

# Add markers for each step in the evolution (except the last)
for idx, global_best in enumerate(global_best_history[:-1]):
    if (idx == 0) or (np.any(global_best.repr != global_best_history[idx-1].repr)):
        folium.Marker(
            location=global_best.repr,
            popup=f'Warehouse Position #{idx + 1}',
            tooltip=f'Step {idx + 1}',
            icon=folium.Icon(color="gray", icon="building")
        ).add_to(my_map)

# Add the final warehouse position with the building icon
final_position = global_best_history[-1].repr
folium.Marker(
    location=final_position,
    popup='Final Warehouse Location',
    tooltip='Final Position',
    icon=folium.Icon(color='red', icon='building', prefix="fa")
).add_to(my_map)

my_map