# DEEP LEARNING - PRACTICUM 03

- Diego Roca Rodríguez
- Roi Santos Ríos

Variational Autoencoders

In [1]:
import numpy as np
import glob, os

from numpy import iscomplexobj
from scipy.linalg import sqrtm

import tensorflow as tf

from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Input, Conv2D, Flatten, Dense, Conv2DTranspose, Reshape, Lambda, Activation, BatchNormalization, LeakyReLU, Dropout
from keras.models import Model
from keras import backend as K
from tensorflow.keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint
from keras.utils.vis_utils import plot_model
from tensorflow.keras import callbacks

import tensorflow_hub as hub

import matplotlib.pyplot as plt

from tensorflow.python.framework.ops import disable_eager_execution

from utils import display, sample_batch, display_one, display_list

disable_eager_execution()

2023-05-12 12:58:37.807252: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


### Variables

In [2]:
DATA_FOLDER = "./data/img_align_celeba/"

filenames = np.array(glob.glob("./data/img_align_celeba/*.jpg"))
NUM_IMAGES = len(filenames)
print("Total number of images : " + str(NUM_IMAGES))
# prints : Total number of images : 100000

INPUT_DIM = (16,16,1) # Image dimension
BATCH_SIZE = 128 # Batch size
Z_DIM = 128 # Dimension of the latent vector (z)

LEARNING_RATE = 0.0005
N_EPOCHS = 2  # No. of epochs to show advance
N_BLOCKS = 10
LOSS_FACTOR = 10000

Total number of images : 202599


### Dataset

In [3]:
data_flow = ImageDataGenerator(rescale=1./255).flow_from_directory(DATA_FOLDER, 
    target_size = INPUT_DIM[:2],
    batch_size = BATCH_SIZE,
    shuffle = True,
    class_mode = 'input',
    subset = 'training',
    color_mode='grayscale'
    )

Found 0 images belonging to 0 classes.


In [None]:
# Load the pre-trained Inception v3 model
inception_v3 = hub.KerasLayer('https://tfhub.dev/google/imagenet/inception_v3/feature_vector/5', input_shape=(299, 299, 3))

### Encoder

In [4]:
# ENCODER
def build_vae_encoder(input_dim, output_dim, conv_filters, conv_kernel_size, conv_strides):
  
    # Clear tensorflow session to reset layer index numbers to 0 for LeakyRelu, 
    # BatchNormalization and Dropout.
    # Otherwise, the names of above mentioned layers in the model 
    # would be inconsistent
    global K
    K.clear_session()
    
    # Number of Conv layers
    n_layers = len(conv_filters)

    # Define model input
    encoder_input = Input(shape = input_dim, name = 'encoder_input')
    x = encoder_input

    # Add convolutional layers
    for i in range(n_layers):
        x = Conv2D(filters = conv_filters[i], 
            kernel_size = conv_kernel_size[i],
            strides = conv_strides[i], 
            padding = 'same',
            name = 'encoder_conv_' + str(i)
            )(x)

        x = LeakyReLU()(x)
        
    # Required for reshaping latent vector while building Decoder
    shape_before_flattening = K.int_shape(x)[1:] 
    
    x = Flatten()(x)
    mean_mu = Dense(output_dim, name = 'mu')(x)
    log_var = Dense(output_dim, name = 'log_var')(x)

    # Defining a function for sampling
    def sampling(args):
        mean_mu, log_var = args
        epsilon = K.random_normal(shape=K.shape(mean_mu), mean=0., stddev=1.) 
        return mean_mu + K.exp(log_var/2)*epsilon   

    # Using a Keras Lambda Layer to include the sampling function as a layer 
    # in the model
    encoder_output = Lambda(sampling, name='encoder_output')([mean_mu, log_var])


    return encoder_input, encoder_output, mean_mu, log_var, shape_before_flattening, Model(encoder_input, encoder_output)


vae_encoder_input, vae_encoder_output,  mean_mu, log_var, vae_shape_before_flattening, vae_encoder  = build_vae_encoder(
    input_dim = INPUT_DIM,
    output_dim = Z_DIM, 
    conv_filters = [16, 32, 32],
    conv_kernel_size = [3,3,3],
    conv_strides = [2,2,2])

vae_encoder.summary()

