In [None]:
!pip install emnist

In [None]:
import numpy as np
from keras.layers import *
from keras.models import Model, load_model
from keras import backend as K
from keras import losses
from keras.optimizers import SGD,Adam
import imageio,os
from PIL import Image
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from emnist import extract_training_samples, extract_test_samples
from keras.regularizers import l1, l2

In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
batch_size = 100
latent_dim = 20
epochs = 100
img_dim = 28
filters = 16
intermediate_dim = 256
learning_rate = 0.005
dataset = 'mnist'
dataset = 'fashion'

In [None]:
if dataset == 'mnist':
  num_classes = 10
  from keras.datasets import mnist
  (x_train, y_train), (x_test, y_test) = mnist.load_data()
elif dataset == 'fashion':
  num_classes = 10
  from keras.datasets import fashion_mnist as mnist
  (x_train, y_train), (x_test, y_test) = mnist.load_data()

In [None]:
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train.reshape((-1, img_dim, img_dim, 1))
x_test = x_test.reshape((-1, img_dim, img_dim, 1))

In [None]:
# encoder
x = Input(shape=(img_dim, img_dim, 1))
h = x

h = Conv2D(filters=filters, kernel_size=3, strides=2, padding='same')(h)
h = LeakyReLU(0.2)(h)
h = Conv2D(filters=filters, kernel_size=3, strides=1, padding='same')(h)
h = LeakyReLU(0.2)(h)
h = Conv2D(filters=filters*2, kernel_size=3, strides=2, padding='same')(h)
h = LeakyReLU(0.2)(h)
h = Conv2D(filters=filters*2, kernel_size=3, strides=1, padding='same')(h)
h = LeakyReLU(0.2)(h)

h_shape = K.int_shape(h)[1:]
h = Flatten()(h)
z_mean = Dense(latent_dim)(h)
z_log_var = Dense(latent_dim)(h)

encoder = Model(x, z_mean)

In [None]:
# decoder
z = Input(shape=(latent_dim,))
h = z
h = Dense(np.prod(h_shape))(h)
h = Reshape(h_shape)(h)
h = Conv2DTranspose(filters=filters*2, kernel_size=3, strides=1, padding='same')(h)
h = LeakyReLU(0.2)(h)
h = Conv2DTranspose(filters=filters*2, kernel_size=3, strides=2, padding='same')(h)
h = LeakyReLU(0.2)(h)
h = Conv2DTranspose(filters=filters, kernel_size=3, strides=1, padding='same')(h)
h = LeakyReLU(0.2)(h)
h = Conv2DTranspose(filters=filters, kernel_size=3, strides=2, padding='same')(h)
h = LeakyReLU(0.2)(h)

x_recon = Conv2DTranspose(filters=1,
              kernel_size=3,
              activation='sigmoid',
              padding='same')(h)


decoder = Model(z, x_recon)
generator = decoder

In [None]:
# classfier
z = Input(shape=(latent_dim,))
y = Dense(intermediate_dim, activation='relu')(z)
y = Dense(num_classes, activation='softmax')(y)

classfier = Model(z, y)

In [None]:
# Reparameterization
def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim))
    return z_mean + K.exp(z_log_var / 2) * epsilon
z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
x_recon = decoder(z)
y = classfier(z)

In [None]:
class Gaussian(Layer):
    def __init__(self, num_classes, **kwargs):
        self.num_classes = num_classes
        super(Gaussian, self).__init__(**kwargs)
    def build(self, input_shape):
        latent_dim = input_shape[-1]
        self.mean = self.add_weight(name='mean',
                       shape=(self.num_classes, latent_dim),
                       initializer='zeros')
    def call(self, inputs):
        z = inputs # z.shape=(batch_size, latent_dim)
        z = K.expand_dims(z, 1)
        return z - K.expand_dims(self.mean, 0)
    def compute_output_shape(self, input_shape):
        return (None, self.num_classes, input_shape[-1])

gaussian = Gaussian(num_classes)
z_prior_mean = gaussian(z)

In [None]:
vae = Model(x, [x_recon, z_prior_mean, y])

In [None]:
# loss functions
z_mean = K.expand_dims(z_mean, 1)
z_log_var = K.expand_dims(z_log_var, 1)
lamb = 1
xent_loss = 0.5 * K.mean((x - x_recon)**2, 0)
kl_loss = - 0.5 * (z_log_var - K.square(z_prior_mean))
kl_loss = K.mean(K.batch_dot(K.expand_dims(y, 1), kl_loss), 0)
cat_loss = K.mean(y * K.log(y + K.epsilon()), 0)
vae_loss = lamb * K.sum(xent_loss) + K.sum(kl_loss) + K.sum(cat_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='adam')
vae.summary()

In [None]:
# training
if not os.path.exists('compare'):
    os.mkdir('compare')

