<a href="https://colab.research.google.com/github/Jarvis1000x/Variational_Autoencoder_for_Sound_Generation/blob/main/Sound_Generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, \
    Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
import numpy as np
import os
import pickle
import tensorflow as tf
import os
import pickle
import numpy as np
import soundfile as sf
import librosa

import sys
sys.path.append('/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE')

In [2]:
tf.compat.v1.disable_eager_execution()

In [3]:
HOP_LENGTH = 256
SAVE_DIR_ORIGNAL = "/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/sample/original"
SAVE_DIR_GENERATED = "/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/sample/generated"
MIN_MAX_VALUES_PATH = "/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/data/min_max_values.pkl"
SPECTROGRAM_PATH = "/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/data/spectrograms"

In [4]:
class MinMaxNormaliser:
    # MinMaxNormaliser applies min max normalisation to an array

    def __init__(self, min_val, max_val):
        self.min = min_val
        self.max = max_val

    def normalise(self, array):
        norm_array = (array - array.min()) / (array.max() - array.min())
        norm_array = norm_array * (self.max - self.min) + self.min
        return norm_array

    def denormalise(self, norm_array, original_min, original_max):
        array = (norm_array - self.min) / (self.max - self.min)
        array = array * (original_max - original_min) + original_min
        return array


In [5]:
class SoundGenerator:
    # Sound generator is responsible for generating audio from spectrogram

    def __init__(self, vae, hop_length):
        self.vae = vae
        self.hop_length = hop_length
        self._min_max_normaliser = MinMaxNormaliser(0, 1)

    def generate(self, spectrograms, min_max_values):
        generated_spectrograms, latent_representations = self.vae.reconstruct(spectrograms)
        signals = self.convert_spectrograms_to_audio(generated_spectrograms, min_max_values)
        return signals, latent_representations

    def convert_spectrograms_to_audio(self, spectrograms, min_max_values):
        signals = []
        for spectrogram, min_max_value in zip(spectrograms, min_max_values):
            # 1 - reshape the log spectrogram
            log_spectrogram = spectrogram[:, :, 0]

            # 2 - apply denormalisation
            denorm_log_spec = self._min_max_normaliser.denormalise(
                log_spectrogram, min_max_value["min"], min_max_value["max"])

            # 3 - log spectrogram -> spectrogram
            spec = librosa.db_to_amplitude(denorm_log_spec)

            # 4 - apply griffin-lim algo
            signal = librosa.istft(spec, hop_length=self.hop_length)

            # 5 - append signal to signals
            signals.append(signal)

        return signals


