In [None]:
from google.colab import drive
drive.mount('/gdrive')

In [None]:
%cd /gdrive/My Drive/AN2DL/ExerciseSession5v2

In [None]:
!pip install visualkeras
import visualkeras

In [None]:
import tensorflow as tf
import numpy as np
import os
import random
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix
from PIL import Image
from sklearn.decomposition import PCA

tfk = tf.keras
tfkl = tf.keras.layers
print(tf.__version__)

In [None]:
# Random seed for reproducibility
seed = 42

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

In [None]:
import warnings
import logging

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)
tf.get_logger().setLevel('INFO')
tf.autograph.set_verbosity(0)

tf.get_logger().setLevel(logging.ERROR)
tf.get_logger().setLevel('ERROR')
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

In [None]:
(X_train, y_train), (X_test, y_test) = tfk.datasets.mnist.load_data()
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, random_state=seed, test_size=len(X_test), stratify=y_train)

In [None]:
# Inspect the data
num_row = 2
num_col = 5
fig, axes = plt.subplots(num_row, num_col, figsize=(10*num_row,2*num_col))
for i in range(num_row*num_col):
    ax = axes[i//num_col, i%num_col]
    ax.imshow(np.squeeze(X_train[i]), cmap='gray')
plt.tight_layout()
plt.show()

In [None]:
X_train = (np.expand_dims(X_train, axis=-1)/255.).astype(np.float32)
print('Data shape', X_train.shape)
print('Data min {:0.2f}\nData max {:0.2f}\nData mean {:0.2f}\nData std {:0.2f}'.format(
    X_train.min(), X_train.max(), X_train.mean(), X_train.std()))

In [None]:
X_val = (np.expand_dims(X_val, axis=-1)/255.).astype(np.float32)
print('Data shape', X_val.shape)
print('Data min {:0.2f}\nData max {:0.2f}\nData mean {:0.2f}\nData std {:0.2f}'.format(
    X_val.min(), X_val.max(), X_val.mean(), X_val.std()))

In [None]:
X_test = (np.expand_dims(X_test, axis=-1)/255.).astype(np.float32)
print('Data shape', X_test.shape)
print('Data min {:0.2f}\nData max {:0.2f}\nData mean {:0.2f}\nData std {:0.2f}'.format(
    X_test.min(), X_test.max(), X_test.mean(), X_test.std()))

In [None]:
input_shape = X_train.shape[1:]
input_shape

In [None]:
latent_dim = 2

### Autoencoder

In [None]:
def get_encoder(enc_input_shape=input_shape, enc_output_shape=latent_dim, seed=seed):
    tf.random.set_seed(seed)
    input_layer = tfkl.Input(shape=enc_input_shape, name='input_layer')
    x = tfkl.ZeroPadding2D((2,2))(input_layer)

    x = tfkl.Conv2D(64, 3, padding='same', strides=2)(x)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.ReLU()(x)

    x = tfkl.Conv2D(128, 3, padding='same', strides=2)(x)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.ReLU()(x)

    x = tfkl.Conv2D(256, 3, padding='same', strides=2)(x)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.ReLU()(x)

    x = tfkl.Flatten()(x)
    output_layer = tfkl.Dense(enc_output_shape, name='output_layer')(x)

    # Connect input and output through the Model class
    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='encoder')

    # Return the discriminator
    return model
encoder = get_encoder(input_shape)
encoder.summary()
display(visualkeras.layered_view(encoder, legend=True, scale_xy=6))
tfk.utils.plot_model(encoder, show_shapes=True, expand_nested=True, to_file='encoder.png')

