K means solution space is a function that has a global maximum among many local maximums.
We will use simulated annealing to escape local maximum solutions. 


K Means algorithm:



1.   Initialize cluster centers randomly.
2.   Repeat until converged:
  
  -Update cluster labels: Assign points to nearest cluster center(centroid).
  
  -Update cluster centers(centroids): Set center to mean of each cluster. 


Formula for finding nearest distance of point from centers(centroids):


> Euclidean distance = d(p,q) = sqrt(sum((pi - qi)**2))





In [129]:
import numpy as np
import matplotlib.pyplot as plt


def euclidean_distance(x1, x2):
  return np.sqrt(np.sum((x1-x2)**2))


class KMeans:

  def __init__(self, K=5, max_iters=100, plot_steps=False):
    self.K = K
    self.max_iters = max_iters
    self.plot_steps = plot_steps

    # list of sample indices for each cluster 
    self.clusters = [[] for _ in range(self.K) ]

    # the centers (mean vector) for each cluster
    self.centroids = []
    
  def predict(self,X):
    self.X = X
    self.n_samples, self.n_features = X.shape

    # initialize the centroids
    random_sample_idxs = np.random.choice(self.n_samples, self.K, replace=False) # select K cluster centers from n samples without replacement (no repeated points).
    self.centroids = [self.X[idx] for idx in random_sample_idxs]

    # optimize clusters
    for _ in range(self.max_iters):
      # assign samples to closest centroids (create clusters)
      self.clusters = self._create_clusters(self.centroids)

      if self.plot_steps:
        self.plot()

      #calculate new centroids from the clusters
      centroids_old = self.centroids # saving so that we can see if results have converged
      self.centroids = self._get_centroids(self.clusters)

      if self._is_converged(centroids_old,  self.centroids):
        break

      if self.plot_steps:
        self.plot()

    # classify samples as the index of their clusters
    return self._get_cluster_labels(self.clusters), self._get_objective_value(self.clusters,self.centroids)

  def _get_cluster_labels(self, clusters):
    # each sample will get the label of the cluster it was assigned to 
    labels = np.empty(self.n_samples) 
    for cluster_idx, cluster in enumerate(clusters):
      for sample_idx in cluster:
        labels[sample_idx] = cluster_idx
    return labels
    
  def _get_objective_value(self,clusters,centroids):
    clusters_assignment_value = 0
    for centroid, cluster in zip(centroids,clusters):
      cluster_objective_value = 0
      for sample_point in cluster:
        if not cluster_objective_value:
          cluster_objective_value = euclidean_distance(sample_point,centroid)
        else:
          cluster_objective_value = cluster_objective_value + euclidean_distance(sample_point,centroid)
      if not clusters_assignment_value:
        clusters_assignment_value = cluster_objective_value
      else:
        clusters_assignment_value = cluster_objective_value + clusters_assignment_value
    return clusters_assignment_value

  # function to create clusters from centroids
  def _create_clusters(self, centroids):
    #assign samples to the closest centroids
    clusters = [[] for _ in range(self.K)]
    for idx, sample in enumerate(self.X):
      centroid_idx = self._closest_centroid(sample, centroids)
      clusters[centroid_idx].append(idx)
    return clusters 

  def _closest_centroid(self, sample, centroids):
    #distance of the current sample to each centroid
    distances = [euclidean_distance(sample, point) for point in centroids]
    closest_idx = np.argmin(distances)
    return closest_idx


  # function to return new calculated centroids
  def _get_centroids(self, clusters):
    # assign mean value of clusters to centroids
    centroids = np.zeros((self.K, self.n_features))
    
    for cluster_idx, cluster in enumerate(clusters):
      cluster_mean = np.mean(self.X[cluster], axis=0)
      centroids[cluster_idx] = cluster_mean 
    return centroids

  # function to check if kmeans is converged
  def _is_converged(self, centroids_old, centroids):
    # distances between old and new centroids, for all centroids
    distances = [euclidean_distance(centroids_old[i], centroids[i]) for i in range(self.K)]
    return sum(distances) == 0

  
  # function to plot centoids and clusters
  def plot(self):
    fig, ax = plt.subplots(figsize=(12, 8))
    for i, index in enumerate(self.clusters):
      point = self.X[index].T
      ax.scatter(*point)
    for point in self.centroids:
      ax.scatter(*point, marker="x", color="black", linewidth=2)
    plt.show()

In [137]:
# Testing 

# if __name__ == "__main__":
#   np.random.seed(42)
#   from sklearn.datasets import make_blobs
   
#   X, y = make_blobs(
#       centers=3, n_samples=500, n_features=2, shuffle=True, random_state=40
#   )
#   print(X.shape)

#   clusters = len(np.unique(y))
#   print(clusters)

#   k = KMeans(K=clusters, max_iters=150, plot_steps=False)
#   y_pred, objective_value = k.predict(X)
#   print(f"objective value: {round(objective_value)}")
#   #k.plot()

