# AUTOENCODERS

The objective of these scripts is to develop different alternatives for Autoencoder trained with spectrograms. More specifically, the spectrograms will come from audio signals that will be recordings of different scenarios related to the sea, such as: animals, ships, sea noises, etc. Each spectrogram is a 2-dimensional matrix, which can be thought of as a black and white image.

In [None]:
## AutoEncoder v1
import os
import numpy as np
import pickle
import tensorflow as tf
#print(tf.__version__)==> VERSION DE TENSORFLOW  1.10
import matplotlib

from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization
from tensorflow.keras.layers import Flatten, Dense, Reshape, Conv2DTranspose
from tensorflow.keras.layers import Activation, Lambda, LSTM
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import mean_squared_error # MeanSquaredError
from tensorflow.keras import metrics
from matplotlib import pyplot as plt
import pickle   #Lo tuve que agregar nuevamente porque me decía que no estaba definido cuando guarda los datos
import skimage   #la version que se instla en conda es la 0.14.0 ==> hay que ver nombre de la funcion para metricas
from skimage import measure 
from sklearn.model_selection import train_test_split

The data is stored in files with extension npy (numpy). The file returns a 3-dimensional tensor (FxTxN) , where N is the number of files, F represents the rows and T the columns. 

There are two more files: frequency and time, which are used to visualize the data.

The data values are expressed in dB since the logarithmic scale allows a better visualization.

The data was previously normalized in both frequency and time. 

The re-sample frequency is 44100 Hz and the time of each spectrogram is 1 sec. 

The spectrograms are obtained from audio normalized between -1 and 1. 

The number of samples of the FFT=256 The returned values are between -150dB and 50dB.


### LOADING OF THE DATA 


In [None]:
#Pxx=np.load('.../PECESlog.npy')            #windows 
#Pxx=np.load('.../ballena_jorobada.npy')    #windows 
#Pxx=np.load('.../blue_whale.npy')          #windows
Pxx=np.load('.../Bowhead_whale.npy')        #windows
#Pxx=np.load('.../MYSTICETlog.npy')         #windows 
#Pxx=np.load('.../MYSTICETlog.npy')         #"linux"
print(np.shape(Pxx)) 

#### Length of Data

The dimentions of the tensors are forced to pair values, in order to make them more appropiated for the 2D convolutions performed during the training process.


In [None]:
len_col=len(Pxx[1,:,1])
len_row=len(Pxx[:,1,1])

if (len_col % 2) !=0 :
   Pxx = Pxx[:,0:-1, :]
if (len_row % 2) !=0 :
   Pxx = Pxx[0:-1,:, :] 

print (np.shape(Pxx)) 

#### Normalization between 0 and 1

This normalization is needed as a consequence of the function definition in Keras

In [None]:
class MinMaxNormalizationSpectograms :
    def __init__ (self, input_data):
        self.np_data = np.array(input_data)
        self.min = np.min(self.np_data)
        self.max = np.max(self.np_data)
    
    def spectograms_to_image (self,input_data):
        self.norm_array = ( self.np_data - self.min)/(self.max-self.min)
        return self.norm_array

P_normalizer = MinMaxNormalizationSpectograms(Pxx)
p_train_norm = P_normalizer.spectograms_to_image(Pxx)

#input_shape=np.shape(x_train_norm)
print(np.shape(p_train_norm ))
#print(input_shape)

#### Data Splitting

Since our dataset has no temporal order, it is convenient to perform the splitting randomly to ensure maximum variability of the data in our train/test sets. This can be done with a specific library for this purpose: scikit-learn: train_test_split().

In [None]:
import numpy as np
a=len((p_train_norm[:,1,1])) #Rows
b=len((p_train_norm[1,:,1])) #Cols
c=len((p_train_norm[1,1,:])) #Files 

Pxx=np.reshape(p_train_norm,(c,a,b, 1))
print(np.shape(Pxx))



In [None]:
x_train, x_test = train_test_split(Pxx, test_size = 0.10)
print(np.shape(x_test))
print(np.shape(x_train))


input_shape=np.shape(x_train)
print(input_shape)

## CNN AUTOENCODER

