## Ejecutar el modelo

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
from keras.models import Model
from keras import backend as K
from keras import metrics
from tensorflow.keras import Model
from keras.losses import mse
from tensorflow.keras.layers import MaxPooling2D, Input, Dense, Lambda, Conv2D, Flatten, Reshape, Conv2DTranspose, BatchNormalization, LeakyReLU
from tensorflow.keras import preprocessing
import time
import io
from tqdm import tqdm

# Create the Tensorboard writer
tensorboard= tf.summary.create_file_writer(logdir='logs/{}'.format("Cars{}".format(int(time.time()))))

# Specify the directory where the images to train the network are located
directory="" # Format: "{directory}"

# Specify the size of the images that the network should generate
img_size = 128

# Specify the folder where the trained model will be saved
trained_models_folder ="" # Format: "{directory}\ "

# Specify the folder where the images generated by the model will be saved
generated_images_folder="" # Format: "{directory}\ "

# Specify the batch size
batch_size = 70

# Specify the dimension of the latent space
latent_dim=100

# Specify how many images to show for reconstructions
num_examples_to_generate = 25

# Specify the shape of the input vector
input_shape = (128,128,3)

# Load the car dataset, normalize, resize and horizontally flip its images
def get_loader(img_size):
    def augment(img):
        return tf.image.random_flip_left_right(img)
    def train_preprocessing(x):
        x = tf.cast(x, tf.float32)
        x = x / 255.0  # normalization
        x = augment(x)
        return x
    dataset = tf.keras.preprocessing.image_dataset_from_directory(
        directory,
        label_mode=None,
        batch_size=batch_size,
        shuffle=True,
        seed=123,
        image_size=(img_size, img_size),
    )
    datasetmapeado = dataset.map(lambda x: train_preprocessing(x))
    return datasetmapeado
dataset=get_loader(img_size)


# Build the encoder
def get_encoder():
    entradas = Input(shape=(input_shape[0],input_shape[0],3))
    x = entradas
    # Create variable i to index the layers in the loop
    i=0
    # Loop to transform the images received by the input method to dimensions (64,64,32), (32,32,64), (16,16,128), (8,8,256), (4,4,512)
    for filtro in [32,64,128,256,512]: 
        x = Conv2D(filtro, kernel_size=(3,3), strides=1, padding='same',name='Encoder_Conv2D_'+str(i))(x)
        x = BatchNormalization()(x)
        x = LeakyReLU()(x)
        x = MaxPooling2D()(x)
        i=i+1
    # "Flatten" the output vector from the loop
    x = Flatten()(x)
    x = Dense(latent_dim*2, activation='relu')(x)
    x_media = Dense(latent_dim, activation='relu')(x)
    x_var = Dense(latent_dim, activation='relu')(x)
    # Define the mean with a Dense layer
    media = Dense(latent_dim, name='Mean')(x_media)
    # Define the logarithm of the variance with a Dense layer
    log_var = Dense(latent_dim, name='Variance')(x_var)
    
    # Build the Samples function to sample from the approximate normal distribution
    def Samples(args):
        # Get the mean and log variance vectors
        z_media, z_log_var = args
        # Create an epsilon vector with mean 0 and standard deviation 1 for reparameterization
        epsilon = K.random_normal(shape=(K.shape(z_media)[0], K.shape(z_media)[1]), mean=0., stddev=1.)
        # Sample from a standard normal through reparameterization
        return z_media + K.exp(0.5 * z_log_var) * epsilon
    
    # Call the Samples function to generate the samples that will be passed to the decoder
    latent = Lambda(Samples, name='Samples')([media, log_var])
    # Build the encoder model
    encoder = Model(entradas,[latent, media, log_var], name='Encoder')
    return encoder

