In [121]:
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt
from ipywidgets import *
from IPython.display import display  

In [122]:
## create a global array to store points
points = np.array([])

def rand_2D_points(num_points, num_centroids, **kwargs):
    
    space = kwargs.get('space', 20)
    noise = kwargs.get('noise' , 0.1)
    
    points    = np.zeros(shape = (num_points, 2))
    centroids = np.zeros(shape = (num_centroids, 2))
    
    np.random.seed()
    
    for i in range(num_centroids):
        centroids[i] = space*(np.random.rand(2) - 0.5)
    
    for i in range(num_points):
        
        points[i] = np.random.normal(
            loc = centroids[np.random.choice(num_centroids)],
            scale = noise * space,
            size = (2)
        )
        
    return points

def plot_2D_points(points, **kwargs):
    
    fig, ax = plt.subplots(figsize = (6,6))
    
    color = kwargs.get('color')
    alpha = kwargs.get('alpha', 1)

    plt.plot(points[:,0], points[:,1], '.',
             color = color,
             markerfacecolor = 'None',
             alpha = alpha)
    
    plt.xticks([])
    plt.yticks([])
    
    plt.title(kwargs.get('title'))
    plt.ylim(kwargs.get('ylim'))
    plt.xlim(kwargs.get('xlim'))
    
    plt.axis('equal')
    plt.tight_layout()
    
    return fig, ax

def generate_on_click(b):
    
    global points
    
    num_points = points_slider.value
    num_centroids = centroids_slider.value
    noise = noise_slider.value
    
    points = rand_2D_points(num_points, num_centroids, noise = noise)
    
    plot_2D_points(points)
    
    output_points.clear_output(wait = True)
    
    with output_points:
        plt.show()
    
points_slider = IntSlider(
    value = 100,
    min = 50,
    max = 500
)

centroids_slider = IntSlider(
    value = 3,
    min = 1,
    max = 6
)

noise_slider = FloatSlider(
    value = 0.1,
    min = 0.01,
    max = 0.20,
    step = 0.01,
    readout_format = '0.2f'
)

generate_button = Button(
    description = 'Generate Points'
)

generate_button.on_click(generate_on_click)

output_points = Output()

display(
    HBox([VBox([Label('Number of points: '), Label('Number of centroids: '), Label('Noise: ')]),
         VBox([points_slider, centroids_slider, noise_slider])]),
    generate_button,
    output_points
)

