In [20]:
import numpy as np
import random
from itertools import product


In [21]:
class Particle:
    def __init__(self, num_subsets, k):
        self.num_subsets = num_subsets
        self.k = k
        
        # Initialize position (random selection of k subsets)
        self.position = np.zeros(num_subsets, dtype=int)
        selected_indices = np.random.choice(num_subsets, k, replace=False)
        self.position[selected_indices] = 1
        
        # Initialize velocity (random small changes)
        self.velocity = np.random.uniform(-1, 1, num_subsets)
        
        # Best personal solution
        self.personal_best = np.copy(self.position)
        self.personal_best_score = float('-inf')


In [22]:
class PSO_MCP:
    def __init__(self, coverage_matrix, num_particles=30, k=3, max_iter=100, w=0.7, c1=1.5, c2=1.5):
        self.coverage_matrix = coverage_matrix
        self.num_elements = coverage_matrix.shape[1]
        self.num_subsets = coverage_matrix.shape[0]
        self.k = k
        self.max_iter = max_iter
        self.w = w
        self.c1 = c1
        self.c2 = c2
        # Initialisation des particules
        self.particles = [Particle(self.num_subsets, k) for _ in range(num_particles)]
        self.global_best = np.copy(self.particles[0].position)
        self.global_best_score = self.fitness_function(self.global_best)


    def fitness_function(self, position):
        covered_elements = np.any(self.coverage_matrix[position == 1], axis=0)
        coverage_score = np.sum(covered_elements)

        # Encourage diversity (éviter que toutes les particules aient les mêmes scores)
        diversity_bonus = np.sum(position) / self.k  

        return coverage_score + 0.1 * diversity_bonus

    
    def update_velocity(self, particle, w=0.7, c1=1.5, c2=1.5):
        """Update the velocity of a particle using PSO equations."""
        r1, r2 = np.random.rand(), np.random.rand()
        cognitive = c1 * r1 * (particle.personal_best - particle.position)
        social = c2 * r2 * (self.global_best - particle.position)
        particle.velocity = w * particle.velocity + cognitive + social
    
    def update_position(self, particle):
        """Update the position of a particle based on velocity using sigmoid activation."""
        sigmoid = lambda x: 1 / (1 + np.exp(-x / 2))  # Diviser par 2 pour réduire l'effet explosif
        probabilities = sigmoid(particle.velocity)
        new_position = (np.random.rand(self.num_subsets) < probabilities).astype(int)
        
        # Ensure exactly k subsets are chosen
        if np.sum(new_position) != self.k:
            ones = np.where(new_position == 1)[0]
            zeros = np.where(new_position == 0)[0]
            if len(ones) > self.k:
                np.random.shuffle(ones)
                new_position[ones[self.k:]] = 0
            elif len(ones) < self.k:
                np.random.shuffle(zeros)
                new_position[zeros[:(self.k - len(ones))]] = 1
        
        particle.position = new_position
    
    def optimize(self, patience=10):
        best_score = float('-inf')
        patience_counter = 0
        
        for iteration in range(self.max_iter):
            patience_counter = 0  # Reset at the beginning of each iteration
            for particle in self.particles:
                fitness = self.fitness_function(particle.position)

                if fitness > particle.personal_best_score:
                    particle.personal_best = np.copy(particle.position)
                    particle.personal_best_score = fitness

                if fitness > self.global_best_score:
                    self.global_best = np.copy(particle.position)
                    self.global_best_score = fitness
                    patience_counter = 0  # Reset patience on improvement

                self.update_velocity(particle)
                self.update_position(particle)

            # Stop early if no improvement for `patience` iterations
            if patience_counter >= patience:
                print(f"Early stopping at iteration {iteration}")
                break

        return self.global_best, self.global_best_score





In [23]:
# Function to generate a random coverage matrix
def generate_random_coverage_matrix(num_subsets, num_elements, coverage_prob=0.5):
    """
    Generates a random binary coverage matrix.
    
    Parameters:
        num_subsets (int): Number of subsets (rows)
        num_elements (int): Number of elements covered (columns)
        coverage_prob (float): Probability of an element being covered by a subset (default 50%)
    
    Returns:
        np.array: Randomly generated coverage matrix
    """
    return (np.random.rand(num_subsets, num_elements) < coverage_prob).astype(int)

In [24]:
if __name__ == "__main__":
    # Parameters for random coverage matrix
    num_subsets = 5  # Number of available subsets
    num_elements = 6  # Number of elements to cover
    
    # Generate a random coverage matrix
    coverage_matrix = generate_random_coverage_matrix(num_subsets, num_elements)
    
    print("Generated Coverage Matrix:\n", coverage_matrix)
    
    # Run PSO for MCP
    pso_mcp = PSO_MCP(coverage_matrix, num_particles=30, k=2, max_iter=100)
    best_solution, best_score = pso_mcp.optimize()
    
    print("Best subset selection:", best_solution)
    print("Maximum elements covered:", best_score)

Generated Coverage Matrix:
 [[1 1 0 0 1 1]
 [0 1 1 0 0 1]
 [0 0 1 0 0 1]
 [1 0 0 1 1 0]
 [1 0 1 0 1 0]]
Best subset selection: [0 1 0 1 0]
Maximum elements covered: 6.1


In [25]:
def tune_pso(coverage_matrix, k, num_particles_list, w_list, c1_list, c2_list, max_iter_list):
    """
    Tunes the PSO algorithm to find the best hyperparameters for MCP.
    
    Parameters:
        coverage_matrix (np.array): Binary coverage matrix
        k (int): Number of subsets to select
        num_particles_list (list): Different values for number of particles
        w_list (list): Different values for inertia weight
        c1_list (list): Different values for cognitive factor
        c2_list (list): Different values for social factor
        max_iter_list (list): Different values for number of iterations
    
    Returns:
        dict: Best hyperparameters
        float: Best fitness score achieved
    """
    best_params = None
    best_score = float('-inf')

    for num_particles, w, c1, c2, max_iter in product(num_particles_list, w_list, c1_list, c2_list, max_iter_list):
        pso_mcp = PSO_MCP(coverage_matrix, num_particles=num_particles, k=k, w=w, c1=c1, c2=c2, max_iter=max_iter)
        pso_mcp.optimize()
        
        if pso_mcp.global_best_score > best_score:
            best_score = pso_mcp.global_best_score
            best_params = {
                'num_particles': num_particles,
                'w': w,
                'c1': c1,
                'c2': c2,
                'max_iter': max_iter
            }

    return best_params, best_score

# Define hyperparameter search space
num_particles_list = [10, 20, 30, 50]
w_list = [0.4, 0.7, 0.9]
c1_list = [1.0, 1.5, 2.0]
c2_list = [1.0, 1.5, 2.0]
max_iter_list = [50, 100, 200]  # Adding iteration tuning

# Example usage
best_params, best_score = tune_pso(
    coverage_matrix, k=2, num_particles_list=num_particles_list, 
    w_list=w_list, c1_list=c1_list, c2_list=c2_list, max_iter_list=max_iter_list
)
print("Best Parameters:", best_params)
print("Best Fitness Score:", best_score)

Best Parameters: {'num_particles': 10, 'w': 0.4, 'c1': 1.0, 'c2': 1.0, 'max_iter': 50}
Best Fitness Score: 6.1
