In [16]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [None]:
class CustomKMeans:
    
    def __init__(self, K=5, max_iters=1000, plot_steps=False) -> None:
        self.K = K
        self.max_iters = max_iters
        self.plot_steps = plot_steps
        # list of sample indices for each cluster
        self.clusters = [[] for _ in range(self.K)]
        # mean feature vector for each cluster
        self.centroids = []
    
    def _eculidean_distance(self, x1: float, x2: float):
        return np.sqrt(np.sum((x1 - x2)**2))
    
    def _closest_centroid(self, sample, centroids):
        distances = [self._eculidean_distance(sample, point) for point in centroids]
        closest_idx = np.argmin(distances)
        return closest_idx

    def _create_clusters(self, centroids: list):
        clusters = [[] for _ in range(self.K)]
        for idx, sample in enumerate(self.x):
            centroid_idx = self._closest_centroid(sample, centroids)
            clusters[centroid_idx].append(idx)
        return clusters

    def _update_centroids(self, clusters: list):
        centroids = np.zeros((self.K, self.n_features))
        for cluster_idx, cluster in enumerate(clusters):
            cluster_mean = np.mean(self.x[cluster], axis=0)
            centroids[cluster_idx] = cluster_mean

        return centroids

    def _is_converged(self, centroids_old, centroids):
        # distance between each old and new centroids, for all centroids
        distances = [self._eculidean_distance(centroids_old[i], centroids[i]) for i in range(self.K)]
        return sum(distances) == 0

    def _get_cluster_labels(self, clusters):
        labels = np.empty(self.n_samples)
        for cluster_idx, cluster in enumerate(clusters):
            for sample_index in cluster:
                labels[sample_index] = cluster_idx
        
        return labels

    def plot(self):
        fig, ax = plt.subplots(figsize=(12, 8))

        for i, index in enumerate(self.clusters):
            point = self.x[index].T
            ax.scatter(*point)

        for point in self.centroids:
            ax.scatter(*point, marker="x", color="black", linewidth=2)

        plt.show()

    def predict(self, x: np.ndarray):
        self.x = x
        self.n_samples, self.n_features = x.shape

        # Initialize centroids
        random_sample_indices = np.random.choice(self.n_samples, self.K, replace=False)
        self.centroids = [self.x[idx] for idx in random_sample_indices]

        # Optimize clusters
        for _ in range(self.max_iters):
            self.clusters = self._create_clusters(self.centroids)
            
            if self.plot_steps:
                self.plot()

            centroids_old = self.centroids
            self.centroids = self._update_centroids(self.clusters)

            # Check for convergence
            if self._is_converged(centroids_old, self.centroids):
                break

            if self.plot_steps:
                self.plot()
        
        # classify samples as the index of their clusters
        return self._get_cluster_labels(self.clusters)

In [None]:
x, y = make_blobs(centers=3, n_samples=500, n_features=2, shuffle=True, random_state=42)
clusters = len(np.unique(y))

In [None]:
custom_model = CustomKMeans(K=clusters, max_iters=150, plot_steps=True)
custom_model.predict(x)