In [1]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [2]:
import numpy as np
import librosa
import keras
import tensorflow as tf
import pandas as pd
import yaml, os
from pathlib import Path

In [3]:
def configure_for_performance(ds, batch_size, mode):
    ds = ds.cache()
    if mode == 'train':
        ds = ds.shuffle(buffer_size=1000)
    ds = ds.batch(batch_size)
    ds = ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    return ds

class training_DataLoader():
    def __init__(self, path_to_slakh, instr_class, batch_size = 8) -> None:
        self.path_to_slakh = Path(path_to_slakh)
        self.instr_class = instr_class
        self.batch_size = batch_size
        self.metadata_df = self.get_metadata()

    def get_metadata(self):
        metadata_df = pd.DataFrame(columns = ['mix_path'])
        for i,path in enumerate(self.path_to_slakh.glob('**/metadata.yaml')):
            root = path.parent
            mix_path = root / 'mix.wav'
            metadata_df.loc[i, 'mix_path'] = mix_path
            metadata_path = root / 'metadata.yaml'
            with open(metadata_path, 'r') as file:
                metadata = yaml.safe_load(file)
            j=0
            for key in metadata['stems'].keys():
                if metadata['stems'][key]['inst_class'] == self.instr_class:
                    metadata_df.loc[i, f'instr_path_{j}'] = root / f'stems/{key}.wav'
                    j+=1

        return metadata_df

    def get_data(self, indexes, mode):
        first = True
        for index, row in self.metadata_df.iloc[indexes].iterrows():
            mix_path, instr_path = row['mix_path'], row['instr_path_0']

            y_mix, sr = librosa.load(mix_path)
            mean, stddev = np.mean(y_mix), np.std(y_mix)
            y_mix = (y_mix-mean)/stddev

            y_instr, sr = librosa.load(instr_path)
            mean, stddev = np.mean(y_instr), np.std(y_instr)
            y_instr = (y_instr-mean)/stddev

            if first:
                X_slices = tf.signal.frame(y_mix,
                                          2**15,
                                          2**13,
                                          pad_end=True,
                                          pad_value=0,
                                          axis=-1)
                y_slices = tf.signal.frame(y_instr,
                                          2**15,
                                          2**13,
                                          pad_end=True,
                                          pad_value=0,
                                          axis=-1)

                first = False

            else:
                frames_mix = tf.signal.frame(y_mix,
                                            2**15,
                                            2**14,
                                            pad_end=True,
                                            pad_value=0,
                                            axis=-1)

                X_slices = np.concatenate([X_slices,frames_mix], axis = 0)

                frames_instr = tf.signal.frame(y_instr,
                                              2**15,
                                              2**14,
                                              pad_end=True,
                                              pad_value=0,
                                              axis=-1)

                y_slices = np.concatenate([y_slices,frames_instr], axis = 0)


        X = tf.data.Dataset.from_tensor_slices(X_slices)
        y = tf.data.Dataset.from_tensor_slices(y_slices)

        return configure_for_performance(tf.data.Dataset.zip((X, y)), self.batch_size, mode)


In [4]:
from keras import layers

def downsample_block(x, n_filters):
   # Conv2D then ReLU activation
   x = layers.Conv2D(n_filters, 5,
                     strides=2,
                     padding='same',
                     kernel_initializer = "he_normal")(x)
   x = layers.BatchNormalization(axis=-1,
                                 momentum=0.01,
                                epsilon=1e-3)(x)
   x = layers.LeakyReLU(alpha=0.2)(x)
   return x


def upsample_block(x, conv_features, n_filters, dropout, activation_func):

    # upsample
    x = layers.Conv2DTranspose(n_filters, 5, strides=2, padding='same', kernel_initializer = "he_normal")(x)
    if activation_func == 'relu':
        x = layers.ReLU()(x)
    elif activation_func == 'sigmoid':
        x = keras.activations.sigmoid(x)
    else:
        pass
    x = layers.BatchNormalization(axis=-1)(x)
    # dropout
    if dropout:
        x = layers.Dropout(0.5)(x)
    # concatenate
    if conv_features != None:
        x = layers.Concatenate(axis=-1)([x, conv_features])
    return x

def get_spectrogram(waveform):
  # Convert the waveform to a spectrogram via a STFT.
  spectrogram = tf.signal.stft(waveform, frame_length=512, frame_step=128, fft_length = 510, pad_end= True, window_fn=tf.signal.hamming_window)
  # Add a `channels` dimension, so that the spectrogram can be used
  # as image-like input data with convolution layers (which expect
  # shape (`batch_size`, `height`, `width`, `channels`).
  spectrogram = spectrogram[..., tf.newaxis]
  return tf.math.log1p(tf.abs(spectrogram)), tf.math.angle(spectrogram)