In [None]:
class AutoEncoder:

    def __init__(self,
                 input_shape,   # Dimension of the input data. In this case, 2D spectrograms.
                 conv_filters,  # Number of convolution filters for each layer.
                 conv_kernels,  # Dimensions of the convolution filters. By itself,
                                # these filters are intended to be 2D because the input 
                                # is 2D and will be defined as such. 
                                # In this case, the dimensions of the rows=columns.
                 conv_strides,  # The shift in the convolution of the filter with the input.
                 latent_space_dim #Dimension of the latent space used as bottleneck between encoder and decoder.
                 ): 
   
        self.input_shape        = input_shape # Each individual entry will have the dimension [196,128,1].  
        self.conv_filters       = conv_filters # [2, 4, 8] 
        self.conv_kernels       = conv_kernels # [3, 5, 3] ==> (3x3x1,5x5x1,3x3x1)
        self.conv_strides       = conv_strides # [1, 2, 2]
        self.latent_space_dim   = latent_space_dim #
        self.Encoder            = None
        self.Decoder            = None
        self.Model              = None
        self._model_input       = None
        self._num_conv_layers   = len(conv_filters) # Conv layers in the model
        self._shape_before_bottleneck = None
           
        self._build() 

  
    def _build(self):
        self._build_encoder() 
        self._build_decoder()  
        self._build_autoencoder() 

  #----------------------------------------------------------------------------------------
  #                   ENCODER
  #---------------------------------------------------------------------------------------
  
    def _build_encoder(self):
        encoder_input = self._add_encoder_input() #Add input layer
        conv_layers = self._add_conv_layers(encoder_input) #Add all conv layers
        bottleneck = self._add_bottleneck(conv_layers) #Agrega el cuello de botella
        self._model_input = encoder_input 
        self.Encoder = Model(encoder_input,bottleneck, name='Encoder_v1') 
        
    def _add_encoder_input(self):
        return Input(
            shape = self.input_shape,
            name = "Encoder_Input_layer"
            )

    def _add_conv_layers(self, temp_model):
        #Creation of all convolutional layers in the model.#
        aux = temp_model
        #Add layer to layer to the model
        for layer in range(self._num_conv_layers):
            aux = self._add_conv_layer(layer,aux)
        return aux

    def _add_conv_layer(self, layer, temp_model):
        #Add convolutional layers to the model (temp_model). Each convolutional layer consists of:
        #- Conv2D: convolution with 2D filters.
        #- ReLU : activation function applied to the convolution result.
        #- BatchNromalzation : This operation is added as a Subsampling or Pooling layer (parameter reduction layer). This operation accelerates the training process.
            # Each layer of a neural network has inputs with a corresponding distribution, which is affected during the training process by 
            # Randomness in parameter initialization and randomness in the input data. 
            # The effect of these sources of randomness on the distribution of the inputs produces changes in the means and variances of the inputs to the inner layers during training. 
            # to the inner layers during training. 
            # During the training stage of the networks, as the parameters of the previous layers change, the distribution of inputs to the layer changes,
            # so that the current layer needs to constantly readjust to the new distributions. This problem increases in deep networks, 
            # since small changes in shallower hidden layers will be amplified as they propagate within the network. 
            # Therefore, the batch normalization method is proposed to reduce these unwanted changes to speed up training and to produce more reliable models. 
            # produce more reliable models.
            # Batch normalization seems to have a regularization effect, so that the network improves its generalization properties.
            # It has also been observed that, with batch standardization, the network becomes more robust to different initialization schemes and learning rates.
        #- The padding is adjusted to obtain at the output of each convolution, the appropriate matrices.     
        conv_layer = Conv2D(
            filters=self.conv_filters[layer],
            kernel_size=self.conv_kernels[layer],
            strides=self.conv_strides[layer],
            padding="same", 
            name="encoder_conv_layer_"+str(layer),
            trainable=False
            )
        temp_model = conv_layer(temp_model)
        temp_model = ReLU(name="encoder_ReLU_layer_"+str(layer))(temp_model)
        temp_model = BatchNormalization(name="encoder_BN_layer_"+str(layer))(temp_model)
        return temp_model

    def _add_bottleneck(self, temp_model):
        #- A dense (fully connected) layer is added to the model to obtain the latent space.
        #- A Flatten layer must be added to resha.pe the output tensor of the convolutional layers to a vector suitable for a Dense layer.
        #- Information about the shape of the model is stored, before storing the latent space, for the duplication process while the decoder is being built. 
        #- The model is stored for replication in the decoder
        self._shape_before_bottleneck = K.int_shape(temp_model)[1:]
        temp_model = Flatten()(temp_model) 
        temp_model = Dense(self.latent_space_dim, name="Decoder_output")(temp_model)
        return temp_model

  #------------------------------------------------------------------------------------------------------------------------------
  #                   DECODER
  #------------------------------------------------------------------------------------------------------------------------------
    def _build_decoder(self):
        decoder_input = self._add_decoder_input()
        dense_layer = self._add_dense_layer(decoder_input)
        reshape_layer = self._add_reshape_layer(dense_layer)
        conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer)
        decoder_output = self._add_decoder_output(conv_transpose_layers)
        self.Decoder = Model(decoder_input, decoder_output, name="Decoder_v1")

    def _add_decoder_input(self):
        return Input(
            shape = (self.latent_space_dim,),  
            name = "Decoder_input_layer"
        )

    def _add_dense_layer(self, temp_model):
        dense_layer = Dense(
            np.prod(self._shape_before_bottleneck),
            name = "Decoder_dense_layer"
            )(temp_model)
        return dense_layer

    def _add_reshape_layer(self,temp_model):
        return Reshape(self._shape_before_bottleneck)(temp_model)

    def _add_conv_transpose_layers(self, temp_model):
        # Add all the transpose convolutional layers to the model"""
        for layer in reversed(range( 1, self._num_conv_layers)):
              temp_model = self._add_conv_transpose_layer(layer,temp_model)
        return temp_model

    def _add_conv_transpose_layer(self,layer,temp_model):
        #"Add a single transpose conv layer to the model """
        layer_num = self._num_conv_layers - layer
        conv_transpose_layer = Conv2DTranspose(
            filters = self.conv_filters[layer],
            kernel_size=self.conv_kernels[layer],
            strides=self.conv_strides[layer],
            padding="same",
            name="decoder_conv_transpose_layer_"+str(layer), 
            trainable=False
            )
        temp_model = conv_transpose_layer(temp_model)
        temp_model = ReLU(name ="decoder_ReLU_transpose_layer_"+str(layer))(temp_model)
        temp_model = BatchNormalization(name = "decoder_BN_transpose_layer_"+str(layer))(temp_model)
        return temp_model

    def _add_decoder_output(self, temp_model):
        conv_transpose_layer = Conv2DTranspose(
            filters=1,
            kernel_size=self.conv_kernels[0],
            strides=self.conv_strides[0],
            padding="same",
            name="decoder_conv_transpose_layer_"+str(self._num_conv_layers)
            )
        temp_model = conv_transpose_layer(temp_model)
        output_layer = Activation("sigmoid", name="sigmoid_layer")(temp_model)
        return output_layer

  #----------------------------------------------
  #                   AUTOENCODER
  #----------------------------------------------
    def _build_autoencoder(self):
        model_input = self._model_input
        model_output = self.Decoder(self.Encoder(model_input))
        self.Model = Model(model_input, model_output, name="AutoEncoder_CNN")

  #----------------------------------------------------------------------------------------------------------------------
  #            COMPILATION AND TRAINING
  #----------------------------------------------------------------------------------------------------------------------
    # When compiling, we must specify some additional properties necessary for training the network. 
    # Training a network means finding the best set of weights for the network to do what it should do. 
    # We must specify the loss function to use to evaluate a set of weights, the optimizer used to search through different weights for the network. 
    # and if we wanted we could add any optional metrics we would like to collect and report during training.
    # For training we use the mean square error as a loss function and a type of optimizer called Adam.
    # Adamm is a replacement optimization algorithm for stochastic gradient descent for training deep learning models.
    
        def compile(self, learning_rate=0.0001):
        optimizer = Adam(lr=learning_rate)
        mse_loss = mean_squared_error
        self.Model.compile(optimizer=optimizer, loss=mse_loss)
    
    #We have defined our model and compiled it ready for efficient calculation. Now it is time to run the model on some data. 
    #We can train or fit our model to the loaded data by calling the fit() function in the model.
    # The training process will run for a fixed number of iterations called epochs or epochs. 
    # We can also set the number of instances that are evaluated before a weight update is performed on the network called batch_size. 
    # For this problem we will use a small number of epochs (150) and a relatively small batch_size (10). 
    # These can be chosen experimentally by trial and error.
    # This function returns a History attribute. If, instead of using for validation a percentage of the data,
    #I want to use data, then, I should change percentage_validation to data_validation=(x_test,x_test). That is to say that you should put the test data and the target.
    #test data and the target.
    def train(self, x_train, batch_size, num_epochs,porcentaje_validacion):
        History = self.Model.fit(x_train,
                                x_train,
                                batch_size=batch_size,
                                epochs=num_epochs,
                                validation_split=porcentaje_validacion, #Indica el porcentaje de datos de train que se usarán para validación
                                shuffle=True)
        return History

  #----------------------------------------------
  #                   EVALUATION 
  #----------------------------------------------   
    #This function returns the loss value and the metric values of the model in test mode.
    def evaluation(self,x_test):  
        evaluacion=self.Model.evaluate(
               x=x_test,
               y=x_test,
               verbose=1
               )
        return evaluacion

  #----------------------------------------------
  #                   STRUCTURE SUMMARY
  #----------------------------------------------

    def summary(self):
        self.Encoder.summary()
        self.Decoder.summary()
        self.Model.summary()

  #----------------------------------------------
  #                  STORE THE MODEL
  #----------------------------------------------
    def save(self, save_folder="."):
        self._create_folder_if_it_doesnt_exist(save_folder)
        self._save_parameters(save_folder)
        self._save_weights(save_folder)

    def _create_folder_if_it_doesnt_exist(self, folder):
        if not os.path.exists(folder):
            os.makedirs(folder)

    def _save_parameters(self, save_folder):
            parameters = [
              self.input_shape,
              self.conv_filters,
              self.conv_kernels,
              self.conv_strides,
              self.latent_space_dim
              ]
            save_path = os.path.join(save_folder, "parameters.pkl")
            with open(save_path, "wb") as f:
                pickle.dump(parameters, f)

    def _save_weights(self, save_folder):
        save_path = os.path.join(save_folder, "weights.h5py")  
        self.Model.save_weights(save_path)

  #----------------------------------------------
  #                  LOAD THE MODEL
  #----------------------------------------------
    def load_weights(self, weights_path):
        self.Model.load_weights(weights_path)

    @classmethod
    def load(cls, save_folder="."):
        parameters_path = os.path.join(save_folder, "parameters.pkl")
        with open(parameters_path, "rb") as f:
            parameters = pickle.load(f)
        autoencoder = AutoEncoder(*parameters)
        weights_path = os.path.join(save_folder, "weights.h5py") #En versiones posteriores, la extensión podría ser .h5
        autoencoder.load_weights(weights_path)
        return autoencoder

  #----------------------------------------------
  #                   PREDICTION
  #----------------------------------------------
  #Reconstruction of the input images.
  #This function returns a 2-dimensional tensor:
  # - In dimension 1 it will reconstruct a spectrogram matrix, starting from a matrix (data) at the input.
  # The shape of the input data will be a 4-dimensional tensor. The output will also be a 4-dimensional tensor.
  # I.e., if input with n number of images (n,x,y,1), the output will be a tensor (n,x1,y1,1).
  # In dimension 2, it returns the latent space tensor. In this type of convolutionals, the latent space is discrete, 
  # therefore it will return a vector for each image entered. 

  #It is used with some test data to corroborate the similarity, then some kind of error could be calculated.
  #The latent space is found at position 1 of the tensor.
    def reconstruct(self, images):
        latent_representations = self.Encoder.predict(images)
        reconstructed_images = self.Decoder.predict(latent_representations)
        return reconstructed_images, latent_representations
 