HBox(children=(VBox(children=(Label(value='Number of points: '), Label(value='Number of centroids: '), Label(v…

Button(description='Generate Points', style=ButtonStyle())

Output()

In [123]:
import time

max_iter = 20

centroids = np.array([])
coherence = np.zeros(max_iter)

## returns a vector of distances between point b and vector of points A
def euclid_dist(A, b): 
    return np.sqrt(np.sum((A - b)**2, axis = 1))

def find_coherence(points, current, s):
    return np.sum([np.linalg.norm(points[i] - current[s[i]]) for i in range(points.shape[0])])

def init_clusters(points):
    
    global coherence
    global centroids
    
    s = kmeans_step(points, centroids[0])[0]
    
    coherence = np.zeros(max_iter)
    coherence[0] = find_coherence(points, centroids[0], s)
    
    plot_2D_clusters(points, centroids, s, 0)
        
    output_clusters.clear_output(wait = True)
    
    with output_clusters:
        print('Iteration 0')
        print(f'Coherence = {coherence[0]:.2f}')
        plt.show()

def kmeans_step(points, current):
    
    k_num = current.shape[0]
    
    ## create distance matrix
    dist = np.array([euclid_dist(points, current[k]) for k in range(k_num)])
    
    ## assign points to the closest centroid
    s = np.argmin(dist, axis = 0)
        
    ## update centroids
    new_centroids = np.array([np.mean(points[s == k], axis = 0) for k in range(k_num)])
        
    return s, new_centroids, find_coherence(points, new_centroids, s)

def plot_2D_clusters(points, centroids, s, num_iter):
    
    fig, ax = plt.subplots(figsize = (6, 6))

    for k in range(centroids.shape[1]):
        
        plt.plot(points[s == k][:,0], points[s == k][:,1], '.', markerfacecolor = 'None')
        
        plt.plot(centroids[:num_iter + 1,k,0], centroids[:num_iter + 1,k,1],
                 linestyle = ':',
                 color = 'k',
                 alpha = 0.6
                )
        
        for i in range(num_iter):
            plt.plot(*centroids[i,k,:], '.', color = 'k', alpha = 0.6)
        
        plt.plot(*centroids[num_iter,k,:], 'x', color = 'k')

    plt.xticks([])
    plt.yticks([])
        
    plt.axis('equal')
    plt.tight_layout()
    
def centroids_on_click(b):
    
    global centroids
    
    num_k = num_k_slider.value
    
    ## create an array to store the centroid history
    centroids = np.zeros((coherence.shape[0], num_k, points.shape[1]))
    
    ## randomly pick the initial centroids
    centroids[0] = points[np.random.choice(points.shape[0], num_k), :]
    
    init_clusters(points)
        
    cluster_button.disabled = False
    
def cluster_on_click(b):
    
    global centroids
    global coherence
    
    delay = speed_slider.value
    
    if coherence[1] != 0: init_clusters(points)
    
    for i in range(1, coherence.shape[0]):
    
        s, centroids[i], coherence[i] = kmeans_step(points, centroids[i-1])
        
        if coherence[i] == coherence[i-1]: break
        
        plot_2D_clusters(points, centroids, s, i)
        
        time.sleep(delay)
        
        output_clusters.clear_output(wait = True)
        
        with output_clusters:
            print(f'Iteration {i}')
            print(f'Coherence = {" ".join(str(coherence[j].round(1)) for j in range(i+1))}')
            plt.show()
    
num_k_slider = IntSlider(
    value = 3,
    min = 1,
    max = 6
)

speed_slider = FloatSlider(
    value = 2.0,
    min = 0,
    max = 3,
    step = 0.1,
    readout_format = '0.1f'
)

centroids_button = Button(
    description = 'Random centroids'
)

centroids_button.on_click(centroids_on_click)

cluster_button = Button(
    description = 'Find clusters',
    disabled = True
)

cluster_button.on_click(cluster_on_click)

output_clusters = Output()

display(
    HBox([VBox([Label('Number of centroids: '), Label('Speed of Animation: ')]),
          VBox([num_k_slider, speed_slider])]),
    centroids_button,
    cluster_button,
    output_clusters
)

plot_2D_points(points)
    
with output_clusters:
    print('Iteration N/A')
    print('Coherence = N/A')
    plt.show()

HBox(children=(VBox(children=(Label(value='Number of centroids: '), Label(value='Speed of Animation: '))), VBo…

Button(description='Random centroids', style=ButtonStyle())

Button(description='Find clusters', disabled=True, style=ButtonStyle())

Output()

In [124]:
new_points = np.copy(points)

## gaussian kernel
def kernel(x, xj, h):
    return (1/(h * np.sqrt(2 * np.pi))) * np.exp(-0.5 * (euclid_dist(x, xj)/h)**2)

def mean_shift_step(X, bandwidth):
    
    new_X = np.copy(X)
        
    for i, xi in enumerate(new_X):
        
        weight = kernel(X, xi, bandwidth)[:,None]
        
        new_X[i] = np.sum(weight * X, axis = 0)/np.sum(weight)
            
    return new_X

def contours_on_click(b):
    
    global kde, x1, y1
    
    bandwidth = bandwidth_slider.value
    
    fig, ax = plot_2D_points(points)
    
    ## Create an array of 2D points
    xy = np.array(np.meshgrid(x1, y1)).reshape(2,-1).T
    
    ## Find the kernel density at each point then reshape to match x-y coordinates
    kde = np.sum([kernel(xy, p, bandwidth) for p in points], axis = 0).reshape(len(x1), -1)

    plt.contour(x1, y1, kde)
    
    output_contours.clear_output(wait = True)
    
    with output_contours:
        print('Iteration 0')
        print(f'KDE contour with bandwidth = {bandwidth}')
        plt.show()
        
    shift_button.disabled = False
        
def shift_on_click(b):
    
    global new_points, kde, x1, y1
    
    ## reset data if means-shift has already been run
    if not np.array_equal(new_points, points):
        
        new_points = np.copy(points)
        plot_2D_points(points)
        plt.contour(x1, y1, kde)
            
        output_contours.clear_output(wait = True)
    
        with output_contours:
            print('Iteration 0')
            print(f'KDE contour with bandwidth = {bandwidth}')
            plt.show()
        
    bandwidth = bandwidth_slider.value
    
    max_iter  = 10
    tolerance = 1e-2
    
    for i in range(max_iter):
        
        old_points = np.copy(new_points)
    
        new_points = mean_shift_step(new_points, bandwidth)
    
        fig, ax = plot_2D_points(
            new_points,
            xlim = (x1[0], x1[-1]),
            ylim = (y1[0], y1[-1]),
        )
        
        plt.contour(x1, y1, kde)
        
        time.sleep(shift_speed_slider.value)
        
        output_contours.clear_output(wait = True)
    
        with output_contours:
            print(f'Iteration {i + 1}')
            print(f'KDE contour with bandwidth = {bandwidth}')
            plt.show()
            
        if np.linalg.norm(new_points - old_points) < tolerance: break
    
bandwidth_slider = FloatSlider(
    value = 1.5,
    min = 0,
    max = 3,
    step = 0.1,
    readout_format = '0.1f'
)

shift_speed_slider = FloatSlider(
    value = 2.0,
    min = 0,
    max = 3,
    step = 0.1,
    readout_format = '0.1f'
)

contours_button = Button(
    description = 'Show KDE contours'
)

contours_button.on_click(contours_on_click)

shift_button = Button(
    description = 'Means shift',
    disabled = True
)

shift_button.on_click(shift_on_click)

output_contours = Output()

fig, ax = plot_2D_points(points)

x1 = np.linspace(*ax.get_xlim(), 100)
y1 = np.linspace(*ax.get_ylim(), 100)

kde = np.zeros((len(x1), len(y1)))

display(
    HBox([VBox([Label('Bandwidth: '), Label('Speed of Animation: ')]),
          VBox([bandwidth_slider, shift_speed_slider])]),
    
    contours_button,
    shift_button,
    output_contours
)

with output_contours:
    print('Iteration 0')
    print('Raw points')
    plt.show()

HBox(children=(VBox(children=(Label(value='Bandwidth: '), Label(value='Speed of Animation: '))), VBox(children…

Button(description='Show KDE contours', style=ButtonStyle())

Button(description='Means shift', disabled=True, style=ButtonStyle())

Output()

In [125]:
def DBSCAN(points, epsilon, minPts):
    
    delay = dbscan_speed_slider.value
    animate = animate_box.value
    
    n = points.shape[0]    
    s = np.full(shape = (n), fill_value = -1, dtype = int)
    
    queue = []
    cluster = 0
    
    for i in range(n):
        
        if s[i] != -1: continue
        
        neighbors = np.where(euclid_dist(points, points[i]) < epsilon)[0]
        
        if len(neighbors) < minPts:
            s[i] = 0
            if animate: update_plot(points, s, i)
            continue
        
        cluster += 1
        
        s[i] = cluster
        if animate: update_plot(points, s, i)
            
        queue.extend(list(neighbors))
        
        for q in queue:
            
            if s[q] == 0:
                
                s[q] = cluster
                if animate: update_plot(points, s, q)
                    
            if s[q] != -1: continue
                
            s[q] = cluster
            if animate: update_plot(points, s, q)
            
            neighbors = np.where(euclid_dist(points, points[q]) < epsilon)[0]
            
            if len(neighbors) >= minPts:
                queue.extend(list(neighbors))
                
    return s

def plot_dbscan_clusters(points, labels, **kwargs):
    
    fig, ax = plt.subplots(figsize = (6, 6))
    
    plt.plot(points[(labels == 0) | (labels == -1)][:,0], 
             points[(labels == 0) | (labels == -1)][:,1],
             '.',
             color = 'k',
             markerfacecolor = 'None',
             alpha = 0.6,
             label = 'noise'
            )
    
    for i in range(1, np.amax(labels) + 1):
        
        plt.plot(points[labels == i][:,0], points[labels == i][:,1], '.',
                 markerfacecolor = 'None',
                 label = f'Cluster {i}'
                )
        
    if 'current' in kwargs:
        plt.plot(*points[kwargs['current']], 'o',
                 color = 'k',
                 markerfacecolor = 'None',
                 markersize = 10,
                 alpha = 0.8)

    plt.xticks([])
    plt.yticks([])
        
    plt.axis('equal')
    plt.tight_layout()
    plt.legend()
    
def update_plot(points, labels, current):
    
    plot_dbscan_clusters(points, labels, current = current)
    
    output_dbscan.clear_output(wait = True)
    
    with output_dbscan:
        plt.show()
        
    time.sleep(dbscan_speed_slider.value)
    
def dbscan_on_click(b):
    
    labels = DBSCAN(points, epsilon_slider.value, minPts_slider.value)
    
    plot_dbscan_clusters(points, labels)
    
    output_dbscan.clear_output(wait = True)
    
    with output_dbscan:
        plt.show()
        
epsilon_slider = FloatSlider(
    value = 2,
    min = 1,
    max = 5,
    step = 0.1,
    readout_format = '0.1f'
)

minPts_slider = IntSlider(
    value = 3,
    min = 2,
    max = 10,
)

dbscan_speed_slider = FloatSlider(
    value = 0.2,
    min = 0.1,
    max = 0.5,
    step = 0.01,
    readout_format = '0.2f'
)

animate_box = Checkbox(
    value = False,
    description = 'Animate'
)

dbscan_button = Button(
    description = 'DBSCAN'
)

dbscan_button.on_click(dbscan_on_click)

output_dbscan = Output()

fig, ax = plot_2D_points(points, color = 'k', alpha = 0.6)

display(
    HBox([VBox([Label('Epsilon: '), Label('Minimum points: '), Label('Speed of Animation: ')]),
          VBox([epsilon_slider, minPts_slider, dbscan_speed_slider])]),
    
    HBox([dbscan_button, animate_box]),
    output_dbscan
)

with output_dbscan:
    plt.show()       

HBox(children=(VBox(children=(Label(value='Epsilon: '), Label(value='Minimum points: '), Label(value='Speed of…

HBox(children=(Button(description='DBSCAN', style=ButtonStyle()), Checkbox(value=False, description='Animate')…

Output()