<a href="https://colab.research.google.com/github/PratikStar/google-colab/blob/main/2_Creating_%26_Training_Audio_VAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1.  Create VAE

### Mount Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive',  force_remount=True)

Mounted at /content/drive


### Install tensorflow v2.4.0

In [None]:
!pip uninstall tensorflow
!pip install tensorflow==2.4.0

In [3]:
import os
import pickle
import csv
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, \
    Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
import numpy as np
import tensorflow as tf


tf.compat.v1.disable_eager_execution()


class VAE:
    """
    VAE represents a Deep Convolutional variational autoencoder architecture
    with mirrored encoder and decoder components.
    """

    def __init__(self,
                 input_shape,
                 conv_filters,
                 conv_kernels,
                 conv_strides,
                 latent_space_dim):
        self.input_shape = input_shape
        self.conv_filters = conv_filters
        self.conv_kernels = conv_kernels 
        self.conv_strides = conv_strides 
        self.latent_space_dim = latent_space_dim 
        self.reconstruction_loss_weight = 1000000

        self.encoder = None
        self.decoder = None
        self.model = None

        self._num_conv_layers = len(conv_filters)
        self._shape_before_bottleneck = None
        self._model_input = None

        self._build()

    def summary(self):
        self.encoder.summary()
        self.decoder.summary()
        self.model.summary()

    def compile(self, learning_rate=0.0001):
        optimizer = Adam(learning_rate=learning_rate)
        self.model.compile(optimizer=optimizer,
                           loss=self._calculate_combined_loss,
                           metrics=[self._calculate_reconstruction_loss,
                                    self._calculate_kl_loss])

    def train(self, x_train, batch_size, num_epochs):
        self.model.fit(x_train,
                       x_train,
                       batch_size=batch_size,
                       epochs=num_epochs,
                       shuffle=True)

    def save(self, save_folder="."):
        self._create_folder_if_it_doesnt_exist(save_folder)
        self._save_parameters(save_folder)
        self._save_weights(save_folder)

    def reconstruct(self, images):
        latent_representations = self.encoder.predict(images)
        reconstructed_images = self.decoder.predict(latent_representations)
        return reconstructed_images, latent_representations

    def _save_parameters(self, save_folder):
        parameters = [
            self.input_shape,
            self.conv_filters,
            self.conv_kernels,
            self.conv_strides,
            self.latent_space_dim
        ]
        save_path = os.path.join(save_folder, "parameters.pkl")
        with open(save_path, "wb") as f:
            print(parameters)
            pickle.dump(parameters, f)

    def _save_weights(self, save_folder):
        save_path = os.path.join(save_folder, "weights.h5")
        self.model.save_weights(save_path)

    @classmethod
    def load(cls, save_folder="."):
        parameters_path = os.path.join(save_folder, "parameters.pkl")
        with open(parameters_path, "rb") as f:
            parameters = pickle.load(f)
        autoencoder = VAE(*parameters)
        weights_path = os.path.join(save_folder, "weights.h5")
        autoencoder.load_weights(weights_path)
        return autoencoder

    def load_weights(self, weights_path):
        self.model.load_weights(weights_path)

    def _calculate_combined_loss(self, y_target, y_predicted):
        reconstruction_loss = self._calculate_reconstruction_loss(y_target, y_predicted)
        kl_loss = self._calculate_kl_loss(y_target, y_predicted)
        combined_loss = self.reconstruction_loss_weight * reconstruction_loss\
                                                         + kl_loss
        return combined_loss

    def _calculate_reconstruction_loss(self, y_target, y_predicted):
        error = y_target - y_predicted
        reconstruction_loss = K.mean(K.square(error), axis=[1, 2, 3])
        return reconstruction_loss

    def _calculate_kl_loss(self, y_target, y_predicted):
        kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) -
                               K.exp(self.log_variance), axis=1)
        return kl_loss

    def _create_folder_if_it_doesnt_exist(self, folder):
        if not os.path.exists(folder):
            os.makedirs(folder)

    def _build(self):
        self._build_encoder()
        self._build_decoder()
        self._build_autoencoder()

    def _build_autoencoder(self):
        model_input = self._model_input
        model_output = self.decoder(self.encoder(model_input))
        self.model = Model(model_input, model_output, name="autoencoder")

    def _build_decoder(self):
        decoder_input = self._add_decoder_input()
        dense_layer = self._add_dense_layer(decoder_input)
        reshape_layer = self._add_reshape_layer(dense_layer)
        conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer)
        decoder_output = self._add_decoder_output(conv_transpose_layers)
        self.decoder = Model(decoder_input, decoder_output, name="decoder")

    def _add_decoder_input(self):
        return Input(shape=self.latent_space_dim, name="decoder_input")

    def _add_dense_layer(self, decoder_input):
        num_neurons = np.prod(self._shape_before_bottleneck) # [1, 2, 4] -> 8
        dense_layer = Dense(num_neurons, name="decoder_dense")(decoder_input)
        return dense_layer

    def _add_reshape_layer(self, dense_layer):
        return Reshape(self._shape_before_bottleneck)(dense_layer)

    def _add_conv_transpose_layers(self, x):
        """Add conv transpose blocks."""
        # loop through all the conv layers in reverse order and stop at the
        # first layer
        for layer_index in reversed(range(1, self._num_conv_layers)):
            x = self._add_conv_transpose_layer(layer_index, x)
        return x

    def _add_conv_transpose_layer(self, layer_index, x):
        layer_num = self._num_conv_layers - layer_index
        conv_transpose_layer = Conv2DTranspose(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"decoder_conv_transpose_layer_{layer_num}"
        )
        x = conv_transpose_layer(x)
        x = ReLU(name=f"decoder_relu_{layer_num}")(x)
        x = BatchNormalization(name=f"decoder_bn_{layer_num}")(x)
        return x

    def _add_decoder_output(self, x):
        conv_transpose_layer = Conv2DTranspose(
            filters=1,
            kernel_size=self.conv_kernels[0],
            strides=self.conv_strides[0],
            padding="same",
            name=f"decoder_conv_transpose_layer_{self._num_conv_layers}"
        )
        x = conv_transpose_layer(x)
        output_layer = Activation("sigmoid", name="sigmoid_layer")(x)
        return output_layer

    def _build_encoder(self):
        encoder_input = self._add_encoder_input()
        conv_layers = self._add_conv_layers(encoder_input)
        bottleneck = self._add_bottleneck(conv_layers)
        self._model_input = encoder_input
        self.encoder = Model(encoder_input, bottleneck, name="encoder")

    def _add_encoder_input(self):
        return Input(shape=self.input_shape, name="encoder_input")

    def _add_conv_layers(self, encoder_input):
        """Create all convolutional blocks in encoder."""
        x = encoder_input
        for layer_index in range(self._num_conv_layers):
            x = self._add_conv_layer(layer_index, x)
        return x

    def _add_conv_layer(self, layer_index, x):
        """Add a convolutional block to a graph of layers, consisting of
        conv 2d + ReLU + batch normalization.
        """
        layer_number = layer_index + 1
        conv_layer = Conv2D(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"encoder_conv_layer_{layer_number}"
        )
        x = conv_layer(x)
        x = ReLU(name=f"encoder_relu_{layer_number}")(x)
        x = BatchNormalization(name=f"encoder_bn_{layer_number}")(x)
        return x

    def _add_bottleneck(self, x):
        """Flatten data and add bottleneck with Guassian sampling (Dense
        layer).
        """
        self._shape_before_bottleneck = K.int_shape(x)[1:]
        x = Flatten()(x)
        self.mu = Dense(self.latent_space_dim, name="mu")(x)
        self.log_variance = Dense(self.latent_space_dim,
                                  name="log_variance")(x)

        def sample_point_from_normal_distribution(args):
            mu, log_variance = args
            epsilon = K.random_normal(shape=K.shape(self.mu), mean=0.,
                                      stddev=1.)
            sampled_point = mu + K.exp(log_variance / 2) * epsilon
            return sampled_point

        x = Lambda(sample_point_from_normal_distribution,
                   name="encoder_output")([self.mu, self.log_variance])
        return x