This architecture has a coding section based on convolutional networks, in which a feature extraction section and a densely connected flatten layer are performed to obtain a representative matrix or generator code. From this generator code and replicating the coding process in reverse, an attempt is made to replicate the input spectrogram.

The proposed architecture has 4 layers in the feature extraction stage. In all stages the activation function is RELU type and the padding, associated to the filtering stage, is automatically rearranged so that at the output of the filtering stage, the size of the rows and columns is an integer. A BachNormalization layer is added as a Pooling layer. A Flatten layer is added to obtain a representative data vector.

* 1st layer: 32 Filters of 3x3x1. The Stride will be 1 and the padding will be adjusted.
* 2nd layer: 64 Filters of 3x3x1. The Stride will be 2 and the padding is accommodated.
* 3rd layer: 64 Filters of 3x3x1. The Stride will be 2 and the padding is accommodated.
* 4th layer: 64 Filters of 3x3x1. The Stride will be 1 and the padding will be adjusted.

In [None]:
autoencoder_CNN = AutoEncoder(
            input_shape=(input_shape[1],input_shape[2], 1), 
            conv_filters=(32, 64, 64, 64),  
            conv_kernels=(3, 3, 3, 3),  
            conv_strides=(1, 2, 2, 1), 
            latent_space_dim = 4, 
            )
