**Initialization**

In [1]:
import numpy as np
from sklearn.manifold import TSNE

%tensorflow_version 2.x
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow_probability import distributions as tfd
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
from tensorflow_probability import layers as tfpl
import tensorflow_datasets as tfds

import matplotlib.pyplot as plt

**Task 1: Data set**

In [2]:
datasets, datasets_info = tfds.load(name='fashion_mnist',
                                    with_info=True,
                                    as_supervised=False)

def _preprocess(sample):
  image = tf.cast(sample['image'], tf.float32) / 255.  # Scale to unit interval.
  image = image < tf.random.uniform(tf.shape(image))   # Randomly binarize.
  return image, image

training_dataset = (datasets['train']
                 .map(_preprocess)
                 .batch(256)
                 .prefetch(tf.data.experimental.AUTOTUNE)
                 .shuffle(int(10e3)))
test_dataset = (datasets['test']
                .map(_preprocess)
                .batch(256)
                .prefetch(tf.data.experimental.AUTOTUNE))

for i,j in training_dataset:
  print(len([i][0]))
  print(len([i][0][0]))
  print(len([i][0][0][0]))
  print(len([i][0][0][0][0]))
  print([i][0])
  print([i][0][0])
  print([i][0][0][0])
  print([i][0][0][0][0])
  break