# Build the decoder
def get_decoder():
    entradas = Input(shape=(latent_dim,))
    x = entradas
    # Resize the input vector to (batch_size,400)
    x = Dense(latent_dim*2, activation='relu')(x)
    # Resize the received vector to (batch_size,4*4*512)
    x = Dense(4*4*512)(x)
    # Resize the received vector to (batch_size,4,4,512)
    x = Reshape((4, 4, 512))(x)
    # Create variable i to index the layers in the loop
    i=0
    # Loop to transform the images received by the input method to dimensions (8,8,512), (16,16,256), (32,32,128), (64,64,64), (128,128,32)
    for filtro in [512, 256, 128, 64, 32]:
        x = Conv2DTranspose(filtro, 3, strides=2, padding='same',name='Decoder_Conv2DTranspose_'+str(i))(x)
        x = BatchNormalization()(x)
        x = LeakyReLU()(x)
        i=i+1
    # Transform the vector received from the loop to dimensions (128,128,3)
    salidas = Conv2D(3, kernel_size=1, strides=1, padding='same', activation='sigmoid')(x)
    # Build the decoder model
    decoder = Model(entradas, salidas, name='Decoder')
    return decoder

# Build the VAE
def get_vae(codificador, decodificador, optimizador):
    entradas = Input(shape=(input_shape[0],input_shape[0],3))
    # Extract samples from the approximate distribution from the encoder
    z = codificador(entradas)[0]
    # Get the mean of the distribution
    media = codificador(entradas)[1]
    # Get the variance of the distribution
    log_var = codificador(entradas)[2]
    # Get the output vector from the decoder that receives the z encodings as input
    salidas = decodificador(z)
    # Build the VAE model
    vae = Model(entradas, salidas, name='VAE')
    # Calculate the MSE reconstruction loss
    mse_loss = mse(K.flatten(entradas), K.flatten(salidas))
    # Calculate the KL divergence
    kl_loss = 1 + log_var - K.square(media) - K.exp(log_var)
    kl_loss = K.sum(kl_loss, axis=-1)
    kl_loss *= -0.5
    # Calculate the network loss as the sum of KL divergence and reconstruction loss * 10000 (multiplied to give it more relevance)
    loss = K.mean(mse_loss*10000 + kl_loss)
    # Add the KL divergence, reconstruction loss and network loss to the VAE
    vae.add_loss(loss)
    vae.add_metric(kl_loss, name='kl_loss')
    vae.add_metric(mse_loss, name='mse_loss')
    # Compile the VAE with the Adam optimizer
    vae.compile(optimizer=optimizador)
    return vae

# Create the variables that will contain the generator and discriminator models
encoder = get_encoder()
decoder = get_decoder()
# Create the VAE optimizer
vae_optimizer=tf.keras.optimizers.Adam(0.0005)
vae = get_vae(encoder,decoder,vae_optimizer)

# Return the summary of the VAE, encoder and decoder
vae.summary()
encoder.summary()
decoder.summary()

# Return the images in a format that allows storing them in Tensorboard
def plot_to_image(figure):
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    plt.close(figure)
    buf.seek(0)
    image = tf.image.decode_png(buf.getvalue(), channels=4)
    image = tf.expand_dims(image, 0)
    return image

