In [1]:
import tensorflow as tf
from tensorflow.python.keras import Input
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.engine.keras_tensor import KerasTensor
from tensorflow.python.keras.engine.functional import Functional

import numpy as np
from matplotlib import pyplot as plt
from tensorflow.keras.layers import Dense, Lambda
from tensorflow.python.keras.regularizers import l2
import os

from tensorflow_model_optimization.python.core.sparsity.keras import prune, pruning_callbacks, pruning_schedule
from tensorflow_model_optimization.sparsity.keras import strip_pruning, prune_low_magnitude, UpdatePruningStep
tf.compat.v1.disable_eager_execution()

from callbacks import all_callbacks

from tensorflow.keras import backend as K



### Import the MNIST data class and call it.

Original dataset and modified dataset with modifiable resolution is shown below.

In [2]:
import MNIST_dataset as mnist
size_final = 8

data_zoom = mnist.MNISTData(size_final=size_final, color_depth=5)
test = data_zoom.x_test

#plt.imshow(test[0].reshape(size_final,size_final), cmap='gray')
print(data_zoom.x_train.shape)

(60000, 64)


In [3]:
print(data_zoom.x_train.dtype)

float64


# Autoencoder using Keras
The aim of this class is to implement a simple autoencoder for the MNIST data and then build an autoencoder that is
able to classify MNIST data in its latent dimension.

