# Particle Swarm Optimisation for Knights Covering Problem

In [1]:
#necessary python library imports
import numpy as np
from collections.abc import Callable
from tqdm.auto import tqdm

## Algorithm Implementation

### Particle Swarm Optimisation class

Fitness function is an error function, so we want to minimise it

In [2]:
def none_value_wrapper(value, default):
    if value is None:
        value = default
    return value

In [3]:
class PSO:
    def __init__(self, board_size: int, fitness_function: Callable[[np.ndarray], np.ndarray], 
                 penalty_function: Callable[[np.ndarray, int], np.ndarray] = (lambda x,y: 0), num_particles: int = 100, c1: float = 1.0, 
                 c2: float = 1.0, inertia: float = 1.0, max_velocity: float = 4.0, min_velocity: float = -4.0, 
                 rng: np.random.Generator = np.random.default_rng(None)) -> None:
        
        #check for sensible hyperparameter selection
        assert min_velocity < max_velocity, "Max velocity has to be greater than the min velocity." 
        assert num_particles > 1, "There has to be at least one particle"
        assert c1 >= 0, "c1 has to be greater than 0"
        assert c2 >= 0, "c2 has to be greater than 0"
        assert board_size > 3, "board size has to be at least 3"
        #condition on sensible inertia values as proposed in: "F. van den Bergh. An Analysis of Particle Swarm Optimizers (2002)"
        assert inertia > c1+c2 / 2 - 1 and inertia >= 0, "Inertia must be positive and greater than (c1 + c2) / 2 -1"

        #assign hyperparameters to object variables
        self.board_size = board_size
        self.fitness_function = fitness_function
        self.penalty_function = none_value_wrapper(penalty_function, default=(lambda x,y: 0))
        self.num_particles = none_value_wrapper(num_particles, default=100)
        self.c1 = none_value_wrapper(c1, default=1.0)
        self.c2 = none_value_wrapper(c2, default=1.0)
        self.max_velocity = none_value_wrapper(max_velocity,4.0)
        self.min_velocity = none_value_wrapper(min_velocity,-4.0)
        self.inertia = none_value_wrapper(inertia, default=1.0)
        self.rng = none_value_wrapper(rng, np.random.default_rng(None))

        #initialise particles and velocities
        self.particle_positions = self.rng.integers(low=0, high=1, endpoint=True, size=(self.num_particles, self.board_size**2))
        self.particle_best_pos = self.particle_positions.copy()
        self.particle_velocities = self.rng.random(size=(self.num_particles, self.board_size**2)) * self.max_velocity
        
        #initialise global best
        self.particle_fitness = self.compute_penalty_fitness(self.particle_positions, 0)
        self.particle_best_fitness = self.particle_fitness.copy()

        best_fitness = np.min(self.particle_fitness) #find best fitness value
        best_indices = np.argwhere(self.particle_fitness == best_fitness) #find all particles that have the best fitness value
        best_chosen_index = self.rng.choice(best_indices) #randomly choose one of the particles with the best fitness values

        self.global_best_location = self.particle_positions[best_chosen_index].copy()
        self.global_best_fitness = self.particle_fitness[best_chosen_index]
    
    def compute_penalty_fitness(self, particle_positions: np.ndarray, k: int):
        ''' Computes the fitness subtracted by the penalty for invalid solutions (i.e. not all squares attacked or occupied).
            The penalty function receives a parameter 'k', which indicates the current iteration number.
        '''
        return self.fitness_function(particle_positions) + self.penalty_function(particle_positions, k)

    def optimise(self, iterator):
        ''' Starts the optimisation loop. The num_iterations parameter specifies the terimination criterion as 
            the number of iterations that will be done.
        '''

        # initialise random numbers used in velocity computation
        r1 = np.empty(shape=(self.num_particles,1))
        r2 = np.empty(shape=(self.num_particles,1))
        r_sigmoid = np.empty(shape=(self.num_particles,1))

        for i in iterator:
            #update random numbers
            self.rng.random(size=(self.num_particles,1), out=r1)
            self.rng.random(size=(self.num_particles,1), out=r2)
            self.rng.random(size=(self.num_particles,1), out=r_sigmoid)

            # update velocities
            reduced_current_velocity = self.inertia * self.particle_velocities
            local_velocity_component = self.c1 * r1 * (self.particle_best_pos - self.particle_positions)
            global_velocity_component = self.c2 * r2 * (self.global_best_location - self.particle_positions)

            new_velocity = reduced_current_velocity + local_velocity_component + global_velocity_component
            self.particle_velocities = np.clip(new_velocity, a_max= self.max_velocity, a_min= self.min_velocity)

            # update positions
            #the original algorithm was flawed in regards to the position update
            #see https://www.researchgate.net/publication/224302958_A_novel_binary_particle_swarm_optimization
            sigmoid = 1/(1 + np.exp(-self.particle_velocities))
            change_indices = r_sigmoid < sigmoid

            #dimensions in which the normalised velocity value is greater than random value change their location value from 0 to 1 or vice versa
            self.particle_positions[change_indices] == (self.particle_positions[change_indices] + 1) % 2

            # calculate fitness
            self.particle_fitness = self.compute_penalty_fitness(self.particle_positions, i)

            # update best positions
            # local best positions
            new_best_local_fitness_indices = self.particle_fitness > self.particle_best_fitness
            self.particle_best_fitness[new_best_local_fitness_indices] = self.particle_fitness[new_best_local_fitness_indices]
            self.particle_best_pos[new_best_local_fitness_indices] = self.particle_positions[new_best_local_fitness_indices]

            # global best positions
            new_best_global_fitness = np.min(self.particle_fitness) #find best fitness value
            new_best_global_fitness_indices = np.argwhere(self.particle_fitness == new_best_global_fitness) #find all particles that have the best fitness value
            new_best_chosen_global_fitness_index = self.rng.choice(new_best_global_fitness_indices) #randomly choose one of the particles with the best fitness values

            #update best solution 
            if new_best_global_fitness < self.global_best_fitness:
                self.global_best_fitness = new_best_global_fitness
                self.global_best_location = self.particle_positions[new_best_chosen_global_fitness_index].copy()
            


