In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, Layer, InputSpec
from tensorflow.keras.optimizers import Adadelta, SGD
from tensorflow.keras.datasets import mnist
from tensorflow.keras import callbacks
from tensorflow.keras.initializers import VarianceScaling
import tensorflow.keras.backend as K

import time

import numpy as np

import os

import csv

from sklearn.metrics import normalized_mutual_info_score, adjusted_rand_score

from sklearn.cluster import KMeans

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from sklearn.manifold import TSNE
from sklearn.decomposition import PCA

import matplotlib.pyplot as plt

# Support Functions

In [None]:
import numpy as np
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from tensorflow import keras

# Function to plot PCA and t-SNE embeddings side-by-side
def plotfun(pca_embedding, tsne_embedding, y_data, labels, title, fname=None):
    if fname is not None:
        # If saving plot do not display it so that execution is not blocked
        import matplotlib
        matplotlib.use('Agg')
    from matplotlib import pyplot as plt
    from mpl_toolkits.mplot3d import Axes3D

    fig = plt.figure()
    ax = fig.add_subplot(121, projection='3d')
    for i in labels:
        ax.scatter(pca_embedding[y_data == i, 0], pca_embedding[y_data == i, 1], pca_embedding[y_data == i, 2], label=i)
    ax.legend()
    ax.set_title(title + ' PCA Embedding')
    ax.set_xlabel('PCA 1')
    ax.set_ylabel('PCA 2')
    ax.set_zlabel('PCA 3')
    
    ax = fig.add_subplot(122, projection='3d')
    for i in labels:
        ax.scatter(tsne_embedding[y_data == i, 0], tsne_embedding[y_data == i, 1], tsne_embedding[y_data == i, 2], label=i)
    ax.legend()
    ax.set_title(title + ' t-SNE Embedding')
    ax.set_xlabel('t-SNE 1')
    ax.set_ylabel('t-SNE 2')
    ax.set_zlabel('t-SNE 3')

    if fname is None:
        plt.show()
    else:
        plt.savefig(fname)
        plt.close()


# Function to get and prepare the MNIST dataset
def get_MNIST_data(number_of_test_samples):
    ############
    # Prepare the data
    ############

    # Load the MNIST dataset
    mnist_dataset = keras.datasets.mnist.load_data()
    (trainset, testset) = (mnist_dataset[0], mnist_dataset[1])
    (X_train, y_train) = trainset
    (X_test, y_test) = testset

    # Preprocess data (convert to float and scale to between 0 and 1)
    X_train = X_train.astype('float32') / 255
    X_test = X_test.astype('float32') / 255

    # Flatten data (turn images into vectors)
    X_train_fl = X_train.reshape(X_train.shape[0], -1)
    X_test_fl = X_test.reshape(X_test.shape[0], -1)

    # Take a random subset of the data
    rndperm = np.random.permutation(X_test.shape[0])
    if number_of_test_samples > 0:
        X_test = X_test[rndperm[:number_of_test_samples],]
        X_test_fl = X_test_fl[rndperm[:number_of_test_samples],]
        y_test = y_test[rndperm[:number_of_test_samples],]

    target_ids = np.unique(y_train)

    return X_train, X_train_fl, y_train, X_test, X_test_fl, y_test, target_ids


# Function to embed data using t-SNE and PCA
def get_tsne_pca(data_fl, n_components=2, tsne_perplexity=30):
    # PCA
    pca = PCA(n_components=n_components)
    pca_embedding = pca.fit_transform(data_fl)
    print('Explained variation per principal component: {}'.format(pca.explained_variance_ratio_))

    # t-SNE
    tsne = TSNE(n_components=n_components, perplexity=tsne_perplexity)
    tsne_embedding = tsne.fit_transform(data_fl)
    print('t-SNE embedding KL Divergence: {}'.format(tsne.kl_divergence_))

    return pca_embedding, tsne_embedding

# Metrics

In [None]:
calcnmi = normalized_mutual_info_score
calcari = adjusted_rand_score


