# Session 12 Assignment 

## Víctor Vega Sobral
## Santiago Souto Ortega

---

## Libraries

In [5]:
import numpy as np
import time

## Contants

In [6]:
# Change this value to run different number of times the algorithms tested below
ITERATIONS = 10

## Implementing Von Neumann Neighborhood for PSO

### 1. Von Neumann Explanation

In standard PSO, each particle is influenced by:

* Its best personal position (cognitive component).
* Best global position of all the swarm (social component).

In a Von Neumann Neighborhood, particles are organized in a 2D net, where each particle is only connected tu its inmediate neighbors (north, south, west and east). Social component is based in best position found inside its neighborhood, not in all the swarm.

With this architecture, more diverse exploration in the search space is made, with more resistance to local optima and slower convergence, but potentially more robust.


## 1. Step by Step Implementation

In [7]:
class ParticleSwarmOptimizationVN(object):
    """
    Particle Swarm Optimization (PSO) implementation with Von Neumann neighborhood.
    
    This implementation arranges particles in a 2D grid, where each particle is
    influenced by its own best position and the best positions of its immediate
    neighbors (north, south, east, west) in the grid.
    """

    def __init__(
        self,
        objective_function: callable,
        n_particles: int = 25,  
        dimensions: int = 2,
        bounds: tuple[float, float] = (0, 5),
        w: float = 0.8,
        c1: float = 0.1,
        c2: float = 0.1,
        seed: int = 100
    ):
        """
        Initialize the PSO algorithm with Von Neumann neighborhood.

        Parameters:
        -----------
        objective_function : function
            The function to be minimized
        n_particles : int
            Number of particles in the swarm (preferably a perfect square)
            Preferably the value has to be a perfect square for grid arrangement
        dimensions : int
            Number of dimensions in the search space
        bounds : tuple
            (min, max) bounds for each dimension
        w : float
            Inertia weight - controls influence of previous velocity
        c1 : float
            Cognitive parameter - controls attraction to personal best
        c2 : float
            Social parameter - controls attraction to neighborhood best
        seed : int
            Random seed for reproducibility
        """
        self.objective_function = objective_function
        self.n_particles = n_particles
        self.dimensions = dimensions
        self.bounds = bounds
        self.w = w
        self.c1 = c1
        self.c2 = c2

        # Set random seed for reproducibility
        np.random.seed(seed)

        # Initialize particles' positions and velocities
        self.X = np.random.rand(dimensions, n_particles) * \
            (bounds[1] - bounds[0]) + bounds[0]
        self.V = np.random.randn(dimensions, n_particles) * 0.1

        # Initialize personal best positions and objective values
        self.pbest = self.X.copy()
        self.pbest_obj = self.evaluate(self.X)

        # Create grid topology and find neighbors
        self.grid_size = int(np.ceil(np.sqrt(n_particles)))
        self.neighbors = self._create_von_neumann_neighborhood()
        
        # Initialize neighborhood best positions and values
        self.nbest = np.zeros((dimensions, n_particles))
        self.nbest_obj = np.ones(n_particles) * float('inf')
        self._update_nbest()

        # Initialize global best (still tracked, but not used for updates)
        self.gbest = self.pbest[:, self.pbest_obj.argmin()]
        self.gbest_obj = self.pbest_obj.min()

        # Store optimization history
        self.history = {
            'positions': [self.X.copy()],
            'velocities': [self.V.copy()],
            'pbest': [self.pbest.copy()],
            'nbest': [self.nbest.copy()],
            'gbest': [self.gbest.copy()],
            'gbest_obj': [self.gbest_obj]
        }

    def _create_von_neumann_neighborhood(self):
        """
        Create Von Neumann neighborhood topology (north, south, east, west).
        
        Returns:
        --------
        list
            List of neighbor indices for each particle
        """
        neighbors = []
        for i in range(self.n_particles):
            # Convert particle index to 2D grid coordinates
            row = i // self.grid_size
            col = i % self.grid_size
            
            # Find neighbors (north, south, east, west with wrap-around)
            north = ((row - 1) % self.grid_size) * self.grid_size + col
            south = ((row + 1) % self.grid_size) * self.grid_size + col
            east = row * self.grid_size + ((col + 1) % self.grid_size)
            west = row * self.grid_size + ((col - 1) % self.grid_size)
            
            # Ensure indices are within range
            particle_neighbors = [
                north if north < self.n_particles else i,
                south if south < self.n_particles else i,
                east if east < self.n_particles else i,
                west if west < self.n_particles else i
            ]
            neighbors.append(particle_neighbors)
        
        return neighbors

    def _update_nbest(self):
        """
        Update neighborhood best positions based on current personal bests.
        """
        for i in range(self.n_particles):
            # Include the particle itself in the neighborhood
            neighborhood = self.neighbors[i] + [i]
            
            # Find the best particle in the neighborhood
            best_idx = neighborhood[np.argmin([self.pbest_obj[j] for j in neighborhood])]
            
            # Update neighborhood best if better
            if self.pbest_obj[best_idx] < self.nbest_obj[i]:
                self.nbest[:, i] = self.pbest[:, best_idx]
                self.nbest_obj[i] = self.pbest_obj[best_idx]

    def evaluate(self, X):
        """
        Evaluate the objective function for all particles.

        Parameters:
        -----------
        X : ndarray
            Particles' positions

        Returns:
        --------
        ndarray
            Objective function values for all particles
        """
        return self.objective_function(X[0], X[1])

    def update(self):
        """
        Perform one iteration of the PSO algorithm with Von Neumann neighborhood.

        Steps:
        1. Update velocities based on inertia, cognitive, and social components
        2. Update positions based on velocities
        3. Evaluate new positions
        4. Update personal and neighborhood bests

        Returns:
        --------
        dict
            Current state of the swarm
        """
        # Generate random coefficients for stochastic behavior
        r1, r2 = np.random.rand(2)

        # Update velocities using neighborhood best instead of global best
        self.V = (self.w * self.V +                             # Inertia component
                  self.c1 * r1 * (self.pbest - self.X) +        # Cognitive component
                  self.c2 * r2 * (self.nbest - self.X))         # Social component (neighborhood)

        # Update positions by adding velocities
        self.X = self.X + self.V
        
        # Enforce bounds if necessary
        self.X = np.clip(self.X, self.bounds[0], self.bounds[1])

        # Evaluate new positions
        obj = self.evaluate(self.X)

        # Update personal best positions and objective values
        mask = (self.pbest_obj >= obj)
        self.pbest[:, mask] = self.X[:, mask]
        self.pbest_obj = np.minimum(self.pbest_obj, obj)

        # Update neighborhood bests
        self._update_nbest()

        # Update global best (still tracked for comparison)
        min_idx = self.pbest_obj.argmin()
        self.gbest = self.pbest[:, min_idx]
        self.gbest_obj = self.pbest_obj[min_idx]

        # Store current state for history
        self.history['positions'].append(self.X.copy())
        self.history['velocities'].append(self.V.copy())
        self.history['pbest'].append(self.pbest.copy())
        self.history['nbest'].append(self.nbest.copy())
        self.history['gbest'].append(self.gbest.copy())
        self.history['gbest_obj'].append(self.gbest_obj)

        # Return current state for visualization
        return {
            'X': self.X,
            'V': self.V,
            'pbest': self.pbest,
            'nbest': self.nbest,
            'gbest': self.gbest,
            'gbest_obj': self.gbest_obj
        }

    def optimize(self, iterations=50):
        """
        Run the PSO algorithm for a specified number of iterations.

        Parameters:
        -----------
        iterations : int
            Number of iterations to run

        Returns:
        --------
        tuple
            (global best position, global best objective value)
        """
        for _ in range(iterations):
            self.update()

        return self.gbest, self.gbest_obj

    def get_history(self):
        """
        Get the optimization history.

        Returns:
        --------
        dict
            Optimization history containing positions, velocities, 
            personal bests, neighborhood bests, global best, and objective values
        """
        return self.history

