In [None]:
import os
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
import musdb
import IPython.display as ipd

# Set the path to the dataset directory
dataset_dir = "/Users/shola/Downloads/musdb18/"
duration = 180  # Duration of the songs: those longer than this will be cut and those shorter padded
num_songs_to_use = 50
batch_size = 20

fs = 44100
length_songs = duration * fs

if not os.path.exists(dataset_dir):
    print("Error: provided path 'dataset_dir' doesn't exist")

# Define the function to extract spectrograms
def extract_spectrogram(audio):
    spectrogram = librosa.stft(audio, n_fft=2048, hop_length=512)  # Compute the spectrogram using STFT
    spectrogram = np.abs(spectrogram)  # Take the absolute value of the complex spectrogram
    return librosa.amplitude_to_db(spectrogram)


In [None]:
# Generate spectrograms of mixtures and vocal parts

mus = musdb.DB(root=dataset_dir, subsets='train')
print("Number of downloaded training songs: " + str(len(mus)))

# Iterate over the training dataset directory
print("Generating spectrograms for each song...")
nb = 0
idx = 0
for track in mus[0:num_songs_to_use]:
    vocals = track.targets['vocals'].audio[:, 0].T
    mixture = track.audio[:, 0].T

    if len(vocals) > length_songs:
        vocals = vocals[0:length_songs]
        mixture = mixture[0:length_songs]
    else:
        # Pad songs with zeros to make sure they all have the same duration (otherwise converting the list to numpy array doesn't work)
        vocals = np.append(vocals, np.zeros(length_songs - len(vocals)))
        mixture = np.append(mixture, np.zeros(length_songs - len(mixture)))

    vocal_spectrogram = extract_spectrogram(vocals)
    mixture_spectrogram = extract_spectrogram(mixture)
    
    if nb == 0: # Init arrays
        mixture_spectrograms = np.empty(((len(vocal_spectrogram[0])//batch_size)*num_songs_to_use, len(vocal_spectrogram), batch_size))
        vocal_spectrograms = np.empty(((len(vocal_spectrogram[0])//batch_size)*num_songs_to_use, len(vocal_spectrogram), batch_size))
        masks = np.empty(((len(vocal_spectrogram[0])//batch_size)*num_songs_to_use, len(vocal_spectrogram)), dtype='bool')
    
    # Split the spectrogram time samples in batches of size "batch_size", dropping those that don't fit
    for i in range(len(vocal_spectrogram[0])//batch_size):
        vocal_spectrograms[idx, :, :] =  np.array(vocal_spectrogram[:, i*batch_size:(i+1)*batch_size])
        mixture_spectrograms[idx, :, :] = np.array(mixture_spectrogram[:, i*batch_size:(i+1)*batch_size])
        for f in range(len(vocal_spectrogram)):
            non_vocal_spectrogram = mixture_spectrogram[f, i*batch_size:(i+1)*batch_size] - vocal_spectrogram [f, i*batch_size:(i+1)*batch_size]
            #print(vocal_spectrogram[f, i*batch_size:(i+1)*batch_size] > non_vocal_spectrogram)
            if np.count_nonzero(vocal_spectrogram[f, i*batch_size:(i+1)*batch_size] > 0.2*non_vocal_spectrogram) > batch_size/10:
                masks[idx, f] = 1 # If the spectrogram value at frequency f is larger in vocals than non_vocal at least in half of the time samples, consider it is voice
            else:
                masks[idx, f] = 0
        idx += 1

    if nb % (num_songs_to_use//10) == 0:
        print(str(100*nb/num_songs_to_use) + "%")
    nb += 1

print(len(vocal_spectrograms)) # Total number of 20 samples extracts in the selected songs
print(len(vocal_spectrograms[0])) # 1025 frequency bins
print(len(vocal_spectrograms[0][0])) # batch_size

X_train = np.reshape(mixture_spectrograms, (-1, 1, len(vocal_spectrograms[0]), batch_size))
y_train = np.reshape(masks, (-1, len(vocal_spectrograms[0])))

print("Done")

In [None]:
# Save numpy array in file
#np.save("vocal_spectrograms.npy", vocal_spectrograms)
#np.save("mixture_spectrograms.npy", mixture_spectrograms)

In [None]:
#X_train = np.load("vocal_spectrograms.npy")
#y_train = np.load("mixture_spectrograms.npy")

In [None]:
# Define the neural network architecture as a class
class SingerIsolationNet(nn.Module):
    def __init__(self):
        super(SingerIsolationNet, self).__init__()
        
        # Define the convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=1, kernel_size=3, stride=1, padding=1)

        self.fc1 = nn.Linear(20500, 20500)
        self.fc2 = nn.Linear(20500, 20500)
        self.fc3 = nn.Linear(20500, 1025)

    def forward(self, x):
        # Apply the convolutional layers
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        
        # Flatten the output for the fully connected layers
        x = x.view(x.size(0), -1)
        
        # Continue with the modified fully connected layers
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        #x = x > 0.5
        return x # Convert bool to int

# Create an instance of the SingerIsolationNet class
model = SingerIsolationNet()

# Define the loss function and optimizer
criterion = nn.L1Loss()
optimizer = optim.Adam(model.parameters(), lr=0.00004)

In [None]:
# Convert the NumPy arrays to PyTorch tensors
X_train = torch.from_numpy(X_train).float()
y_train = torch.from_numpy(y_train).float()

In [None]:
# Train the model
num_epochs = 40

for epoch in range(num_epochs):
    inputs = X_train
    targets = y_train
        
    outputs = model(inputs)
    loss = criterion(outputs, targets)
    loss.backward()
    optimizer.step()
    
    epoch_loss = loss.item()
    print("Epoch Number " + str(epoch+1) + ": loss = " + str(epoch_loss))
    
    
#try:
    # Save a checkpoint after each epoch
#    PATH = f'checkpoint_{epoch}.pt'
#    torch.save({
#                'epoch': epoch,
#                'model_state_dict': model.state_dict(),
#                'optimizer_state_dict': optimizer.state_dict(),
#                'train_loss': epoch_loss,
#                }, 
#                PATH)
#except:
#    print("Error while saving checkpoint")

In [None]:
# Use the trained neural network to compute the vocals from the song number song_nb in the chosen subset (either 'train' or 'test')
mus = musdb.DB(root=dataset_dir, subsets='train')
song_nb = 0

vocals = mus[song_nb].targets['vocals'].audio[:, 0].T
mixture = mus[song_nb].audio[:, 0].T

if len(vocals) > length_songs:
    vocals = vocals[0:length_songs]
    mixture = mixture[0:length_songs]
else:
    # Pad songs with zeros to make sure they all have the same duration (otherwise converting the list to numpy array doesn't work)
    vocals = np.append(vocals, np.zeros(length_songs - len(vocals)))
    mixture = np.append(mixture, np.zeros(length_songs - len(mixture)))

# Perform singer isolation on a sample mixture
sample_vocal_spectrogram = extract_spectrogram(vocals)
sample_mixture_spectrogram = extract_spectrogram(mixture)
sample_mixture_spectrogram_with_phase = librosa.stft(mixture, n_fft=2048, hop_length=512)
song_mask = np.zeros((len(vocal_spectrogram[0])//batch_size, 1025))
for i in range(song_nb*len(vocal_spectrogram[0])//batch_size, (song_nb+1)*len(vocal_spectrogram[0])//batch_size):
    song_mask[i%(len(vocal_spectrogram[0])//batch_size), :] = masks[i, :]

# Split the spectrogram time samples in batches of size "batch_size", dropping those that don't fit
vocal_spectrograms = np.empty(((len(sample_vocal_spectrogram[0])//batch_size)*len(mus), len(sample_vocal_spectrogram), batch_size))
mixture_spectrograms = np.empty(((len(sample_mixture_spectrogram[0])//batch_size)*len(mus), len(sample_mixture_spectrogram), batch_size))
mixture_spectrograms_with_phase = np.empty(((len(sample_mixture_spectrogram[0])//batch_size)*len(mus), len(sample_mixture_spectrogram), batch_size), dtype=np.complex_)
idx = 0
for i in range(len(vocal_spectrogram[0])//batch_size):
    vocal_spectrograms[idx, :, :] =  np.array(sample_vocal_spectrogram[:, i*batch_size:(i+1)*batch_size])
    mixture_spectrograms[idx, :, :] = np.array(sample_mixture_spectrogram[:, i*batch_size:(i+1)*batch_size])
    mixture_spectrograms_with_phase[idx, :, :] = np.array(sample_mixture_spectrogram_with_phase[:, i*batch_size:(i+1)*batch_size], dtype=np.complex_)
    idx += 1

# Compute the vocals from the mixture using the model
predicted_vocal_spectrogram = np.zeros((len(sample_vocal_spectrogram), len(sample_vocal_spectrogram[0])), dtype=np.complex_)
reconstructed_mask = np.zeros((idx, len(vocal_spectrograms[0])))
for i in range(idx):
    sample_mixture_spectrogram = mixture_spectrograms[i, :, :]
    sample_vocal_spectrogram = vocal_spectrograms[i, :, :]
    
    sample_mixture_spectrogram = np.reshape(sample_mixture_spectrogram, (-1, batch_size*len(mixture_spectrograms[0])))

    # Convert the sample mixture spectrogram to a PyTorch tensor and add a batch dimension
    sample_mixture_spectrogram = torch.from_numpy(sample_mixture_spectrogram).unsqueeze(0).float()

    # Pass the sample mixture spectrogram through the model to obtain the frequency mask
    with torch.no_grad():
        mask = model(sample_mixture_spectrogram).squeeze().numpy()
        
    predicted_vocal_spectrogram_extract = np.zeros((len(vocal_spectrograms[0]), batch_size), dtype=np.complex_)
    for f in range(len(vocal_spectrograms[0])):
        #if song_mask[i, f] > 0.5: # If mask is close to 0, the spectrogram stays at 0
        if mask[f] > 0.005:
            predicted_vocal_spectrogram_extract[f, :] = mixture_spectrograms_with_phase[i, f, :]
        else:
            predicted_vocal_spectrogram_extract[f, :] = 0.0
        reconstructed_mask[i, f] = mask[f] 

    # Convert the predicted vocal spectrogram to a NumPy array
    predicted_vocal_spectrogram_extract = np.reshape(predicted_vocal_spectrogram_extract, (len(vocal_spectrograms[0]), -1))

    predicted_vocal_spectrogram[:, i*batch_size:(i+1)*batch_size] = predicted_vocal_spectrogram_extract
    
# Perform inverse spectrogram to obtain the isolated vocal audio
#predicted_vocal_audio = librosa.griffinlim(predicted_vocal_spectrogram)
reconstructed_vocal = librosa.istft(predicted_vocal_spectrogram, n_fft=2048, hop_length=512)

In [None]:
plt.title("Initial vocals extract")
plt.plot(vocals)
audio = ipd.Audio(vocals, rate=44100)
with open('initial.wav', 'wb') as f:
    f.write(audio.data)
ipd.Audio(vocals, rate=44100)

In [None]:
plt.title("Reconstructed vocals extract")
plt.plot(reconstructed_vocal)
audio = ipd.Audio(reconstructed_vocal, rate=44100)
with open('reconstructed.wav', 'wb') as f:
    f.write(audio.data)
ipd.Audio(reconstructed_vocal, rate=44100)

In [None]:
plt.title("Initial mixture extract")
plt.plot(mixture)
audio = ipd.Audio(mixture, rate=44100)
with open('mixture.wav', 'wb') as f:
    f.write(audio.data)
ipd.Audio(mixture, rate=44100)

In [None]:
predicted_vocal_spectrogram = librosa.amplitude_to_db(np.abs(predicted_vocal_spectrogram)) # Don't need the phase anymore
sample_vocal_spectrogram = extract_spectrogram(vocals)
print(len(sample_vocal_spectrogram))
print(len(sample_vocal_spectrogram[0]))

librosa.display.specshow(sample_vocal_spectrogram, sr=44100, hop_length=512, x_axis='time', y_axis='hz')
plt.colorbar(format='%+2.0f dB')

# Add labels and title
plt.title('Spectrogram of original vocals')  # Title of the plot
plt.xlabel('Time (s)')  # X-axis label representing time
plt.ylabel('Frequency (Hz)')  # Y-axis label representing frequency
plt.show()



librosa.display.specshow(predicted_vocal_spectrogram, sr=44100, hop_length=512, x_axis='time', y_axis='hz')
plt.colorbar(format='%+2.0f dB')

# Add labels and title
plt.title('Spectrogram of reconstructed vocals')  # Title of the plot
plt.xlabel('Time (s)')  # X-axis label representing time
plt.ylabel('Frequency (Hz)')  # Y-axis label representing frequency
plt.show()



error_spectrogram = librosa.amplitude_to_db(np.abs(librosa.db_to_amplitude(sample_vocal_spectrogram)-librosa.db_to_amplitude(predicted_vocal_spectrogram)))
librosa.display.specshow(error_spectrogram, sr=44100, hop_length=512, x_axis='time', y_axis='hz')
plt.colorbar(format='%+2.0f dB')

# Add labels and title
plt.title('Spectrogram of error = abs(original vocals - reconstructed vocals)')  # Title of the plot
plt.xlabel('Time (s)')  # X-axis label representing time
plt.ylabel('Frequency (Hz)')  # Y-axis label representing frequency
plt.show()



sample_mixture_spectrogram = extract_spectrogram(mixture)
librosa.display.specshow(sample_mixture_spectrogram, sr=44100, hop_length=512, x_axis='time', y_axis='hz')
plt.colorbar(format='%+2.0f dB')

# Add labels and title
plt.title('Spectrogram of original mixture')  # Title of the plot
plt.xlabel('Time (s)')  # X-axis label representing time
plt.ylabel('Frequency (Hz)')  # Y-axis label representing frequency
plt.show()




librosa.display.specshow(sample_mixture_spectrogram-sample_vocal_spectrogram, sr=44100, hop_length=512, x_axis='time', y_axis='hz')
plt.colorbar(format='%+2.0f dB')

# Add labels and title
plt.title('Spectrogram of non-vocals (= mixture-vocals)')  # Title of the plot
plt.xlabel('Time (s)')  # X-axis label representing time
plt.ylabel('Frequency (Hz)')  # Y-axis label representing frequency
plt.show()




librosa.display.specshow(sample_vocal_spectrogram > sample_mixture_spectrogram-sample_vocal_spectrogram, sr=44100, hop_length=512, x_axis='time', y_axis='hz')
plt.colorbar(format='%+2.0f dB')

# Add labels and title
plt.title('Result of vocal > non-vocal')  # Title of the plot
plt.xlabel('Time (s)')  # X-axis label representing time
plt.ylabel('Frequency (Hz)')  # Y-axis label representing frequency
plt.show()

In [None]:
# Target mask
song_nb = 0
target_mask_song = np.zeros((len(vocal_spectrogram[0])//batch_size, 1025))
for i in range(song_nb*len(vocal_spectrogram[0])//batch_size, (song_nb+1)*len(vocal_spectrogram[0])//batch_size):
    target_mask_song[i%(len(vocal_spectrogram[0])//batch_size), :] = masks[i, :]
plt.pcolormesh(np.transpose(target_mask_song))
plt.colorbar()
plt.show()

# Reconstructed mask
plt.pcolormesh(np.transpose(reconstructed_mask))
plt.colorbar()
plt.show()

In [None]:
L1error = np.sum(librosa.db_to_amplitude(error_spectrogram))/np.size(error_spectrogram)
print(L1error)
MSE = np.sum(librosa.db_to_amplitude(error_spectrogram)**2)/np.size(error_spectrogram)
print(MSE)