def calcacc(y_true, y_pred):
    """
    Calculate clustering accuracy. Require scikit-learn installed

    # Arguments
        y: true labels, numpy.array with shape `(n_samples,)`
        y_pred: predicted labels, numpy.array with shape `(n_samples,)`

    # Return
        accuracy, in [0,1]
    """
    y_true = y_true.astype(np.int64)
    assert y_pred.size == y_true.size
    D = max(y_pred.max(), y_true.max()) + 1
    w = np.zeros((D, D), dtype=np.int64)
    for i in range(y_pred.size):
        w[y_pred[i], y_true[i]] += 1
    from scipy.optimize import linear_sum_assignment
    row_ind, col_ind = linear_sum_assignment(w.max() - w)
    return sum([w[i, j] for i, j in zip(row_ind, col_ind)]) * 1.0 / y_pred.size

# DEC Framework

In [None]:
# Assuming calcacc, calcnmi, calcari functions are defined elsewhere
# These are utility functions to compute accuracy, NMI, and ARI

class ClusteringLayer(Layer):
    """
    Clustering layer converts input sample (feature) to soft label, i.e. a vector that represents the probability of the
    sample belonging to each cluster. The probability is calculated with student's t-distribution.

    # Example
    ```python
        model.add(ClusteringLayer(n_clusters=10))
    ```
    # Arguments
        n_clusters: number of clusters.
        weights: list of Numpy array with shape `(n_clusters, n_features)` which represents the initial cluster centers.
        alpha: parameter in Student's t-distribution. Default to 1.0.
    # Input shape
        2D tensor with shape: `(n_samples, n_features)`.
    # Output shape
        2D tensor with shape: `(n_samples, n_clusters)`.
    """

    def __init__(self, n_clusters, weights=None, alpha=1.0, **kwargs):
        if 'input_shape' not in kwargs and 'input_dim' in kwargs:
            kwargs['input_shape'] = (kwargs.pop('input_dim'),)
        super(ClusteringLayer, self).__init__(**kwargs)
        self.n_clusters = n_clusters
        self.alpha = alpha
        self.initial_weights = weights
        self.input_spec = InputSpec(ndim=2)

    def build(self, input_shape):
        assert len(input_shape) == 2
        input_dim = input_shape[1]
        self.input_spec = InputSpec(dtype=K.floatx(), shape=(None, input_dim))
        self.clusters = self.add_weight(shape=(self.n_clusters, input_dim), initializer='glorot_uniform', name='clusters')
        if self.initial_weights is not None:
            self.set_weights(self.initial_weights)
            del self.initial_weights
        self.built = True

    def call(self, inputs, **kwargs):
        """ student t-distribution, as same as used in t-SNE algorithm.
                 q_ij = 1/(1+dist(x_i, u_j)^2), then normalize it.
        Arguments:
            inputs: the variable containing data, shape=(n_samples, n_features)
        Return:
            q: student's t-distribution, or soft labels for each sample. shape=(n_samples, n_clusters)
        """
        q = 1.0 / (1.0 + (K.sum(K.square(K.expand_dims(inputs, axis=1) - self.clusters), axis=2) / self.alpha))
        q **= (self.alpha + 1.0) / 2.0
        q = K.transpose(K.transpose(q) / K.sum(q, axis=1))
        return q

    def compute_output_shape(self, input_shape):
        assert input_shape and len(input_shape) == 2
        return input_shape[0], self.n_clusters

    def get_config(self):
        config = {'n_clusters': self.n_clusters}
        base_config = super(ClusteringLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))