---

## 2. Test  it  compared  to  the  previous  implementation 
Explain also the functions that you use and why they are used as test cases.

Functions:
- [Wikipedia Functions](https://en.wikipedia.org/wiki/Test_functions_for_optimization)

In order to compare the performance of both implementations, we will use 3 different funcions from the link provided **[HERE](https://en.wikipedia.org/wiki/Test_functions_for_optimization)**

### Ackley function

![Ackley funtion](./images/ackley.png)

In [8]:
from pso_class_old import ParticleSwarmOptimization

In [15]:
def ackley_function(x, y):
    """
    ackley funciton implementation
    """
    x = np.array(x, ndmin=1)
    y = np.array(y, ndmin=1)

    a = 20
    b = 0.2
    c = 2 * np.pi

    part1 = -a * np.exp(-b * np.sqrt((x**2 + y**2) / 2.0))
    part2 = -np.exp((np.cos(c*x) + np.cos(c*y)) / 2.0)

    return (part1 + part2 + a + np.e).flatten()


pso_ackley = ParticleSwarmOptimization(
    objective_function = ackley_function,
    n_particles=25,
    dimensions=2,
    bounds=(-5, 5),
    w=0.8,
    c1=0.1,
    c2=0.1,
    seed=100
)

time_runs =  []
pos_ackle_results= []
val_ackley_results = []

for _ in range(ITERATIONS):

    t_0 = time.time()
    best_pos_ackley, best_val_ackley = pso_ackley.optimize(iterations=50)
    t_f = time.time() - t_0

    pos_ackle_results.append(best_pos_ackley)
    val_ackley_results.append(best_val_ackley)
    time_runs.append(t_f)

print("=== Results ackley function ===")
print(f"Average run per run: {np.mean(time_runs):.2f} segundos")
print(f"Average pos: {np.mean(pos_ackle_results):.2f}")
print(f"Average value: {np.mean(val_ackley_results):.2f}\n")


pso_ackley_vn = ParticleSwarmOptimizationVN(
    objective_function = ackley_function,
    n_particles=25,
    dimensions=2,
    bounds=(-5, 5),
    w=0.8,
    c1=0.1,
    c2=0.1,
    seed=100
)
time_runs_vn =  []
pos_ackle_results_vn= []
val_ackley_results_vn = []

for _ in range(ITERATIONS):
    t_0 = time.time()
    best_pos_ackley_vn, best_val_ackley_vn = pso_ackley_vn.optimize(iterations=50)
    t_f = time.time() - t_0

    pos_ackle_results_vn.append(best_pos_ackley_vn)
    val_ackley_results_vn.append(best_val_ackley_vn)
    time_runs_vn.append(t_f)
print("=== Results ackley function with Von Neumann neighborhood ===")
print(f"Average run per run: {np.mean(time_runs_vn):.2f} segundos")
print(f"Average pos: {np.mean(pos_ackle_results_vn):.2f}")
print(f"Average value: {np.mean(val_ackley_results_vn):.2f}")



=== Results ackley function ===
Average run per run: 0.00 segundos
Average pos: -0.00
Average value: 0.00

=== Results ackley function with Von Neumann neighborhood ===
Average run per run: 0.02 segundos
Average pos: 0.00
Average value: 0.00


The reults show that the old implementation is ``0.02 seconds`` faster than the **Von Neumann**

### Sphere function

![Sphere](./images/sphere.png)

In [20]:
def sphere_function(x, y):
    """
    Implementation of the Sphere function for two variables x, y.
    """
    x = np.array(x, ndmin=1)
    y = np.array(y, ndmin=1)
    return (x**2 + y**2).flatten()

# Standard PSO
pso_sphere = ParticleSwarmOptimizationVN(
    objective_function=sphere_function,
    n_particles=25,
    dimensions=2,
    bounds=(-5, 5),  
    w=0.8,
    c1=0.1,
    c2=0.1,
    seed=100
)

time_runs_sphere = []
pos_sphere_results = []
val_sphere_results = []

ITERATIONS = 10
for _ in range(ITERATIONS):
    t0 = time.time()
    best_pos_sphere, best_val_sphere = pso_sphere.optimize(iterations=50)
    t_elapsed = time.time() - t0

    pos_sphere_results.append(best_pos_sphere)
    val_sphere_results.append(best_val_sphere)
    time_runs_sphere.append(t_elapsed)

print("=== Sphere Function Results ===")
print(f"Average execution time: {np.mean(time_runs_sphere):.2f} seconds")
print(f"Average position found: {np.mean(pos_sphere_results):.2f}")
print(f"Average function value: {np.mean(val_sphere_results):.2f}\n")

# PSO with Von Neumann Neighborhood
pso_sphere_vn = ParticleSwarmOptimizationVN(
    objective_function=sphere_function,
    n_particles=25,
    dimensions=2,
    bounds=(-5, 5),  
    w=0.8,
    c1=0.1,
    c2=0.1,
    seed=100
)

time_runs_sphere_vn = []
pos_sphere_results_vn = []
val_sphere_results_vn = []

for _ in range(ITERATIONS):
    t0 = time.time()
    best_pos_sphere_vn, best_val_sphere_vn = pso_sphere_vn.optimize(iterations=50)
    t_elapsed = time.time() - t0

    pos_sphere_results_vn.append(best_pos_sphere_vn)
    val_sphere_results_vn.append(best_val_sphere_vn)
    time_runs_sphere_vn.append(t_elapsed)
    
print("=== Sphere Function Results with Von Neumann Neighborhood ===")
print(f"Average execution time: {np.mean(time_runs_sphere_vn):.2f} seconds")
print(f"Average position found: {np.mean(pos_sphere_results_vn):.2f}")
print(f"Average function value: {np.mean(val_sphere_results_vn):.2f}")


=== Sphere Function Results ===
Average execution time: 0.02 seconds
Average position found: -0.00
Average function value: 0.00

=== Sphere Function Results with Von Neumann Neighborhood ===
Average execution time: 0.01 seconds
Average position found: -0.00
Average function value: 0.00


The reults show that the old implementation is ``0.01 seconds`` faster than the **Von Neumann**

### Three-hump camel function

![Three-hump camel function](./images/camel.png)

In [19]:
def threehump_camel_function(x, y):
    """
    Implementación de la Three-Hump Camel Function para dos variables x, y.
    """
    x = np.array(x, ndmin=1)
    y = np.array(y, ndmin=1)

    term1 = 2.0 * x**2
    term2 = -1.05 * x**4
    term3 = (x**6) / 6.0
    term4 = x * y
    term5 = y**2

    return (term1 + term2 + term3 + term4 + term5).flatten()


pso_threehump = ParticleSwarmOptimizationVN(
    objective_function=threehump_camel_function,
    n_particles=25,
    dimensions=2,
    bounds=(-5, 5),
    w=0.8,
    c1=0.1,
    c2=0.1,
    seed=100
)

time_runs_threehump = []
pos_threehump_results = []
val_threehump_results = []


for _ in range(ITERATIONS):
    t_inicio = time.time()
    best_pos_threehump, best_val_threehump = pso_threehump.optimize(iterations=50)
    t_final = time.time() - t_inicio
    
    pos_threehump_results.append(best_pos_threehump)
    val_threehump_results.append(best_val_threehump)
    time_runs_threehump.append(t_final)

print("=== Resultados Three-Hump Camel Function ===")
print(f"Tiempo promedio por ejecución: {np.mean(time_runs_threehump):.2f} segundos")
print(f"Posición promedio encontrada: {np.mean(pos_threehump_results, axis=0)}")
print(f"Valor promedio de la función: {np.mean(val_threehump_results):.4f}\n")


pso_threehump_vn = ParticleSwarmOptimizationVN(
    objective_function=threehump_camel_function,
    n_particles=25,
    dimensions=2,
    bounds=(-5, 5),
    w=0.8,
    c1=0.1,
    c2=0.1,
    seed=100
)
time_runs_threehump_vn = []
pos_threehump_results_vn = []
val_threehump_results_vn = []

for _ in range(ITERATIONS):
    t_inicio = time.time()
    best_pos_threehump_vn, best_val_threehump_vn = pso_threehump_vn.optimize(iterations=50)
    t_final = time.time() - t_inicio
    
    pos_threehump_results_vn.append(best_pos_threehump_vn)
    val_threehump_results_vn.append(best_val_threehump_vn)
    time_runs_threehump_vn.append(t_final)
print("=== Resultados Three-Hump Camel Function con Vecindario Von Neumann ===")
print(f"Tiempo promedio por ejecución: {np.mean(time_runs_threehump_vn):.2f} segundos")
print(f"Posición promedio encontrada: {np.mean(pos_threehump_results_vn, axis=0)}")
print(f"Valor promedio de la función: {np.mean(val_threehump_results_vn):.4f}")



=== Resultados Three-Hump Camel Function ===
Tiempo promedio por ejecución: 0.01 segundos
Posición promedio encontrada: [1.32379242e-26 3.76062678e-26]
Valor promedio de la función: 0.0000

=== Resultados Three-Hump Camel Function con Vecindario Von Neumann ===
Tiempo promedio por ejecución: 0.01 segundos
Posición promedio encontrada: [1.32379242e-26 3.76062678e-26]
Valor promedio de la función: 0.0000


**No** differences were shown in both implementations

---

## 3. Ant Conoly Optimization Pseudocode


### 3.1 ACO Parameters

- G = (V, E): Graph with vertices V and edges E
- τ(i,j): Pheromone level on edge (i,j)
- η(i,j): Heuristic information (1/distance) for edge (i,j)
- α: Pheromone influence parameter
- β: Heuristic influence parameter
- ρ: Pheromone evaporation rate
- m: Number of ants
- Q: Constant for pheromone updates
- source, destination: Start and end nodes of the path
- max_iter: Maximum number of iterations

### 3.2 Initialization

1. For each edge (i,j) in E:
   - τ(i,j) = τ₀ (small positive constant, e.g., 0.1)
2. best_path = NULL
3. best_length = ∞

### 3.3 ACO Main Loop

**For** `iter = 1` **to** `max_iter`:

#### Ant Tour Construction

**For** `k = 1` **to** `m` (for each ant):
- Place ant *k* at the source node  
- `path_k = [source]`  
- `current_node = source`

**While** `current_node ≠ destination`:
- Calculate the selection probability for each unvisited node *j*:

  \[
  p(j) = \frac{[\tau(current\_node, j)]^\alpha \cdot [\eta(current\_node, j)]^\beta}{\sum [\tau(current\_node, l)]^\alpha \cdot [\eta(current\_node, l)]^\beta}
  \]

- Select the next node based on these probabilities (roulette wheel method)  
- Add the selected node to `path_k`  
- `current_node = selected node`

- Calculate `length_k` (total length of the found path)  
- **If** `length_k < best_length`:
  - `best_path = path_k`
  - `best_length = length_k`


### 3.4 Pheromone update

#### Evaporation
For each edge (i,j):
  - τ(i,j) = (1-ρ) * τ(i,j)
   
#### Deposit
For each ant k:
  For each edge (i,j) in path_k:
    - Δτ(i,j) = Q / length_k
    - τ(i,j) = τ(i,j) + Δτ(i,j)

### 3.5 Return
Return best_path, best_length

### 3.6 Helper Functions

SelectNextNode(current_node, unvisited_nodes, τ, η, α, β):
1. Calculate total sum:
   - sum = Σ [τ(current_node,j)]^α * [η(current_node,j)]^β for all j in unvisited_nodes
2. For each node j in unvisited_nodes:
   - Calculate probability p(j) = [τ(current_node,j)]^α * [η(current_node,j)]^β / sum
3. Generate random number r between 0 and 1
4. Select node using roulette wheel method based on p(j) and r
5. Return selected node

CalculatePathLength(path, G):
1. length = 0
2. For i = 0 to length(path)-2:
   - length += distance between path[i] and path[i+1] in G
3. Return length

---

## 4. Solving the TSP With Pseudocode

### Reasoning

For this implementation of the TSP we thought that a possible implementation could follow the next reasoning:

This PSO for the TSP starts by giving each particle its own random tour of the cities. Every particle saves its tour as its best solution so far, called the personal best. In addition, each particle looks at the tours of its neighbors and remembers the best one as the neighborhood best. In each round, every particle figures out which swaps in its tour would make it more like its personal best and which swaps would make it more like the neighborhood best. Then, some of these swaps are chosen based on a set probability and applied to change the current tour. The new tour is measured by its total distance, and if this distance is less than the distance of the old personal best, the personal best is updated. After all particles have updated their tours in this round, the neighborhood best for every particle is refreshed by finding the best tour among the personal bests of its neighbors. This process repeats for the maximum number of iterations, and at the end, the tour with the lowest cost among all the personal bests is returned along with its total distance.

### **PSEUDOCODE**

Input:
- cities: list of cities (or their coordinates)
- num_particles: number of particles in the swarm
- num_iterations: maximum number of iterations
- parameters: for example, swap application probability

**Initialization:**

For each particle i from 1 to num_particles:

    solution_i = generate a random permutation of the cities
    pbest_i = solution_i
    cost_pbest_i = Evaluate(solution_i)   # Calculate the total tour distance

For each particle i:

    nbest_i = best pbest among the neighbors of particle i
    (using a fixed neighborhood topology, e.g., Von Neumann)

TSP funtion (for iter = 1 to num_iterations):

    For each particle i:

        # Compute swaps to move current solution toward personal best (pbest)
        swaps_pbest = compute_swaps(current solution_i, pbest_i)

        # Compute swaps to move current solution toward neighborhood best (nbest)
        swaps_nbest = compute_swaps(current solution_i, nbest_i)

        # Select some swaps from both sets (using a defined probability)
        selected_swaps = select_swaps(swaps_pbest, swaps_nbest)

        # Update the current solution by applying the selected swap operations
        new_solution = apply_swaps(solution_i, selected_swaps)

        new_cost = evaluate(new_solution) # Compute total distance of new tour

        If new_cost < cost_pbest_i then:
            pbest_i = new_solution
            cost_pbest_i = new_cost

        solution_i = new_solution

    # Update the neighborhood best for each particle:
    For each particle i:
        nbest_i = best pbest among the neighbors of particle i


Output:
    best_tour = pbest of the particle with the lowest cost_pbest
    best_cost = evaluate(best_tour)
    Return best_tour, best_cost