#autoencoder_CNN.Model.layers[1].trainable = False
autoencoder_CNN.summary()  

### Load Model

In [None]:
autoencoder_path = ''
autoencoder_CNN = AutoEncoder.load(autoencoder_path)  #WINDOWS
#autoencoder2.summary()  #show model

In [None]:
#Hyperparameters
LEARNING_RATE = 0.05
BATCH_SIZE = 70
EPOCHS = 100

percent_validation=0.2 

#COMPILATION AND TRAINING
#At the compilation stage, additional properties required for training the network are specified. 
# The loss function to be used to evaluate a set of weights and the optimizer used to search through different weights for the network, 
# are already defined above and are: 
 # - the mean square error as the loss function. 
 # - the Adam optimizer. Adam is a replacement optimization algorithm for stochastic gradient descent to train deep learning models.
# It only remains to specify the learning rate.
autoencoder_CNN.compile(LEARNING_RATE)

# TRAINING
 # Training a network means finding the best set of weights for the network to do what it should do. It is time to run the model on some data. 
    # The training process will run for a fixed number of iterations called epochs or epochs. 
    # The number of instances that are evaluated before a weight update is performed on the network called batch_size. 
    # These can be chosen experimentally by trial and error.
    # the validation percentage, indicates what percentage of the train data is used to validate.
#This function returns a History attribute. If, instead of using a percentage of the data for validation, I wanted to use data, then, instead of using a percentage of the data, I would use data,
#I want to use data, then, I should change percentage_validation to data_validation=(x_test,x_test). That is to say that you should put the test data and the target.
#test data and the target.
History=autoencoder_CNN.train(x_train,BATCH_SIZE,EPOCHS,percent_validation)

### Store Model

In [None]:
#Store Model
#autoencoder_CNN.save(".../modelo_CNN")  #WINDOWS
autoencoder_CNN.save(".../CNN_transfer_bowhead100")

#Store Error vector
error_train=History.history["loss"]
error_train_save        =   np.save('...vectores_error/train_CNN_bowhead_transfer100.npy', error_train)

error_validation        =   History.history["val_loss"]
error_validation_save   =   np.save('.../vectores_error/val_CNN_bowhead_transfer100.npy',error_validation)