### Fitness and penalty functions

#### Utility functions

In [4]:
def compute_covered_squares(particle_position: np.ndarray, binary_space: bool = True, board_size: int = None) -> np.ndarray:
    ''' 
    Computes all covered squares given the position of a single particle. 
    Covered squares are defined as positions on the chessboard that are either occupied or attacked by a knight.
    '''
    if binary_space:
        board_size = int(np.sqrt(particle_position.shape[0]))
        particle_position = particle_position.reshape((board_size, board_size))
    else:
        assert board_size is not None, "When using integer space, you have to specify the board_size parameter"
        particle_position = convert_integer_to_binary_space(particle_position, board_size)

    knight_position_indices = np.argwhere(particle_position)

    covered_positions = []
    relative_jump_positions = [
        #(x_change, y_change)
        ( 0, 0),#current position
        (-1,-2),#left top
        (+1,-2),#left bottom
        (-1,+2),#right top
        (+1,+2),#right bottom
        (-2,-1),#top left
        (-2,+1),#top right
        (+2,-1),#bottom left
        (+2,+1),#bottom right
    ]

    for relative_jump_position in relative_jump_positions:
        #calculate positions that the knights can jump to
        positions = knight_position_indices.copy()
        positions += relative_jump_position   

        #calculated positions are only valid if they are within the bounds of the chessboard     
        valid_indices = np.all((positions >= 0) & (positions < board_size), axis=1)

        #append valid positions to the list of attacked positions
        covered_positions.extend(positions[valid_indices])
    
    covered_positions = np.array(covered_positions)
    unique_positions = np.unique(covered_positions, axis=0)

    return unique_positions