In [6]:
class VAE:
    """
    VAE represents a Deep Convolutional Variational autoencoder
    architecture with a mirrored encoder and decoder component.
    """

    def __init__(self,
                 input_shape,
                 conv_filters,
                 conv_kernels,
                 conv_strides,
                 latent_space_dim):
        self.input_shape = input_shape # [28, 28, 1]
        self.conv_filters = conv_filters # [2, 4, 8]
        self.conv_kernels = conv_kernels # [3, 5, 3]
        self.conv_strides = conv_strides # [1, 2, 2]
        self.latent_space_dim = latent_space_dim # 2
        self.reconstruction_loss_weight = 1000000

        self.encoder = None
        self.decoder = None
        self.model = None

        self._num_conv_layers = len(conv_strides)
        self._shape_before_bottleneck = None
        self._model_input = None

        self._build()

    def summary(self):
        self.encoder.summary()
        self.decoder.summary()
        self.model.summary()

    def compile(self, learning_rate=0.0001):
        optimizer = Adam(learning_rate=learning_rate)
        self.model.compile(optimizer=optimizer,
                           loss=self._calculate_combined_loss)

    def train(self, x_train, batch_size, num_epochs):
        self.model.fit(x_train,
                       x_train,
                       batch_size=batch_size,
                       epochs=num_epochs,
                       shuffle=True)

    def save(self, save_folder="."):
        self._create_folder(save_folder)
        self._save_parameters(save_folder)
        self._save_weights(save_folder)

    def _create_folder(self, folder):
        if not os.path.exists(folder):
            os.makedirs(folder)

    def _save_parameters(self, save_folder):
        parameters = [
            self.input_shape,
            self.conv_filters,
            self.conv_kernels,
            self.conv_strides,
            self.latent_space_dim
        ]
        save_path = os.path.join(save_folder, "parameters.pkl")
        with open(save_path, "wb") as f:
            pickle.dump(parameters, f)

    def _save_weights(self, save_folder):
        save_path = os.path.join(save_folder, "weights.h5")
        self.model.save_weights(save_path)

    def load_weights(self, weights_path):
        self.model.load_weights(weights_path)

    def reconstruct(self, images):
        latent_representations = self.encoder.predict(images)
        reconstructed_images = self.decoder.predict(latent_representations)
        return reconstructed_images, latent_representations

    @classmethod
    def load(cls, save_folder="."):
        parameters_path = "/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/model/parameters.pkl"
        with open(parameters_path, "rb") as f:
            parameters = pickle.load(f)
        autoencoder = VAE(*parameters)
        weights_path = "/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/model/weights.h5"
        autoencoder.load_weights(weights_path)
        return autoencoder

    def _calculate_combined_loss(self, y_target, y_predicted):
        reconstruction_loss = self._calculate_reconstruction_loss(y_target, y_predicted)
        kl_loss = self._calculate_kl_loss(y_target, y_predicted)
        combined_loss = self.reconstruction_loss_weight * reconstruction_loss + kl_loss
        return combined_loss

    def _calculate_reconstruction_loss(self, y_target, y_predicted):
        error = y_target - y_predicted
        reconstruction_loss = K.mean(K.square(error), axis=[1, 2, 3])
        return reconstruction_loss

    def _calculate_kl_loss(self, y_target, y_predicted):
        kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) -
                               K.exp(self.log_variance), axis=1)
        return kl_loss

    def _build(self):
        self._build_encoder()
        self._build_decoder()
        self._build_autoencoder()

    def _build_autoencoder(self):
        model_input = self._model_input
        model_output = self.decoder(self.encoder(model_input))
        self.model = Model(model_input, model_output, name="autoencoder")

    def _build_decoder(self):
        decoder_input = self._add_decoder_input()
        dense_layer = self._add_dense_layer(decoder_input)
        reshape_layer = self._add_reshape_layer(dense_layer)
        conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer)
        decoder_output = self._add_decoder_output(conv_transpose_layers)
        self.decoder = Model(decoder_input, decoder_output, name="decoder")

    def _add_decoder_input(self):
        return Input(shape=self.latent_space_dim, name="decoder_input")

    def _add_dense_layer(self, decoder_input):
        num_neurons = np.prod(self._shape_before_bottleneck)
        dense_layer = Dense(num_neurons, name="decoder_dense")(decoder_input)
        return dense_layer

    def _add_reshape_layer(self, dense_layer):
        return Reshape(self._shape_before_bottleneck)(dense_layer)

    def _add_conv_transpose_layers(self, x):
        """Add conv transpose blocks
        loop through all the conv layers in reverse order and
        stop at the first layer"""
        for layer_index in reversed(range(1, self._num_conv_layers)):
            x = self._add_conv_transpose_layer(layer_index, x)
        return x

    def _add_conv_transpose_layer(self, layer_index, x):
        layer_num = self._num_conv_layers - layer_index
        conv_transpose_layer = Conv2DTranspose(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"decoder_conv_transpose_layer_{layer_num}"
        )
        x = conv_transpose_layer(x)
        x = ReLU(name=f"decoder_relu_{layer_num}")(x)
        x = BatchNormalization(name=f"decoder_bn_{layer_num}")(x)
        return x

    def _add_decoder_output(self, x):
        conv_transpose_layer = Conv2DTranspose(
            filters=1,
            kernel_size=self.conv_kernels[0],
            strides=self.conv_strides[0],
            padding="same",
            name=f"decoder_conv_transpose_layer_{self._num_conv_layers}"
        )
        x = conv_transpose_layer(x)
        output_layer = Activation("sigmoid", name="sigmoid_layer")(x)
        return output_layer

    def _build_encoder(self):
        encoder_input = self._add_encoder_input()
        conv_layers = self._add_conv_layers(encoder_input)
        bottleneck = self._add_bottleneck(conv_layers)
        self._model_input = encoder_input
        self.encoder = Model(encoder_input, bottleneck, name="encoder")

    def _add_encoder_input(self):
        return Input(shape=self.input_shape, name="encoder_input")

    def _add_conv_layers(self, encoder_input):
        # Creates all convolutional blocks in encoder.
        x = encoder_input
        for layer_index in range(self._num_conv_layers):
            x = self._add_conv_layer(layer_index, x)
        return x

    def _add_conv_layer(self, layer_index, x):
        """
        Adds a convolutional block to a group to a graph of layers,
        consisting of conv 2D + ReLU + batch Normalisation
        """
        layer_number = layer_index + 1
        conv_layer = Conv2D(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"encoder_conv_layer_{layer_number}"
        )
        x = conv_layer(x)
        x = ReLU(name=f"encoder_relu_{layer_number}")(x)
        x = BatchNormalization(name=f"encoder_bn_{layer_number}")(x)
        return x

    def _add_bottleneck(self, x):
        # Flatten data and add bottleneck with Guassian sampling(Dense layer).
        self._shape_before_bottleneck = K.int_shape(x)[1:]
        x = Flatten()(x)
        self.mu = Dense(self.latent_space_dim, name="mu")(x)
        self.log_variance = Dense(self.latent_space_dim,
                                  name="log_variance")(x)
        # x = Dense(self.latent_space_dim, name="encoder_output")(x)

        def sample_point_from_normal_distribution(args):
            mu, log_variance = args
            epsilon = K.random_normal(shape=K.shape(self.mu), mean=0., stddev=1.)
            sampled_point = mu + K.exp(log_variance/2)*epsilon
            return sampled_point
            
        x = Lambda(sample_point_from_normal_distribution, name="encoder_output")([self.mu, self.log_variance])
        return x