if __name__ == "__main__":
    autoencoder = VAE(
        input_shape=(512, 64, 1),
        conv_filters=(32, 64, 64, 64),
        conv_kernels=(3, 3, 3, 3),
        conv_strides=(1, 2, 2, 1),
        latent_space_dim=32
    )
    # autoencoder.summary()

2. Train VAE

In [25]:
# from autoencoder import VAE
import re
import os
import numpy as np

LEARNING_RATE = 0.0001
BATCH_SIZE = 20
EPOCHS = 100
LATENT_SPACE_DIM= 8 # try less than 10?

SPECTROGRAMS_PATH = "/content/drive/MyDrive/Music/VAE-test/spectrogram-01"
# FILE_NAME_REGEX = "^512.*"

def load_fsdd(spectrograms_path):
    dataset = {}
    for root, _, file_names in os.walk(spectrograms_path):
        for file_name in file_names:

            if True: # re.match(regex, file_name):
                file_path = os.path.join(root, file_name)
                spectrogram = np.load(file_path) # (n_bins, n_frames, 1) 
                dataset[file_name] = spectrogram[..., np.newaxis]
    return dataset

def train(x_train, learning_rate, batch_size, epochs):
    autoencoder = VAE(
        input_shape=(256, 64, 1),
        conv_filters=(32, 16, 16, 8), # 16, 8, 8, 4 -> 
        conv_kernels=(3, 3, 3, 3),
        conv_strides=(1, 2, 2, 1),
        latent_space_dim=LATENT_SPACE_DIM
    )
    autoencoder.summary()
    autoencoder.compile(learning_rate)
    autoencoder.train(x_train, batch_size, epochs)
    return autoencoder