def convert_integer_to_binary_space(particle_position: np.ndarray, board_size: int):
    binary_position = np.zeros(shape=(board_size, board_size)) #initialise board
    knight_position_indices = np.cumsum(particle_position) #calculate knight positions on board
    knight_position_indices = knight_position_indices - 1 #set the reference point for the first knight to -1 so that 0 always indicates an unused knight
    
    #keep only valid positions
    valid_position_indices = knight_position_indices >= 0 & knight_position_indices < board_size**2 
    knight_position_indices = knight_position_indices[valid_position_indices]

    binary_position[knight_position_indices] = 1 #set knights on board
    return binary_position


def compute_num_of_covered_squares(particle_position: np.ndarray, binary_space: bool = True, board_size: int = None) -> int:
    return compute_covered_squares(particle_position, binary_space, board_size).shape[0]


a = np.array([0,1,1,0,0, 
              0,0,0,0,0, 
              0,0,1,0,1, 
              0,1,0,0,0,
              1,0,0,0,0,])
covered_squares = compute_covered_squares(a)

#### Fitness function implementations

In [5]:
def fitness_binary_space_pso_paper(particle_positions: np.ndarray) -> np.ndarray:
    '''From paper Investigating binary PSO parameter influence on the knights cover problem by N. Franken and A.P. Engelbrecht (2005)'''
    num_covered_squares = np.apply_along_axis(compute_num_of_covered_squares, axis=1, arr = particle_positions)
    num_knights = np.sum(particle_positions, axis=1)
    total_num_squares = particle_positions.shape[1]
    num_empty_squares = total_num_squares - num_covered_squares

    fitness = num_empty_squares + num_knights + total_num_squares/num_covered_squares
    return fitness

def fitness_integer_space_pso_paper(particle_positions: np.ndarray, board_size:int = None) -> np.ndarray:
    num_covered_squares = np.apply_along_axis(compute_num_of_covered_squares, axis=1, arr=particle_positions, binary_space=True, board_size=board_size)
    num_knights = np.sum((particle_positions > 0), axis=1)
    total_num_squares = board_size**2
    num_empty_squares = total_num_squares - num_covered_squares

    fitness = num_empty_squares + num_knights + total_num_squares/num_covered_squares
    return fitness

def fitness_binary_space_simple(particle_positions: np.ndarray) -> np.ndarray:
    num_knights = np.sum(particle_positions, axis=1)
    return num_knights

def fitness_integer_space_simple(particle_positions: np.ndarray, board_size:int = None) -> np.ndarray:
    num_knights = np.sum((particle_positions > 0), axis=1)
    return num_knights

#### Penalty function implementations

In [6]:
#computes penalty for conventional search space depending on the ratio of empty squares
def penalty_binary_space(particle_positions: np.ndarray, iteration_number: int):
    ratio_of_empty_squares = (particle_positions.shape[1] - compute_num_of_covered_squares(particle_positions)) / particle_positions.shape[1]

    penalties = iteration_number * ratio_of_empty_squares #use ratio so that the penalty is independent of the board size
    return penalties

#computes penalty for integer search space depending on ratio of empty squares and whether the knights are positioned outside the board
def penalty_integer_space(particle_positions:np.ndarray, iteration_number: int, board_size):
    ratio_of_empty_squares = (board_size**2 - compute_num_of_covered_squares(particle_positions)) / board_size**2
    last_knight_location = np.sum(particle_positions, axis=1)
    distance_from_last_square = last_knight_location - board_size**2

    penalties = iteration_number * (ratio_of_empty_squares + np.maximum(0, distance_from_last_square))
    return penalties

### Random Search Algorithm class

