In [2]:
import os
import pickle

import numpy as np
import soundfile as sf
import librosa
import tensorflow as tf

In [3]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from tensorflow.keras import backend as K
from tensorflow import keras


In [1]:
class SoundGen:
    def __init__(self, vae, hop_len):
        self.vae = vae
        self.hop_len = hop_len
        self._min_max_norm = MinMaxNorm(0, 1)

    def generate(self, mel, min_max_val):
        gen_mel, latent = self.vae.reconstruct(mel)
        wav = self.convert_mel2wav(gen_mel, min_max_val)
        return wav, latent
    
    def convert_mel2wav(self, mel, min_max_val):
        wavs = []
        for mel, min_max_val in zip(mel, min_max_val):
            # reshape the log-mel
            log_spect = mel[:, :, 0]
            # apply de-norm
            denorm_log_spect = self._min_max_norm.denormalize(
                log_spect, min_max_val['min'], min_max_val['max'])
            # log-spec to spec
            spect = librosa.db_to_amplitude(denorm_log_spect)
            # griffin-lim
            wav = librosa.istft(spect, hop_length=self.hop_len)
            # wav to wav
            wavs.append(wav)
        return wavs


In [4]:
input_shape=[28,28,1]
conv_filters=[2,4,8]
conv_kernels=[3,5,3]
conv_strides=[1,2,2]
latent_dim=2

In [11]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        # self.input_shape = input_shape
        # self.conv_filters = conv_filters
        # self.conv_kernels = conv_kernels
        # self.conv_strides = conv_strides
        # self.latent_dim = latent_dim
        self.reconst_loss_weight = 1000000

        self.encoder = encoder
        self.decoder = decoder

    def _calculate_combined_loss(self, y_traget, y_pred):
        reconst_loss = self._calculate_reconst_loss(y_traget, y_pred)
        kl_loss = self._calculate_kl_loss(y_traget, y_pred)
        combined_loss = self.reconst_loss_weight * reconst_loss + kl_loss
        return combined_loss

    def _calculate_recosnt_loss(self, y_target, y_pred):
        err = y_target - y_pred
        reconst_loss = K.mean(K.square(err), axis=[1,2,3])
        return reconst_loss
    
    def _calculate_kl_loss(self, y_traget, y_pred):
        kl_loss = -0.5 * K.sum(1 + self.log_var - K.square(self.mu) - K.exp(self.log_var), axis=1)
        return kl_loss

    
    def compile(self, lr=1e-6):
        optim = tf.keras.optimizers.Adam(learning_rate=lr)
        self.model.compile(optimizer=optim, loss=)

    


In [63]:
input_shape = [256,345,1]
latent_dim = 2

def sample_point_from_normal_distribution(args):
    mu, log_var = args
    epsilon = K.random_normal(shape=K.shape(mu), mean=0., stddev=1.)
    sample_point = mu + K.exp(log_var / 2) * epsilon
    return sample_point

def encoder():
    global shape_before_bottleneck
    en_input = Input(shape=input_shape)

    conv1 = Conv2D(filters=2, kernel_size=3, strides=1, padding='same')(en_input)
    relu1 = ReLU()(conv1)
    bnor1 = BatchNormalization()(relu1)

    conv2 = Conv2D(filters=4, kernel_size=5, strides=2, padding='same')(bnor1)
    relu2 = ReLU()(conv2)
    bnor2 = BatchNormalization()(relu2)
    
    conv3 = Conv2D(filters=8, kernel_size=3, strides=2, padding='same')(bnor2)
    relu3 = ReLU()(conv3)
    bnor3 = BatchNormalization()(relu3)

    shape_before_bottleneck = K.int_shape(bnor3)[1:]
    x = Flatten()(bnor3)

    mu = Dense(latent_dim)(x)
    log_var = Dense(latent_dim)(x)

    en_out = Lambda(sample_point_from_normal_distribution)([mu, log_var])

    return Model(en_input, [mu, log_var, en_out], name='encoder')





