### imports

In [None]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [None]:
from config import DATASET_CONFIG, PREPROCESS_CONFIG
from preprocess.preprocess_tools import STFT, Scaler
import preprocess.utility as sp
import numpy as np
import os
import torch
import norbert
from train.model import Generalised_Recurrent_Model
import librosa as lib
import librosa.display as lib_display
# matplotlib for graphs
import matplotlib.pyplot as plot
import matplotlib.cm as cm
from IPython.display import Audio, display

### creating instances for preprocessing tools

In [None]:
# transformation object
transform = STFT(sr=DATASET_CONFIG.SR,
                 n_per_seg=DATASET_CONFIG.N_PER_SEG,
                 n_overlap=DATASET_CONFIG.N_OVERLAP)

# Scaler object
scaler = Scaler()

## Loading music file

In [None]:
track_path = r"../samples/Georgia Wonder - Siren/mixture.wav"
# time series data of mixture
data, sr = sp.read(track_path, stereo=True)
print("Mixture file time series data shape: ", data.shape)

# plotting data
_, axes = plot.subplots(1, figsize=(12, 10))
plot.subplot(3, 1, 2)
plot.title("Mixture")
lib_display.waveplot(np.ascontiguousarray(data.T, dtype=np.float32), sr=sr)

data = sp.to_mono(data)
print("Shape of mono mixture time series data: ", data.shape)

## Short time fourier transformation of time series data

In [None]:
# generate STFT of time series data, shape(nbframes, nb_bins, nb_channels)
mixture_tf = transform.stft(data.T)

# get spectrogram of STFT i.e., |Xi|, shape(nbframes, nb_bins, nb_channels)
mixture_stft = np.abs(mixture_tf)
f, axes = plot.subplots(1, figsize=(12, 10))
axes.set_title("Mixture STFT")
axes.pcolormesh(mixture_stft[..., 0].T)

### Checking how normal scaling looks Normal Scaling

In [None]:
#this is not an actual step its only for demonstration
usual_normalization = lib.util.normalize(mixture_stft)
_, axes = plot.subplots(1, figsize=(12, 10))
axes.set_title("Min-Max Normalized spectrogram")
axes.pcolormesh(usual_normalization[..., 0].T, cmap=cm.gray)

### Scaling/Normalizing transformed data

In [None]:
# scaling the values to 0 to 1, shape(nbframes, nb_bins, nb_channels)
X_scaled = scaler.scale(mixture_stft)
print("Scaled data shape :", X_scaled.shape)
print("Scaled data min :", np.max(X_scaled))
print("Scaled data max :", np.max(X_scaled))
print("Scaled data mean :", np.mean(X_scaled))
X_boundary = scaler.boundary
_, axes = plot.subplots(1, figsize=(12, 10))
axes.set_title("Scaled spectrogram")
axes.pcolormesh(X_scaled[..., 0].T, cmap=cm.gray)

In [None]:
# transposing the matrix to make it in shape (nb_batch, nb_frames, nb_bins)
X_scaled = np.transpose(X_scaled, (2, 0, 1))

## Loading model and predicting the results

In [None]:
# loading the model
path = r'H:\FYP\application/controllers/models/30_2019-04-07_11-49_Generalised_Recurrent_Model_relu_accompaniment_B16_H512_S5000_adam.pt'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
dnn_model = torch.load(path, map_location='cpu')
dnn_model.to(device)
dnn_model.eval()

In [None]:
with torch.no_grad():
    mixture_tensor = torch.tensor(X_scaled, dtype=torch.float32, device=device).to(device)
    estimate = dnn_model(mixture_tensor)

## Filtering the results to generate the output

In [None]:
# output tensor shape (nb_batch, nb_frames, nb_bins)
estimate_np = estimate[0].cpu().detach().numpy()

_, axes = plot.subplots(1, figsize=(12, 10))
axes.set_title("Estimates generated by model")
axes.pcolormesh(estimate_np[...].T, cmap=cm.gray)

