In [3]:
import os
import numpy as np
from datasets import Dataset, DatasetDict, load_from_disk
import librosa as lib
import random as rd
import pickle

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, \
    Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError

  from .autonotebook import tqdm as notebook_tqdm


# Useful functions

In [47]:
def load_from_file(file: str):
    
    """
    Import data from file.

    Parameters:
    --------------------------
    file : str
        The .wave file you want to import

    Outputs:
    --------------------------
    sig : ndarray
        Array containing signal amplitude value
    sr : int
        Value of the sampling rate
    t : int
        5*sr to generate signal of lenght 5s 
    """

    sig, sr = lib.load(file)
    t = 5*sr
    return sig, sr, t

In [48]:
def slices(sig, t: int):
    """
    Make audio slices.

    Parameters:
    --------------------------
    sig : ndarray
        Array containing signal amplitude value
    t : int
        Multiple of the sampling rate to have a specific time length

    Outputs:
    --------------------------
    li : list[ndarray]
        List of different slices
    """


    li = []
    for i in range(0,len(sig)-t,t):
        li.append(np.asarray(sig[i:i+t]))
    return li

In [49]:
def pitch_mod(data, sampling_rate:int, pitch_factor:int):

    """
    Modulate the signal amplitude (for data augmentation purposes)

    Parameters:
    --------------------------
    data: ndarray
        Array containing signal amplitude value
    sampling_rate : int
        Sampling rate of the signal
    pitch_factor : int
        Number of semitones used for modulation
    
    Outputs:
    --------------------------
    pitch_shifted_data : ndarray
        Array containing modulated signal amplitude value
    """

    return lib.effects.pitch_shift(y=data, bins_per_octave=12, sr=sampling_rate, n_steps=pitch_factor)

In [50]:
def noising(data,noise_factor:float):

    """
    Add noise to the signal (for data augmentation purposes)

    Parameters:
    --------------------------
    data: ndarray
        List containing signal amplitude value
    noise_factor : float
        Hyperparameter used to specify the importance of the noise
    
    Outputs:
    --------------------------
    noised_data : ndarray
        Array containing noisy signal amplitude value
    """

    noise = np.random.randn(len(data))
    noisy_data = data + noise_factor * noise
    # Cast back to same data type
    noisy_data = noisy_data.astype(type(data[0]))
    return np.asarray(noisy_data)

In [12]:
def normalise(array):
    """
    Normalize the array value between 0 and 1

    Parameters:
    --------------------------
    array : ndarray
        2D array (typically spectrogram) we want to normalise

    Outputs:
    --------------------------
    norm_array : ndarray
        Normalised 2D array
    """

    norm_array = (array - array.min()) / (array.max() - array.min())
    return norm_array

def store_min_max(save_path:str,array):
    """
    Store min_max values of array

    Parameters:
    --------------------------
    save_path: str
        Path of the file to store data
    array : ndarray
        Array we want to store the important values

    Outputs:
    --------------------------
    None
    """
    
    min_max_values={}
    min_val = array.min()
    max_val = array.max()
    min_max_values[save_path] = {
            "min": min_val,
            "max": max_val
        }

def denormalise(norm_array, original_min, original_max):
    """
    Denormalise an array

    Parameters:
    --------------------------
    norm_array : ndarray
        2D normalised array
    original_min: float
        Min value of the original array
    original_max : float
        Max value of the original array

    Outputs:
    --------------------------
    array : ndarray
        Denormalised 2D array
    """

    array = norm_array * (original_max - original_min) + original_min
    return array

In [51]:
def gen_spectrogramm(li: list, sr: int):

    """
    Generate mel-spectrograms from slices.

    Parameters:
    --------------------------
    li: list[ndarray]
        List containing slices
    sr : int
        Sampling rate
    
    Outputs:
    --------------------------
    spec : list[ndarray]
        List of generated spectrograms
    """    

    hl = 512 # number of samples per time-step in spectrogram
    hi = 216 # Height of image
    wi = 384 # Width of image

    spec = []
    for el in li:
        S = lib.feature.melspectrogram(y=el, sr=sr, n_mels=hi,hop_length=hl)
        spec.append(normalise(S))
        store_min_max("path",S)

    return spec

