In [36]:
import numpy as np
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)-8s %(message)s",
    datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)


def dist(a, b):
    d = np.sqrt(((a - b) ** 2).sum())
    # logger.debug(f"dist: between {a} and {b} = {d:.4f}")
    return d


def find_best_centroid(centroids, point):
    # logger.debug(f"find_best_centroid: point={point}")
    distances = np.array([dist(c, point) for c in centroids])
    idx = np.argmin(distances)
    # logger.debug(f"  → best centroid index = {idx}, distance = {distances[idx]:.4f}")
    return idx


def assign_points(centroids, points):
    # logger.info(f"assign_points: assigning {len(points)} points to {len(centroids)} centroids")
    assignments = [[] for _ in centroids]

    for point in points:
        idx = find_best_centroid(centroids, point)
        assignments[idx].append(point)

    # for i, cluster in enumerate(assignments):
    #     logger.info(f"  → cluster {i} size = {len(cluster)}")

    return assignments


def find_average_point(points):
    # if len(points) == 0:
    #     logger.warning("find_average_point: empty cluster, returning NaNs")
    avg = np.mean(points, axis=0)
    # logger.debug(f"find_average_point: computed mean = {avg}")
    return avg

In [37]:
def step(centroids, points, thresh, max_iterations):
    for iteration in range(1, max_iterations + 1):
        # logger.info(f"Iteration {iteration} start — centroids:\n{centroids}")

        points_for_centroids = assign_points(centroids, points)
        new_centroids = []


        for idx, cluster in enumerate(points_for_centroids):
            if len(cluster) == 0:
                new_centroids.append(centroids[idx])
            else:
                avg = find_average_point(cluster)
                if np.isnan(avg).any():
                    new_centroids.append(centroids[idx])
                else:
                    new_centroids.append(avg)

        new_centroids = np.array(new_centroids)
        
        shifts = np.linalg.norm(new_centroids - centroids, axis=1)
        # logger.info(f"Iteration {iteration} shifts: {shifts}")
        
        if np.all(shifts < thresh):
            # logger.info(f"Converged at iteration {iteration}")
            # logger.info(f"Final centroids:\n{new_centroids}")
            return new_centroids
        
        centroids = new_centroids

    # logger.info(f"Reached max_iterations ({max_iterations}) without full convergence")
    # logger.info(f"Returning centroids:\n{centroids}")
    return centroids

In [38]:
class KMeans:

    def __init__(self, points, k):
        self.k = k

        self.points = np.array(points)

        self.n = len(self.points)

        self.features = len(self.points[0])

        self.centroids = np.random.randn(self.k, self.features)

        max_feature_value, min_feature_value = np.max(self.points, axis=0), np.min(self.points, axis=0)

        self.centroids = np.clip(self.centroids, a_min=min_feature_value, a_max=max_feature_value)

    def fit(self, thresh, max_iterations):
        self.centroids = step(self.centroids, self.points, thresh, max_iterations)

In [39]:
k = KMeans([[1, 0], [2, 0], [3, 1], [3, 4]], 3)

k.fit(0.001, 10)

k.centroids

array([[2.5, 0.5],
       [1. , 0. ],
       [3. , 4. ]])