In [None]:
def get_decoder(dec_input_shape=latent_dim, dec_output_shape=input_shape, seed=seed):
    tf.random.set_seed(seed)
    input_layer = tfkl.Input(shape=dec_input_shape, name='input_layer')
    x = tfkl.Dense(4*4*256)(input_layer)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.ReLU()(x)
    x = tfkl.Reshape((4,4,256))(x)

    x = tfkl.Conv2DTranspose(128, 3, padding='same', strides=2)(x)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.ReLU()(x)

    x = tfkl.Conv2DTranspose(64, 3, padding='same', strides=2)(x)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.ReLU()(x)

    x = tfkl.Conv2DTranspose(32, 3, padding='same', strides=2)(x)
    x = tfkl.BatchNormalization()(x)
    x = tfkl.ReLU()(x)

    x = tfkl.Conv2D(dec_output_shape[-1], 3, padding='same')(x)
    x = tfkl.Activation('sigmoid')(x)
    output_layer = tfkl.Cropping2D((2,2))(x)

    # Connect input and output through the Model class
    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='decoder')

    # Return the discriminator
    return model
decoder = get_decoder()
decoder.summary()
display(visualkeras.layered_view(decoder, legend=True, scale_xy=6))
tfk.utils.plot_model(decoder, show_shapes=True, expand_nested=True, to_file='decoder.png')

In [None]:
def get_autoencoder(ae_input_shape=input_shape, ae_output_shape=input_shape):
    tf.random.set_seed(seed)
    
    encoder = get_encoder()
    decoder = get_decoder()

    input_layer = tfkl.Input(shape=ae_input_shape)
    z = encoder(input_layer)
    output_layer = decoder(z)

    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='autoencoder')
    return model
autoencoder = get_autoencoder()
autoencoder.summary()
tfk.utils.plot_model(autoencoder, show_shapes=True, expand_nested=True, to_file='autoencoder.png')

In [None]:
learning_rate = 1e-3
optimizer = tf.optimizers.Adam(learning_rate)
autoencoder.compile(optimizer=optimizer, loss=tfk.losses.binary_crossentropy, metrics=['mse', 'mae'])

batch_size = 128
epochs = 1000

In [None]:
history = autoencoder.fit(
    X_train,
    X_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_data=(X_val,X_val),
    callbacks=[
        tfk.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
        tfk.callbacks.ReduceLROnPlateau(monitor='val_loss', patience=5, factor=0.5, min_lr=1e-5),
    ]
).history