Code partially adapted from [Keras Documentation](https://blog.keras.io/building-autoencoders-in-keras.html).

In [16]:
class VAE:
    """Variational Autencoder Class"""
    def __init__(self, data: mnist.MNISTData, num_classes=10, prune = False):
        self.x_train = data.x_train
        self.x_test = data.x_test
        self.y_train = data.y_train
        self.y_test = data.y_test

        self.num_classes = num_classes
        self.prune = prune
        self.input_shape = self.x_train[0].shape
        self.input = Input(shape=self.input_shape,name='encoder_input')
        self.latent_dim = 2

        self.encoder = None
        self.encoded_mean = None
        self.encoded_var = None
        self.encoder_model = None

        self.decoder = None
        self.VAE = None
        
        self.decoder_model=None

        self.history = None
        self.pruning_params = {"pruning_schedule" : pruning_schedule.ConstantSparsity(0.75, begin_step=10, frequency=100)}

    def build_encoder(self):
        """Building the encoder architecture for the variational autoencoder. 
        The final encoding dimension is 2. 
        """
        self.encoder = Dense(32, activation='relu')(self.input)
        """if self.prune:
            self.encoder = prune_low_magnitude(Dense(16, activation='relu'), **self.pruning_params)(self.encoder)
            self.encoder = Dense(16, activation='relu')(self.encoder)"""

        if self.prune:
            self.encoder = Dense(16, activation='relu')(self.encoder)

        self.encoded_mean = Dense(self.latent_dim)(self.encoder)
        self.encoded_var = Dense(self.latent_dim)(self.encoder)
        self.encoder = Lambda(self.sampling, output_shape=(self.latent_dim,), name = 'encoder_output')([self.encoded_mean, self.encoded_var])

        #building a model for the encoder in order to be able to predict and plot the latent dimension
        self.encoder_model = Model(self.input, [self.encoded_mean, self.encoded_var, self.encoder], name='encoder')
    
    def build_decoder(self):
        """Building the decoder architecture and storing the output in self.decoder."""
        if self.encoder is None:
            raise RuntimeError("The encoder has to be built before you can build the decoder!")        

        self.decoder = Dense(16, activation='relu')(self.encoder)
        self.decoder = Dense(32, activation='relu')(self.decoder)
        self.decoder = Dense(self.input_shape[0], activation='sigmoid', name='decoder_output')(self.decoder)
        
    def build_classifier(self):
        """ Building the classifier architecture, using self.encoder as input."""

        if self.encoder is None:
            raise RuntimeError("The encoder has to be built before you can build the classifier!")
        self.latent_classifier = prune_low_magnitude(Dense(32), **self.pruning_params)(self.encoder)
        self.latent_classifier = Dense(self.num_classes, activation='softmax',name='classifier_output')(self.latent_classifier)
        
    def build_vae(self, use_latent_classifier=False):
        """ Building the whole variational autoencoder Model from self.encoder and self.decoder with 
        self.input as input. It is used self.custom_loss as the model loss function.
        """
        if self.encoder is None:
            raise RuntimeError("The encoder has to be built before you can build the autoencoder!")
        if self.decoder is None:
            raise RuntimeError("The decoder has to be built before you can build the autoencoder!")
        
        self.dir()

        if use_latent_classifier:
            if self.latent_classifier is None:
                raise RuntimeError("If you want to use the option with the latent classifier, you have to build it "
                                   "beforehand!")
            self.VAE = Model(self.input, outputs=[self.decoder, self.latent_classifier])
            self.VAE.compile(loss=self.custom_loss(self.encoded_mean, self.encoded_var), loss_weights=[1, 0.1], optimizer='adam', metrics=["accuracy"])
        else:
            self.VAE = Model(self.input, outputs=self.decoder)
            self.VAE.compile(optimizer='adam', loss=self.custom_loss(self.encoded_mean, self.encoded_var))
        self.VAE.summary()

    def sampling(self, args):
        """ Implement the Reparameterization trick. The function returns
         a vector randomly sampled from the latent space.
         # Arguments
        args (tensor): mean and log of variance of Q(z|X)
         # Returns
            z (tensor): sampled latent vector   
        """
        z_mean, z_log_var = args
        batch = K.shape(z_mean)[0]
        dim = K.int_shape(z_mean)[1]
        # we apply the multigaussian noise to every points at once
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + K.exp(0.5 * z_log_var) * epsilon
    

    def custom_loss(self, mean, var):
        """Implement the loss function for the variational autoencoder.
        Sort of recepy for tf to how to compute loss function from two values."""
        
        def loss(y_true, y_pred):

            reconstruction_loss = tf.losses.mean_squared_error(y_true, y_pred)*784
            kl_loss = 1. + var - K.square(mean) - K.exp(var)
            kl_loss = K.sum(kl_loss, axis=-1)
            kl_loss *= -0.5
            beta = 1.0*10**-2
            return K.mean(reconstruction_loss + beta*kl_loss)

        return loss
        
        
    def fit_data(self, batch_size=128, epochs=40, use_latent_classifier=False):
        """Write the fit function for the autoencoder. 
        Store the fit history in self.history to be able to plot the fitting scores."""
        
        """callbacks = all_callbacks(stop_patience = 1000,
                              lr_factor = 0.5,
                              lr_patience = 10,
                              lr_epsilon = 0.000001,
                              lr_cooldown = 2,
                              lr_minimum = 0.0000001,
                              outputDir = 'model/AE_model/model_2')
        callbacks.callbacks.append(pruning_callbacks.UpdatePruningStep())"""

        if use_latent_classifier:
            self.history = self.VAE.fit(self.x_train, [self.x_train, self.y_train],
                                                validation_data=(self.x_test, [self.x_test, self.y_test]),
                                                batch_size=batch_size, epochs=epochs,
                                                shuffle=True,callbacks=[UpdatePruningStep()]
                                                )
        else:
            self.history = self.VAE.fit(x = self.x_train, y = self.x_train,
                                        validation_data=(self.x_test, self.x_test),
                                        batch_size=batch_size, epochs=epochs,
                                        shuffle=True, callbacks=[UpdatePruningStep()] )
            self.VAE=strip_pruning(self.VAE)

        self.history = self.history.history

    def dir(self):
        """Creation of the directories"""
        dir = os.path.join("images")
        if not os.path.exists(dir):
            os.mkdir(dir)
        dir2 = os.path.join("./images/VAE")
        if not os.path.exists(dir2):
            os.mkdir(dir2)
    
    def plot_score(self, use_latent_classifier = False, model_name=None):
        """Plots the scores achieved during the fitting."""
        if use_latent_classifier is False:
            plt.plot(self.history['loss'])
            plt.plot(self.history['val_loss'])
            plt.ylabel('Model Accuracy')
        else:
            plt.plot(self.history['classifier_output_accuracy'])
            plt.plot(self.history['val_classifier_output_accuracy'])
            plt.ylabel('Classifier Accuracy')
            
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Test'], loc='best')
        plt.title('Accuracy of {model_name}'.format(model_name=model_name))
        
        plt.savefig('./images/VAE/Accuracy of {model_name}.png'.format(model_name=model_name))
        
        plt.show()

    def plot_latent_dimension(self, model_name=None):
        """Plots the latent dimension of the autoencoder."""
        prediction = self.encoder_model.predict(self.x_test)
        fig = plt.figure(figsize=(10, 10))
        fig.patch.set_facecolor("white")
        plt.scatter(prediction[2][:, 0], prediction[2][:, 1], c=np.argmax(self.y_test, axis=1) , cmap="Set3")
        plt.colorbar()
        plt.title('Latent space {model_name}'.format(model_name=model_name))
        
        plt.savefig('./images/VAE/Latent dimension of {model_name}.png'.format(model_name=model_name))

        plt.show()  

It is built the standard autoencoder, *without classifier* here, and the MNIST data class called before is used. 
The model is fitted and adequate plots are reproduced, 
as well as the comparison between orginal MNIST dataset and reconstructed images.

In [8]:
network = VAE(data_zoom, prune=False)
network.build_encoder()
network.build_decoder()
network.build_vae(use_latent_classifier=False)
network.fit_data(epochs=3)
#network.plot_latent_dimension("Autoencoder without classifier")
#network.plot_reco()
#network.plot_score(use_latent_classifier=False,model_name="Autoencoder without classifier")


Model: "functional_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      [(None, 64)]         0                                            
__________________________________________________________________________________________________
dense_13 (Dense)                (None, 32)           2080        encoder_input[0][0]              
__________________________________________________________________________________________________
dense_14 (Dense)                (None, 16)           528         dense_13[0][0]                   
__________________________________________________________________________________________________
dense_15 (Dense)                (None, 2)            34          dense_14[0][0]                   
_______________________________________________________________________________________

In [6]:
network = VAE(data_zoom, prune=True)
network.build_encoder()
network.build_decoder()
network.build_vae()
network.fit_data(epochs=3)
#network.plot_latent_dimension("Compressed AE without classifier")
#network.plot_reco()
#network.plot_score(use_latent_classifier=False,model_name="Compressed AE without classifier")


Instructions for updating:
Please use `layer.add_weight` method instead.
Model: "functional_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      [(None, 64)]         0                                            
__________________________________________________________________________________________________
dense_6 (Dense)                 (None, 32)           2080        encoder_input[0][0]              
__________________________________________________________________________________________________
prune_low_magnitude_dense_7 (Pr (None, 16)           1042        dense_6[0][0]                    
__________________________________________________________________________________________________
dense_8 (Dense)                 (None, 16)           272         prune_low_magnitude_dense_7[0][0]
______________

InvalidArgumentError: assertion failed: [Prune() wrapper requires the UpdatePruningStep callback to be provided during training. Please add it as a callback to your model.fit call.] [Condition x >= y did not hold element-wise:] [x (assert_greater_equal/ReadVariableOp:0) = ] [-1] [y (assert_greater_equal/y:0) = ] [1]
	 [[{{node Assert}}]]

In [17]:
network_with_classifier = VAE(data_zoom, prune=False)
network_with_classifier.build_encoder()
network_with_classifier.build_decoder()
network_with_classifier.build_classifier()
network_with_classifier.build_vae(use_latent_classifier=True)
network_with_classifier.fit_data(use_latent_classifier=True, epochs=3)
#network_with_classifier.plot_latent_dimension("VAE with classifier")
#network_with_classifier.plot_score(use_latent_classifier=True,model_name="VAE with classifier")


Model: "functional_15"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      [(None, 64)]         0                                            
__________________________________________________________________________________________________
dense_43 (Dense)                (None, 32)           2080        encoder_input[0][0]              
__________________________________________________________________________________________________
dense_44 (Dense)                (None, 2)            66          dense_43[0][0]                   
__________________________________________________________________________________________________
dense_45 (Dense)                (None, 2)            66          dense_43[0][0]                   
______________________________________________________________________________________

InvalidArgumentError: assertion failed: [Prune() wrapper requires the UpdatePruningStep callback to be provided during training. Please add it as a callback to your model.fit call.] [Condition x >= y did not hold element-wise:] [x (assert_greater_equal/ReadVariableOp:0) = ] [-1] [y (assert_greater_equal/y:0) = ] [1]
	 [[{{node Assert}}]]