In [None]:
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
from sklearn.cluster.k_means_ import _init_centroids
from matplotlib.colors import ListedColormap
from PIL import Image
import math
import random
import pandas as pd
from sklearn.metrics import davies_bouldin_score

In [None]:
rgb_code_dictionary={(255,255,255): -1, (255,0,0):0 }

In [None]:
def distance(c1, c2):
    (r1,g1,b1) = c1
    (r2,g2,b2) = c2
    return math.sqrt((r1 - r2)**2 + (g1 - g2) ** 2 + (b1 - b2) **2)
def get_closest(point):
    colors = list(rgb_code_dictionary.keys())
    closest_colors = sorted(colors, key=lambda color: distance(color, point))
    closest_color = closest_colors[0]
    code = rgb_code_dictionary[closest_color]
    return code
def add_noise(X):
    rand = random.gauss(0, 1)
    return X + rand

In [None]:
def write_data_to_csv(source: str, dest_path: str):
    image = Image.open(source)
    width, height = image.size
    with open(dest_path, 'w') as dest_file:
        for x in range(0, width):
            for y in range(0, height):
                if(get_closest(image.getpixel((x,y))[0:3])!=-1):
                    dest_file.write(f'{add_noise(x)};{add_noise(y)};{get_closest(image.getpixel((x,y))[0:3])}\n')

In [None]:
write_data_to_csv('k_means_normal.png','usual_dataset.csv' )

In [None]:
def read_dataset_from_csv(dataset):
    dataset_df = pd.read_csv(dataset, sep=';', names=['x', 'y', 'label'])
    X = dataset_df.iloc[:, 0:2]
    y = dataset_df.iloc[:, 2]
    return np.array(X)

In [None]:
x = read_dataset_from_csv('usual_dataset.csv')

In [None]:
def plot_dataset(X: np.ndarray):
    plt.figure(figsize=(9,9))
    plt.plot(X[:,0],X[:,1], 'ro')

In [None]:
plot_dataset(x)

In [None]:
k = 9

In [None]:
def init_random_partition(X: np.ndarray):
    row_no = np.random.choice(X.shape[0], replace=False,size=(30,9))
    return np.array([X[row_no[:,i],:].mean(axis=0) for i in range(9)])

In [None]:
def init_random_points(X: np.ndarray):
    random_rows = np.random.choice(X.shape[0], size = 9, replace=False)
    return X[random_rows,:]

In [None]:
cases = [
    (lambda X: 'random', 'random'),
    (lambda X: init_random_points(X), 'forgy'),
    (lambda X: init_random_partition(X), 'random_partition'),
    (lambda X: 'k-means++', 'k-means++')
    
]

In [None]:
colours =['#FF0000', '#00FF00', '#0000FF', '#FF8000','#FFFF00', '#00FFFF','#FF00FF','#808080','#99CCFF']

In [None]:
k_means = []
for init, name in cases:
    kmeans = KMeans(n_clusters=k, init=init(x))
    kmeans.fit(x)
    k_means.append((kmeans, name))

In [None]:
plt.figure(figsize=(16,16))
plt.suptitle("Clusters found using k means with particular init methods")
for i, (kmeans, name) in enumerate(k_means):
    plt.subplot(2,2,i+1)
    clusters = kmeans.predict(x)
    for cluster in range(k):
        my_members =  x[clusters==cluster]
        my_center = kmeans.cluster_centers_[cluster]
        plt.scatter(my_members[:,0], my_members[:,1], c=colours[cluster])
        plt.scatter(my_center[0], my_center[1],marker='o', c='black')
        plt.scatter(my_center[0], my_center[1], marker='x', c=colours[cluster])
    plt.title(name)
plt.show()
    

In [None]:
def kmeans_score(X: np.ndarray, k: int, init, max_iter: int):
    kmeans= KMeans(n_clusters=k, init=init(X), max_iter=max_iter).fit(X)
    labels = kmeans.labels_
    return davies_bouldin_score(X, labels)  

def calculate_scores(X: np.ndarray,init, k: int =9, max_iter:int=25):
    return np.array([kmeans_score(X, k, init, i) for i in range(1,max_iter)])

def calculate_average_score(X: np.array, cases, repeat:int, k: int =9, max_iter:int = 40 ):
    average_results = {}
    for init, name in cases:
        scores = None
        for i in range(repeat):
            if scores is None:
                scores = np.array([calculate_scores(X, init, k, max_iter)])
            else:
                scores = np.r_[scores,[calculate_scores(X, init, k, max_iter)]]
        average_results[name] = scores
    return average_results

In [None]:
result_dict = calculate_average_score(x, cases, 25)

In [None]:
def plot_averages(results):
    fig = plt.figure(figsize=(15,15))
    for name, array in results.items():
        means = array.mean(axis =0) 
        errors = array.std(axis =0)/13
        plt.scatter(np.arange(len(means)), means, label=name)
#         plt.plot(np.arange(len(means)), means, label=name)
        plt.errorbar(np.arange(len(means)),means, errors)
    plt.title("K-menas score using  Davies-Bouldin score. Initialization methods: random, forgy, random_partition,k-means++")
    plt.ylabel("Davies - Bouldin score")
    plt.xlabel("iteration")
    plt.legend()
    plt.show()

In [None]:
plot_averages(result_dict)

In [None]:
write_data_to_csv('k_means.png','malformed_dataset.csv' )
x_malformed = read_dataset_from_csv('malformed_dataset.csv')

In [None]:
plot_dataset(x_malformed)

In [None]:
k_means = []
for init, name in cases:
    kmeans = KMeans(n_clusters=k, init=init(x_malformed))
    kmeans.fit(x)
    k_means.append((kmeans, name))

In [None]:
plt.figure(figsize=(16,16))
plt.suptitle("Clusters found using k means with particular init methods")
for i, (kmeans, name) in enumerate(k_means):
    plt.subplot(2,2,i+1)
    clusters = kmeans.predict(x_malformed)
    for cluster in range(k):
        my_members =  x_malformed[clusters==cluster]
        my_center = kmeans.cluster_centers_[cluster]
        plt.scatter(my_members[:,0], my_members[:,1], c=colours[cluster])
        plt.scatter(my_center[0], my_center[1],marker='o', c='black')
        plt.scatter(my_center[0], my_center[1], marker='x', c=colours[cluster])
    plt.title(name)
plt.show()
    

In [None]:
result_dict = calculate_average_score(x_malformed, cases, 25)

In [None]:
plot_averages(result_dict)

In [None]:
def scores_from_K(X: np.ndarray): 
    for k in range(2,21):
        kmeans= KMeans(n_clusters=k).fit(X)
        labels = kmeans.labels_
        yield davies_bouldin_score(X, labels)  
    

In [None]:
x_scores = [s for s in scores_from_K(x)]
x_malformed_scored = [s for s in scores_from_K(x_malformed)]

In [None]:
def plot_scores(scores, name):
    plt.scatter(range(2,21), scores)
    plt.xticks( range(2,21) )
    plt.title(f'Davies-Bouldin score to k in {name} dataset')
    plt.xlabel('number of clusters (k)')
    plt.ylabel('Davies-Bouldin score')
    plt.show()

In [None]:
plot_scores(x_scores, 'normal')

In [None]:
plot_scores(x_malformed_scored, 'malformed')