In [None]:
# Installing libraries
!pip install musdb
!pip install librosa

In [None]:
import numpy as np
import musdb
import librosa
import random
import tensorflow.keras
from time import time

In [None]:
from google.colab import drive
drive.mount('/gdrive')

In [None]:
# Import DATABASE

# %cd ..
# !ls
!gsutil -m cp -r "path-to-musdb18-on-gdrive" "/root"

In [None]:
class generator(tensorflow.keras.utils.Sequence):
  def __init__(self, steps_per_epoch, tracks_in_batch, subsets, split):
    self.steps_per_epoch = steps_per_epoch
    self.tracks_in_batch = tracks_in_batch
    self.subsets = subsets
    self.split = split
    self.mus = musdb.DB(root='../../root/musdb18', subsets=subsets, split=split)
    self.track_number = np.arange(len(self.mus))
    np.random.shuffle(self.track_number)
    self.cur_index = 0    
    self.freq_bins = 2049
    self.num_frames = 9
    self.num_ft_bins = self.freq_bins * self.num_frames #18441
    self.medium_frame = int(np.floor(self.num_frames/2)) #4
    self.hop_num_frames = 8
    self.chunk_per_track = 323
    self.duration = 30 # 30 sec track chunks
    self.pad_zero = np.float32(np.zeros((self.freq_bins, self.medium_frame))) #Zero padding for IBM
    self.pad_min = np.float32(np.zeros((self.freq_bins, self.medium_frame))) #Minimum padding for frames overlapping
    self.mixture_train_data = np.float32(np.zeros((self.num_ft_bins, self.chunk_per_track)))
    self.ibm_train_label = np.float32(np.zeros((self.num_ft_bins, self.chunk_per_track)))

  def __len__(self):
    return self.steps_per_epoch

  def __getitem__(self, index):
    mixes = []
    targets = []
    #Random mixing batches
    for i in range(self.tracks_in_batch):
      mix, target_vocals = self.get_random_track_piece()
      if self.is_source_silent(mix):
        continue
      track_resampled = librosa.core.resample(mix, orig_sr=44100,target_sr=22050) #Resample to 22050 Hz
      mixture_ft_magn = np.float32(np.abs(librosa.stft(track_resampled, n_fft=4096, hop_length=256, win_length=1024, window='hann'))) 
      self.pad_min[:,:] = np.float32(min(mixture_ft_magn.min(0)))
      mixture_padded = np.concatenate((self.pad_min, mixture_ft_magn, self.pad_min), axis=1)
      # Append vocals
      vocals_resampled = librosa.core.resample(target_vocals, orig_sr=44100, target_sr=22050)
      vocals_ft_magn = np.float32(np.abs(librosa.stft(vocals_resampled, n_fft=4096, hop_length=256, win_length=1024, window='hann')))
      # Create Binary Mask
      ideal_binary_mask = (vocals_ft_magn > mixture_ft_magn).astype('float32')

      # Concatenation for frame overlapping
      ibm_padded = np.concatenate((self.pad_zero, ideal_binary_mask, self.pad_zero),axis=1)
      for j in range(self.chunk_per_track):
                start_index = j*self.hop_num_frames
                end_index = start_index + self.num_frames
                self.mixture_train_data[:,j:j+1] = np.reshape(mixture_padded[:,start_index:end_index],(self.num_ft_bins,1), order="F")
                self.ibm_train_label[:,j:j+1] = np.reshape(ibm_padded[:,start_index:end_index],(self.num_ft_bins,1), order="F")

      mixes = mixes + list(np.transpose(self.mixture_train_data))
      targets = targets + list(np.transpose(self.ibm_train_label))
      mix_batch = np.array(mixes)
      target_batch = np.array(targets)
      return mix_batch, target_batch

  def get_random_track_piece(self):
    # Getting random track
    random.seed(int(time()%1*6000))
    if self.cur_index == self.track_number.shape[0]:
        np.random.shuffle(self.track_number)
        self.cur_index = 0
    track = self.mus[self.track_number[self.cur_index]]
    self.cur_index += 1
    # Getting random track chunk of given duration
    track.chunk_duration = self.duration
    track.chunk_start = random.uniform(0, track.duration - track.chunk_duration)
    mix = track.audio.T
    vocals = track.targets['vocals'].audio.T
    # Random swapping channels
    channel = random.randint(0, mix.shape[0]-1)
    return mix[channel], vocals[channel]

  def is_source_silent(self, source):
    # Returns true if the parameter source is fully silent
    return not np.any(source)


