In [3]:
from typing import List
import numpy as np

(a) Implement the PSO algorithm for clustering described in “Van der Merwe, D.
W., and Andries Petrus Engelbrecht. ”Data clustering using particle swarm
optimization.” Evolutionary Computation, 2003. CEC’03. The 2003 Congress
on. Vol. 1. IEEE, 2003.” (see also the lecture’s slides on swarm intelligence).

In [6]:
# Define Function to initialize population
def init_population(size:int, K:int, data_dim:int) -> np.ndarray:
    """Generates a population of particles as a Matrix of shape (num_particles, K, data_dim). 
    A particle is a Matrix of length <K> where each column represents a centroid. The amount of rows
    is equal to the dimensionality of the data that we want to cluster

    Args:
        size (int): The amount of particles 
        K (int): The dimensionality of each particle
        data_dim (int): Dimensionality of the Data to cluster

    Returns:
        (np.ndarray): The population in Matrix form
    """
    # Generate a population Matrix
    return np.random.uniform(-1,1,(size, K, data_dim))

# Sanity check to ensure that we get the right dimensionality
assert init_population(4,4,4).shape == (4,4,4)

$d(x,y) = \sqrt{\sum_{k=1}^{N_d} (x_k - y_k)^2}$

In [30]:
def calc_distance(x:np.ndarray, y:np.ndarray) -> float:
    """Calculates the euclidean distance between point <x> and point <y>.

    Args:
        p1 (np.ndarray): point1
        p2 (np.ndarray): point2

    Returns:
        float: The distance between the points
    """
    return np.linalg.norm(x-y)

# Sanity check
assert calc_distance(np.array([5,5]), np.array([6,6])) == 1.4142135623730951

2.121320343559643

In [27]:
# Define Fitness Function
def calc_fitness(assignments: np.ndarray, dataset: np.ndarray, particle:np.ndarray) -> float:
    """Calculates the fitness of a particle given a K-cluster problem setup. Each entry in the particle is
    treated as a centroid for a cluster. Calculates the normalized
    sum of euclidean distances between centroids and assigned datapoints. 

    Args:
        assignments (np.ndarray): Stores the cluster index for each datapoint
        dataset (np.ndarray): The datapoints 
        particle (np.ndarray): The centroid Matrix

    Returns:
        float: Fitness of the particle
    """
    _, num_clusters = particle.shape
    fitness = 0
    for cluster in range(num_clusters):
        # Get the centroid of the cluster 
        centroid = particle[:,cluster]
        # Get all datapoints that were assigned to this cluster
        boolean_mask = assignments==cluster
        datapoints = dataset[boolean_mask]
        # Calculate the sum of distances between the datapoints and the centroid
        total_distance = 0
        for datapoint in datapoints:
            total_distance += calc_distance(centroid, datapoint)
        #normalize by the amount of assigned datapoints and add to the fitness
        fitness +=  total_distance / len(assignments[boolean_mask])
    # Normalize by the amount of clusters
    fitness /= num_clusters
    return fitness

In [31]:
# Testing the fitness function

# single datapoint
test_ass = np.array([0])
test_data = np.zeros([1,2])+5
# one cluster
test_part = np.zeros([2,1])+6

assert calc_fitness(test_ass, test_data, test_part) == 1.4142135623730951

# two datapoints with single cluster
test_ass = np.array([0, 0])
test_data = np.zeros([2,2])
test_data[0] += 4
test_data[1] += 5

correct = (calc_distance(np.array([4,4]), np.array([6,6])) + calc_distance(np.array([5,5]), np.array([6,6]))) / 2
assert calc_fitness(test_ass, test_data, test_part) == correct


$v_i := \omega\times v_i + \alpha_1\times r_1\circ (\hat{x} - x_i) + \alpha_2\times r_2\circ (\hat{g} - x_i)$

In [None]:
# Define Function to update the velocity of a particle
def update_velocity(vel, local_best, global_best, current, omega, alpha1, alpha2, r1, r2) -> np.ndarray:
    """Updates the velocity <vel> of the <current> particle according to the PSO velocity update

    Args:
        vel (np.ndarray): Defines the velocity of the particle
        local_best (np.ndarray): Stores the fittest version of this particle found so far
        global_best (np.ndarray): Stores the fittest particle of the population found so far
        current (np.ndarray): The current particle
        omega (float): Inertia weight
        alpha1 (float): weight param for the local influence
        alpha2 (float): weight param for the social influence
        r1 (float): additional weight param for local influence
        r2 (float): additional weight param for social influence

    Returns:
        np.ndarray: The updated velocity
    """
    return omega*vel + alpha1*r1*(local_best - current) + alpha2*r2*(global_best - current)