In [None]:
best_epoch = np.argmin(history['val_loss'])
plt.figure(figsize=(18,3))
plt.plot(history['loss'], label='Training', alpha=.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_loss'], label='Validation', alpha=.9, color='#5a9aa5', linewidth=3)
plt.axvline(x=best_epoch, label='Best epoch', alpha=.3, ls='--', color='#5a9aa5')
plt.title('Entropy (Loss)')
plt.legend()
plt.grid(alpha=.3)
plt.show()

plt.figure(figsize=(18,3))
plt.plot(history['mse'], label='Training', alpha=.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_mse'], label='Validation', alpha=.9, color='#5a9aa5', linewidth=3)
plt.axvline(x=best_epoch, label='Best epoch', alpha=.3, ls='--', color='#5a9aa5')
plt.title('Mean Squared Error')
plt.legend()
plt.grid(alpha=.3)
plt.show()

plt.figure(figsize=(18,3))
plt.plot(history['mae'], label='Training', alpha=.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_mae'], label='Validation', alpha=.9, color='#5a9aa5', linewidth=3)
plt.axvline(x=best_epoch, label='Best epoch', alpha=.3, ls='--', color='#5a9aa5')
plt.title('Mean Absolute Error')
plt.legend()
plt.grid(alpha=.3)
plt.show()

plt.figure(figsize=(18,3))
plt.plot(history['lr'], label='Learning Rate', alpha=.8, color='#ff7f0e', linewidth=3)
plt.axvline(x=best_epoch, label='Best epoch', alpha=.3, ls='--', color='#5a9aa5')
plt.legend()
plt.grid(alpha=.3)
plt.show()

In [None]:
autoencoder.save('autoencoder_latent2')

In [None]:
autoencoder_latent2 = tfk.models.load_model('autoencoder_latent2')
encoder_latent2 = autoencoder_latent2.get_layer('encoder')
decoder_latent2 = autoencoder_latent2.get_layer('decoder')

autoencoder_latent16 = tfk.models.load_model('autoencoder_latent16')
encoder_latent16 = autoencoder_latent16.get_layer('encoder')
decoder_latent2 = autoencoder_latent16.get_layer('decoder')

In [None]:
def get_reconstructions(model, X, imgs=10, verbose=True):
    predictions = model.predict(X, verbose=0)
    fig, axs = plt.subplots(2, imgs, figsize=(imgs*2, 4))
    for i in range(imgs):
        axs[0, i].imshow(np.squeeze(X[i]), cmap=plt.get_cmap('gray'))
        axs.flat[i].axis('off')
        axs[1, i].imshow(np.squeeze(predictions[i]), cmap=plt.get_cmap('gray'))
        axs.flat[i+imgs].axis('off')
    axs[0,imgs//2].set_title('Real data')
    axs[1,imgs//2].set_title('Reconstructions')
    plt.show()
    if verbose:
        entropy_score = np.mean(tfk.losses.binary_crossentropy(X, predictions))
        mse_score = np.mean(tfk.losses.mean_squared_error(X, predictions))
        mae_score = np.mean(tfk.losses.mean_absolute_error(X, predictions))
        print('Entropy:',entropy_score)
        print('MSE:',mse_score)
        print('MAE:',mae_score)

get_reconstructions(autoencoder_latent16, X_test)
get_reconstructions(autoencoder_latent2, X_test)

#### Project the training set into the latent space

In [None]:
def plot_labels_clusters(encoder, data, labels):
    # display a 2D plot of the digit classes in the latent space
    z_mean = encoder.predict(data, verbose=0)
    if z_mean.shape[-1] != 2:
        pca = PCA(n_components=2)
        z_mean = pca.fit_transform(z_mean)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.show()

In [None]:
plot_labels_clusters(encoder_latent16, X_train, y_train)
plot_labels_clusters(encoder_latent16, X_val, y_val)
plot_labels_clusters(encoder_latent16, X_test, y_test)

In [None]:
plot_labels_clusters(encoder_latent2, X_train, y_train)
plot_labels_clusters(encoder_latent2, X_val, y_val)
plot_labels_clusters(encoder_latent2, X_test, y_test)

#### Interpolate the latent space and reconstruct

In [None]:
def plot_latent_space(decoder, x_lim, y_lim, n_per_dim=20, digit_size=28, figsize=15):
    # display a n*n 2D manifold of digits
    figure = np.zeros((digit_size * n_per_dim, digit_size * n_per_dim))
    # linearly spaced coordinates corresponding to the 2D plot
    # of digit classes in the latent space
    grid_x = np.linspace(x_lim[0], x_lim[1], n_per_dim)
    grid_y = np.linspace(y_lim[0], y_lim[1], n_per_dim)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            # z_sample = np.array([[xi, yi]])
            latent = decoder.layers[1].input_shape[-1]
            z_sample = np.reshape(np.linspace(xi, yi, latent), (1, latent))
            x_decoded = decoder.predict(z_sample, verbose=0)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[
                i * digit_size : (i + 1) * digit_size,
                j * digit_size : (j + 1) * digit_size,
            ] = digit

    plt.figure(figsize=(figsize, figsize))
    start_range = digit_size // 2
    end_range = n_per_dim * digit_size + start_range
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel(r'$z_0$', fontsize=24)
    plt.ylabel(r'$z_1$', fontsize=24)
    plt.xticks(fontsize=18)
    plt.yticks(fontsize=18)
    plt.title('Latent space - Uniform samples', fontsize=28)
    plt.imshow(figure, cmap="Greys_r")
    plt.show()

In [None]:
plot_latent_space(decoder_latent16, x_lim=(-20,-10), y_lim=(-20,-10))

In [None]:
plot_latent_space(decoder_latent2, x_lim=(0,10), y_lim=(-5,5))