# Complete pipeline

In [52]:
def preprocess_pipeline(list_of_wavefile:list)->list:
    
    list_of_signals = []
    for e in list_of_wavefile:
        (s, sr, t) = load_from_file(e)
        list_of_signals.extend(s)
    
    array_of_signals = np.asarray(list_of_signals)
    list_of_slices = slices(array_of_signals,t)
    list_of_spectrograms = gen_spectrogramm(list_of_slices,sr)

    array_of_noisy_signals = noising(list_of_signals,0.1)
    list_of_noisy_slices = slices(array_of_noisy_signals,t)
    list_of_noisy_spectrograms = gen_spectrogramm(list_of_noisy_slices,sr)

    array_of_pitched_signals = pitch_mod(array_of_signals,sr,3)
    list_of_pitched_slices = slices(array_of_pitched_signals,t)
    list_of_pitched_spectrograms = gen_spectrogramm(list_of_pitched_slices,sr)

    spec = []
    spec.extend(list_of_spectrograms)
    spec.extend(list_of_noisy_spectrograms)
    spec.extend(list_of_pitched_spectrograms)
    rd.shuffle(spec)

    return spec

In [53]:
spec = preprocess_pipeline(["Grego_chant.wav","Capella_greg.wav","Mass_grego.wav"])

In [54]:
output = []
for e in spec:
    output.append(np.reshape(e,(-1,)))

# Dataset generation

In [55]:
buf_dict = {"X": output}

In [56]:
train_dataset = Dataset.from_dict(buf_dict)
dataset = {
    'train': train_dataset,
}
dataset = DatasetDict(dataset)
path = 'data/hugging_face_dataset/'
os.makedirs(path, exist_ok=True)
dataset.save_to_disk(path)

Saving the dataset (3/3 shards): 100%|██████████| 6393/6393 [00:01<00:00, 4381.42 examples/s]


In [57]:
dataset_dict = DatasetDict.load_from_disk(path)
print(dataset_dict)
dataset_dict['train'].features

DatasetDict({
    train: Dataset({
        features: ['X'],
        num_rows: 6393
    })
})


{'X': Sequence(feature=Value(dtype='float32', id=None), length=-1, id=None)}

In [None]:
tf.compat.v1.disable_eager_execution()




In [None]:
LEARNING_RATE = 1e-5
BATCH_SIZE = 64
EPOCHS = 20

def load_fsdd(spectrograms_path):
    x_train = []
    for root, _, file_names in os.walk(spectrograms_path):
        for file_name in file_names:
            file_path = os.path.join(root, file_name)
            spectrogram = np.load(file_path) # (n_bins, n_frames, 1)
            x_train.append(spectrogram)
    x_train = np.array(x_train)
    x_train = x_train[..., np.newaxis] # -> (3000, 256, 259, 1)
    return x_train


def train(x_train, learning_rate, batch_size, epochs):
    autoencoder = VAE(
        input_shape=(256, 256, 1),
        conv_filters=(512, 256, 128, 64, 32),
        conv_kernels=(3, 3, 3, 3, 3),
        conv_strides=(2, 2, 2, 2, (2, 1)),
        latent_space_dim=128
    )
    autoencoder.summary()
    autoencoder.compile(learning_rate)
    autoencoder.train(x_train, batch_size, epochs)
    return autoencoder

In [None]:
x_train = load_fsdd(SPECTROGRAMS_SAVE_DIR)
autoencoder = train(x_train, LEARNING_RATE, BATCH_SIZE, EPOCHS)
autoencoder.save("model_PRS_20_epochs")