$x_i := x_i + v_i$

In [None]:
# Define Function for the position update
def update_position(current, velocity) -> np.ndarray:
    """Updates the position <current>, using the <velocity> according to the PSO position update rule
    Args:
        current (np.ndarray): The current particle
        velocity (_type_): The velocity of the particle

    Returns:
        np.ndarray: The updated particle position
    """
    new_pos = current + velocity
    return new_pos

<ol>
  <li>Initialize each particle to contain N randomly selected centroids</li>
  <li>
    For iter=1 to maxiter:
    <ul>
      <li> 
        For each particle x:
        <ul>
          <li> For each data point z:</li>
          <ul>
            <li> Calculate the distance of z to each cluster centroid </li>
            <li> Assign z to the cluster (centroid) with minimal distance; </li>
          </ul>
          <li> Compute the fitness (e.g. average weighted distance of data points 
            from their centroid); </li>
          <li> Update the local best; </li>
        </ul>
        <li> Update the global best; </li>
        <li> Update each particle !"using PSO update rules </li>
      </li>
    </ul>
  </li>
</ol> 

In [32]:
# Define a function that runs the K-Means PSO algorithm
def k_means_PSO(K:int, data_dim: int, dataset:np.ndarray, pop_size:int, max_iter:int, omega:float, alpha1:float, alpha2:float, r1:float, r2:float) -> np.ndarray:
    """Runs a K-Means PSO algorithm for <max_iter> iterations to find a set of <K> clusters for a dataset.

    Args:
        K (int): The amount of clusters to separate the data into
        data_dim (int): The dimensionality of the data
        dataset (np.ndarray): The datapoints that should be clustered
        pop_size (int): The amount of particles to run the PSO algorithm with
        max_iter (int): The maximum amount of iterations to run PSO for
        omega (float): Inertia weight
        alpha1 (float): local influence param
        alpha2 (float): social influence param
        r1 (float): additional local influence param
        r2 (float): additional social influence param

    Returns:
        np.ndarray: The best clustering found through the algorithm
    """
    # Define initial population (1.)
    population = init_population(pop_size, K, data_dim)
    # Create a copy of the population to store the best versions found so far for each particle
    local_best = population.copy()
    # Store the local best fitnesses to avoid recomputing
    local_best_f = np.zeros([pop_size])
    # Zero-initialize the velocities of each particle
    velocities = np.zeros([pop_size, K, data_dim])
    # Run the iterations (2.)
    for i in range(max_iter):
        # Loop over each particle 
        for idx, particle in enumerate(population):
            # Collect the assignments of each datapoint for the particle
            assignments = []
            # Loop over each datapoint
            for z in dataset:
                # Collect the distance to each centroid
                distances = []
                # Calculate the distance for every cluster centroid
                for cluster in range(K):
                    centroid = particle[:,cluster]
                    # Get the distance between the centroid and the datapoint
                    distance = calc_distance(centroid, z)
                    # Add the distance to the collector
                    distances.append(distance)
                # The datapoint is assigned to the cluster that minimizes the distance between centroid and datapoint
                assignments.append(np.argmin(distances))
            # Calculate the fitness of the particle given the new assignments
            fitness = calc_fitness(assignments, dataset, particle)
            # Update the local best if the new fitness is higher than that of the previous local best
            if fitness < local_best_f[idx]:
                local_best[idx] = particle.copy()
                local_best_f = fitness
        # Update the global best to be the best of the local_best
        best_idx = np.argmin(local_best_f)
        global_best = local_best[best_idx].copy()
        # Update the position and velocity of each particle
        for i in range(pop_size):
            velocities[i] = update_velocity(velocities[i], local_best[i], global_best, population[i], omega, alpha1, alpha2, r1, r2)
            population[i] = update_position(population[i], velocities[i])
    return global_best   

(b) Implement the k-means clustering method.

(c) Generate Artificial dataset 1 using the description given in the above mentioned
paper.
