In [1]:
import numpy as np
from tqdm.auto import tqdm


## Q I

### Developing K-Means, GMM, Mean-Shift from scratch

##### K-Means Algorithm

In [2]:
import numpy as np
from tqdm.auto import tqdm

def ed(x1, x2):
    return np.sqrt(np.sum((x1-x2)**2))

class KMeans():

    def __init__(self, k=5, max_iter=100, plot_step=False):
        self.k = k
        self.max_iter = max_iter
        self.plot_step = plot_step

        self.clusters = [[] for _ in range(self.k)]
        self.centeroids = []


    def fit(self, x):
        self.x = x
        self.n, self.d = x.shape

        samples = np.random.choice(self.n, self.k, replace=False)
        self.centeroids = [self.x[i] for i in samples]

        # for _ in range(self.max_iter):
        for _ in tqdm(range(self.max_iter), desc='Kmeans clustering fitting'):
            self.clusters = self._get_clusters(self.centeroids)
            old_centeroids = self.centeroids
            self.centeroids = self._get_centeroids(self.clusters)

            # if self.plot_step:
            #     self.plot()

            if self._is_converged(old_centeroids, self.centeroids):
                # print("Centroids are achieved")
                break
        
        return self._get_clusterlabels(self.clusters)

        
    def _get_clusterlabels(self, clusters):
        labels = np.empty(self.n)
        for cluster_idx, cluster in enumerate(clusters):
            for idx in cluster:
                labels[idx] = cluster_idx
        return labels

    def _get_clusters(self, centeroids):
        clusters = [[] for _ in range(self.k)]
        for idx, sample in enumerate(self.x):
            new_center = self._closest_center(sample, centeroids)
            clusters[new_center].append(idx)
        return clusters
    
    def _closest_center(self, sample, centeroids):
        distance = [ed(sample, point) for point in centeroids]
        return np.argmin(distance)

    def _get_centeroids(self, clusters):
        # centeroids = np.zeros(self.k, self.n)
        centeroids = []
        for cluster_idx, cluster in enumerate(clusters):
            centeroids.append(np.mean(self.x[cluster], axis=0))
        return np.array(centeroids)
    
    def _is_converged(self, old_centeroids, centeroids):
        distance = [ed(old_centeroids[i], centeroids[i]) for i in range(self.k)]
        return sum(distance) == 0

  from .autonotebook import tqdm as notebook_tqdm


##### Multimodal GMM Algorithm

In [None]:
def normal_dist(x, mn, var):
    if np.linalg.det(var) == 0:
        return 0.0
    n = len(x)
    factor = 1/((2**(n/2))*((abs(np.linalg.det(var)))**(1/2)))
    e = np.exp(-((x-mn).T @ np.linalg.inv(var) @ (x-mn)))
    return factor*e

#####
def calc_ɣ(x, l_mn, l_var, l_wt):
    # print("l mn  ",l_mn)
    # print("l var  ",l_var)
    n = len(x)
    k = len(l_mn)
    g_matrix = []
    for i in range(n):
        temp_l=[]
        for j in range(k):        
            temp = l_wt[j] * normal_dist(x[i], l_mn[j], l_var[j])
            temp_l.append(temp)
        g_matrix.append(np.array(temp_l)/sum(temp_l))
    return np.array(g_matrix)



#####
def calc_N_k(l_ɣ):
    return l_ɣ.sum(axis=0)

####
def calc_μ(x, l_ɣ, l_N_k):
    k = len(l_N_k)
    l_mu = []
    n,d = x.shape
    for j in range(k):
        temp=0
        for i in range(n):
            temp+= l_ɣ[i][j]*x[i]
        l_mu.append(temp/l_N_k[j])
    return np.array(l_mu)
        


####
def calc_sigma(x, l_ɣ, l_N_k, l_μ):
    l_sigma = []
    n,d = x.shape
    for j in range(len(l_N_k)):
        temp=np.zeros((d,d))
        for i in range(n):
            arr = np.reshape((x[i] - l_μ[j]),(d,1))
            temp+= l_ɣ[i][j]*(arr @ arr.T)
        
        l_sigma.append(temp/l_N_k[j])
    return np.array(l_sigma)


####
def calc_wt(l_N_k):
    return l_N_k/l_N_k.sum()

####
def calc_LL(x, l_wt, l_mu, l_sigma):
    n,d = x.shape
    k = len(l_mu)
    temp=0
    for i in range(n):
        temp1=0
        for j in range(k):
            temp1 += l_wt[j]*normal_dist(x[i], l_mu[j], l_sigma[j])
        temp += np.log(temp1)
    return temp



#######
def em_estimate(x, initial_mean, initial_var, initial_wt, max_iterations, threshold):

    l_mean=initial_mean
    l_var=initial_var
    l_wt=initial_wt
    # l_N_k = []

    l_cntrs = [l_mean]

    l_ɣ = np.zeros((len(x),len(l_mean)))

    ll_list=[]
    ll = calc_LL(x, l_wt, l_mean, l_var)

    for iter in tqdm(range(max_iterations), desc='GMM clustering'):


        l_ɣ = calc_ɣ(x, l_mean, l_var, l_wt)
        l_N_k = calc_N_k(l_ɣ)

        l_wt = calc_wt(l_N_k)
        l_mean = calc_μ(x, l_ɣ, l_N_k)
        l_var = calc_sigma(x, l_ɣ, l_N_k, l_mean)

        ll_latest = calc_LL(x, l_wt, l_mean, l_var)

        ll_list.append(ll)
        if abs(ll - ll_latest)<threshold:
            break

        ll = ll_latest

        l_cntrs.append(l_mean)    
    return {"means":l_mean, "sigmas":l_var, "weights":l_wt, "list_ll":ll_list, "center_update_list": l_cntrs}
        

##### Mean-Shift Algorithm

In [None]:
min_dist = 0.001

def cluster(self, points, kernel_bandwidth, iteration_callback=None):
    points = np.array([[float(v) for v in point] for point in points])
    if(iteration_callback):
        iteration_callback(points, 0)
    shift_points = np.array(points)
    max_min_dist = 1
    iteration_number = 0

    still_shifting = [True] * points.shape[0]
    while max_min_dist > min_dist:
        # print max_min_dist
        max_min_dist = 0
        iteration_number += 1
        for i in range(0, len(shift_points)):
            if not still_shifting[i]:
                continue
            p_new = shift_points[i]
            p_new_start = p_new
            p_new = self._shift_point(p_new, points, kernel_bandwidth)
            dist = ed(p_new, p_new_start)
            if dist > max_min_dist:
                max_min_dist = dist
            if dist < min_dist:
                still_shifting[i] = False
            shift_points[i] = p_new
        if iteration_callback:
            iteration_callback(shift_points, iteration_number)
    point_grouper = pg.PointGrouper()
    group_assignments = point_grouper.group_points(shift_points.tolist())
    return MeanShiftResult(points, shift_points, group_assignments)

def _shift_point(self, point, points, kernel_bandwidth):
    points = np.array(points)

    # numerator
    point_weights = self.kernel(point-points, kernel_bandwidth)
    tiled_weights = np.tile(point_weights, [len(point), 1])
    # denominator
    denominator = sum(point_weights)
    shifted_point = np.multiply(tiled_weights.transpose(), points).sum(axis=0) / denominator
    return shifted_point

class MeanShiftResult:
    def __init__(self, original_points, shifted_points, cluster_ids):
        self.original_points = original_points
        self.shifted_points = shifted_points
        self.cluster_ids = cluster_ids

Reference , youtube/learmML

### Importing Image files