plot_model(vae_encoder, show_shapes=True, show_layer_names=True)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 16, 16, 1)]  0           []                               
                                                                                                  
 encoder_conv_0 (Conv2D)        (None, 8, 8, 16)     160         ['encoder_input[0][0]']          
                                                                                                  
 leaky_re_lu (LeakyReLU)        (None, 8, 8, 16)     0           ['encoder_conv_0[0][0]']         
                                                                                                  
 encoder_conv_1 (Conv2D)        (None, 4, 4, 32)     4640        ['leaky_re_lu[0][0]']            
                                                                                              

### Decoder

In [5]:
def build_decoder(input_dim, shape_before_flattening, conv_filters, conv_kernel_size, conv_strides):

    # Number of Conv layers
    n_layers = len(conv_filters)

    # Define model input
    decoder_input = Input(shape = (input_dim,) , name = 'decoder_input')

    # To get an exact mirror image of the encoder
    x = Dense(np.prod(shape_before_flattening))(decoder_input)
    x = Reshape(shape_before_flattening)(x)

    # Add convolutional layers
    for i in range(n_layers):
        x = Conv2DTranspose(
            filters = conv_filters[i], 
            kernel_size = conv_kernel_size[i],
            strides = conv_strides[i], 
            padding = 'same',
            name = 'decoder_conv_' + str(i)
            )(x)
        
        # Adding a sigmoid layer at the end to restrict the outputs 
        # between 0 and 1
        if i < n_layers - 1:
            x = LeakyReLU()(x)
        else:
            x = Activation('sigmoid')(x)

    # Define model output
    decoder_output = x

    return decoder_input, decoder_output, Model(decoder_input, decoder_output)

decoder_input, decoder_output, vae_decoder = build_decoder(input_dim = Z_DIM,
    shape_before_flattening = vae_shape_before_flattening,        
    conv_filters = [32,16,1],
    conv_kernel_size = [3,3,3],
    conv_strides = [2,2,2]
    )

vae_decoder.summary()

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 128)]             0         
                                                                 
 dense (Dense)               (None, 128)               16512     
                                                                 
 reshape (Reshape)           (None, 2, 2, 32)          0         
                                                                 
 decoder_conv_0 (Conv2DTrans  (None, 4, 4, 32)         9248      
 pose)                                                           
                                                                 
 leaky_re_lu_3 (LeakyReLU)   (None, 4, 4, 32)          0         
                                                                 
 decoder_conv_1 (Conv2DTrans  (None, 8, 8, 16)         4624      
 pose)                                                     

### Encoder + Decoder

In [6]:
vae_input = vae_encoder_input
vae_output = vae_decoder(vae_encoder_output)

# Input to the combined model will be the input to the encoder.
# Output of the combined model will be the output of the decoder.
vae = Model(vae_input, vae_output)

vae.summary()

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 16, 16, 1)]  0           []                               
                                                                                                  
 encoder_conv_0 (Conv2D)        (None, 8, 8, 16)     160         ['encoder_input[0][0]']          
                                                                                                  
 leaky_re_lu (LeakyReLU)        (None, 8, 8, 16)     0           ['encoder_conv_0[0][0]']         
                                                                                                  
 encoder_conv_1 (Conv2D)        (None, 4, 4, 32)     4640        ['leaky_re_lu[0][0]']            
                                                                                            

### Reconstructing images

In [8]:
def plot_compare_VAE(images, add_noise=False):
    
    n_to_show = images.shape[0]

    if add_noise:
        encodings = vae_encoder.predict(images)
        encodings += np.random.normal(0.0, 1.0, size = (n_to_show,200))
        reconst_images = vae_decoder.predict(encodings)

    else:
        reconst_images = vae.predict(images)


    fig = plt.figure(figsize=(12, 3))
    fig.subplots_adjust(left=0.05, bottom=0.05, right=0.95, top=0.95, hspace=0.1, wspace=0.1)

    for i in range(n_to_show):
        #img = images[i].squeeze()
        img = images[i]
        sub = fig.add_subplot(2, n_to_show, i+1)
        sub.axis('off')        
        sub.imshow(img,cmap='gray')

    for i in range(n_to_show):
        img = reconst_images[i].squeeze()
        sub = fig.add_subplot(2, n_to_show, i+n_to_show+1)
        sub.axis('off')
        sub.imshow(img,cmap='gray')
    plt.show()    