In [64]:
def decoder():
    
    dec_in = Input(shape=(latent_dim,))
    
    num_neurons = np.prod(shape_before_bottleneck)
    x = Dense(num_neurons)(dec_in)
    x = Reshape(shape_before_bottleneck)(x)

    conv3 = Conv2DTranspose(filters=8, kernel_size=3, strides=2, padding='same')(x)
    relu3 = ReLU()(conv3)
    bnor3 = BatchNormalization()(relu3)

    conv2 = Conv2DTranspose(filters=4, kernel_size=5, strides=2, padding='same')(bnor3)
    relu2 = ReLU()(conv2)
    bnor2 = BatchNormalization()(relu2)

    conv1 = Conv2DTranspose(filters=2, kernel_size=3, strides=1, padding='same')(bnor2)
    relu1 = ReLU()(conv1)
    bnor1 = BatchNormalization()(relu1)

    out = Conv2DTranspose(filters=1, kernel_size=3, strides=1, padding='same')(bnor1)
    dec_out = Activation('sigmoid')(out)

    return Model(dec_in, dec_out, name='decoder')

In [65]:
class own_VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(own_VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name='total_loss')
        self.reconst_loss_tracker = keras.metrics.Mean(name='reconst_loss')
        self.kl_loss_tracker = keras.metrics.Mean(name='kl_loss')

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconst_loss_tracker,
            self.kl_loss_tracker
        ]
    
    def train_stpe(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconst = self.decoder
            reconst_loss = tf.reduce_mean(tf.reduce_sum(keras.losses.binary_crossentropy(data, reconst), axis=(1,2)))
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconst_loss + kl_loss

        grad = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grad, self.trainalble_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconst_loss_tracker.update_state(reconst_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            'loss' : self.total_loss_tracker.result(),
            'reconst_loss' : self.reconst_loss_tracker.result(),
            'kl_loss' : self.kl_loss_tracker.result(),
        }


In [66]:
def load_fsdd(spectrograms_path):
    x_train = []
    file_paths = []
    for root, _, file_names in os.walk(spectrograms_path):
        for file_name in file_names:
            file_path = os.path.join(root, file_name)
            spectrogram = np.load(file_path) # (n_bins, n_frames, 1)
            x_train.append(spectrogram)
            file_paths.append(file_path)
    x_train = np.array(x_train)
    x_train = x_train[..., np.newaxis] # -> (3000, 256, 64, 1)
    return x_train, file_paths

In [67]:
def select_spectrograms(spectrograms,
                        file_paths,
                        min_max_values,
                        num_spectrograms=2):
    sampled_indexes = np.random.choice(range(len(spectrograms)), num_spectrograms)
    sampled_spectrogrmas = spectrograms[sampled_indexes]
    file_paths = [file_paths[index] for index in sampled_indexes]
    sampled_min_max_values = [min_max_values[file_path] for file_path in
                           file_paths]
    print(file_paths)
    print(sampled_min_max_values)
    return sampled_spectrogrmas, sampled_min_max_values

In [51]:
def save_signals(signals, save_dir, sample_rate=22050):
    for i, signal in enumerate(signals):
        save_path = os.path.join(save_dir, str(i) + ".wav")
        sf.write(save_path, signal, sample_rate)

In [52]:
imsi = []
SR = 22050

for i in imsi:
    wav = librosa.load(i, sr=SR, mono=True)[0]
    stft = librosa.stfr(wav)[:-1]
    spec = np.abs(stft)
    log_spec = librosa.amplitude_to_db(spec)
    norm = (log_spec - log_spec.min()) / (log_spec.max() - log_spec.min())
    



---

In [53]:
import os
import pickle

import librosa
import numpy as np


class Loader:
    """Loader is responsible for loading an audio file."""

    def __init__(self, sample_rate, duration, mono):
        self.sample_rate = sample_rate
        self.duration = duration
        self.mono = mono

    def load(self, file_path):
        signal = librosa.load(file_path,
                              sr=self.sample_rate,
                              duration=self.duration,
                              mono=self.mono)[0]
        return signal


class Padder:
    """Padder is responsible to apply padding to an array."""

    def __init__(self, mode="constant"):
        self.mode = mode

    def left_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (num_missing_items, 0),
                              mode=self.mode)
        return padded_array

    def right_pad(self, array, num_missing_items):
        padded_array = np.pad(array,
                              (0, num_missing_items),
                              mode=self.mode)
        return padded_array


