In [67]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import ipywidgets as widgets
from IPython.core.debugger import set_trace
from scipy.stats import rankdata
import matplotlib.colors as mcolors
import itertools as it
import functools as ft
import dataclasses
from typing import NamedTuple

# Settings

In [2]:
%matplotlib widget

In [86]:
sns.set()

In [3]:
def step_kmeans(params: np.ndarray, x: np.ndarray):
    """Make one kmeans step"""
    dist = np.sqrt(np.sum(
        (x - params[:, np.newaxis]) ** 2,
        axis=-1
    ))
    indxs = ~(rankdata(dist, axis=0) - 1).astype(np.bool)
    return np.apply_along_axis(
        lambda i: x[i].mean(0),
        1, 
        indxs
    )

In [4]:
def kmeans(x, params):
    if x.ndim == 1:
        x = x[:, np.newaxis]
    elif x.ndim != 2:
        raise ValueError(f"x should have 2 dims not {x.ndim}")
    if params.ndim == 1:
        params = params[:, np.newaxis]
    elif params.ndim != 2:
        raise ValueError(f"x should have 2 dims not {params.ndim}")
        
    while True:
        yield params
        params = step_kmeans(params, x)
    

In [5]:
def run_until_eps(x, params, epsilon=0.01):
    solutions = iter(kmeans(x, params))
    params = next(solutions)
    error = epsilon + 1
    while error > epsilon:
        new_params = next(solutions)
        error = np.mean(np.sqrt(np.sum(
            (params - new_params) ** 2, axis=-1
        )))
        params = new_params
    return params
        

In [6]:
def snap_until_eps(x, params, epsilon=0.0):
    solutions = iter(kmeans(x, params))
    yield next(solutions)
    error = epsilon + 1
    while error > epsilon:
        new_params = next(solutions)
        yield new_params
        error = np.mean(np.sqrt(np.sum(
            (params - new_params) ** 2, axis=-1
        )))
        params = new_params
        

In [7]:
def mean_distance(x, params):
    dist = np.sqrt(np.sum(
        (x - params[:, np.newaxis]) ** 2,
        axis=-1
    ))
    return np.mean(np.min(dist, axis=0))

In [8]:
def elbow(x, ks, generator=None):
    if generator is None:
        generator = seedpp_max
    for k in ks:
        inits = generator(x, k)
        params = run_until_eps(x, inits)
        yield params, mean_distance(x, params)

In [80]:
class Cluster(NamedTuple):
    centroid: np.ndarray
    points: np.ndarray
        
        
def silhouette_dist(x, clusters):
    
    def _dist(x, points):
        return np.mean((x - points)**2)

    def _silhouette(x, st, nd):
        st_dist = _dist(x, st)
        nd_dist = _dist(x, nd)
        return (nd_dist - st_dist) / max(nd_dist, st_dist)
    
    distance_to_clusters = np.sum(
        (x - clusters[:, np.newaxis]) ** 2,  
        axis=-1
    )
    
    distance_to_clusters = rankdata(distance_to_clusters, axis=0) - 1
    cluster_map = [
        Cluster(points=x[distances == 0], centroid=cluster)  
        for distances, cluster in zip(distance_to_clusters, clusters)
    ]
    
    st_closest = np.argmin(distance_to_clusters, axis=0)
    nd_closest = np.argmin(np.abs(distance_to_clusters - 1), axis=0)
    
    data = np.vstack([
            x.T,
            st_closest,
            nd_closest,  
    ])
    
    distances = np.apply_along_axis(
        lambda x: _silhouette(x[:2], cluster_map[int(x[2])].points, cluster_map[int(x[3])].points),
        0,
        data,
    )
    
    return np.mean(distances)

In [81]:
def iterate_dist(x, ks, generator=None, dist=None):
    if generator is None:
        generator = seedpp_max
    for k in ks:
        inits = generator(x, k)
        params = run_until_eps(x, inits)
        yield params, dist(x, params)

In [82]:
elbow = ft.partial(iterate_dist, dist=mean_distance)
silhouette = ft.partial(iterate_dist, dist=silhouette_dist)

In [9]:
def snd(x): return x[1]
def fst(x): return x[0]

In [10]:
def seedpp(x, no=3):
    seed = x[np.random.choice(len(x))]
    seed = seed[np.newaxis]
    
    i = 1
    while i < no:
        p = np.min(
            np.sum(
                ((x - seed[:, np.newaxis])**2), 
                axis=-1
            ),  
            axis=0
        )
        p /= p.sum(axis=-1)
        new_loc = x[np.random.choice(np.arange(len(x)), p=p)]
        seed = np.r_[seed, new_loc[np.newaxis]]
        i += 1
    return seed

In [11]:
def seedpp_max(x, no=3):
    seed = x[np.random.choice(len(x))]
    seed = seed[np.newaxis]
    
    i = 1
    while i < no:
        p = np.min(np.sum(((x - seed[:, np.newaxis])**2), axis=-1), axis=0).argmax()
        new_loc = x[p]
        seed = np.r_[seed, new_loc[np.newaxis]]
        i += 1
    return seed

