In [71]:
#get data
import itertools
import math
import random
from statistics import mean
from typing import Tuple, List, Dict, Set

# Helper types for the clustering task.
Embedding = Tuple[float, ...]
ClusterId = int
ClusteredDataset = Dict[ClusterId, List[Embedding]]

GOLD_STANDARD_CLUSTER_STD = 0.2
DIMENSIONS = 32
EPOCHS = 10
N_CLUSTERS = 4
N_SAMPLES = 1000

def generate_data(
    samples: int = N_SAMPLES, dimensions: int = DIMENSIONS, clusters: int = N_CLUSTERS
) -> Tuple[ClusteredDataset, List[Embedding]]:
    """Generates a random dataset and flat list of embeddings."""
    # Distributions.
    distributions = []
    for _ in range(clusters):
        mean = random.random()
        distributions.append((mean, GOLD_STANDARD_CLUSTER_STD))

    # Generate samples.
    dataset: ClusteredDataset = {}
    embeddings = []
    for _ in range(samples):
        cluster_id = random.randrange(0, clusters)
        mean, std = distributions[cluster_id]
        embedding = tuple(random.normalvariate(mean, std) for _ in range(dimensions))
        dataset.setdefault(cluster_id, []).append(embedding)
        embeddings.append(embedding)
    random.shuffle(embeddings)
    data={}
    for i,data_point in enumerate(embeddings):
        key_='id_'+str(i)
        data[key_] = data_point
    
    return dataset,data#, embeddings

gold_standard, embeddings = generate_data(clusters=N_CLUSTERS)

In [72]:
class Centroid:
    def __init__(self,location):
        self.location = location
        self.closest_points = set()

def get_k_means(feature_map,num_features_per_id,k):
    """Get the right centroid for data points
    input: feature_map all the data points (dictionary -k:id,v:feature for k-th key)
    num_features_per_tag: dimensionality of features
    k = number of clusters
    return list of centroids object for given clusters"""
    
    #randomly initialised centroid - choose ids
    initial_centroid_ids = random.sample(sorted(list(feature_map.keys())),k)
    centroids = [Centroid(feature_map[id]) for id in initial_centroid_ids]
     # now update the centroid in a iterative manner
            # 2 loops 1- number of eppochs, 2 - for all data points
    for ii in range(50):
        for id_,feature in feature_map.items():
            closest_id_distance = float('inf')
            closest_centroid = None
            for centroid in centroids:
                feature_to_centroid_distance = get_distance(centroid.location,feature)
                if feature_to_centroid_distance<closest_id_distance:
                    closest_id_distance = feature_to_centroid_distance
                    closest_centroid = centroid
            closest_centroid.closest_points.add(id_)
            # Updating centroid is done
        # Now update the centroids
#         print(ii)
        for centroid in centroids:
            #calculate new centroid as per the data in centroid
            centroid.locaton = get_average_distance(centroid,num_features_per_id,feature_map)
            # clear the data 
            if ii!=19:
                centroid.closest_points.clear()
    return centroids
            
def get_average_distance(centroid,num_features_per_id,feature_map):
    """calculate the average distance from the features
    input: centroid object
    return averaged distance of that features"""
    total_records = len(centroid.closest_points)
    centroid_avg = []
    for feature_ith in range(num_features_per_id):
        data_point =0
        for record in centroid.closest_points:
            data_point +=feature_map[record][feature_ith]
        centroid_avg.append(data_point/num_features_per_id)  
    return centroid_avg
                    
def get_distance(centroid,feature):
    """get manhattan distance"""
    absolute_diff = []
    for centroid_dim,feature_dim in zip(centroid,feature):
        absolute_diff.append(abs(centroid_dim-feature_dim))
    return sum(absolute_diff)

In [73]:
k_means_centroid = get_k_means(embeddings,DIMENSIONS,N_CLUSTERS)