class LogSpectrogramExtractor:
    """LogSpectrogramExtractor extracts log spectrograms (in dB) from a
    time-series signal.
    """

    def __init__(self, frame_size, hop_length):
        self.frame_size = frame_size
        self.hop_length = hop_length

    def extract(self, signal):
        stft = librosa.stft(signal,
                            n_fft=self.frame_size,
                            hop_length=self.hop_length)[:-1]
        spectrogram = np.abs(stft)
        log_spectrogram = librosa.amplitude_to_db(spectrogram)
        return log_spectrogram


class MinMaxNormaliser:
    """MinMaxNormaliser applies min max normalisation to an array."""

    def __init__(self, min_val, max_val):
        self.min = min_val
        self.max = max_val

    def normalise(self, array):
        norm_array = (array - array.min()) / (array.max() - array.min())
        norm_array = norm_array * (self.max - self.min) + self.min
        return norm_array

    def denormalise(self, norm_array, original_min, original_max):
        array = (norm_array - self.min) / (self.max - self.min)
        array = array * (original_max - original_min) + original_min
        return array


class Saver:
    """saver is responsible to save features, and the min max values."""

    def __init__(self, feature_save_dir, min_max_values_save_dir):
        self.feature_save_dir = feature_save_dir
        self.min_max_values_save_dir = min_max_values_save_dir

    def save_feature(self, feature, file_path):
        save_path = self._generate_save_path(file_path)
        np.save(save_path, feature)
        return save_path

    def save_min_max_values(self, min_max_values):
        save_path = os.path.join(self.min_max_values_save_dir,
                                 "min_max_values.pkl")
        self._save(min_max_values, save_path)

    @staticmethod
    def _save(data, save_path):
        with open(save_path, "wb") as f:
            pickle.dump(data, f)

    def _generate_save_path(self, file_path):
        file_name = os.path.split(file_path)[1]
        save_path = os.path.join(self.feature_save_dir, file_name + ".npy")
        return save_path


class PreprocessingPipeline:
    """PreprocessingPipeline processes audio files in a directory, applying
    the following steps to each file:
        1- load a file
        2- pad the signal (if necessary)
        3- extracting log spectrogram from signal
        4- normalise spectrogram
        5- save the normalised spectrogram

    Storing the min max values for all the log spectrograms.
    """

    def __init__(self):
        self.padder = None
        self.extractor = None
        self.normaliser = None
        self.saver = None
        self.min_max_values = {}
        self._loader = None
        self._num_expected_samples = None

    @property
    def loader(self):
        return self._loader

    @loader.setter
    def loader(self, loader):
        self._loader = loader
        self._num_expected_samples = int(loader.sample_rate * loader.duration)

    def process(self, audio_files_dir):
        for root, _, files in os.walk(audio_files_dir):
            files = [f for f in files if not f[0] == '.']
            for file in files:
                file_path = os.path.join(root, file)
                self._process_file(file_path)
                print(f"Processed file {file_path}")
        self.saver.save_min_max_values(self.min_max_values)

    def _process_file(self, file_path):
        signal = self.loader.load(file_path)
        if self._is_padding_necessary(signal):
            signal = self._apply_padding(signal)
        feature = self.extractor.extract(signal)
        norm_feature = self.normaliser.normalise(feature)
        save_path = self.saver.save_feature(norm_feature, file_path)
        self._store_min_max_value(save_path, feature.min(), feature.max())

    def _is_padding_necessary(self, signal):
        if len(signal) < self._num_expected_samples:
            return True
        return False

    def _apply_padding(self, signal):
        num_missing_samples = self._num_expected_samples - len(signal)
        padded_signal = self.padder.right_pad(signal, num_missing_samples)
        return padded_signal

    def _store_min_max_value(self, save_path, min_val, max_val):
        self.min_max_values[save_path] = {
            "min": min_val,
            "max": max_val
        }


In [55]:
FRAME_SIZE = 512
HOP_LENGTH = 256
DURATION = 4  # in seconds
SAMPLE_RATE = 22050
MONO = True

SPECTROGRAMS_SAVE_DIR = "./datasets/fsdd/spectrograms/"
MIN_MAX_VALUES_SAVE_DIR = "./datasets/fsdd/"
FILES_DIR = "./LOOP/"

# instantiate all objects
loader = Loader(SAMPLE_RATE, DURATION, MONO)
padder = Padder()
log_spectrogram_extractor = LogSpectrogramExtractor(FRAME_SIZE, HOP_LENGTH)
min_max_normaliser = MinMaxNormaliser(0, 1)
saver = Saver(SPECTROGRAMS_SAVE_DIR, MIN_MAX_VALUES_SAVE_DIR)