class DEC(object):
    def __init__(self, autoencoder, n_clusters=10, alpha=1.0):
        super(DEC, self).__init__()

        self.n_clusters = n_clusters
        self.alpha = alpha
        self.autoencoder, self.encoder = autoencoder

        # prepare DEC model
        clustering_layer = ClusteringLayer(self.n_clusters, name='clustering')(self.encoder.output)
        self.model = Model(inputs=self.encoder.input, outputs=clustering_layer)

    def pretrain(self, x, y=None, optimizer='adam', epochs=200, batch_size=256, save_dir='results/temp'):
        print('...Pretraining...')
        self.autoencoder.compile(optimizer=optimizer, loss='mse')

        csv_logger = callbacks.CSVLogger(save_dir + '/pretrain_log.csv')
        cb = [csv_logger]
        if y is not None:
            class PrintACC(callbacks.Callback):
                def __init__(self, x, y):
                    self.x = x
                    self.y = y
                    super(PrintACC, self).__init__()

                def on_epoch_end(self, epoch, logs=None):
                    if int(epochs/10) != 0 and epoch % int(epochs/10) != 0:
                        return
                    feature_model = Model(self.model.input,
                                          self.model.get_layer('encoded').output)
                    features = feature_model.predict(self.x)
                    km = KMeans(n_clusters=len(np.unique(self.y)), n_init=20)
                    y_pred = km.fit_predict(features)
                    print(' '*8 + '|==>  acc: %.4f,  nmi: %.4f  <==|'
                          % (calcacc(self.y, y_pred), calcnmi(self.y, y_pred)))

            cb.append(PrintACC(x, y))

        # begin pretraining
        t0 = time.time()
        self.autoencoder.fit(x, x, batch_size=batch_size, epochs=epochs, callbacks=cb)
        print('Pretraining time: %ds' % round(time.time() - t0))
        self.autoencoder.save_weights(save_dir + '/ae_weights.h5')
        print('Pretrained weights are saved to %s/ae_weights.h5' % save_dir)
        self.pretrained = True

    def load_weights(self, weights):  # load weights of DEC model
        self.model.load_weights(weights)

    def extract_features(self, x):
        """Extract features from the encoder part of the autoencoder."""
        feature_model = Model(self.model.input, self.encoder.output)
        return feature_model.predict(x)

    def predict(self, x):  # predict cluster labels using the output of clustering layer
        q = self.model.predict(x, verbose=0)
        return q.argmax(1)

    @staticmethod
    def target_distribution(q):
        weight = q ** 2 / q.sum(0)
        return (weight.T / weight.sum(1)).T

    def compile(self, optimizer='sgd', loss='kld'):
        self.model.compile(optimizer=optimizer, loss=loss)

    def fit(self, x, y=None, maxiter=2e4, batch_size=256, tol=1e-3,
            update_interval=140, test_data=None, save_dir='./DEC_results/'):

        print('Update interval', update_interval)

        # Step 1: initialize cluster centers using k-means
        t1 = time.time()
        print('Initializing cluster centers with k-means.')
        kmeans = KMeans(n_clusters=self.n_clusters, n_init=20)  # <-- Completed
        y_pred = kmeans.fit_predict(self.extract_features(x))  # <-- Completed
        y_pred_last = np.copy(y_pred)
        self.model.get_layer(name='clustering').set_weights([kmeans.cluster_centers_])

        # Step 2: deep clustering
        # logging file
        logfile = open(save_dir + '/dec_log.csv', 'w')
        logwriter = csv.DictWriter(logfile, fieldnames=['iter', 'acc', 'nmi', 'ari', 'loss'])
        logwriter.writeheader()

        loss = 0
        index = 0
        index_array = np.arange(x.shape[0])
        for ite in range(int(maxiter)):
            if ite % update_interval == 0:
                q = self.model.predict(x, verbose=0)
                p = self.target_distribution(q)  # update the auxiliary target distribution p

                # evaluate the clustering performance
                y_pred = q.argmax(1)
                if y is not None:
                    acc = np.round(calcacc(y, y_pred), 5)
                    nmi = np.round(calcnmi(y, y_pred), 5)
                    ari = np.round(calcari(y, y_pred), 5)
                    loss = np.round(loss, 5)
                    logdict = dict(iter=ite, acc=acc, nmi=nmi, ari=ari, loss=loss)
                    logwriter.writerow(logdict)
                    print('Iter %d: acc = %.5f, nmi = %.5f, ari = %.5f' % (ite, acc, nmi, ari), ' ; loss=', loss)

                # Reconstruction
                decoded_images = self.autoencoder.predict(x)  # <-- Completed
                mse = np.mean(np.square(test_data[0] - decoded_images))
                print('Iter %d: test reconstruction mse = %.5f' % (ite, mse))

                n_images = 5
                import matplotlib
                matplotlib.use('Agg')
                from matplotlib import pyplot as plt
                for i in range(n_images):
                    fig = plt.figure()
                    ax = fig.add_subplot(121)
                    ax.imshow(test_data[0][i].reshape(28, 28), cmap='gray')
                    ax.set_title('Test Image ' + str(i + 1))
                    ax = fig.add_subplot(122)
                    ax.imshow(decoded_images[i].reshape(28, 28), cmap='gray')
                    ax.set_title('Reconstructed Image ' + str(i + 1))
                    plt.savefig(f"{save_dir}/reconstructed_image_{i+1}_iter_{ite}.png")
                    plt.close(fig)

            # mini-batch k-means
            idx = index_array[index: min(index + batch_size, x.shape[0])]
            q = self.model.predict(x[idx], verbose=0)
            p = self.target_distribution(q)  # update the auxiliary target distribution p
            loss = self.model.train_on_batch(x[idx], p)

            index += batch_size
            if index >= x.shape[0]:
                index = 0

        logfile.close()
        print('End of training.')