In [7]:
class RandomSearch:
    def __init__(self, board_size: int, fitness_function: Callable[[np.ndarray], np.ndarray], 
                 penalty_function: Callable[[np.ndarray, int], np.ndarray] = (lambda _: 0), 
                 rng: np.random.Generator = np.random.default_rng(None)):
        
        self.board_size = board_size
        self.fitness_function = fitness_function
        self.penalty_function = penalty_function
        self.rng = rng

        self.current_solution = self.rng.integers(low=0, high=1, endpoint=True, size=board_size**2)
        self.best_solution = self.current_solution.copy()
        self.best_fitness = self.compute_penalty_fitness(self.best_solution, 0)
        
    def compute_penalty_fitness(self, particle_positions: np.ndarray, k: int):
        ''' Computes the fitness subtracted by the penalty for invalid solutions (i.e. not all squares attacked or occupied).
            The penalty function receives a parameter 'k', which indicates the current iteration number.
        '''
        return self.fitness_function(particle_positions) + self.penalty_function(particle_positions, k)
        
    def optimise(self, iterator):
        for i in iterator:
            #select random position to be altered
            mutation_index = self.rng.integers(low=0, high=self.current_solution.shape[0])
            #set element at index position to it's binary complement
            self.current_solution[mutation_index] = (self.current_solution[mutation_index]+1) % 2 
            #compute fitness of altered solution
            fitness = self.compute_penalty_fitness(self.current_solution, i)
            
            #update best solution
            if(fitness < self.best_fitness):
                self.best_solution = self.current_solution.copy()
                self.best_fitness = fitness

### Stochastic Hillclimb Algorithm class

In [8]:
#implements First choice stochastic hill climb as per Artificial Intelligence: A Modern Approach, Global Edition, Fourth Edition
class StochasticHillclimb:
    def __init__(self, board_size: int, fitness_function: Callable[[np.ndarray], np.ndarray], 
                 penalty_function: Callable[[np.ndarray, int], np.ndarray] = (lambda _: 0), allowed_plateau_steps: int = 100,
                 rng: np.random.Generator = np.random.default_rng(None)):
        
        self.board_size = board_size
        self.fitness_function = fitness_function
        self.penalty_function = penalty_function
        self.rng = rng
        self.allowed_plateau_steps = allowed_plateau_steps

        self.current_solution = self.rng.integers(low=0, high=1, endpoint=True, size=board_size**2)
        self.current_fitness = self.compute_penalty_fitness(self.current_solution, 0)

    def compute_penalty_fitness(self, particle_positions: np.ndarray, k: int):
        ''' Computes the fitness subtracted by the penalty for invalid solutions (i.e. not all squares attacked or occupied).
            The penalty function receives a parameter 'k', which indicates the current iteration number.
        '''
        return self.fitness_function(particle_positions) + self.penalty_function(particle_positions, k)

    def optimise(self, iterator):
        num_plateau_steps = 0

        for i in iterator:
            neighbour_permutation = self.rng.permutation(self.current_solution.shape[0])

            for neighbour_index in neighbour_permutation:
                neighbour_solution = self.current_solution.copy()
                neighbour_solution[neighbour_index] = (neighbour_solution[neighbour_index]+1) % 2 
                neighbour_fitness = self.compute_penalty_fitness(neighbour_fitness, i)

                #choose first neighbour with more optimal fitness (in this case, lower fitness value as we want to minimise the function)
                if neighbour_fitness <= self.current_fitness:
                    #check if fintess is on a plateau
                    if neighbour_fitness == self.current_fitness:
                        num_plateau_steps += 1 #increment plateau step counter

                        #cancel optimisation if maximum number of plateau steps is reached
                        if num_plateau_steps > self.allowed_plateau_steps: return
                        
                    else: #reset plateau step counter if fitness is reduced
                        num_plateau_steps = 0

                    self.current_solution = neighbour_solution
                    self.current_fitness = neighbour_fitness
                    break #dont consider the remaining neighbours
            
                return # no improved solution was found among the neighbours -> peak has been reached

## Experimentation

### Utility functions

In [9]:
def increment_indices(current_indices, max_indices):
    current_indices[-1] += 1
    for i in reversed(range(len(current_indices))):
        if current_indices[i] > max_indices[i]:
            current_indices[i] = 0
            if i != 0:
                current_indices[i-1] +=1
        else:
            break
    return current_indices