In [None]:
# stacking the output to make it in stereo shape
# and transposing it back to shape (nb_frames, nb_bins, nb_channels)
estimate_stereo = np.stack([estimate_np, estimate_np]).transpose(1, 2, 0)
# intensifies the signal
estimate_stereo = estimate_stereo[..., None] ** 2

# stacking the mixture stft to make it in stereo shape
# and transposing it back to shape (nb_frames, nb_bins, nb_channels)
mixture_tf_squeeze = np.squeeze(mixture_tf)
mixture_tf_stereo = np.stack([mixture_tf_squeeze, mixture_tf_squeeze]).transpose(1, 2, 0)

# models the estimates to stft, frequency wise.
estimate_residual = norbert.residual(estimate_stereo, mixture_tf_stereo)
# applying wiener filers to get the sources
estimate_filter_results = norbert.wiener(np.copy(estimate_residual), np.copy(mixture_tf_stereo))

In [None]:
# predicted vocals
_, axes = plot.subplots(1, figsize=(12, 10))
axes.set_title("Estimated vocals")
pre_vocals_tf = np.abs(estimate_filter_results[..., 1].T[1])
pre_vocals_scaled = scaler.scale(pre_vocals_tf, boundary=X_boundary)
axes.pcolormesh(pre_vocals_scaled, cmap=cm.gray)
# predicted accompaniment
_, axes = plot.subplots(1, figsize=(12, 10))
axes.set_title("Estimated accompaniment")
pre_acc_tf = np.abs(estimate_filter_results[..., 0].T[1])
pre_acc_scaled = scaler.scale(pre_acc_tf, boundary=X_boundary)
axes.pcolormesh(pre_acc_scaled, cmap=cm.gray)

In [None]:
vocals_path = r"../samples/Georgia Wonder - Siren/vocals.wav"
# time series data of mixture
vocals_data, sr = sp.read(vocals_path, stereo=True)
vocals_tf = np.abs(transform.stft(vocals_data.T))
vocals_scaled = scaler.scale(vocals_tf, boundary=X_boundary)
# predicted vocals
_, axes = plot.subplots(1, figsize=(12, 10))
axes.set_title("Original vocals")
axes.pcolormesh(vocals_scaled[..., 0].T, cmap=cm.gray)
# plot.subplot(3, 1, 2)
# plot.title("Original vocals")
# lib_display.waveplot(np.ascontiguousarray(vocals_data.T, dtype=np.float32), sr=sr)

acc_path = r"../samples/Georgia Wonder - Siren/accompaniment.wav"
# time series data of mixture
acc_data, sr = sp.read(acc_path, stereo=True)
acc_tf = np.abs(transform.stft(acc_data.T))
acc_scaled = scaler.scale(acc_tf, boundary=X_boundary)
# predicted vocals
_, axes = plot.subplots(1, figsize=(12, 10))
axes.set_title("Original accompaniment")
axes.pcolormesh(acc_scaled[..., 0].T, cmap=cm.gray)
# plot.subplot(3, 1, 2)
# plot.title("Original accompaniment")
# lib_display.waveplot(np.ascontiguousarray(vocals_data.T, dtype=np.float32), sr=sr)

In [None]:
# filtered estimates
vocals_estimate = transform.istft(estimate_filter_results[..., 1]).T
acc_estimate = transform.istft(estimate_filter_results[..., 0]).T

In [None]:
display(Audio(acc_estimate.T, rate=44100))
display(Audio(vocals_estimate.T, rate=44100))

## TESTING Cells

In [None]:
# testing
# output tensor shape (nb_batch, nb_frames, nb_bins)
estimate_np = estimate[0].cpu().detach().numpy()
inverted = np.subtract(1.0, estimate_np)
print("estimate shape", estimate_np[...,None].shape)
print("estimate mean", np.mean(estimate_np))
print("estimate max", np.max(estimate_np))
print("estimate min", np.min(estimate_np))
print(mixture_tf.shape)
acc_estimate = mixture_tf*estimate_np[..., None]
res = transform.istft(acc_estimate).T
display(Audio(data.T, rate=44100))
display(Audio(res.T, rate=44100))