In [7]:
def load_fsdd(spectrogram_path):
    x_train = []
    file_paths = []
    for root, _, file_names in os.walk(spectrogram_path):
        for file_name in file_names:
            file_path = os.path.join(root, file_name)
            spectrogram = np.load(file_path, allow_pickle=True)
            x_train.append(spectrogram)
            file_paths.append(file_path)
    x_train = np.array(x_train)
    x_train = x_train[..., np.newaxis]
    return x_train, file_paths


In [8]:
def select_spectrograms(spectrograms,
                        file_paths,
                        min__max_values,
                        num_spectrograms=2):
    #sampled_indexes = np.random.choice(range(len(spectrograms)), num_spectrograms)

    sampled_indexes = np.random.choice(range(len(spectrograms)), num_spectrograms)
    sampled_spectrograms = spectrograms[sampled_indexes]

    #sampled_spectrograms = spectrograms[sampled_indexes]
    file_paths = [file_paths[index] for index in sampled_indexes]
    sampled_min_max_values = [min__max_values[file_path] for file_path in file_paths]
    print(file_paths)
    print(sampled_min_max_values)
    return sampled_spectrograms, sampled_min_max_values


In [9]:
def save_signal(signals, save_dir, sample_rate=22050):
    for i, signal in enumerate(signals):
        save_path = os.path.join(save_dir, str(i) + ".wav")
        sf.write(save_path, signal, sample_rate)


In [13]:
# initialise sound generator
vae = VAE.load("model")
sound_generator = SoundGenerator(vae, HOP_LENGTH)

# load spectrogram + min max values
with open(MIN_MAX_VALUES_PATH, "rb") as f:
    min_max_values = pickle.load(f)

specs, file_paths = load_fsdd(SPECTROGRAM_PATH)

# sample spectrogram + min max values
sampled_specs, sampled_min_max_values = select_spectrograms(specs,
                                                            file_paths,
                                                            min_max_values,
                                                            5)

# generate audio from sampled spectrogram
signals, _ = sound_generator.generate(sampled_specs, sampled_min_max_values)

# convert spectrogram samples to audio
original_signals = sound_generator.convert_spectrograms_to_audio(
    sampled_specs, sampled_min_max_values
)

# save audio signals
save_signal(signals, SAVE_DIR_GENERATED)
save_signal(original_signals, SAVE_DIR_ORIGNAL)

['/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/data/spectrograms/4_george_29.wav.npy', '/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/data/spectrograms/1_lucas_45.wav.npy', '/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/data/spectrograms/3_george_19.wav.npy', '/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/data/spectrograms/0_yweweler_29.wav.npy', '/content/drive/MyDrive/Colab Notebooks/Sound Generation using VAE/data/spectrograms/0_theo_20.wav.npy']
[{'min': -51.683426, 'max': 28.316572}, {'min': -55.34108, 'max': 24.658918}, {'min': -58.328014, 'max': 21.671986}, {'min': -68.99773, 'max': 11.002273}, {'min': -81.03109, 'max': -1.0310881}]


