In [None]:
import csv
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as spi
import sklearn.preprocessing as skp
import sklearn.metrics as skmt
rng = np.random.default_rng(0)
scaler = skp.StandardScaler()

In [None]:
dataset = "segment"

In [None]:
file = open(dataset + ".csv", "w")
csvwriter = csv.writer(file)
csvwriter.writerow(["k*", "ari", "nnmi"])
file.close()

In [None]:
temp_data = np.loadtxt("keel_datasets/" + dataset + ".dat", dtype = str, delimiter = ",", comments = "@")
data = np.array(temp_data[:, :-1], dtype = float)
label = skp.LabelEncoder().fit_transform(temp_data[:, -1])

In [None]:
n, d = data.shape
p = 0.1
k_max = 2 * np.unique(label).shape[0]
B_ = 25

t = np.arange(n)

In [None]:
def add_noise(X):
    n, d = X.shape
    r = int(p * n)
    noise = rng.uniform(0, 1, (r, d))
    noise = X.min(0) + (X.max(0) - X.min(0)) * noise
    X = np.concatenate((X, noise))
    X = scaler.fit_transform(X)
    return X

In [None]:
def calculate_U(X, V):
    n, d = X.shape
    U = np.zeros(n)
    for i in range(n):
        dist = ((X[i] - V) * (X[i] - V)).sum(1)
        j = dist.argmin()
        U[i] = j
    return U

In [None]:
def calculate_V(X, U, k):
    n, d = X.shape
    V = np.zeros([k, d])
    for j in range(k):
        index = np.where(U == j)[0]
        r = index.shape[0]
        if r == 0:
            V[j] = rng.choice(X)
        else:
            for i in index:
                V[j] = V[j] + X[i]
            V[j] = V[j] / r
    return V

In [None]:
def k_means(X, k):
    n, d = X.shape
    U = np.zeros(n)
    V = rng.choice(X, k)
    
    condition = True
    while(condition):
        temp = np.copy(U)
        U = calculate_U(X, V)
        V = calculate_V(X, U, k)
        condition = not np.array_equal(U, temp)

    return U

In [None]:
def calculate_W(X, k_max):
    W = np.zeros(k_max)

    for k in range(k_max):
        valid = False
        while not valid:
            Z = k_means(X, k + 1)
            valid = True
            for r in range(k + 1):
                I = np.where(Z == r)[0]
                nr = I.shape[0]
                if nr == 0:
                    valid = False
                    break
        for r in range(k + 1):
            I = np.where(Z == r)[0]
            nr = I.shape[0]
            Dr = 0
            for i in range(nr):
                for j in range(i + 1, nr):
                    Dr = Dr + ((X[I[i]] - X[I[j]]) * (X[I[i]] - X[I[j]])).sum()
            W[k] = W[k] + (Dr / (2 * nr))

    return W

In [None]:
def calculate_optimal(X):
    n, d = X.shape
    W = calculate_W(X, k_max)

    W_B = np.zeros([B_, k_max])
    for b in range(B_):
        X_b = rng.uniform(0, 1, (n, d))
        X_b = X.min(0) + (X.max(0) - X.min(0)) * X_b
        W_B[b] = calculate_W(X_b, k_max)

    k_opt = 1

    gap = (np.log(W_B[:, 0])).mean() - np.log(W[0])
    sd = ((np.log(W_B[:, 0]) - (np.log(W_B[:, 0])).mean()) ** 2).mean()
    s = sd * np.sqrt(1 + (1 / B_))
    for k in range(1, k_max):
        prev_gap = gap
        prev_sd = sd
        prev_s = s
        gap = (np.log(W_B[:, k])).mean() - np.log(W[k])
        sd = ((np.log(W_B[:, k]) - (np.log(W_B[:, k])).mean()) ** 2).mean()
        s = sd * np.sqrt(1 + (1 / B_))
        if prev_gap > gap - s:
            k_opt = k
            break

    return k_opt

In [None]:
k_star = []
ari = []
nnmi = []

for m in range(75):
    
    if m % 5 == 0:
        X = add_noise(data)
        X = scaler.fit_transform(X)
        n, d = X.shape
        k_opt = calculate_optimal(X)
        print()
        
    Z = k_means(X, k_opt)
    Z = Z[t]

    k_star.append(np.unique(Z).shape[0])
    ari.append(skmt.adjusted_rand_score(label, Z))
    nnmi.append(skmt.adjusted_mutual_info_score(label, Z))

    print((m + 1), end = " ")

In [None]:
file = open(dataset + ".csv", "a")
csvwriter = csv.writer(file)
for m in range(m):
    csvwriter.writerow([k_star[m], ari[m], nnmi[m]])
file.close()