preprocessing_pipeline = PreprocessingPipeline()
preprocessing_pipeline.loader = loader
preprocessing_pipeline.padder = padder
preprocessing_pipeline.extractor = log_spectrogram_extractor
preprocessing_pipeline.normaliser = min_max_normaliser
preprocessing_pipeline.saver = saver

preprocessing_pipeline.process(FILES_DIR)

Processed file ./LOOP/Track(74).wav
Processed file ./LOOP/dlvbk(95).wav
Processed file ./LOOP/olbk(17).wav
Processed file ./LOOP/ovbb(72).wav
Processed file ./LOOP/OVB80A.wav
Processed file ./LOOP/Track(23).wav
Processed file ./LOOP/dlvbk(138).wav
Processed file ./LOOP/ovbb(25).wav
Processed file ./LOOP/DLVBK021.wav
Processed file ./LOOP/02.wav
Processed file ./LOOP/DLVBK035.wav
Processed file ./LOOP/OLBK091.wav
Processed file ./LOOP/DLVBK009.wav
Processed file ./LOOP/Track(35).wav
Processed file ./LOOP/OLBK085.wav
Processed file ./LOOP/OLBK003A.wav
Processed file ./LOOP/OLBK052.wav
Processed file ./LOOP/ovbb(33).wav
Processed file ./LOOP/OLBK046.wav
Processed file ./LOOP/Track(62).wav
Processed file ./LOOP/dlvbk(83).wav
Processed file ./LOOP/ovbb(64).wav
Processed file ./LOOP/Track(19).wav
Processed file ./LOOP/dlvbk(102).wav
Processed file ./LOOP/dlvbk(17).wav
Processed file ./LOOP/OLBK047.wav
Processed file ./LOOP/OLBK053.wav
Processed file ./LOOP/OLBK084.wav
Processed file ./LOOP/d

In [56]:
spectrograms_path = './datasets/fsdd/spectrograms/'
x_train = []
for root, _, file_names in os.walk(spectrograms_path):
    for file_name in file_names:
        file_path = os.path.join(root, file_name)
        spectrogram = np.load(file_path) # (n_bins, n_frames, 1)
        x_train.append(spectrogram)
x_train = np.array(x_train)
x_train = x_train[..., np.newaxis] # -> (3000, 256, 64, 1)

In [57]:
x_train.shape

(604, 256, 345, 1)

In [68]:
encoder = encoder()

In [69]:
decoder = decoder()

In [70]:
vae = own_VAE(encoder=encoder, decoder=decoder)

In [71]:
vae.compile(optimizer=tf.keras.optimizers.Adam())

In [72]:
vae.fit(x_train, epochs=100, batch_size=16)

Epoch 1/100


NotImplementedError: in user code:

    File "/Users/cooky/miniforge3/lib/python3.9/site-packages/keras/engine/training.py", line 1051, in train_function  *
        return step_function(self, iterator)
    File "/Users/cooky/miniforge3/lib/python3.9/site-packages/keras/engine/training.py", line 1040, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/Users/cooky/miniforge3/lib/python3.9/site-packages/keras/engine/training.py", line 1030, in run_step  **
        outputs = model.train_step(data)
    File "/Users/cooky/miniforge3/lib/python3.9/site-packages/keras/engine/training.py", line 889, in train_step
        y_pred = self(x, training=True)
    File "/Users/cooky/miniforge3/lib/python3.9/site-packages/keras/utils/traceback_utils.py", line 67, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/Users/cooky/miniforge3/lib/python3.9/site-packages/keras/engine/training.py", line 517, in call
        raise NotImplementedError('Unimplemented `tf.keras.Model.call()`: if you '

    NotImplementedError: Exception encountered when calling layer "own_vae_2" (type own_VAE).
    
    Unimplemented `tf.keras.Model.call()`: if you intend to create a `Model` with the Functional API, please provide `inputs` and `outputs` arguments. Otherwise, subclass `Model` with an overridden `call()` method.
    
    Call arguments received by layer "own_vae_2" (type own_VAE):
      • inputs=tf.Tensor(shape=(None, 256, 345, 1), dtype=float32)
      • training=True
      • mask=None