early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1, min_lr=1e-6)
checkpoint_callback = ModelCheckpoint(
    filepath = file_name,
    save_freq='epoch',
    period=10
)

callbacks = [early_stopping, reduce_lr]


for i in range(epochs):
  vae.fit(x_train,
      shuffle=True,
      epochs=1,
      batch_size=batch_size,
      validation_data=(x_test, None))
      #callbacks=callbacks)

  if i % 10 == 0:
        x_test_encoded = encoder.predict(x_test)
        recon = generator.predict(x_test_encoded)
        n = 5
        fig = np.zeros((img_dim * 2, img_dim * n))
        for m in range(15, 20):
            x_test_origin = x_test[m].reshape((img_dim, img_dim))
            fig[0 * img_dim: (0 + 1) * img_dim,
                (m - 15) * img_dim: ((m - 15) + 1) * img_dim] = x_test_origin

            x_test_generate = recon[m].reshape((img_dim, img_dim))
            fig[1 * img_dim: (1 + 1) * img_dim,
                (m - 15) * img_dim: ((m - 15) + 1) * img_dim] = x_test_generate

            fit_path = './compare/epoch' + str(i) + '_compare.png'
            imageio.imwrite(fit_path, fig * 255)

        fig_pil = Image.fromarray((fig * 255).astype(np.uint8))

        display(fig_pil)
vae.save(file_name)

In [None]:
means = K.eval(gaussian.mean)
x_train_encoded = encoder.predict(x_train)
y_train_pred = classfier.predict(x_train_encoded).argmax(axis=1)
x_test_encoded = encoder.predict(x_test)
y_test_pred = classfier.predict(x_test_encoded).argmax(axis=1)

In [None]:
def cluster_sample(path, category=0):
    n = 8
    figure = np.zeros((img_dim * n, img_dim * n))
    idxs = np.where(y_train_pred == category)[0]
    for i in range(n):
        for j in range(n):
            digit = x_train[np.random.choice(idxs)]
            digit = digit.reshape((img_dim, img_dim))
            figure[i * img_dim: (i + 1) * img_dim,
            j * img_dim: (j + 1) * img_dim] = digit
    imageio.imwrite(path, figure * 255)

In [None]:
def random_sample(path, category=0, std=1):
    n = 8
    figure = np.zeros((img_dim * n, img_dim * n))
    for i in range(n):
        for j in range(n):
            noise_shape = (1, latent_dim)
            z_sample = np.array(np.random.randn(*noise_shape)) * std + means[category]
            x_recon = generator.predict(z_sample)
            digit = x_recon[0].reshape((img_dim, img_dim))
            figure[i * img_dim: (i + 1) * img_dim,
            j * img_dim: (j + 1) * img_dim] = digit
    imageio.imwrite(path, figure * 255)

In [None]:
if not os.path.exists('samples'):
    os.mkdir('samples')
for i in range(10):
    cluster_sample(u'samples/cluster_sample_%s.png' % i, i)
    random_sample(u'samples/random_sample_%s.png' % i, i)

In [None]:
right = 0.
for i in range(10):
    _ = np.bincount(y_train[y_train_pred == i])
    right += _.max()

print (f'train acc: {right / len(y_train)}')


right = 0.
for i in range(10):
    _ = np.bincount(y_test[y_test_pred == i])
    right += _.max()

print (f'test acc: {right / len(y_test)}')

In [None]:
from sklearn.metrics import adjusted_rand_score, mutual_info_score
from sklearn.metrics import fowlkes_mallows_score
# Calculate ARI
ari_train = adjusted_rand_score(y_train, y_train_pred)
ari_test = adjusted_rand_score(y_test, y_test_pred)
# Calculate Mutual Information
mi_train = mutual_info_score(y_train, y_train_pred)
mi_test = mutual_info_score(y_test, y_test_pred)
# Fowlkes-Mallows Index (FMI)
score_train = fowlkes_mallows_score(y_train, y_train_pred)
score_test = fowlkes_mallows_score(y_test, y_test_pred)

print("ARI for Training Set:", ari_train)
print("ARI for Test Set:", ari_test)
print("Mutual Information for Training Set:", mi_train)
print("Mutual Information for Test Set:", mi_test)
print("Fowlkes-Mallows Index for Training Set:", score_train)
print("Fowlkes-Mallows Index for Test Set:", score_test)

In [None]:
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, random_state=0)
x_test_2d = tsne.fit_transform(x_test_encoded)

plt.figure(figsize=(6, 5))
colors = "red", "green", "blue", "cyan", "yellow", "magenta", "black", "orange", "purple", "pink"
for i, c, label in zip(range(num_classes), colors, range(num_classes)):
    plt.scatter(x_test_2d[y_test_pred == i, 0], x_test_2d[y_test_pred == i, 1], c=c, label=label)
plt.legend()
plt.show()