# U-net Implementation for Guitar String Separation

Firstly, you need to install librosa. If you don't have librosa, make sure you run the requirements.txt.

In [1]:
!pip install librosa

[0m

In [1]:
#Importing the essential libraries
import os
import librosa
import numpy as np
from tensorflow import keras 
import matplotlib.pyplot as plt
from keras.callbacks import ReduceLROnPlateau


ModuleNotFoundError: No module named 'librosa'

### We are using U-net model which is based on the following two papers:
1. "Singing Voice Separation with Deep U-Net Convolutional Networks" by Jansson et al., 2017
2. "U-Net: Convolutional Networks for Biomedical Image Segmentation" by Ronneberger et al., 2015

In [3]:
# Model Creation

def unet(hyper_params,
         input=keras.Input((512, 2**7, 1))): 
  # encoder
  x = input
  encoder_layers = []
  for n_layer in range(hyper_params['NUM_BLOCKS']):
    num_filters = 2 ** n_layer * hyper_params['FILTER_MULTIPLIER']
    x = keras.layers.Conv2D(num_filters, 5, strides=2, padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    if hyper_params['DROPOUT_ENCODER'][n_layer]: x = keras.layers.Dropout(hyper_params['DROPOUT_RATE'])(x)
    x = keras.layers.LeakyReLU(alpha=0.2)(x)
    encoder_layers.append(x)

  # decoder
  for n_layer in range(hyper_params['NUM_BLOCKS']-1):
    if not n_layer == 0: x = keras.layers.Concatenate(axis=3)([x, encoder_layers[hyper_params['NUM_BLOCKS'] - 1 - n_layer]])
    num_filters = 2 ** (hyper_params['NUM_BLOCKS'] - 1 - n_layer) * hyper_params['FILTER_MULTIPLIER']
    x = keras.layers.Conv2DTranspose(num_filters, 5, strides=2, padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    if hyper_params['DROPOUT_DECODER'][n_layer]: x = keras.layers.Dropout(hyper_params['DROPOUT_RATE'])(x)
    x = keras.layers.Activation('relu')(x)

  output = keras.layers.Concatenate(axis=3)([x, encoder_layers[-hyper_params['NUM_BLOCKS']]])
  output = keras.layers.Conv2DTranspose(1, 5, strides=2, padding='same')(output)
  output = keras.layers.Activation('relu')(output)

  # mask
  if hyper_params['MASK']:
    output = keras.layers.multiply([input, output])

  model = keras.Model(inputs=input, outputs=output)
  return model

## Computing the Metrics

In [4]:
def compute_SDR_frame(true_frame, predicted_frame):
    numerator = np.sum(true_frame**2)
    denominator = np.sum((true_frame - predicted_frame)**2)
    return 10 * np.log10(numerator / denominator)

def compute_SIR_frame(true_frame, predicted_frame):
    interference = true_frame - (np.dot(true_frame, predicted_frame) / np.dot(predicted_frame, predicted_frame)) * predicted_frame
    numerator = np.sum(true_frame**2)
    denominator = np.sum(interference**2)
    return 10 * np.log10(numerator / denominator)

def compute_SAR_frame(true_frame, predicted_frame):
    artifacts = predicted_frame - (np.dot(predicted_frame, true_frame) / np.dot(true_frame, true_frame)) * true_frame
    interference = true_frame - (np.dot(true_frame, predicted_frame) / np.dot(predicted_frame, predicted_frame)) * predicted_frame
    numerator = np.sum((true_frame + interference)**2)
    denominator = np.sum(artifacts**2)
    return 10 * np.log10(numerator / denominator)

def compute_metric_for_signal(true_signal, predicted_signal, metric_func):
    # Reshaping to 2D (num_frames, frame_length)
    true_signal = true_signal.reshape(true_signal.shape[0], -1)
    predicted_signal = predicted_signal.reshape(predicted_signal.shape[0], -1)
    
    metric_values = [metric_func(true_frame, predicted_frame) for true_frame, predicted_frame in zip(true_signal, predicted_signal)]
    return np.mean(metric_values)

In [5]:
# Initialize the Directory

resized_input_dir = "/notebooks/IN+AUG"
resized_target_dir = "/notebooks/tar + AUG"

### Checks for Inspecting the Spectrograms to know the mean and the shape

In [8]:
def inspect_saved_spectrograms(spectrogram_dir, limit=5):
    """
    Loads and prints details or content of saved spectrograms.
    
    Parameters:
    - spectrogram_dir: Path to directory containing saved spectrogram numpy files.
    - limit: Number of spectrogram files to inspect. Default is 5.
    """
    files = sorted(os.listdir(spectrogram_dir))
    for i, file in enumerate(files[:limit]):
        file_path = os.path.join(spectrogram_dir, file)
        spectrogram = np.load(file_path)
        
        print(f"Spectrogram for {file}:")
        print(f"Shape: {spectrogram.shape}")
        print(f"Max: {np.max(spectrogram)}, Min: {np.min(spectrogram)}, Mean: {np.mean(spectrogram)}")
        print("\n" + "="*40 + "\n")

In [9]:
inspect_saved_spectrograms(resized_input_dir)
inspect_saved_spectrograms(resized_target_dir)

Spectrogram for 00_BN1-129-Eb_comp_mix.wav.npy:
Shape: (512, 128, 1)
Max: 22.248661041259766, Min: -57.23155212402344, Mean: -49.51835250854492


Spectrogram for 00_BN1-129-Eb_comp_mix_noise_injection.npy:
Shape: (512, 128, 1)
Max: 69.12982940673828, Min: -0.23467423021793365, Mean: 0.189961239695549


Spectrogram for 00_BN1-129-Eb_comp_mix_pitch_shifting.npy:
Shape: (512, 128, 1)
Max: 49.37890625, Min: -0.07383453845977783, Mean: 0.09082947671413422


Spectrogram for 00_BN1-129-Eb_comp_mix_time_stretching.npy:
Shape: (512, 128, 1)
Max: 64.82308197021484, Min: -0.116488978266716, Mean: 0.08924165368080139


Spectrogram for 00_BN1-129-Eb_solo_mix.wav.npy:
Shape: (512, 128, 1)
Max: 26.652942657470703, Min: -52.62190246582031, Mean: -42.67133331298828


Spectrogram for 00_BN1-129-Eb_comp_hex_cln.wav.npy:
Shape: (512, 128, 1)
Max: 15.900431632995605, Min: -64.6517105102539, Mean: -56.013648986816406


Spectrogram for 00_BN1-129-Eb_comp_hex_cln_noise_injection.npy:
Shape: (512, 128, 1)
Max:

## Learning Rate Finder for efficient model training

In [11]:
class LearningRateFinder(keras.callbacks.Callback):
    def __init__(self, min_lr=1e-6, max_lr=1e-1, steps_per_epoch=None, epochs=None):
        super().__init__()
        
        self.min_lr = min_lr
        self.max_lr = max_lr
        self.total_iterations = steps_per_epoch * epochs
        self.iteration = 0
        self.history = {}
        
    def clr(self):
        """Compute the current learning rate."""
        x = self.iteration / self.total_iterations 
        return self.min_lr + (self.max_lr-self.min_lr) * x
        
    def on_train_begin(self, logs=None):
        """Initialize the learning rate to the minimum value at the start of training."""
        logs = logs or {}
        keras.backend.set_value(self.model.optimizer.lr, self.min_lr)
        
    def on_batch_end(self, epoch, logs=None):
        """Record the previous batch's statistics and update the learning rate."""
        logs = logs or {}
        self.iteration += 1

        self.history.setdefault('lr', []).append(keras.backend.get_value(self.model.optimizer.lr))
        self.history.setdefault('iterations', []).append(self.iteration)

        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)
            
        keras.backend.set_value(self.model.optimizer.lr, self.clr())


def plot_lr_finder(history):
    """Plot the loss versus learning rate."""
    plt.plot(history['lr'], history['loss'])
    plt.xscale('log')
    plt.xlabel('Learning Rate')
    plt.ylabel('Loss')
    plt.title('Learning Rate Finder')
    plt.show()

## Normalize your spectrograms if not already normalized

In [12]:

def normalize_spectrogram(spectrogram):
    """
    Normalize the given spectrogram.
    
    :param spectrogram: numpy array of the spectrogram
    :return: normalized spectrogram
    """
    # Subtract the mean and divide by the standard deviation
    normalized_spectrogram = (spectrogram - np.mean(spectrogram)) / np.std(spectrogram)
    return normalized_spectrogram


def normalize_spectrograms(input_dir, output_dir):
    """
    Normalize all spectrograms in the input directory and save to the output directory.
    
    :param input_dir: directory containing the original spectrograms
    :param output_dir: directory where normalized spectrograms will be saved
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    for file_name in os.listdir(input_dir):
        if file_name.endswith('.npy'):
            file_path = os.path.join(input_dir, file_name)
            spectrogram = np.load(file_path)
            normalized_spectrogram = normalize_spectrogram(spectrogram)
            output_path = os.path.join(output_dir, file_name)
            np.save(output_path, normalized_spectrogram)


# Example usage: for mix
# input_directory = "/notebooks/mix/combined_mix_resize"
# output_directory = "/notebooks/mix/norm_mix_resz"
# normalize_spectrograms(input_directory, output_directory)


# input_directory_hex = "/notebooks/hex/combined_hex_resize"
# output_directory_hex = "/notebooks/hex/norm_hex_resz"
# normalize_spectrograms(input_directory_hex, output_directory_hex)


In [10]:
input_files = sorted(os.listdir(resized_input_dir))
target_files = sorted(os.listdir(resized_target_dir))

## Some sanity checks

In [11]:

# Function to determine the maximum shape from a list of .npy files
def determine_max_shape(files, directory):
    max_shape = (0, 0)
    for f in files:
        data_shape = np.load(os.path.join(directory, f)).shape
        max_shape = (max(max_shape[0], data_shape[0]), max(max_shape[1], data_shape[1]))
    return max_shape

# Function to load and pad the numpy arrays
def load_and_pad(file_path, max_shape):
    data = np.load(file_path)
    padding = [(0, max_shape[i] - data.shape[i]) for i in range(2)] + [(0,0)]  # the last padding is for the channel dimension
    return np.pad(data, padding)

max_input_shape = determine_max_shape(input_files, resized_input_dir)
max_target_shape = determine_max_shape(target_files, resized_target_dir)
# Here, I'm assuming that max_input_shape is same as max_target_shape, 
# if that's not the case you might need to pad them to a common maximum shape

def data_generator(input_files, target_files, batch_size=32, max_shape=max_input_shape):
    while True:
        for i in range(0, len(input_files), batch_size):
            input_batch = np.array([load_and_pad(os.path.join(resized_input_dir, f), max_shape) for f in input_files[i:i+batch_size]])
            target_batch = np.array([load_and_pad(os.path.join(resized_target_dir, f), max_shape) for f in target_files[i:i+batch_size]])
            yield (input_batch, target_batch)

Making sure that the directories are correct.

In [12]:


def count_files_in_directory(directory):
    return len([f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))])

directory_path = resized_input_dir  # Change this to the path of your directory
print(f"Number of files in {directory_path}: {count_files_in_directory(directory_path)}")

directory_path = resized_target_dir  # Change this to the path of your directory
print(f"Number of files in {directory_path}: {count_files_in_directory(directory_path)}")


Number of files in /notebooks/IN+AUG: 1439


## Defining Train/Validation/Test Splits

In [14]:
indices = np.arange(len(input_files))
np.random.shuffle(indices)

train_split = int(0.8 * len(input_files))
val_split = int(0.9 * len(input_files))

train_indices = indices[:train_split]
val_indices = indices[train_split:val_split]
test_indices = indices[val_split:]

train_input_files = [input_files[i] for i in train_indices]
train_target_files = [target_files[i] for i in train_indices]
val_input_files = [input_files[i] for i in val_indices]
val_target_files = [target_files[i] for i in val_indices]
test_input_files = [input_files[i] for i in test_indices]
test_target_files = [target_files[i] for i in test_indices]

## The following are the model Hyperparameters. 
What each parameter does is described below.


- NUM_BLOCKS -> Blocks in the encoder and decoder, for example, if NUM_BLOCKS is set to 6, there are 6 blocks in the encoder and 6 blocks in the decoder. 2D convolution, batch normalisation, and leaky ReLU layers make up each encoder block. Each decoder block includes batch normalisation, transposed 2D convolution,  potentially dropout, and ReLU layers
- FILTER_MULTIPLIER -> Sets the number of filters in 2D convolution layers.
- DROPOUT_ENCODER -> A list of boolean values of lengths equal to the number of blocks in the encoder. Each boolean value corresponds to a block and decides if that block should have dropout or not.
- DROPOUT_DECODER -> A list of boolean values of lengths equal to the number of blocks in the decoder. Each boolean value corresponds to a block and decides if that block should have dropout or not.
- DROPOUT_RATE -> Set the dropout rate in the dropout layers.
- MASK -> Estimate a mask (and multiply it with the input) instead of estimating the output directly.


In [15]:
hyper_params = {
    'NUM_BLOCKS': 6,
    'FILTER_MULTIPLIER': 16,
    'DROPOUT_ENCODER': [False, False, True, True, True, True],
    'DROPOUT_DECODER': [False, False, True, True, True, True],
    'DROPOUT_RATE': 0.5,
    'MASK': True
}

model = unet(hyper_params)
model.compile(optimizer='adam', loss='mean_squared_error')


## Small test to identify the best learning rate.
NOTE: Rerun the model after finding the appropriate LR.

In [16]:
batch_size = 32
steps_per_epoch = len(train_input_files) // batch_size
validation_steps = len(val_input_files) // batch_size
train_gen = data_generator(train_input_files, train_target_files, batch_size)
val_gen = data_generator(val_input_files, val_target_files, batch_size) 


# Initialize Learning Rate Finder
lr_finder = LearningRateFinder(min_lr=1e-6, max_lr=1e-1, steps_per_epoch=steps_per_epoch, epochs=1)

# Using only 1 epoch for the Learning Rate Finder. You can use more if you want a more detailed curve.
model.fit(train_gen, epochs=5, steps_per_epoch=steps_per_epoch, callbacks=[lr_finder])

# Plot the loss against learning rates
plot_lr_finder(lr_finder.history)

## Begin Training

In [17]:


batch_size = 32
steps_per_epoch = len(train_input_files) // batch_size
validation_steps = len(val_input_files) // batch_size
train_gen = data_generator(train_input_files, train_target_files, batch_size)
val_gen = data_generator(val_input_files, val_target_files, batch_size)

# Set up ReduceLROnPlateau callback
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)

# Add Early Stopping callback
early_stopping = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, restore_best_weights=True)

model.fit(train_gen,
          steps_per_epoch=steps_per_epoch,
          epochs=100,
          validation_data=val_gen,
          validation_steps=validation_steps,
          callbacks = early_stopping, reduce_lr)


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

<keras.callbacks.History at 0x7fab2c13bbe0>

## Test Evaluation

In [18]:
test_gen = data_generator(test_input_files, test_target_files, batch_size)
loss = model.evaluate(test_gen, steps=len(test_input_files) // batch_size)
print(f"Test Loss: {loss:.4f}")

Test Loss: 882.1355


## Main Evaluation Metric

In [19]:
SDRs, SIRs, SARs = [], [], []

# Iterate over test dataset to compute the metrics:
for i, (inputs, targets) in enumerate(test_gen):
    if i == len(test_input_files) // batch_size:
        break
    predictions = model.predict(inputs)
    for true_signal, predicted_signal in zip(targets, predictions):
        SDRs.append(compute_metric_for_signal(true_signal, predicted_signal, compute_SDR_frame))
        SIRs.append(compute_metric_for_signal(true_signal, predicted_signal, compute_SIR_frame))
        SARs.append(compute_metric_for_signal(true_signal, predicted_signal, compute_SAR_frame))

# Calculate average metrics for the entire test set:
average_SDR = np.mean(SDRs)
average_SIR = np.mean(SIRs)
average_SAR = np.mean(SARs)

print(f"Average Test SDR: {average_SDR:.4f}")
print(f"Average Test SIR: {average_SIR:.4f}")
print(f"Average Test SAR: {average_SAR:.4f}")



  interference = true_frame - (np.dot(true_frame, predicted_frame) / np.dot(predicted_frame, predicted_frame)) * predicted_frame
  interference = true_frame - (np.dot(true_frame, predicted_frame) / np.dot(predicted_frame, predicted_frame)) * predicted_frame


Average Test SDR: -7.3551
Average Test SIR: nan
Average Test SAR: nan