256
28
28
1
tf.Tensor(
[[[[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  ...

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]]


 [[[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  ...

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]

  [[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ True]
   [ True]]]


 [[[ True]
   [ True]
   [ True]
   ...
   [ True]
   [ Tru

**Task 2: Model**

**Task 2.2: Variational Autoencoder**

Class Encoder

In [3]:
# Description: The class Encoder defines the encoder of a variational autoencoder.
class Encoder(tf.keras.layers.Layer): 
  
  def __init__(self):
    super(Encoder, self).__init__()

    self.encoded_size = 10
    self.base_depth = 32

    self.prior = tfd.Independent(tfd.Normal(loc=tf.zeros(self.encoded_size), scale=1), reinterpreted_batch_ndims=1)

    self.input_layer = tfkl.InputLayer(input_shape=(28, 28, 1))
    self.lambda_layer = tfkl.Lambda(lambda x: tf.cast(x, tf.float32) - 0.5)
    self.conv_1 = tfkl.Conv2D(self.base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu)
    self.conv_2 = tfkl.Conv2D(self.base_depth, 5, strides=2, padding='same', activation=tf.nn.leaky_relu)
    self.conv_3 = tfkl.Conv2D(2 * self.base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu)
    self.conv_4 = tfkl.Conv2D(2 * self.base_depth, 5, strides=2, padding='same', activation=tf.nn.leaky_relu)
    self.conv_5 = tfkl.Conv2D(4 * self.encoded_size, 7, strides=1, padding='valid', activation=tf.nn.leaky_relu)
    self.flatten_layer = tfkl.Flatten()
    self.dense_layer = tfkl.Dense(tfpl.MultivariateNormalTriL.params_size(self.encoded_size), activation=None)
    self.output_layer = tfpl.MultivariateNormalTriL(self.encoded_size, activity_regularizer=tfpl.KLDivergenceRegularizer(self.prior))     #probability distribution

  # Description: This function conducts
  #              The python decorator @tf.function is used to bundle multiple computations into one computational graph.
  #              @parameters: (input) x, training necessary??????????????
  #              @returns: (prediction) x
  #@tf.function
  def call(self, x, training = True):
    x = self.input_layer(x)
    x = self.lambda_layer(x)
    x = self.conv_1(x)
    x = self.conv_2(x)
    x = self.conv_3(x)
    x = self.conv_4(x)
    x = self.conv_5(x)
    x = self.flatten_layer(x)
    x = self.dense_layer(x)
    x = self.output_layer(x)
    return x 

Class Decoder

In [4]:
# Description: The class Decoder defines the decoder of a variational autoencoder.
class Decoder(tf.keras.layers.Layer): 
  
  def __init__(self):
    super(Decoder, self).__init__()

    self.encoded_size = 10
    self.base_depth = 32

    self.input_layer = tfkl.InputLayer(input_shape=[self.encoded_size])
    self.reshape_layer = tfkl.Reshape([1, 1, self.encoded_size])
    self.transp_conv_1 = tfkl.Conv2DTranspose(2 * self.base_depth, 7, strides=1, padding='valid', activation=tf.nn.leaky_relu)
    self.transp_conv_2 = tfkl.Conv2DTranspose(2 * self.base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu)
    self.transp_conv_3 = tfkl.Conv2DTranspose(2 * self.base_depth, 5, strides=2, padding='same', activation=tf.nn.leaky_relu)
    self.transp_conv_4 = tfkl.Conv2DTranspose(self.base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu)
    self.transp_conv_5 = tfkl.Conv2DTranspose(self.base_depth, 5, strides=2, padding='same', activation=tf.nn.leaky_relu)
    self.transp_conv_6 = tfkl.Conv2DTranspose(self.base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu)
    self.conv_1 = tfkl.Conv2D(filters=1, kernel_size=5, strides=1, padding='same', activation=None)
    self.flatten_layer = tfkl.Flatten()
    self.output_layer = tfpl.IndependentBernoulli((28, 28, 1), tfd.Bernoulli.logits)

  # Description: This function conducts
  #              The python decorator @tf.function is used to bundle multiple computations into one computational graph.
  #              @parameters: (input) x, training necessary??????????????
  #              @returns: (prediction) x
  #@tf.function
  def call(self, x, training = True):
    x = self.input_layer(x)
    x = self.reshape_layer(x)
    x = self.transp_conv_1(x)
    x = self.transp_conv_2(x)
    x = self.transp_conv_3(x)
    x = self.transp_conv_4(x)
    x = self.transp_conv_5(x)
    x = self.transp_conv_6(x)
    x = self.conv_1(x)
    x = self.flatten_layer(x)
    x = self.output_layer(x)
    return x

Class Variational Autoencoder

In [5]:
# Description: The class VarAutoencoder defines the variational autoencoder consisting of an encoder and a decoder.
class VarAutoencoder(tf.keras.Model): 
  
  def __init__(self):
    super(VarAutoencoder, self).__init__()

    self.encoder = Encoder()
    self.decoder = Decoder()

  # Description: This function conducts 
  #              The python decorator @tf.function is used to bundle multiple computations into one computational graph.
  #              @parameters: (input) x, training necessary??????????????
  #              @returns: (prediction) x
  #@tf.function
  def call(self, x, training = True):
    embeddings = self.encoder(x)
    output = self.decoder(embeddings)

    return output, embeddings  

**ONLINE SOURCE**

In [6]:
encoded_size = 10
base_depth = 32

prior = tfd.Independent(tfd.Normal(loc=tf.zeros(encoded_size), scale=1),
                        reinterpreted_batch_ndims=1)

encoder = tfk.Sequential([
    tfkl.InputLayer(input_shape=(28, 28, 1)),
    tfkl.Lambda(lambda x: tf.cast(x, tf.float32) - 0.5),
    tfkl.Conv2D(base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2D(base_depth, 5, strides=2, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2D(2 * base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2D(2 * base_depth, 5, strides=2, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2D(4 * encoded_size, 7, strides=1, padding='valid', activation=tf.nn.leaky_relu),
    tfkl.Flatten(),
    tfkl.Dense(tfpl.MultivariateNormalTriL.params_size(encoded_size), activation=None),
    tfpl.MultivariateNormalTriL( encoded_size, activity_regularizer=tfpl.KLDivergenceRegularizer(prior)),
])

decoder = tfk.Sequential([
    tfkl.InputLayer(input_shape=[encoded_size]),
    tfkl.Reshape([1, 1, encoded_size]),
    tfkl.Conv2DTranspose(2 * base_depth, 7, strides=1, padding='valid', activation=tf.nn.leaky_relu),
    tfkl.Conv2DTranspose(2 * base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2DTranspose(2 * base_depth, 5, strides=2, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2DTranspose(base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2DTranspose(base_depth, 5, strides=2, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2DTranspose(base_depth, 5, strides=1, padding='same', activation=tf.nn.leaky_relu),
    tfkl.Conv2D(filters=1, kernel_size=5, strides=1, padding='same', activation=None),
    tfkl.Flatten(),
    tfpl.IndependentBernoulli((28, 28, 1), tfd.Bernoulli.logits),
])

vae = tfk.Model(inputs=encoder.inputs, outputs=decoder(encoder.outputs[0]))

negloglik = lambda x, rv_x: -rv_x.log_prob(x)

vae.compile(optimizer=tf.optimizers.Adam(learning_rate=1e-3), loss=negloglik)

_ = vae.fit(training_dataset, epochs=10, validation_data=test_dataset)

Instructions for updating:
Do not pass `graph_parents`.  They will  no longer be used.


Instructions for updating:
Do not pass `graph_parents`.  They will  no longer be used.


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


**Task 3 and 4: Training and Latent Space Analysis**

In [7]:
# Description: This function conducts a forward-step and the backpropagation. Additionally, the average training loss and accuracy is determined.
#              @parameters: model, training_data, loss_fn, optimizer, training 
#              @returns: training_loss, training_accuracy
#@tf.function
def training_step(model, training_data, loss_fn, optimizer, training = True):
  training_losses = []
  #training_accuracies = []

  for (input, target) in training_data:
    with tf.GradientTape() as tape:
      prediction, _ = model(input, training)                                                      # Embedded input is not relevant during training.                                                    
      current_training_loss = loss_fn(input, prediction) #+ tf.math.reduce_sum(model.losses)    # loss is calculated between original image (input) and the predicted image (prediction)
      gradients = tape.gradient(current_training_loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    training_losses.append(current_training_loss.numpy())

    #current_training_accuracy = np.argmax(target, axis=1) == np.argmax(prediction, axis=1)
    #training_accuracies.append(np.mean(current_training_accuracy))  
  
  training_loss = np.mean(training_losses)
  #training_accuracy = np.mean(training_accuracies)
  return training_loss #, training_accuracy


# Description: This function determines the average test loss and accuracy of an autoencoder.
#              @parameters: model, test_data, loss_fn, training
#              @returns: test_loss, test_accuracy
#@tf.function
def test(model, test_data, loss_fn, training = False):
  test_losses = []
  #test_accuracies = []
  embeddings = []
  corresponding_embedding_labels = []

  test_image_counter = 0
  example_input_img = tf.zeros([28, 28, 1], tf.int32)
  example_prediction_img = tf.zeros([28, 28, 1], tf.int32)
  for (input, target) in test_data:
    prediction, embedded_input = model(input, training)                                       # Embedded input is relevant during testing.

    current_test_loss = loss_fn(input, prediction)
    test_losses.append(current_test_loss.numpy())

    #current_test_accuracy = np.argmax(target, axis=1) == np.argmax(prediction, axis=1)
    #test_accuracies.append(np.mean(current_test_accuracy))   

    if test_image_counter < 1000:                                           # The first 1000 embedded test images are used for the latent space analysis.
      embeddings.append(embedded_input)
      corresponding_embedding_labels.append(target)
      test_image_counter += batch_size

      if test_image_counter == 1000:                                        # One original image and predicted image pair is printed per test epoch to get an overview over the training progress. 
        example_input_img = input[batch_size - 1]
        example_prediction_img = prediction[batch_size - 1]
    
  test_loss = np.mean(test_losses)
  #test_accuracy = np.mean(test_accuracies)
  return test_loss, example_input_img, example_prediction_img, embeddings, corresponding_embedding_labels   #, test_accuracy

In [8]:
# Description: This part creates objects of ....... and executes the training and testing of these models in the training and test loop. The training 
#              takes place over an amount of epochs (n_epochs) with a predefined learning rate. The loss function defines the kind of loss-calculation. The optimizer 
#              is needed to adjust the gradients in the training steps. Moreover, the data for the visualization of the training and test progress is collected.
#              In order to better monitor the training progress, the loss and accuracy graphs are provided in addtion to the numerical outputs when the test accuracy 
#              has significantly improved.
tf.keras.backend.clear_session()

model = VarAutoencoder()

n_epochs = 10
learning_rate = 0.001
loss_fn = lambda x, rv_x: -rv_x.log_prob(x) #tf.keras.losses.MeanSquaredError()                             # Mean squared error as loss-function.
optimizer = tf.keras.optimizers.Adam((learning_rate), amsgrad = True)    # Optimizer Adam (Adaptive Moment Estimation) with AMSGrad activated.

training_losses = []
#training_accuracies = []
test_losses = []
#test_accuracies = []

# Training and test loop
for epoch in range(n_epochs):
    print('Epoch ' + str(epoch))

    training_loss = training_step(model, training_dataset, loss_fn, optimizer, training = True)   #, training_accuracy 
    training_losses.append(training_loss)
    #training_accuracies.append(training_accuracy)

    test_loss, example_input_img, example_prediction_img, embeddings, corresponding_embedding_labels = test(model, test_dataset, loss_fn, training = False)                              #, test_accuracy
    test_losses.append(test_loss)
    #test_accuracies.append(test_accuracy)

    embeddings = tf.reshape(embeddings, [1000, 10])
    corresponding_embedding_labels = tf.reshape(corresponding_embedding_labels, [1000])
    reduced_embeddings = TSNE(n_components = 2).fit_transform(embeddings)

    plt.imshow(tf.squeeze(example_input_img).numpy())
    plt.show()
    plt.imshow(tf.squeeze(example_prediction_img).numpy())
    plt.show()

    label_color_coding = ["black", "brown", "red", "blue", "lime", "yellow", "darkorange", "cyan", "darkviolet", "deeppink"]
    for embed in range(1000):
      current_label_color = label_color_coding[corresponding_embedding_labels[embed]]
      plt.scatter(reduced_embeddings[embed][0], reduced_embeddings[embed][1], color = current_label_color)
    plt.show()

    # Interpolation check between two images in a seperate function which does the same like a test step, but just for two images and without all the loss shit and so on.. And I need an additional seperation in the call function of the autoencoder (another boolean??) -> after the encoder I need to interpolate the two vectors and then apply the decoder.

    print("Training loss: " + str(training_loss))
    print("Test loss: " + str(test_loss))

Epoch 0


ValueError: ignored

Visualization


In [None]:
# Description: Figure 1 shows the loss for each epoch during the training and testing of the model.
#              Figure 2 shows the accuracy for each epoch during the training and testing of the model.
plt.figure()
line1, = plt.plot(training_losses)
line2, = plt.plot(test_losses)
plt.xlabel("Training steps")
plt.ylabel("Loss")
plt.legend((line1, line2),("Training", "Test"))
plt.show()