# plot function needs to be improved in order to present results of clustering datasets with more than 3 features.

if __name__ == "__main__":
  np.random.seed(42)
  from sklearn import datasets
  
  iris = datasets.load_iris()
  X = iris.data
  y = iris.target
  print(X.shape)

  clusters = len(np.unique(y))
  print(clusters)

  k = KMeans(K=clusters, max_iters=150, plot_steps=False)
  y_pred, objective_value = k.predict(X)
  print(f"objective value: {round(objective_value)}")

#  k.plot()


(150, 4)
3
objective value: 21353


Simulated Annealing algorithm goes as follows for maximising given function h(x):

1.  Generate intial candidate solution.
2.  Get an initial temperature T > 0.
3.  for i in 1:N (N is number of iterations).
    
  * Generate new candidate solution.
  * Calculate probability p = exp(delta h / Ti), else accept x = x; delta h = h(x') - (x)
  * Accept candidate solution with probability p; u ~ U(0,1), accept x = x', if u <= p.
  * Update temperature (cooling), e.g. T = T*alpha, where 0 < alpha < 1



Objective function we are trying to minimize: total sum of distances of each sample to its cluster centroid.

If the sum is lower means each point is closer to its centroid, indicating a better solution.

In [135]:
import math
import numpy as np

def euclidean_distance(x1, x2):
  return np.sqrt(np.sum((x1-x2)**2))

class Simulated_Annealing:

  def __init__(self, temperature=1, alpha=0.95, K=5, max_iters=100, minimum_temperature=0.00001, plot_steps=False):
    self.K = K
    self.max_iters = max_iters
    self.plot_steps = plot_steps
    # list of sample indices for each cluster 
    self.clusters = [[] for _ in range(self.K) ]
    # the centers (mean vector) for each cluster
    self.centroids = []
    self.temperature = temperature
    self.minimum_temperature = minimum_temperature
    self.alpha = alpha
    self.accept_worse_state_count = 0 # to accept_worse_state_count step of algorithm
    self.worse_state_count = 0
    self.objective_value = 0 

  def predict(self,X):
    self.X = X
    self.n_samples, self.n_features = X.shape

    # checks if worse state gets rejected, so as to not calculate centroids, if there has been no change to clusters
    worse_state_rejected = False

    # initialize the centroids
    random_sample_idxs = np.random.choice(self.n_samples, self.K, replace=False) # select K cluster centers from n samples without replacement (no repeated points).
    self.centroids = [self.X[idx] for idx in random_sample_idxs]

    while self.temperature > self.minimum_temperature:

      print(f"temperature is: {self.temperature}")
      self.accept_worse_state_count = 0
      self.worse_state_count = 0
      # optimize clusters
      for _ in range(self.max_iters):
        # assign samples to closest centroids (create clusters)
        self.clusters = self._create_clusters(self.centroids)

        if self.plot_steps:
          self.plot()

        # generating a trial assignment by moving random point to random cluster.
        random_sample_point_idx = np.random.choice(self.n_samples, 1, replace=False)
        random_centroid_idx = np.random.choice(self.K, 1, replace=False)[0]

        # creating trial assignment
        clusters_trial_assignment = self.clusters.copy()
       
        for idx,cluster in enumerate(clusters_trial_assignment):
          if random_sample_point_idx in cluster:
            cluster.remove(random_sample_point_idx)
            #cluster.pop(cluster.index(random_sample_point_idx))        
        
        clusters_trial_assignment[random_centroid_idx].append(random_sample_point_idx[0])

        centroids_trial_assignment = self._get_centroids(clusters_trial_assignment)
        
        cluster_improvement_delta = self._get_objective_value(clusters_trial_assignment,centroids_trial_assignment) - self._get_objective_value(self.clusters,self.centroids)
        # if worse configuration than accept with probability
        if cluster_improvement_delta > 0:

          self.worse_state_count = self.worse_state_count + 1
          y = np.random.random(1)[0] 
          if (self._get_acceptance_probability(cluster_improvement_delta, self.temperature)) > y:
            self.clusters = clusters_trial_assignment
            self.centroids = centroids_trial_assignment
            self.objective_value = self._get_objective_value(self.clusters,self.centroids)
            self.accept_worse_state_count = self.accept_worse_state_count+1 # counting worse configuration acceptance ratio
          
          else:
            worse_state_rejected = True

        # if better state accept 
        elif cluster_improvement_delta < 0:
          self.clusters = clusters_trial_assignment
          self.centroids = centroids_trial_assignment
          self.objective_value = self._get_objective_value(self.clusters,self.centroids)
          worse_state_rejected = True

        if worse_state_rejected == False:
          # calculate new centroids from the clusters
          #centroids_old = self.centroids # saving so that we can see if results have converged
          self.centroids = self._get_centroids(self.clusters)
          worse_state_rejected = True
      
      print(f"percentage of worse states being accepted: {round(self.accept_worse_state_count/self.worse_state_count,2)*100}")
      self.temperature = self.temperature*self.alpha

      # if self._is_converged(centroids_old,  self.centroids):
      #   break

      if self.plot_steps:
        self.plot()

    # classify samples as the index of their clusters
    return self._get_cluster_labels(self.clusters), self._get_objective_value(self.clusters,self.centroids)

  def _get_objective_value(self,clusters,centroids):
    clusters_assignment_value = 0
    for centroid, cluster in zip(centroids,clusters):
      cluster_objective_value = 0
      for sample_point in cluster:
        if not cluster_objective_value:
          cluster_objective_value = euclidean_distance(sample_point,centroid)
        else:
          cluster_objective_value = cluster_objective_value + euclidean_distance(sample_point,centroid)
      if not clusters_assignment_value:
        clusters_assignment_value = cluster_objective_value
      else:
        clusters_assignment_value = cluster_objective_value + clusters_assignment_value
    return clusters_assignment_value

  def _get_acceptance_probability(self, delta, temperature):
    acceptance_probability =  math.exp(-(delta/temperature))
    return acceptance_probability

  def _get_cluster_labels(self, clusters):
    # each sample will get the label of the cluster it was assigned to 
    labels = np.empty(self.n_samples) 
    for cluster_idx, cluster in enumerate(clusters):
      for sample_idx in cluster:
        labels[sample_idx] = cluster_idx
    return labels
    
  # function to create clusters from centroids
  def _create_clusters(self, centroids):
    # assign samples to the closest centroids
    clusters = [[] for _ in range(self.K)]
    for idx, sample in enumerate(self.X):
      centroid_idx = self._closest_centroid(sample, centroids)
      clusters[centroid_idx].append(idx)
    return clusters 

  def _closest_centroid(self, sample, centroids):
    #distance of the current sample to each centroid
    distances = [euclidean_distance(sample, point) for point in centroids]
    closest_idx = np.argmin(distances)
    return closest_idx

  # function to return new calculated centroids
  def _get_centroids(self, clusters):
    # assign mean value of clusters to centroids
    centroids = np.zeros((self.K, self.n_features))
    for cluster_idx, cluster in enumerate(clusters):
      cluster_mean = np.mean(self.X[cluster], axis=0)
      centroids[cluster_idx] = cluster_mean 
    return centroids

  # function to check if kmeans is converged
  def _is_converged(self, centroids_old, centroids):
    # distances between old and new centroids, for all centroids
    distances = [euclidean_distance(centroids_old[i], centroids[i]) for i in range(self.K)]
    return sum(distances) == 0

  # function to plot centoids and clusters
  def plot(self):
    fig, ax = plt.subplots(figsize=(12, 8))
    for i, index in enumerate(self.clusters):
      point = self.X[index].T
      ax.scatter(*point)
    for point in self.centroids:
      ax.scatter(*point, marker="x", color="black", linewidth=2)
    plt.show()
    

In [138]:
# Testing 

# if __name__ == "__main__":
#   np.random.seed(42)
#   from sklearn.datasets import make_blobs
   
#   X, y = make_blobs(
#       centers=3, n_samples=100, n_features=2, shuffle=True, random_state=40
#   )
#   print(X.shape)

#   clusters = len(np.unique(y))
#   print(clusters)

#   k = Simulated_Annealing(temperature=100, alpha=0.95, K=clusters, max_iters=100, minimum_temperature=0.00001, plot_steps=False)
#   y_pred = k.predict(X)

  #k.plot()

if __name__ == "__main__":
  np.random.seed(42)
  from sklearn import datasets
  
  iris = datasets.load_iris()
  X = iris.data
  y = iris.target
  print(X.shape)

  clusters = len(np.unique(y))
  print(clusters)

  k = Simulated_Annealing(temperature=100, alpha=0.95, K=clusters, max_iters=100, minimum_temperature=0.00001, plot_steps=False)
  y_pred, objective_value = k.predict(X)
  print(f"objective_value:{round(objective_value)}")

(150, 4)
3
temperature is: 100
percentage of worse states being accepted: 100.0
temperature is: 95.0
percentage of worse states being accepted: 96.0
temperature is: 90.25
percentage of worse states being accepted: 98.0
temperature is: 85.7375
percentage of worse states being accepted: 93.0
temperature is: 81.45062499999999
percentage of worse states being accepted: 98.0
temperature is: 77.37809374999999
percentage of worse states being accepted: 98.0
temperature is: 73.50918906249998
percentage of worse states being accepted: 100.0
temperature is: 69.83372960937498
percentage of worse states being accepted: 95.0
temperature is: 66.34204312890623
percentage of worse states being accepted: 92.0
temperature is: 63.02494097246091
percentage of worse states being accepted: 100.0
temperature is: 59.87369392383786
percentage of worse states being accepted: 95.0
temperature is: 56.880009227645964
percentage of worse states being accepted: 95.0
temperature is: 54.03600876626366
percentage of wo