In [None]:
import numpy as np
import matplotlib.pyplot as plt
import sklearn.datasets as skd
import sklearn.preprocessing as skp
import sklearn.neighbors as skn
import sklearn.metrics as skmt
rng = np.random.default_rng(0)
scaler = skp.StandardScaler()

In [None]:
alpha = 0.5
beta1 = 0.9
beta2 = 0.999
epsilon = 10 ** (-8)

In [None]:
def add_noise(X, p):
    n, d = X.shape
    r = int(p * n)
    noise = rng.uniform(0, 1, (r, d))
    noise = X.min(0) + (X.max(0) - X.min(0)) * noise
    noise_index = np.zeros(r)
    noise_index.fill(-1)
    
    new_X = np.concatenate((X, noise))
    new_label = np.concatenate((label, noise_index))
    return new_X, new_label

In [None]:
def calculate_U(X, V):
    n, d = X.shape
    U = np.zeros(n)
    for i in range(n):
        dist = ((X[i] - V) * (X[i] - V)).sum(1)
        j = dist.argmin()
        U[i] = j
    return U

In [None]:
def calculate_V(X, U, k):
    n, d = X.shape
    V = np.zeros([k, d])
    for j in range(k):
        index = np.where(U == j)[0]
        r = index.shape[0]
        if r == 0:
            V[j] = rng.choice(X)
        else:
            for i in index:
                V[j] = V[j] + X[i]
            V[j] = V[j] / r
    return V

In [None]:
def k_means(X, k):
    n, d = X.shape
    U = np.zeros(n)
    V = rng.choice(X, k)
    
    condition = True
    while(condition):
        temp = np.copy(U)
        U = calculate_U(X, V)
        V = calculate_V(X, U, k)
        condition = not np.array_equal(U, temp)

    return U

In [None]:
def calculate_W(X, k_max):
    W = np.zeros(k_max)

    for k in range(k_max):
        valid = False
        while not valid:
            Z = k_means(X, k + 1)
            valid = True
            for r in range(k + 1):
                I = np.where(Z == r)[0]
                nr = I.shape[0]
                if nr == 0:
                    valid = False
                    break
        for r in range(k + 1):
            I = np.where(Z == r)[0]
            nr = I.shape[0]
            Dr = 0
            for i in range(nr):
                for j in range(i + 1, nr):
                    Dr = Dr + ((X[I[i]] - X[I[j]]) * (X[I[i]] - X[I[j]])).sum()
            W[k] = W[k] + (Dr / (2 * nr))

    return W

In [None]:
def calculate_optimal(X, k_max):
    n, d = X.shape
    W = calculate_W(X, k_max)

    W_B = np.zeros([B_, k_max])
    for b in range(B_):
        X_b = rng.uniform(0, 1, (n, d))
        X_b = X.min(0) + (X.max(0) - X.min(0)) * X_b
        W_B[b] = calculate_W(X_b, k_max)

    k_opt = 1

    gap = (np.log(W_B[:, 0])).mean() - np.log(W[0])
    sd = ((np.log(W_B[:, 0]) - (np.log(W_B[:, 0])).mean()) ** 2).mean()
    s = sd * np.sqrt(1 + (1 / B_))
    for k in range(1, k_max):
        prev_gap = gap
        prev_sd = sd
        prev_s = s
        gap = (np.log(W_B[:, k])).mean() - np.log(W[k])
        sd = ((np.log(W_B[:, k]) - (np.log(W_B[:, k])).mean()) ** 2).mean()
        s = sd * np.sqrt(1 + (1 / B_))
        if prev_gap > gap - s:
            k_opt = k
            break

    return k_opt

In [None]:
def calculate_cost(X, U, BI, k_opt):
    n, d = X.shape
    C = np.zeros([n, k_opt])
    for i in BI:
        dist = ((U - X[i]) * (U - X[i])).sum(1)
        j = dist.argmin()
        C[i, j] = 1
    cost = 0
    for i in BI:
        for j in range(k_opt):
            cost = cost + C[i, j] * ((X[i] - U[j]) * (X[i] - U[j])).sum()
    return cost

In [None]:
def find_median(X, U, B, k_opt):
    cost = np.zeros(l)
    for i in range(l):
        cost[i] = calculate_cost(X, U, B[i], k_opt)
    lt = np.where(cost >= np.median(cost))[0][0]
    return lt

In [None]:
def calculate_grad(X, U, BI, k_opt):
    n, d = X.shape
    grad = np.zeros([k_opt, d])
    for i in BI:
        dist = ((U - X[i]) * (U - X[i])).sum(1)
        j = dist.argmin()
        grad[j] = grad[j] + 2 * (U[j] - X[i])
    grad = grad / b
    return grad

In [None]:
data, label = skd.make_blobs(n_samples = 500, random_state = 100)
# data, label = skd.make_circles(n_samples = 500, factor = 0.25, noise = 0.05, random_state = 100)
# data, label = skd.make_moons(n_samples = 500, noise = 0.05, random_state = 100)
t = np.arange(500)

for m in range(5):
    
    print("Noise :", 0.05 * m)
    print("Iterations :", end = " ")

    k_star = []
    ari = []
    nnmi = []

    for h in range(15):

        X, label = add_noise(data, 0.05 * m)
        X = scaler.fit_transform(X)
    
        n, d = X.shape
        k_max = 2 * np.unique(label).shape[0]
        B_ = 25
        l = n // 25
        b = n // l
        N = 100
        k_opt = calculate_optimal(X, k_max)
    
        U = rng.choice(X, k_opt)
        M = np.zeros([k_opt, d])
        V = 0

        for i in range(1, N + 1):
            temp = rng.permutation(np.arange(l * b))
            B = np.array(np.split(temp, l))
            lt = find_median(X, U, B, k_opt)
            G = calculate_grad(X, U, B[lt], k_opt)
            M = beta1 * M + (1 - beta1) * G
            V = beta2 * V + (1 - beta2) * (G * G)
            M_hat = M / (1 - beta1 ** i)
            V_hat = V / (1 - beta2 ** i)
            U = U - (alpha * M_hat) / np.sqrt(V_hat + epsilon)

        Z = np.zeros(n)
        for i in range(n):
            dist = ((X[i] - U) * (X[i] - U)).sum(1)
            j = dist.argmin()
            Z[i] = j

        if (m == 2) and (h == 8):

            plt.figure()
            plt.scatter(X[:, 0], X[:, 1], c = label, alpha = 0.5)
            plt.grid()
    
            plt.figure()
            plt.scatter(X[:, 0], X[:, 1], c = Z, alpha = 0.5)
            plt.grid()

        label = label[t]
        Z = Z[t]

        print((h + 1), end = " ")
        k_star.append(np.unique(Z).shape[0])
        ari.append(skmt.adjusted_rand_score(label, Z))
        nnmi.append(skmt.adjusted_mutual_info_score(label, Z))

    print()
    print("Number of clusters :", np.array(k_star).mean())
    print("ARI :", np.array(ari).mean())
    print("NNMI :", np.array(nnmi).mean())
    print()