In [None]:
class CalculateFid(callbacks.Callback):
    def __init__(self, num_img, latent_dim):
        self.num_img = num_img
        self.latent_dim = latent_dim

    # Define a function to calculate FID
    def calculate_fid(self, real_features, generated_features):
        # Calculate the mean and covariance of the real and generated features
        mu_real = np.mean(real_features, axis=0)
        mu_generated = np.mean(generated_features, axis=0)
        cov_real = np.cov(real_features, rowvar=False)
        cov_generated = np.cov(generated_features, rowvar=False)

        # Calculate the squared difference between the means
        diff = mu_real - mu_generated
        diff_squared = np.dot(diff, diff)

        # Calculate the square root of the product of the covariances
        cov_sqrt = sqrtm(cov_real.dot(cov_generated))
        
        if iscomplexobj(cov_sqrt):
            cov_sqrt = cov_sqrt.real

        # Calculate the FID
        fid = diff_squared + np.trace(cov_real + cov_generated - 2 * cov_sqrt)

        return fid

    def preprocess_inception(self, image):
        image = tf.image.resize(image, (299, 299))
        image = tf.keras.applications.inception_v3.preprocess_input(image)
        return image
    
    def on_epoch_end(self, epoch, logs=None):
        
        real_images = np.array([tf.keras.preprocessing.image.img_to_array(
            tf.keras.preprocessing.image.load_img(image_path, target_size=(32, 32)))
                                for image_path in np.random.choice(
                                    glob.glob("./data/img_align_celeba/*.jpg"), self.num_img)])

        # Generate a batch of images using the model
        generated_images = vae.predict(real_images)

        # Preprocess the images
        prepd_gen_images = np.array([self.preprocess_inception(image) for image in generated_images])

        # Extract features from the images
        generated_features = inception_v3(prepd_gen_images)
        generated_features = tf.keras.backend.eval(generated_features)

        print("GENERATED IMAGES, EPOCH: ", epoch)
        display_list(generated_images, cmap="None")

        # Preprocess the images
        prepd_real_images = np.array([self.preprocess_inception(image) for image in real_images])

        # Extract features from the images
        real_features = inception_v3(prepd_real_images)
        real_features = tf.keras.backend.eval(real_features)

        #print(real_features.shape)
        print("REAL IMAGES, EPOCH: ", epoch)
        display_list(real_images, cmap="None")
        
        # Calculate the FID
        fid = self.calculate_fid(real_features, generated_features)
        print("-------------------------")
        print("EPOCH: ", epoch, " FID: ", fid)
        
calculate_fid = CalculateFid(num_img=10, latent_dim=Z_DIM)

### Compile and train

In [9]:
adam_optimizer = Adam(learning_rate = LEARNING_RATE)

def r_loss(y_true, y_pred):
    return K.mean(K.square(y_true - y_pred), axis = [1,2,3])

def kl_loss(y_true, y_pred):
    kl_loss =  -0.5 * K.sum(1 + log_var - K.square(mean_mu) - K.exp(log_var), axis = 1)
    return kl_loss

def total_loss(y_true, y_pred):
    return LOSS_FACTOR*r_loss(y_true, y_pred) + kl_loss(y_true, y_pred)



vae.compile(optimizer=adam_optimizer, loss = total_loss, metrics = [r_loss, kl_loss])


example_batch = next(data_flow)
example_batch = example_batch[0]
example_images = example_batch[:8]

for i in range(N_BLOCKS):
    vae.fit(
        data_flow, 
        shuffle=True, 
        epochs = N_EPOCHS, 
        initial_epoch = 0, 
        steps_per_epoch=NUM_IMAGES / BATCH_SIZE,
        callbacks=[calculate_fid])
    plot_compare_VAE(example_images) 


Epoch 1/2


2023-05-12 12:58:40.659010: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-05-12 12:58:40.659883: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
2023-05-12 12:58:40.667195: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:357] MLIR V1 optimization pass is not enabled
2023-05-12 12:58:40.683516: W tensorflow/c/c_api.cc:291] Operation '{name:'mu/bias/Assign' id:96 op device:{requested: '', assigned: ''} def:{{{node mu/bias/Assign}} = AssignVariableOp[_has_manual_control_dependencies=true, dtype=DT_FLOAT, validate_shape=false](mu/bias, mu/bias/Initializer/zeros)}}' was