# Create a 5x5 grid with the images it receives
def image_grid(images):
    figure = plt.figure(figsize=(10,10))
    for i in range(images.shape[0]):
        img = preprocessing.image.array_to_img(images[i])
        plt.subplot(5, 5, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(img)

# Generate and save the images produced by the generator in a local folder
def generate_and_save_images(folder,model, epoch, seed, dim=(5, 5), figsize=(5, 5)):
    generated_images = model(seed)
    plt.figure(figsize=figsize)
    for i in range(generated_images.shape[0]):
        plt.subplot(dim[0], dim[1], i+1)
        img = preprocessing.image.array_to_img((generated_images[i] + 1 / 2))
        plt.imshow(img)
        plt.xticks([])
        plt.yticks([])
    plt.tight_layout()
    plt.savefig(folder + 'generated_image_epoch_%d.png' % epoch)
    plt.close()
    return generated_images

# Function to calculate the VAE network error
def vae_loss(x):
    encoded, mean, logvar = encoder(x)
    x_logit = decoder(encoded)    
    mse_loss = mse(K.flatten(x), K.flatten(x_logit))
    kl_loss = 1 + logvar - K.square(mean) - K.exp(logvar)
    kl_loss = K.sum(kl_loss, axis=-1)
    kl_loss *= -0.5
    vae_loss = K.mean(mse_loss*10000 + kl_loss)
    return vae_loss,kl_loss,mse_loss

# Train the network through train_step
@tf.function
def train_step(images):
    # Train the network by passing real images from the dataset
    with tf.GradientTape() as tape:
        # Calculate the network error
        loss,kl_loss,mse_loss = vae_loss(images)
     # Update the VAE gradients
    gradients = tape.gradient(loss, vae.trainable_variables)
    vae_optimizer.apply_gradients(zip(gradients, vae.trainable_variables))
    return loss, kl_loss, mse_loss

# Extract a set of images from the dataset to observe how reconstructions are performed
for image in dataset.take(1):
    seed = image[0:num_examples_to_generate, :, :, :]

def train(dataset, epochs):
    for epoch in range(1, epochs + 1):
        # Generate images before the network starts training
        generate_and_save_images(generated_images_folder,vae, 0, seed)
        print('Current training epoch {} (out of a total of {}).'.format(epoch+1, epochs))
         # Loop that iterates over the dataset to train the network
        for image_batch in tqdm(dataset):
            loss, kl_loss, mse_loss=train_step(image_batch)
        # Generate and save reconstructions as the network evolves
        generated_images=generate_and_save_images(generated_images_folder,vae, epoch, seed)
        fig=image_grid(generated_images)
        fig2=image_grid(seed)
        with tensorboard.as_default():
            # Print in Tensorboard the VAE error, KL divergence value, reconstruction loss and images generated by the network
            tf.summary.scalar('Loss VAE', loss, step=epoch)
            tf.summary.scalar('kl_loss VAE', K.mean(kl_loss), step=epoch)
            tf.summary.scalar('mse_loss VAE', K.mean(mse_loss), step=epoch)
            tf.summary.image('Generated images', plot_to_image(fig), step=epoch)
            tf.summary.image('Real images', plot_to_image(fig2), step=epoch)
        # If the current epoch is a multiple of 10 then save the VAE
        if epoch % 10 == 0:
            vae.save_weights(trained_models_folder + "VAE_epoch_%d" % epoch)

    # Save the last reconstructions produced by the VAE
    generate_and_save_images(generated_images_folder,vae, epoch, seed)

# Call the train function to start training
train(dataset, 200)

## Generar imágenes nuevas

In [None]:
# Function to create new samples (not reconstructions), from a standard distribution with zero mean and standard deviation 1
def vae_generate_images(samples):
    reconst_images = decoder.predict(np.random.normal(0,1,size=(samples,200)))
    image_grid(reconst_images)

# Call the function to generate samples indicating the number of samples to take (if you want to change it, make it compatible with the image_grid function)
vae_generate_images(samples=25)

## Imprimir la aproximación a la distribución normal

In [None]:
from scipy.stats import norm
import numpy as np
import matplotlib.pyplot as plt

# Take the output vector from the encoder by passing the generated seed to later see how it approximates the normal distribution
z_aux, _, _ = encoder.predict(seed)
x = np.linspace(-3, 3, 200)

fig = plt.figure(figsize=(20, 20))
fig.subplots_adjust(hspace=0.6, wspace=0.4)

for i in range(50):
    ax = fig.add_subplot(5, 10, i+1)
    ax.hist(z_aux[:,i], density=True, bins = 20)
    ax.axis('off')
    ax.text(0.5, -0.35, str(i), fontsize=10, ha='center', transform=ax.transAxes)
    ax.plot(x, norm.pdf(x))

plt.show()