In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_split
from sklearn.metrics import adjusted_rand_score , silhouette_score
from sklearn.cluster import MeanShift

# Mean Shift With Dynamic Bandwidth from scratch

In [48]:
np.array((-4496.045107576282, -19664.3244935471))

array([ -4496.04510758, -19664.32449355])

In [125]:
class My_MeanShiftDynamicBandwidth:
    def __init__(self,radius=None, radius_norm_step=100):
        self.radius = radius
        self.radius_norm_step = radius_norm_step
    
    def is_convergence(self,new_centroids,old_centroids):
        for i in range(len(new_centroids)):
            if not np.array_equal(np.array(new_centroids[i]),np.array(old_centroids[i])):
                return False
        return True
    
    def remove_close_centroids(self,centroids):
        to_pop = []
        # print(centroids)
        for c1 in centroids:
            if c1 in to_pop: 
                continue 
            for c2 in centroids:
                if c1==c2 or c2 in to_pop:
                    continue
                if np.linalg.norm(np.array(c1)-np.array(c2))<=self.radius:
                    to_pop.append(c2)
                    break
        # print(to_pop)
        for i in to_pop:
            centroids.remove(i)
            # print(len(centroids))
        return centroids
    
    def fit(self,x):
        if self.radius is None:
            mean_data = np.mean(x,axis=0)
            data_radius = np.linalg.norm(mean_data)
            self.radius = data_radius/self.radius_norm_step
        
        centroids = []
        for xi in x:
            centroids.append(tuple(xi))
        centroids = np.array(sorted(list(set(centroids))))
        weights = [i**2 for i in range(self.radius_norm_step)][::-1]
        while True:
            new_centroids = []
            for centroid in centroids:
                curr_cluster = []
                for xi in x:
                    distance = np.linalg.norm(xi-centroid)
                    if distance == 0:
                        distance = 0.00000000001
                    weight_idx = int(distance/self.radius)
                    if weight_idx > self.radius_norm_step-1:
                        weight_idx = self.radius_norm_step-1
                    weighted_feature = (weights[weight_idx])*xi
                    curr_cluster.append(weighted_feature)
                curr_cluster = np.array(curr_cluster)
                new_centroid = np.mean(curr_cluster,axis=0)
                new_centroids.append(tuple(new_centroid))
            new_centroids_unique = sorted(list(set(new_centroids)))
            # print("before: ")
            # print( new_centroids_unique)
            new_centroids_unique = self.remove_close_centroids(new_centroids_unique)
            new_centroids_unique = np.array(new_centroids_unique)
            # print("after: ")
            # print(new_centroids_unique)
            is_converged = self.is_convergence(new_centroids_unique,centroids)
            centroids = new_centroids_unique
            # print("---------------------------")
            if is_converged or len(centroids)==1:
                break
        self.centroids = centroids
    
    def predict(self,x):
        y = []
        for xi in x:
            distances = np.sqrt(np.sum((xi-self.centroids)**2,axis=1))
            label = np.argmin(distances)
            y.append(label)
        return np.array(y)
                
            

In [137]:
def plot(X,centroids):
    fig, ax = plt.subplots(figsize=(12, 8))
    ax.scatter(X[:,0],X[:,1],marker='o')
    ax.scatter(centroids[:,0],centroids[:,1], marker="x", c="red", linewidth=4)
    plt.show()