In [None]:
import tensorflow as tf
from tensorflow.keras import Model, layers

In [None]:
# Creating model
inputs = layers.Input(shape=(18441))
x = layers.Reshape(target_shape=[9,2049,1])(inputs)
x = layers.Conv2D(filters=32, kernel_size=(3,12), kernel_initializer='glorot_uniform', use_bias=True, bias_initializer=tf.constant_initializer(0.1), padding='same', activation='relu')(x)
x = layers.Conv2D(filters=16, kernel_size=(3,12), use_bias=True, bias_initializer=tf.constant_initializer(0.1), padding='same', activation='relu')(x)
x = layers.MaxPool2D(pool_size=(1,12), padding='same')(x)
x = layers.Conv2D(filters=64, kernel_size=(3,12), use_bias=True, bias_initializer=tf.constant_initializer(0.1), padding='same', activation='relu')(x)
x = layers.Conv2D(filters=32, kernel_size=(3,12), use_bias=True, bias_initializer=tf.constant_initializer(0.1), padding='same', activation='relu')(x)
x = layers.MaxPool2D(pool_size=(1,12), padding='same')(x)
x = layers.Dropout(rate=0.5)(x)
x = layers.Flatten()(x)
x = layers.Dense(2048, use_bias=True, bias_initializer=tf.constant_initializer(0.1), activation='relu')(x)
x = layers.Dropout(rate=0.5)(x)
x = layers.Dense(512, use_bias=True, bias_initializer=tf.constant_initializer(0.1), activation='relu')(x)
outputs = layers.Dense(18441, use_bias=True, bias_initializer=tf.constant_initializer(0.1), activation='sigmoid')(x)
model = Model(inputs, outputs)

In [None]:
from time import time

In [None]:
TIB = 8 # Tracks in batch for random mixing
EPOCHS = 15 # Total epochs
SPE = 2000 # Steps per epoch

print('<--[INFO] creating batch generators...')
train_gen = generator(SPE, TIB, 'train', 'train')
valid_gen = generator(SPE, TIB, 'train', 'valid')

print('<--[INFO] creating and compiling model...')
learning_rate = 1e-4
optimizer = tf.keras.optimizers.Adam(learning_rate)
accuracy_metric = tf.keras.metrics.BinaryAccuracy(threshold=0.5)
loss = tf.keras.losses.BinaryCrossentropy(from_logits=False, name='binary_crossentropy')
model.compile(optimizer = optimizer, loss = loss, metrics = [accuracy_metric, loss])
model.summary()
print('<--[INFO] training network...')
t0 = time()
checkpoint_path = "your-model-saving-path-to-gdrive/model.h5"
stats_path="your-csv-saving-path-to-gdrive/model.csv"
my_callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, monitor='val_binary_crossentropy',verbose=1, save_best_only=True, mode='min'),
                tf.keras.callbacks.CSVLogger(filename=stats_path,
                                             separator=',', append=True),
                tf.keras.callbacks.EarlyStopping(monitor='val_binary_crossentropy', min_delta=0, patience=3, verbose=0, mode="min", baseline=None,
                                                 restore_best_weights=True)
                ]


history=model.fit(
        train_gen,
        epochs=EPOCHS,
        steps_per_epoch=SPE,
        validation_data=valid_gen,
        validation_steps=SPE//5, # Validation set is about 1/5 of training set
        callbacks=my_callbacks)
t1 = time()
print("<--[INFO] model was trained in " + str(round((t1-t0)/60, 1)) + " minutes")