#Load Train vectors
error_train_save        =   np.save('.../vectores_error/train_CNN_franca_transfer100.npy', error_train)
error_validation_save   =   np.save('.../vectores_error/val_CNN_franca_transfer100.npy',error_validation)



## Evaluation

In [None]:
fig = plt.figure()
plt.title("DATOS DE ENTRENAMIENTO")
plt.ylabel('error')
plt.xlabel('épocas')
plt.plot(History.history["loss"], label="Training Loss")
plt.plot(History.history["val_loss"], label="Validation Loss")
plt.legend()

### EVALUATION OF THE FINAL IMAGE WITH TEST DATA
In this part we will evaluate with some test data, the similarity between the entered images and the obtained ones. Two different metrics will be used for the evaluation:

SSIM index (structural similarity index measure). SSIM is used to measure the similarity between two images.
The SSIM index is a full reference metric; in other words, the measurement or prediction of image quality is based on an initial uncompressed or undistorted image as a reference. SSIM is a perception-based model that considers image degradation as a perceived change in structural information, while incorporating important perceptual phenomena, including the terms luminance masking and contrast masking. The difference with other techniques such as MSE or PSNR is that these approaches estimate absolute errors. Structural information is the idea that pixels have strong interdependencies, especially when they are spatially close. These dependencies carry important information about the structure of objects in the visual scene.

Mean square error,
Adequacy of the original TEST image.

In [None]:
N               =   10
test_image      =   x_test[N,:]                   
original_image  =   np.float32(test_image)


result          =   autoencoder_CNN.reconstruct(x_test)  
aux             =   result[0]  #array of output images (n,128, 196, 1) for test data.
output_image    =   aux[N,:]  #Reconstruction of the one image. Dimension is: (128, 196, 1)

# # Mean squared error
MSE=skimage.measure.compare_mse(original_image,output_image)
print('MSE:',MSE)

# # SSIM
SSIM=skimage.measure.compare_ssim(original_image,output_image,multichannel=True)
print('SSIM:',SSIM)

Image comparison

In [None]:
#PLOT SPECTROGRAMS
#Time axes are loaded. These data can be obtained from sampling frequency and time, but are obtained directly from memory. 
# They are generated when the spectrograms are made.
time  =   np.load('.../frecuencia.npy')
freq  =   np.load('.../tiempo.npy')

#The size of these axes is adapted to the length of the images.
time1     =   time[0:128]
freq1     =   freq[0:196]

#La matriz que se debe pasar para el espectograma debe ser de 2 dimensiones.
original        = original_image[:,:,0]     
reconstruct     = output_image[:,:,0]   

#Create Figure
plt.figure(figsize=(12,4),dpi=150)
plt.subplot(1,2,1)
plt.text(0.8,26000,"BALLENA DE GREONLANDIA", fontsize=12)
plt.xlabel('Time [s]')
plt.ylabel('Frequency [Hz]')
L=len(x_train[:,1,1])
plt.text(0.8,-4100,f'MSE={MSE:.5f} ; SSIM={SSIM:.5f}); N= {L}',fontsize=10)
plt.title("Original Spectrogram TEST")
Pxx_original=plt.pcolormesh(freq1, time1, original, cmap='Spectral')
plt.colorbar()
plt.subplot(1,2,2)
plt.xlabel('Time [s]')
plt.ylabel('Frequency [Hz]')
plt.title("Reconstructed Spectrogram TEST")
Pxx_reconstruc=plt.pcolormesh(freq1, time1, reconstruct, cmap='Spectral')
plt.colorbar()

### Generation of Synthetic images

At this point, the aim is to obtain a synthetic image from the code generated by the autoencoder. Actually, with this model a code is generated for each trained image. One should choose one at random and generate a sample.

In [None]:
#The generated code has as many subcodes as data were trained in the autoecoder and a similar image will be obtained.
# The subcodes are discrete, so I must send you a particular code. 
#The choice of the entered code could be done randomly.
import random

code    =   result[1]
print(np.shape(code))
M       =   random.randint(0, len(code[:,0]))
print(M)
unique_code    =   code[M,:]
print(unique_code)
print(np.shape(unique_code),type(unique_code))

unique_tensor       =   tf.expand_dims(unique_code, axis=0) #Esta función me devuelve un tensor, asique se debe pasar a array.
unique_code_final   =   tf.compat.v1.Session().run(unique_tensor)

Image_final = autoencoder_CNN.Decoder.predict(unique_code_final)


## VAE AUTOENCODER

The first improvement that can be made to the previous version of AutoEncoder is to use a variational AutoEncoder approach.

The main change from the standard AutoEncoder is that VAE uses the TRIAN ensemble to develop a multidimensional distribution that is continuous in latent space. The mapping to single points in the latent space, which CNN does makes it difficult to generate new cohesive outputs when the latent space when the input is relatively far from the known points belonging to the TRAIN set.

