# Custom algorithm and problem
In this notebook, we will show how to use the Algorithm and Problem classes to create a custom algorithm and problem. Here we give an example of **implementing a PSO algorithm that solves the Sphere problem**.



In [6]:
from typing import List

import torch

from evox.core import Algorithm, Mutable, Parameter, Problem, jit_class
from evox.utils import clamp
from evox.workflows import EvalMonitor, StdWorkflow

## Algorithm example: PSO algorithm

Particle Swarm Optimization (PSO) is a population-based metaheuristic algorithm inspired by the social behavior of birds and fish. It is widely used for solving continuous and discrete optimization problems.

**Here is an implementation example of PSO algorithm in EvoX:**

In [7]:
@jit_class
class PSO(Algorithm):
    #Initialize the PSO algorithm with the given parameters.
    def __init__(
        self,
        pop_size: int,
        lb: torch.Tensor,
        ub: torch.Tensor,
        w: float = 0.6,
        phi_p: float = 2.5,
        phi_g: float = 0.8,
        device: torch.device | None = None,
    ):
        super().__init__()
        assert lb.shape == ub.shape and lb.ndim == 1 and ub.ndim == 1 and lb.dtype == ub.dtype
        self.pop_size = pop_size
        self.dim = lb.shape[0]
        # Here, Parameter is used to indicate that these values are hyper-parameters
        # so that they can be correctly traced and vector-mapped
        self.w = Parameter(w, device=device)
        self.phi_p = Parameter(phi_p, device=device)
        self.phi_g = Parameter(phi_g, device=device)
        lb = lb[None, :].to(device=device)
        ub = ub[None, :].to(device=device)
        length = ub - lb
        population = torch.rand(self.pop_size, self.dim, device=device)
        population = length * population + lb
        velocity = torch.rand(self.pop_size, self.dim, device=device)
        velocity = 2 * length * velocity - length
        self.lb = lb
        self.ub = ub
        # Mutable parameters
        self.population = Mutable(population)
        self.velocity = Mutable(velocity)
        self.local_best_location = Mutable(population)
        self.local_best_fitness = Mutable(torch.empty(self.pop_size, device=device).fill_(torch.inf))
        self.global_best_location = Mutable(population[0])
        self.global_best_fitness = Mutable(torch.tensor(torch.inf, device=device))

    def step(self):
        # Compute fitness
        fitness = self.evaluate(self.population)

        # Update the lbest and the gbest
        compare = self.local_best_fitness - fitness
        self.local_best_location = torch.where(
            compare[:, None] > 0, self.population, self.local_best_location
        )
        self.local_best_fitness = self.local_best_fitness - torch.relu(compare)
        self.global_best_location, self.global_best_fitness = self._min_by(
            [self.global_best_location.unsqueeze(0), self.population],
            [self.global_best_fitness.unsqueeze(0), fitness],
        )

        # Update the velocity
        rg = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
        rp = torch.rand(self.pop_size, self.dim, dtype=fitness.dtype, device=fitness.device)
        velocity = (
            self.w * self.velocity
            + self.phi_p * rp * (self.local_best_location - self.population)
            + self.phi_g * rg * (self.global_best_location - self.population)
        )

        # Udpdate the population
        population = self.population + velocity
        self.population = clamp(population, self.lb, self.ub)
        self.velocity = clamp(velocity, self.lb, self.ub)

    def _min_by(self, values: List[torch.Tensor],keys: List[torch.Tensor],):
        # Find the value with the minimum key
        values = torch.cat(values, dim=0)
        keys = torch.cat(keys, dim=0)
        min_index = torch.argmin(keys)
        return values[min_index], keys[min_index]

## Problem example: Sphere problem

The Sphere problem is a simple, yet fundamental benchmark optimization problem used to test optimization algorithms.

The Sphere function is defined as:

$$
\textbf{min} f(x)= \sum_{i=1}^{n} x_{i}^{2}
$$
**Here is an implementation example of Sphere problem in EvoX:**

In [8]:
class Sphere(Problem):
    def __init__(self):
        super().__init__()

    def evaluate(self, pop: torch.Tensor):
        return (pop**2).sum(-1)

## Use the algorithm to solve the problem

### Initiate the algorithm, problem and monitor

In [9]:
algorithm = PSO(
    pop_size=100,
    lb=torch.tensor([-10.0]),
    ub=torch.tensor([10.0]),
    w=0.6,
    phi_p=2.5,
    phi_g=0.8,
    )
problem = Sphere()
monitor = EvalMonitor(full_fit_history=False, full_sol_history=False)

### Initiate the workflow and run it

In [10]:
workflow = StdWorkflow()
workflow.setup(
    algorithm=algorithm,
    problem=problem,
    monitor=monitor
    )

for _ in range(100):
    workflow.step()
workflow.get_submodule("monitor").topk_fitness

tensor([1.6247e-08])