dataset = load_fsdd(SPECTROGRAMS_PATH)

autoencoder = train(np.array(list(dataset.values())), LEARNING_RATE, BATCH_SIZE, EPOCHS)
model_suffix = "-" + str(LATENT_SPACE_DIM) + "-" + str(BATCH_SIZE) + "-" + str(EPOCHS)
autoencoder.save("/content/drive/MyDrive/Music/VAE-test/model" + model_suffix)
print("Model Saved!!")

Model: "encoder"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      [(None, 256, 64, 1)] 0                                            
__________________________________________________________________________________________________
encoder_conv_layer_1 (Conv2D)   (None, 256, 64, 32)  320         encoder_input[0][0]              
__________________________________________________________________________________________________
encoder_relu_1 (ReLU)           (None, 256, 64, 32)  0           encoder_conv_layer_1[0][0]       
__________________________________________________________________________________________________
encoder_bn_1 (BatchNormalizatio (None, 256, 64, 32)  128         encoder_relu_1[0][0]             
____________________________________________________________________________________________

In [26]:
# # https://github.com/musikalkemist/generating-sound-with-neural-networks/blob/49d7db32c43d1a04c596cbbb282a9521be1e7fc8/11%20Implementing%20VAE/code/analysis.py

import numpy as np
import matplotlib.pyplot as plt
import librosa
import librosa.display

LATENT_REPRESENTATIONS_PATH = '/content/drive/MyDrive/Music/VAE-test/model-' + str(LATENT_SPACE_DIM) + '-' + str(BATCH_SIZE) + '-' + str(EPOCHS) + '/'
MODEL_PATH = '/content/drive/MyDrive/Music/VAE-test/model-8-20-100'

def select_random_images(dataset, num_images=10):
    sample_keys = np.random.choice(list(dataset.keys()), num_images)
    sample_ds = { key: dataset[key] for key in sample_keys }

    return sample_ds


def plot_reconstructed_images(images, reconstructed_images):
    num_images = len(images)
    for i, (image, reconstructed_image) in enumerate(zip(images, reconstructed_images)):

        fig, ax = plt.subplots()
        image = image.squeeze()
        img = librosa.display.specshow(image, y_axis='log', x_axis='time', ax=ax)
        fig.colorbar(img, ax=ax, format="%+2.0f dB")

        fig, ax = plt.subplots()
        reconstructed_image = reconstructed_image.squeeze()
        recon_img = librosa.display.specshow(reconstructed_image, y_axis='log', x_axis='time', ax=ax)
        fig.colorbar(recon_img, ax=ax, format="%+2.0f dB")
    plt.show()


def plot_images_encoded_in_latent_space(latent_representations, sample_labels):
    plt.figure(figsize=(10, 10))
    plt.scatter(latent_representations[:, 0],
                latent_representations[:, 1],
                cmap="rainbow",
                c=sample_labels,
                alpha=0.5,
                s=2)
    plt.colorbar()
    plt.show()

def download_vectors(autoencoder, sample_ds, download_path):
    images = np.array(list(sample_ds.values()))
    filenames = sample_ds.keys()
    latent_representations = autoencoder.encoder.predict(images)

    with open(download_path + 'embeddings.tsv', 'w', newline='') as f_output:
        tsv_output = csv.writer(f_output, delimiter='\t')
        tsv_output.writerows(latent_representations)
    # Write corresdonding filenames
    with open(download_path + 'embedding-filenames.tsv', 'a') as f_output:
        f_output.seek(0)
        f_output.truncate()
        for data in filenames:
            f_output.write(data)
            f_output.write('\n')
    return latent_representations, filenames


autoencoder = VAE.load(MODEL_PATH)
dataset = load_fsdd(SPECTROGRAMS_PATH)

num_sample_images_to_show = 30

sample_ds = select_random_images(dataset, num_sample_images_to_show)
reconstructed_images, _ = autoencoder.reconstruct(np.array(list(sample_ds.values())))
download_vectors(autoencoder, sample_ds, LATENT_REPRESENTATIONS_PATH)

# plot_reconstructed_images(sample_images, reconstructed_images)

# num_images = 6000
# sample_images, sample_labels = select_images(x_test, y_test, num_images)
# _, latent_representations = autoencoder.reconstruct(sample_images)
# plot_images_encoded_in_latent_space(latent_representations, sample_labels)



(array([[ 1.9309763 ,  0.7092061 , -2.96149   , -2.3542452 ,  2.639731  ,
          1.6745735 ,  1.2930074 , -2.6118832 ],
        [ 5.0017257 ,  1.8820826 , -1.0214549 , -2.5994651 ,  4.3479624 ,
          4.225523  ,  6.6070724 , -5.3922234 ],
        [ 3.0887406 ,  1.2084757 , -2.8370576 , -3.0406609 ,  2.996209  ,
          2.1580844 ,  2.2900174 , -3.5587275 ],
        [ 4.402467  ,  1.6606225 , -2.5379202 , -3.3595417 ,  3.883297  ,
          3.3562305 ,  4.240099  , -4.7738056 ],
        [ 4.4007053 ,  1.6625434 , -1.8928865 , -2.8953092 ,  3.933433  ,
          3.6980517 ,  4.9839478 , -4.973613  ],
        [ 3.331373  ,  1.2960435 , -2.7855191 , -3.0281904 ,  3.3010225 ,
          2.4388483 ,  2.5970416 , -3.7551234 ],
        [ 5.113605  ,  2.145961  , -2.6577506 , -3.630461  ,  4.5887175 ,
          4.1634016 ,  5.4116945 , -5.602186  ],
        [ 4.111113  ,  1.7056317 , -1.5980251 , -2.5654817 ,  3.7988243 ,
          3.671735  ,  4.601552  , -4.5234203 ],
        [ 4.2746