In [None]:
class VAE:
  """
  The present class refers to a Deep CNN Variational AutoEncoder architecture
  where the encoder and the decoder components are mirrored.
  """
  def __init__(self,
                 input_shape,
                 conv_filters,
                 conv_kernels,
                 conv_strides,
                 latent_space_dim):
    self.input_shape = input_shape # [28, 28, 1]
    self.conv_filters = conv_filters # [2, 4, 8]
    self.conv_kernels = conv_kernels # [3, 5, 3]
    self.conv_strides = conv_strides # [1, 2, 2]
    self.latent_space_dim = latent_space_dim # 2
    self.reconstruction_loss_weight = 1000
    self.Encoder = None
    self.Decoder = None
    self.Model = None
    self._model_input = None
    self._num_conv_layers = len(conv_filters) #Number of convolutional layers in the Model
    self._shape_before_bottleneck = None

    self._build()
  
  def _build(self):
    self._build_encoder() #Method to create the Encoder model
    self._build_decoder()
    self._build_autoencoder() 

  #----------------------------------------------
  #                   ENCODER
  #----------------------------------------------
  def _build_encoder(self):
    encoder_input = self._add_encoder_input() #Add input layer
    conv_layers = self._add_conv_layers(encoder_input) #Add all conv layers
    bottleneck = self._add_bottleneck(conv_layers) 
    self._model_input = encoder_input 
    self.Encoder = Model(encoder_input, bottleneck, name='Encoder_VAE')

  def _add_encoder_input(self):
    return Input(
        shape = self.input_shape,
        name = "Encoder_Input_layer"
    )

  def _add_conv_layers(self, temp_model):
    """Creation of all convolutional layers in the model."""
    aux = temp_model

    #Add layer to layer to the model
    for layer in range(self._num_conv_layers):
      aux = self._add_conv_layer(layer,aux)
      
    return aux

  def _add_conv_layer(self, layer, temp_model):
    """
    Add a single convolutional layer to the model (temp_model)
    Each convolutional layer consist on:
    Conv2D: convolution with filters
    ReLU : activation function applied to the result of the convolution
    BatchNromalzation : Operation included to speed up the training process

    :param: layer: the actual layer index in the model building process
    :param: temp_model: the in-developing model to which the conv layer will be added 
    """
    conv_layer = Conv2D(
        filters=self.conv_filters[layer],
        kernel_size=self.conv_kernels[layer],
        strides=self.conv_strides[layer],
        padding="same",
        name=f"encoder_conv_layer_{layer}"
        )
    temp_model = conv_layer(temp_model)
    temp_model = ReLU(name=f"encoder_ReLU_layer_{layer}")(temp_model)
    temp_model = BatchNormalization(name=f"encoder_BN_layer_{layer}")(temp_model)
    return temp_model
    
  #ESPACIO LATENTE  
    """
    Se agrega una capa densa (totalmente conectada) al modelo para obtener el espacio latente.
    Esta capa (Flatten) remodela el tensor de salida de las capas convolucionales a un vector adecuado.
    """
  def _add_bottleneck(self, temp_model):
    self._shape_before_bottleneck = K.int_shape(temp_model)[1:]   #Información sobre la forma del modelo antes de almacenar el espacio latente. Sirve para el proceso de duplicación al  construir el Decodificador.
    temp_model = Flatten()(temp_model)

    #El espacio latente está envuelto por una distribución normal. La distribución normal está completamente definida por su valor medio y su desviación
    self.mu = Dense(self.latent_space_dim, name = "mu_latent_space")(temp_model)
    self.log_variance = Dense(self.latent_space_dim, name = "log_variance_latent_space")(temp_model)
    
    #Definición de la distribución normal. Esta sera la funcion de transformacion
    def map_to_normal_distribution(args):  
      mu, log_variance = args   #me devuelve la media y el logaritmo de la varianza
      base_gaussian = K.random_normal(
        shape = K.shape(self.mu),
        mean = 0.,
        stddev = 1.
      )
      point = mu + K.exp(log_variance/2) * base_gaussian # la desviacion standar es la raiz cuadrada de la varianza
      return point

    #Las capas Lambda se utilizan para implementar capas con una función artesanal.
    # En este caso, la función seguirá la distribución normal parametrizada
    # cuyos parámetros deben ser entrenados

    temp_model = Lambda(
        map_to_normal_distribution,
        name = "Encoder_output")([self.mu,self.log_variance])

    return temp_model

  #----------------------------------------------
  #                   DECODER
  #----------------------------------------------
  def _build_decoder(self):
    decoder_input = self._add_decoder_input()
    dense_layer = self._add_dense_layer(decoder_input)
    reshape_layer = self._add_reshape_layer(dense_layer)
    conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer)
    decoder_output = self._add_decoder_output(conv_transpose_layers)
    self.Decoder = Model(decoder_input, decoder_output, name="Decoder_VAE")

  def _add_decoder_input(self):
    return Input(
        shape = (self.latent_space_dim,),
        name = "Decoder_input_layer"
    )

  def _add_dense_layer(self, temp_model):
    dense_layer = Dense(
        np.prod(self._shape_before_bottleneck),
        name = "Decoder_dense_layer"
    )(temp_model)
    return dense_layer

  def _add_reshape_layer(self,temp_model):
    return Reshape(self._shape_before_bottleneck)(temp_model)

  def _add_conv_transpose_layers(self, temp_model):
    """ Add all the transpose convolutional layers to the model"""
    for layer in reversed(range( 1, self._num_conv_layers)):
      temp_model = self._add_conv_transpose_layer(layer,temp_model)

    return temp_model

  def _add_conv_transpose_layer(self,layer,temp_model):
    """Add a single transpose conv layer to the model """

    layer_num = self._num_conv_layers - layer

    conv_transpose_layer = Conv2DTranspose(
        filters = self.conv_filters[layer],
        kernel_size=self.conv_kernels[layer],
        strides=self.conv_strides[layer],
        padding="same",
        name="decoder_conv_transpose_layer_"+str(layer)
        )
    
    temp_model = conv_transpose_layer(temp_model)
    temp_model = ReLU(name ="decoder_ReLU_transpose_layer_"+str(layer))(temp_model)
    temp_model = BatchNormalization(name = f"decoder_BN_transpose_layer_"+str(layer))(temp_model)

    return temp_model

  def _add_decoder_output(self, temp_model):

     conv_transpose_layer = Conv2DTranspose(
            filters=1,
            kernel_size=self.conv_kernels[0],
            strides=self.conv_strides[0],
            padding="same",
            name="decoder_conv_transpose_layer_"+str(self._num_conv_layers)
        )
     temp_model = conv_transpose_layer(temp_model)
     output_layer = Activation("sigmoid", name="sigmoid_layer")(temp_model)
    
     return output_layer

  #----------------------------------------------
  #                   AUTOENCODER
  #----------------------------------------------
  def _build_autoencoder(self):
    model_input = self._model_input
    model_output = self.Decoder(self.Encoder(model_input))
    self.Model = Model(model_input, model_output, name="Variational_AutoEncoder(VAE)")

  #----------------------------------------------
  #            COMPILATION AND TRAINING
  #----------------------------------------------
  def compile(self, loss_function = "MSE",learning_rate=0.0001):
        optimizer = Adam(learning_rate=learning_rate)
        self.Model.compile(optimizer=optimizer,
                              loss=self._calculate_combined_loss,
                              metrics=[self._calculate_reconstruction_loss, self._calculate_kl_loss] #Metrics are evaluated by the model during training and testing.
                              )
  
  def _calculate_combined_loss(self, y_target, y_predicted):
    """
    the reconstruction loss weight may be optimized during the training process or using some hyperparameter optimization method
    """
    reconstruction_loss = self._calculate_reconstruction_loss(y_target, y_predicted)
    kl_loss = self._calculate_kl_loss(y_target, y_predicted)
    combined_loss = self.reconstruction_loss_weight * reconstruction_loss + kl_loss
    return combined_loss

  def _calculate_reconstruction_loss(self, y_target, y_predicted):
    error = y_target - y_predicted
    reconstruction_loss = K.mean(K.square(error), axis=[1, 2, 3])
    return reconstruction_loss

  def _calculate_kl_loss(self, y_target, y_predicted):
    """
    The main objetive of KL loss is to take the resulting multivariational distribution the closest possible to
    a standard normal distribution. It also improves the symmetry around the origin, which helps to avoid gaps in the
    latent space.
    """
    kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) - K.exp(self.log_variance), axis=1)
    return kl_loss


  def train(self, x_train, batch_size, num_epochs,porcentaje_validacion):
        History= self.Model.fit(x_train,
                       x_train,
                       batch_size=batch_size,
                       epochs=num_epochs,
                       validation_split=porcentaje_validacion,
                       shuffle=True)
        return History
  #----------------------------------------------
  #                   STRUCTURE SUMMARY
  #----------------------------------------------

  def summary(self):
      self.Encoder.summary()
      self.Decoder.summary()
      self.Model.summary()

  #----------------------------------------------
  #                   SAVE THE MODEL
  #----------------------------------------------
  def save(self, save_folder="."):
        self._create_folder_if_it_doesnt_exist(save_folder)
        self._save_parameters(save_folder)
        self._save_weights(save_folder)

  def _create_folder_if_it_doesnt_exist(self, folder):
    if not os.path.exists(folder):
        os.makedirs(folder)

  def _save_parameters(self, save_folder):
      parameters = [
          self.input_shape,
          self.conv_filters,
          self.conv_kernels,
          self.conv_strides,
          self.latent_space_dim
      ]
      save_path = os.path.join(save_folder, "parameters.pkl")
      with open(save_path, "wb") as f:
          pickle.dump(parameters, f)

  def _save_weights(self, save_folder):
      save_path = os.path.join(save_folder, "weights.h5py")
      self.Model.save_weights(save_path)

  #----------------------------------------------
  #                   LOAD THE MODEL
  #----------------------------------------------
  def load_weights(self, weights_path):
    self.Model.load_weights(weights_path)

  @classmethod
  def load(cls, save_folder="."):
    parameters_path = os.path.join(save_folder, "parameters.pkl")
    with open(parameters_path, "rb") as f:
        parameters = pickle.load(f)
    autoencoder = VAE(*parameters)
    weights_path = os.path.join(save_folder, "weights.h5py")
    autoencoder.load_weights(weights_path)
    return autoencoder

  #----------------------------------------------
  #                   PREDICTION
  #----------------------------------------------
  def reconstruct(self, images):
    latent_representations = self.Encoder.predict(images)
    reconstructed_images = self.Decoder.predict(latent_representations)
    return reconstructed_images, latent_representations