In [12]:
def get_voronoi(x=(-5, 20), y=(-5, 15)):
    X = np.linspace(*x, 50)
    Y = np.linspace(*y, 50)
    X, Y = np.meshgrid(X, Y)
    X, Y = X[..., np.newaxis], Y[..., np.newaxis]
    return np.r_['-1', X, Y].reshape(-1, 2)

In [13]:
def take(i, iterable):
    return [item for _, item in zip(range(i), iterable)]

In [14]:
class Seq2D:
    """Plot the evolution of the 2D EM algorithm"""
    
    def __init__(self, ax, snaps, x, ba, grid):
        self.ax = ax
        self.snaps = snaps
        self.x = x
        self.ba = ba
        self.grid = grid
            
    def plot(self, i):
        self.ax.clear()
        params = self.snaps[i]
        dist = np.sqrt(np.sum((self.x - params[:, np.newaxis]) ** 2, axis=-1))
        indxs = compute_indx(self.x, params)
        grid_indxs = compute_indx(self.grid, params)
        for num, (param, indx, grid_index, bz) in enumerate(zip(params, indxs, grid_indxs, self.ba)):
            print(param)
            self.ax.scatter(*self.x[indx].T, c=bz, alpha=.4)
            self.ax.scatter(*self.grid[grid_index].T, c=bz, s=5, alpha=.4, marker='.')
            self.ax.scatter(*param.T, c=bz, s=1000, marker='x', label=f'cluster{num}')


In [15]:
def compute_indx(x, centroids):
    dist = np.sqrt(np.sum((x - centroids[:, np.newaxis]) ** 2, axis=-1))
    return ~(rankdata(dist, axis=0) - 1).astype(np.bool)

# Data Generation

In [16]:
a = np.random.multivariate_normal([10, 10], [[1, 0], [0, 1]], 300)
b = np.random.multivariate_normal([5, 0], [[1, 0], [0, 1]], 200)
c = np.random.multivariate_normal([5, 10], [[1, 0], [0, 1]], 200)
d = np.random.multivariate_normal([10, 0], [[1, 0], [0, 1]], 100)
e = np.random.multivariate_normal([15, 0], [[1, 0], [0, 1]], 200)
f = np.random.multivariate_normal([15, 10], [[1, 0], [0, 1]], 200)
x = np.r_[a, b, c, d, e, f]

no_cluster = 6

## Without seedpp inits

In [17]:
no_cluster = 6
colors = it.cycle(list(mcolors.BASE_COLORS))
cms = take(no_cluster, colors)

inits = x[np.random.choice(len(x), no_cluster)]
kmeans_snaps = list(snap_until_eps(x, inits))
fig, ax = plt.subplots()
smth = Seq2D(ax, kmeans_snaps, x, cms, get_voronoi(x=(0, 20)))
_ = widgets.interact(smth.plot, i=(0, len(kmeans_snaps) - 1, 1))

FigureCanvasNbAgg()

interactive(children=(IntSlider(value=11, description='i', max=22), Output()), _dom_classes=('widget-interact'…

## With seedpp inits

In [18]:
colors = it.cycle(list(mcolors.BASE_COLORS))
cms = take(no_cluster, colors)

inits = seedpp(x, no_cluster)
# kmeans_snaps = [params for _, params in zip(range(no + 1), kmeans(x, inits))]
kmeans_snaps = list(snap_until_eps(x, inits))
fig, ax = plt.subplots()
smth = Seq2D(ax, kmeans_snaps, x, cms, get_voronoi(x=(0, 20)))
_ = widgets.interact(smth.plot, i=(0, len(kmeans_snaps) - 1, 1))

FigureCanvasNbAgg()

interactive(children=(IntSlider(value=6, description='i', max=13), Output()), _dom_classes=('widget-interact',…

## With seedpp_max inits

In [19]:
colors = it.cycle(list(mcolors.BASE_COLORS))
cms = take(no_cluster, colors)

inits = seedpp_max(x, no_cluster)
# kmeans_snaps = [params for _, params in zip(range(no + 1), kmeans(x, inits))]
kmeans_snaps = list(snap_until_eps(x, inits))
fig, ax = plt.subplots()
smth = Seq2D(ax, kmeans_snaps, x, cms, get_voronoi(x=(0, 20)))
_ = widgets.interact(smth.plot, i=(0, len(kmeans_snaps) - 1, 1))

FigureCanvasNbAgg()

interactive(children=(IntSlider(value=2, description='i', max=4), Output()), _dom_classes=('widget-interact',)…

# Elbow for no of clusters

In [89]:
ma = list(elbow(x, list(range(2, 10))))

mdist = list(map(snd, ma))

plt.figure()
plt.plot(list(range(2, 10)), mdist)
plt.show()

FigureCanvasNbAgg()

# Silhouette Method

In [88]:
ma = list(silhouette(x, list(range(2, 10))))

mdist = list(map(snd, ma))

plt.figure()
plt.plot(list(range(2, 10)), mdist)
plt.show()

FigureCanvasNbAgg()