def build_unet_stft_model():
    # inputs
    inputs = layers.Input(shape=(2**15)) # ceil(log2(22050)) = 15 is the sampling rate of my training set and also half the sampling rate of standard CD quality

    mag_spec, angle_spec = get_spectrogram(inputs)

    # # #Normalize spectrograms
    # bn_layer = layers.BatchNormalization(axis=-1)
    # x = bn_layer(mag_spec)
    # # Retrieve the mean and standard deviation learned by the BatchNormalization layer
    # mean = bn_layer.get_weights()[0]
    # std = bn_layer.get_weights()[1]


    # encoder: contracting path - downsample
    p1 = downsample_block(mag_spec, 16)
    # 2 - downsample
    p2 = downsample_block(p1, 32)
    # 3 - downsample
    p3 = downsample_block(p2, 64)
    # 4 - downsample
    p4 = downsample_block(p3, 128)
    p5 = downsample_block(p4, 256)
    # 5 - bottleneck
    bottleneck = downsample_block(p5, 512)
    # decoder: expanding path - upsample
    # 1 - upsample
    u1 = upsample_block(bottleneck, p5, 256, True, 'relu')
    # 2 - upsample
    u2 = upsample_block(u1, p4, 128, True, 'relu')
    # 3 - upsample
    u3 = upsample_block(u2, p3, 64, False, 'relu')
    # 4 - upsample
    u4 = upsample_block(u3, p2, 32, False, 'relu')
    # 5 - upsample
    u5 = upsample_block(u4, p1, 16, False, 'relu')
    # 6 - upsample
    u6 = upsample_block(u5, None, 1, False, 'relu')
    u7 = layers.Conv2D(
            1,
            (4, 4),
            dilation_rate=(2, 2),
            activation="sigmoid",
            padding="same",
            kernel_initializer="he_normal")(u6)

    # outputs (signal_reconstruction)
    outputs_mag_spec = tf.math.expm1(layers.Multiply()([u7, mag_spec]))
    outputs_spec = tf.math.multiply(tf.cast(outputs_mag_spec, tf.complex64), tf.complex(tf.cos(angle_spec),tf.sin(angle_spec)))
    outputs = tf.signal.inverse_stft(tf.squeeze(outputs_spec, axis=-1), frame_length=512, frame_step=128, fft_length = 510,
                                     window_fn=tf.signal.inverse_stft_window_fn(128, forward_window_fn=tf.signal.hamming_window))[:,:inputs.shape[1]]


    # unet model with Keras Functional API
    unet_model = tf.keras.Model(inputs, outputs, name="U-Net")
    unet_model.compile(optimizer="adam",
                        loss=tf.keras.metrics.mean_absolute_error)

    return unet_model

In [6]:
import tensorflow.keras.backend as K
import gc
from sklearn.model_selection import KFold

# USE MULTIPLE GPUS
gpus = tf.config.list_physical_devices('GPU')
if len(gpus)<=1:
    strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
    print(f'Using {len(gpus)} GPU')
else:
    strategy = tf.distribute.MirroredStrategy()
    print(f'Using {len(gpus)} GPUs')

EPOCHS = 5
n_folds = 5
instr_class = 'Drums'

dataset = training_DataLoader('/content/drive/MyDrive/babyslakh', instr_class)

gkf = KFold(n_splits=n_folds)

for i, (train_index, valid_index) in enumerate(gkf.split(dataset.metadata_df)):

    print('#'*25)
    print(f'### Fold {i+1}')
    print('#'*25)

    train_ds = dataset.get_data(train_index, 'train')
    val_ds = dataset.get_data(valid_index, 'test')

    with strategy.scope():
        model = build_unet_stft_model()

    model.fit(train_ds, verbose=1,
              validation_data = val_ds,
              epochs=EPOCHS,
              batch_size=8)#, callbacks = [LR] )
    model.save_weights(f'Unet_{instr_class}_f{i}.h5')

    K.clear_session()
    del model
    del train_ds
    del val_ds
    gc.collect()


Using 1 GPU
#########################
### Fold 1
#########################
Epoch 1/5




Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
#########################
### Fold 2
#########################
Epoch 1/5




Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
#########################
### Fold 3
#########################
Epoch 1/5




Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
#########################
### Fold 4
#########################
Epoch 1/5




Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
#########################
### Fold 5
#########################
Epoch 1/5




Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [7]:
from google.colab import files
for i in range(5):
  files.download(f'Unet_{instr_class}_f{i}.h5')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [16]:
import soundfile as sf

class predict_DataLoader():
    def __init__(self, path_to_file, batch_size = 8) -> None:
        self.path_to_file = path_to_file
        self.batch_size = batch_size

    def get_data(self):

        y, sr = librosa.load(self.path_to_file)
        self.len_signal = len(y)

        if sr != 22050:
          y = librosa.resample( y, orig_sr = sr, target_sr = 22050)

        mean, stddev = np.mean(y), np.std(y)
        y = (y-mean)/stddev


        X = tf.signal.frame(y,
                            2**15,
                            2**13,
                            pad_end=True,
                            pad_value=0,
                            axis=-1)

        return configure_for_performance(tf.data.Dataset.from_tensor_slices(X), self.batch_size, 'test'), mean, stddev


def get_signal_from_frames(Frames, step_size, input_signal_len):
    y = []
    for i in range(Frames.shape[0]):
        frame = Frames[i,:]
        for j in range(frame.shape[0]):
            timestep_frame = frame[j]
            id = i*step_size + j
            if len(y) <= id:
                y.append([timestep_frame])
            else:
                (y[id]).append(timestep_frame)

    signal = []
    for i in range(len(y)):
        signal.append(np.mean(y[i]))
    signal = np.array(signal)[:input_signal_len]
    return signal



def predict(wav_path, model_weights_paths):

    data_loader = predict_DataLoader(wav_path)
    X, mean, stddev = data_loader.get_data()
    print(mean, stddev)
    len_input_signal = data_loader.len_signal

    signals = []
    for i, model_path in enumerate(model_weights_paths):
        model = build_unet_stft_model()
        model.load_weights(model_path)

        y = model.predict(X)
        signals.append(get_signal_from_frames(y, 2**13, len_input_signal))

    signal = np.mean(np.array(signals), axis = 0)*stddev + mean
    return signal





signals = predict('/content/drive/MyDrive/test_music/Dani_California.wav', [f'/content/drive/MyDrive/music_seg_models/Unet_Drums_f{i}.h5' for i in range(5)])

-9.960328e-06 0.29883942


In [17]:
sf.write('Dani_California_drums_5folds.wav', signals, 22050, subtype='PCM_24')
files.download('Dani_California_drums_5folds.wav')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>