In [None]:
autoencoder_VAE = VAE(
            input_shape=(input_shape[0],input_shape[1], 1), 
            conv_filters=(32, 64, 64, 64),  
            conv_kernels=(3, 3, 3, 3),   
            conv_strides=(1, 2, 2, 1),  
            latent_space_dim = 3
            )
autoencoder_VAE.summary()  

In [None]:
autoencoder_VAE = VAE.load(".../VAE3(48)")  #load model WINDOWS


In [None]:
#Hyperparameter
LEARNING_RATE = 0.0005
BATCH_SIZE = 70
EPOCHS = 60
percent_validacion=0.2 

# Compiling
autoencoder_VAE.compile(LEARNING_RATE)

# Training
History=autoencoder_VAE.train(x_train,BATCH_SIZE,EPOCHS,percent_validacion)

In [None]:
error_train=History.history["_calculate_reconstruction_loss"]
error_train_save=np.save('.../train_VAE3_transfers(60).npy',error_train)

error_validation=History.history["val__calculate_reconstruction_loss"]
error_validation_save=np.save('.../val_VAE3_transfers(60).npy',error_validation)

In [None]:
autoencoder_VAE.save(".../VAE3_jtransfer60")

In [None]:
autoencoder_VAE = AutoEncoder.load(".../VAE3(48)")  
autoencoder_VAE.summary()  