def fetch_algorithm_parameters(algorithm_config, parameter_indices):
    config_value_list = list(algorithm_config.values())
    parameters = [config_value_list[parameter][parameter_index] for parameter, parameter_index in enumerate(parameter_indices)]
    
    #flatten tuples in parameter list
    flattened_parameters = []
    for element in parameters:
        if isinstance(element,tuple):
            flattened_parameters.extend(element)
        else:
            flattened_parameters.append(element)
            
    return flattened_parameters


### Experimentation config

In [10]:
#run configuration for particle swarm optimisation
pso_config = {
    "fitness_and_penalty_functions": [(fitness_binary_space_pso_paper,None)],
    "num_particles": [100],
    "c1_c2_and_inertia": [(1,1,1)],
    "max_min_velocity": [(4,-4)],
}

#run configuration for random search
rs_config = {
    "fitness_and_penalty_functions": [(),()]
}

#run configuration for stochastic hillclimb
shc_config = {
    "fitness_and_penalty_functions": [(),()],
    "allowed_plateau_steps": []
}

#meta run config for all experiments
run_config = {
    "optimisation_algorithm": [PSO, RandomSearch, StochasticHillclimb],
    "algorithm_configs":[pso_config, rs_config, shc_config],
    "board_sizes": list(np.arange(start=4, stop=25)),
    "runs_per_experiment": 10,
    "iterations_per_run": 100,
    "rng_seed": 1234567
}

### Run Experiments

In [11]:
#TODO: switch from iterations to fitness function evaluations for PSO
#TODO: store results in numpy array and then to pandas dataframe
rng = np.random.default_rng(run_config["rng_seed"])

# run each optimisation algorithm
for optimiser_id, algorithm in tqdm(enumerate(run_config["optimisation_algorithm"]), 
    desc="Optimisation Algorithms", total=len(run_config["optimisation_algorithm"]), position=0, leave=False):

    algorithm_config = run_config["algorithm_configs"][optimiser_id] #fetch experiment config for the current optimisation algorithm
    
    #add board_sizes from run_config as first element of current config
    extended_config = {"board_sizes":run_config["board_sizes"]} 
    extended_config.update(algorithm_config)
    algorithm_config = extended_config

    max_parameter_indices = np.array([len(x)-1 for x in algorithm_config.values()], dtype=int) #compute the maximum indices for each config parameter
    num_of_experiments = np.cumprod(max_parameter_indices + 1)[-1] #compute the number of experiments to run based on number of values in config
    current_parameter_indices = np.zeros(len(algorithm_config), dtype= int) #initialise current parameter indices to 0

    #run all experiments
    for experiment_index in tqdm(range(num_of_experiments), desc="Experiments", position=1, leave=False):
        parameters = fetch_algorithm_parameters(algorithm_config, current_parameter_indices)
        parameters.append(rng) #append random number generator to parameters

        #run all runs of the same experiment
        for run in tqdm(range(run_config["runs_per_experiment"]), desc="Runs", position=2, leave=False):
            optimiser = algorithm(*parameters) #initialise optimisation algorithm with parameters
            iterator = tqdm(range(run_config["iterations_per_run"]), desc="Iterations", position=3, leave=False)
            optimiser.optimise(iterator) #perform optimisation
        #increment indices of the current algorithm's parameters
        current_parameter_indices = increment_indices(current_parameter_indices, max_parameter_indices)

Optimisation Algorithms:   0%|          | 0/3 [00:00<?, ?it/s]

Experiments:   0%|          | 0/21 [00:00<?, ?it/s]

Runs:   0%|          | 0/10 [00:00<?, ?it/s]

Runs:   0%|          | 0/10 [00:00<?, ?it/s]

Runs:   0%|          | 0/10 [00:00<?, ?it/s]

Runs:   0%|          | 0/10 [00:00<?, ?it/s]

Runs:   0%|          | 0/10 [00:00<?, ?it/s]

Runs:   0%|          | 0/10 [00:00<?, ?it/s]

KeyboardInterrupt: 

### Store experiment results

## Result Visualisation

### Load results file

### Visualisation