# DEC Autoencoder

In [None]:
from keras.layers import Input, Dense
from keras.models import Model

def create_autoencoder(input_size, hidden_units, act='relu'):
    """
    Fully connected auto-encoder model, symmetric.
    Arguments:
        input_size: number of units in the input layer
        hidden_units: list of number of units in each layer of encoder. hidden_units[-1] is size (number of units) of the encoded representation.
            The decoder is symmetric with encoder.
        act: activation, only applied to hidden layers
    return:
        (ae_model, encoder_model), Model of autoencoder and model of encoder
    """
    output_size = input_size

    # Input layer
    input_layer = Input(shape=(input_size,))

    # Encoder
    x = input_layer
    for units in hidden_units:
        x = Dense(units, activation=act)(x)
    encoded = Dense(hidden_units[-1], activation='linear', name='encoded')(x)  # Encoded representation

    # Decoder (symmetric to encoder)
    x = encoded
    for units in reversed(hidden_units[:-1]):  # Skip the last layer since it's the encoded representation
        x = Dense(units, activation=act)(x)
    decoded = Dense(output_size, activation='sigmoid')(x)  # Output layer

    return Model(inputs=input_layer, outputs=decoded, name='AE'), Model(inputs=input_layer, outputs=encoded, name='encoder')


# DEC Execution

In [None]:
if __name__ == "__main__":

    number_of_test_samples = 500  # Limiting to 500 test samples
    tsne_perplexity = 25

    X_train, X_train_fl, y_train, X_test, X_test_fl, y_test, target_ids = get_MNIST_data(number_of_test_samples)

    #########
    # Deep Clustering
    #########

    n_clusters = len(target_ids)
    input_size = 28 * 28
    hidden_units = [128, 64, 32]
    epochs = 20
    batch_size = 256

    save_dir = './DEC_results/'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    autoencoder = create_autoencoder(input_size, hidden_units)
    optimiser = Adadelta(learning_rate=10.0)
    dec = DEC(autoencoder, n_clusters=n_clusters)

    # Pretrain the autoencoder
    dec.pretrain(x=X_train_fl, y=y_train,
                 optimizer=optimiser,
                 epochs=epochs,
                 batch_size=batch_size,
                 save_dir=save_dir)

    dec.model.summary()
    dec.compile(optimizer=SGD(0.01, 0.9), loss='kld')

    update_interval = 100

    # Ensure that the test_data contains the same number of samples
    y_pred = dec.fit(X_train_fl, y=y_train, 
                     maxiter=3e3, 
                     update_interval=update_interval, 
                     save_dir=save_dir, 
                     test_data=(X_test_fl[:number_of_test_samples], y_test[:number_of_test_samples], target_ids))