### VAE MODEL EVALUATION

In [None]:
fig = plt.figure()
plt.title("TRAIN DATA")
plt.ylabel('error')
plt.xlabel('epochs')
plt.plot(History.history["_calculate_reconstruction_loss"], label="Training reconstruction_Loss")
plt.plot(History.history["val__calculate_reconstruction_loss"], label="Validation reconstructio Loss")
plt.legend()

In [None]:
###---------------------------------------------------------------------------------------
#                              TEST IMAGE PROCESSING
###---------------------------------------------------------------------------------------
N=10                                        
original_test_image=x_test[N,:]                   
original_image= np.float32(original_test_image)  

###---------------------------------------------------------------------------------------
#                              PERFORMANCE MEASUREMENT
###---------------------------------------------------------------------------------------

RESULT =autoencoder_VAE.reconstruct(x_test)  
aux=RESULT[0]   
output_image=aux[N,:] 

# # MSE

MSE=skimage.measure.compare_mse(original_image,output_image)
print('The MSE is:',MSE)

# # # SSIM
SSIM=skimage.measure.compare_ssim(original_image,output_image,multichannel=True)
print('The SSIM figure is:',SSIM)

###---------------------------------------------------------------------------------------
#                             SPECTROGRAM PLOTTING
###---------------------------------------------------------------------------------------

time_axis   =   np.load('.../frecuencia.npy') 
freq_axis   =   np.load('.../tiempo.npy')

#El tamaño de estos ejes se adecuada a la longitud de las imagenes.
time_axis   =   time_axis[0:128]
freq_axis   =   freq_axis[0:196]

original_image  =   original_image[:,:,0]     # (128,196)
reconstruction  =   output_image[:,:,0]   # (128,196)

# Creando figura 
plt.figure(figsize=(12,4),dpi=150)
plt.subplot(1,2,1)
plt.text(0.8,26000,"Spectrograms", fontsize=12)
plt.text(0.4,-4000,"(MSE="+str(MSE)+"); SSIM="+str(SSIM)+"); N="+str(len(x_train[:,1,1]))+")",fontsize=10)
plt.title("Original TEST image")
Pxx_original=plt.pcolormesh(freq_axis, time_axis, original_image, cmap='Spectral')
plt.colorbar()
plt.subplot(1,2,2)
plt.title("Reconstruction")
Pxx_reconstruida=plt.pcolormesh(freq_axis, time_axis, reconstruction, cmap='Spectral')
plt.colorbar()

In [None]:
import random

generation_code=RESULT[1]
print(np.shape(generation_code))
M=random.randint(0, len(generation_code[:,0]))
print(M)
generation_code=generation_code[M,:]
print(generation_code)
print(np.shape(generation_code),type(generation_code))

generation_code_tensor=tf.expand_dims(generation_code, axis=0) #Esta función me devuelve un tensor, asique se debe pasar a array.
generation_code_tensor_final= tf.compat.v1.Session().run(generation_code_tensor)

Imagen_final = autoencoder_VAE.Decoder.predict(generation_code_tensor_final)
