## Simulation d'une situation réel avec EnsembleSet

In [None]:
import IPython.display as ipd
from IPython.display import display
import pickle
import librosa
import soundfile as sf
import os
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import itertools
import operator
import scipy as sp
import numpy as np
from numpy import typing
import pyroomacoustics as pra
from mir_eval.separation import bss_eval_sources

path_in = "./data/prise_de_son/Sounds/Full_Version/Take_4"
dictionnary = "./data/prise_de_son/Sounds/Scales/Matrix/"
save_path = "./test/prise_de_son_perf/"

In [None]:
from src import performance, data_processing, fast_nmf

### Chargmenent de la data et preprocess

In [None]:
# load the data
files_in, files_tilte = data_processing.get_files(path_in, ".wav")

In [None]:
# Microphone à utiliser pour la séparation de sources
# microphone_names = ["Cello", "Center", "Clarinet", "Flute", "Left", "Right", "Violin1", "Violin2"]
microphone_names = ["Left", "Center", "Right", "Violin1", "Violin2", "Flute", "Clarinet", "Cello"]

nb_mics = len(microphone_names)  # number of microphones
start_time = 37  # seconds
audio_length = 10  # seconds
samplerate = 48000  # sampling rate

mics_signals = np.zeros((nb_mics, audio_length * samplerate), dtype=np.float32)

# extract data
for mics_index, name in enumerate(microphone_names):
    for i, file in enumerate(files_in):
        if name in file:
            print(file)
            data, samplerate = librosa.load(file, sr=None, mono=True)
            mics_signals[mics_index, :] =  data[start_time*samplerate : (start_time+audio_length)*samplerate]

In [None]:
for i, name in enumerate(microphone_names):
    print(f"Microphone: {microphone_names[i]}")
    display(ipd.Audio(mics_signals[i], rate=samplerate))

### Définition des variables globales et création des dossiers de visualisations

In [None]:
SAVE_FIG = (
    False  # Si True, on sauvegarde toutes les figures dans les sous-dossiers de 'test'
)
SAVE_AUDIO = False  # Si True, on sauvegarde tous les fichiers audio dans le sous-dossiers 'audio' de 'test'
SAVE_PERF = (
    False  # Si True, on sauvegarde toutes les figures dans les sous-dossiers de 'test'
)
TYPE = "stft"

In [None]:
# Permet de créé les dossiers de sauvegarde si les variables permettant la sauvegarde
# sont à True

try:
    os.mkdir("./test")

except OSError as error:
    pass


if SAVE_FIG:
    try:
        os.mkdir(save_path)

        try:
            os.mkdir(save_path + "activation")
            os.mkdir(save_path + "base")
            os.mkdir(save_path + "spectro")
            os.mkdir(save_path + "mix")
        except OSError as error:
            pass

    except OSError as error:
        pass

if SAVE_AUDIO:
    try:
        os.mkdir(save_path + "audios")
        os.mkdir(save_path + "audios/separation")
        os.mkdir(save_path + "audios/no_separation")
        
        for name in microphone_names:
            os.mkdir(save_path + "audios/separation/micro_" + name)

    except OSError as error:
        pass

In [None]:
L = 16384  # Taille de la fenêtre - Nfft
hop = L // 4  # Pas de la fenêtre

# Transformations des signaux audio en STFT_multichannel en prennant en compte la room et l'emplacement des micros et sources
X = data_processing.spectrogram_from_mics_signal(
    mics_signals,
    microphone_names,
    rate=samplerate,
    L=L,
    hop=hop,
    type=TYPE,
    display_audio=False,
    display_spectrogram=False,
)

#### Définitions des W (Dictionnaires)

In [None]:
W_files = [f for f in os.listdir(dictionnary) if f.endswith(".npy")]

E = np.zeros((5, L // 2+1, 78))

# Load the audio files
for i in range(5):
    E[i] = np.load(dictionnary + W_files[i])
    print(np.min(E[i]))
    plt.imshow(np.log(E[i]), aspect="auto")
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.title(W_files[i])
    plt.show()

### Fast MNMF pour la séparation

In [None]:
# # Parametres de l'algorithme
# n_basis = 32  # Nombre de bases
# n_iter = 200 # Nombre d'itérations de l'algorithme

# print("Running fastmnmf2 with auxiva algorithm")
# Y = pra.bss.auxiva(X, 
#                 n_src=5, 
#                 n_iter=200, 
#                 proj_back=True, 
#                 W0=None, 
#                 model='laplace', 
#                 init_eig=False, 
#                 return_filters=False, 
#                 callback=None
# )

In [None]:
# Parametres de l'algorithme
n_basis = 32  # Nombre de bases
n_iter = 1 # Nombre d'itérations de l'algorithme

print("Running fastmnmf2 with Louis custom algorithm")
Y, W_NFK, H_NKT, g_NM, Q_FMM, Qx_FTM, X_tilde_FTM, Y_tilde_FTM = fast_nmf.fast_MNMF2(
    X.transpose(1, 0, 2),
    n_iter=n_iter,
    n_microphones=mics_signals.shape[0],
    n_sources=5,
    n_time_frames=X.shape[0],
    n_freq_bins=X.shape[1],
    n_basis=n_basis,
    algo="IP",
    mic_index=None,
    split=True,
    n_activations=X.shape[0] - 1,
    n_notes=78,
    E_NFL=E
)
Y = Y.transpose(0, 3, 2, 1)

In [None]:
if TYPE == 'stft':
    # STFT parameters
    win_a = pra.hamming(L)
    win_s = pra.transform.stft.compute_synthesis_window(win_a, hop)


    y = []
    for i in range(len(Y)):
        signal_ = pra.transform.stft.synthesis(Y[i], L, hop, win=win_s)
        signal_ = signal_[L - hop :, :].T
        y.append(signal_)
        
elif TYPE == 'cqt':
    y = []
    for i in range(len(Y)):
        signal_ = librosa.icqt(Y[i].transpose(2,1,0),
                               sr=samplerate, 
                               hop_length=hop, 
                               fmin=None, 
                               bins_per_octave=12, 
                               tuning=0.0, 
                               filter_scale=1, 
                               norm=1, 
                               sparsity=0.01, 
                               window='hann', 
                               scale=True, 
                               length=None, 
                               res_type='fft', 
                               dtype=None)
        y.append(signal_)


# shape of y = (n_mics, n_sources, n_samples)
y = np.array(y)

### Un peu de visualisation

In [None]:
mic_plot_sep = 1

plt.specgram(mics_signals[mic_plot_sep], NFFT=1024, Fs=samplerate)
plt.title("Signal micro " + microphone_names[mic_plot_sep])
plt.show()

In [None]:
# Correspond à la séparation de source, micro_plot_sep

fig = plt.figure()
fig.set_size_inches(10, 8)

gs = gridspec.GridSpec(3, 2)

for i in range(2):
    ax = fig.add_subplot(gs[0, i])  
    ax.specgram(y[mic_plot_sep][i, :], NFFT=1024, Fs=samplerate)
    ax.set_title(f"Source {i} (séparé)")

for i in range(2):
    ax = fig.add_subplot(gs[1, i])  
    ax.specgram(y[mic_plot_sep][i+2, :], NFFT=1024, Fs=samplerate)
    ax.set_title(f"Source {i+2} (séparé)")


ax = fig.add_subplot(gs[2, :])  
ax.specgram(y[mic_plot_sep][4, :], NFFT=1024, Fs=samplerate)
ax.set_title(f"Source {5} (séparé)")


plt.tight_layout()

fig.tight_layout(pad=2.5)
fig.suptitle(
    "spectro_source0123_micro_"
    + microphone_names[mic_plot_sep]
    + "_n_basis_"
    + str(n_basis)
    + "_n_fft_"
    + str(L)
)

if SAVE_FIG:
    fig.savefig(
        save_path
        + "spectro/"
        + "spectro_source0123_micro_"
        + microphone_names[mic_plot_sep]
        + "_n_basis_"
        + str(n_basis)
        + "_n_fft_"
        + str(L)
        + ".pdf"
    )

#### Ecoute maintenant 

##### Pas de séparation

In [None]:
print("Listening of Audio at each microphones without separation")

for micro_n in range(mics_signals.shape[0]):
    print("Microphone " + microphone_names[micro_n] + " : ")
    display(ipd.Audio(mics_signals[micro_n, :], rate=samplerate))
    if SAVE_AUDIO:
        sp.io.wavfile.write(
            save_path
            + "audios/no_separation/"
            + "micro_" + microphone_names[micro_n]
            + ".wav",
            samplerate,
            mics_signals[micro_n],
        )

##### Séparation des sources

In [None]:
def ecoute_separation_micro(mic, microphone_names, y, save, rate):
    """Fonction permettant d'écouter les audios séparés pour un micro donné et de les sauvegarder si save=True

    Args:
        mic (int): index du microphone dont on veut écouter la séparation
        y (array): array contenant les signaux audios séparés pour chaque microphones
        save (boolean): True si on veut sauvegarder les audios séparés, False sinon
    """

    for source_n in range(len(y[0])):
        print(
            "Listening of Audio at microphone "
            + microphone_names[mic]
            + " for source "
            + str(source_n)
        )
        display(ipd.Audio(y[mic][source_n], rate=rate))
        if save:
                sp.io.wavfile.write(
                    save_path
                    + "audios/separation/micro_"
                    + microphone_names[mic]
                    + "/"
                    + "source_"+str(source_n)+"_micro_"
                    + microphone_names[mic]
                    + "_audio_length_"
                    + str(audio_length)
                    + "_n_basis_"
                    + str(n_basis)
                    + "_n_fft_"
                    + str(L)
                    + ".wav",
                    rate,
                    y[mic][source_n].astype(np.float32),
                )

In [None]:
ecoute_separation_micro(0, microphone_names, y, SAVE_AUDIO, rate=samplerate)

In [None]:
ecoute_separation_micro(1, microphone_names, y, SAVE_AUDIO, rate=samplerate)

In [None]:
ecoute_separation_micro(2, microphone_names, y, SAVE_AUDIO, rate=samplerate)

In [None]:
ecoute_separation_micro(3, microphone_names, y, SAVE_AUDIO, rate=samplerate)

In [None]:
ecoute_separation_micro(4, microphone_names, y, SAVE_AUDIO, rate=samplerate)

In [None]:
ecoute_separation_micro(5, microphone_names, y, SAVE_AUDIO, rate=samplerate)

In [None]:
ecoute_separation_micro(6, microphone_names, y, SAVE_AUDIO, rate=samplerate)

In [None]:
ecoute_separation_micro(7, microphone_names, y, SAVE_AUDIO, rate=samplerate)

### Visualisation des matrices intermédiaires

#### Représentation of g, and G (covariance et matrice spatiale)

In [None]:
plt.imshow(g_NM[:, :], cmap="inferno", aspect="auto")
plt.title("g_NM")
plt.tight_layout()

if SAVE_FIG:
    plt.savefig(
        save_path
        + "mix/g_mn_n_basis_"
        + str(n_basis)
        + "_n_fft_"
        + str(L)
        + ".pdf"
    )

In [None]:
def G_mix(Q_FMM, g_NM):
    g_NMM = []
    for n_source in range(g_NM.shape[0]):
        g_NMM.append(np.diag(g_NM[n_source]))

    g_NMM = np.array(g_NMM)

    G_NF = np.einsum(
        "fij, nij, ijf -> ni",
        np.linalg.inv(Q_FMM),
        g_NMM,
        np.linalg.inv(Q_FMM).conj().T,
    )
    return G_NF

In [None]:
# G_NF
G_NF = G_mix(Q_FMM, g_NM)

In [None]:
plt.imshow(np.log(np.abs(G_NF)), cmap="inferno", aspect="auto")
plt.colorbar()
plt.title("G_NF_n_basis_" + str(n_basis) + "_n_fft_" + str(L))
plt.tight_layout()

if SAVE_FIG:
    plt.savefig(
        save_path
        + "mix/G_NF_n_basis_"
        + str(n_basis)
        + "_n_fft_"
        + str(L)
        + ".pdf"
    )

#### Représentation de U

In [None]:
# Correspond à la séparation de source, micro_plot_sep

fig = plt.figure()
fig.set_size_inches(10, 6)

gs = gridspec.GridSpec(3, 2)

for i in range(2):
    ax = fig.add_subplot(gs[0, i])
    ax.imshow(np.log(W_NFK[i, :, :]), cmap="inferno", aspect="auto")
    ax.invert_yaxis()
    ax.set_title(f"Matrice de base W{i}")

for i in range(2):
    ax = fig.add_subplot(gs[1, i])  
    ax.imshow(np.log(W_NFK[i+2, :, :]), cmap="inferno", aspect="auto")
    ax.invert_yaxis()
    ax.set_title(f"Matrice de base W{i+2}")


ax = fig.add_subplot(gs[2, :])  
ax.imshow(np.log(W_NFK[4, :, :]), cmap="inferno", aspect="auto")
ax.invert_yaxis()
ax.set_title(f"Matrice de base W{4}")


plt.tight_layout()

fig.tight_layout(pad=2.5)
fig.suptitle("W_NFK_n_basis_" + str(n_basis) + "_n_fft_" + str(L))

if SAVE_FIG:
    fig.savefig(
        save_path
        + "base/W_NFK_n_basis_"
        + str(n_basis)
        + "_n_fft_"
        + str(L)
        + ".pdf"
    )

#### Représentation de T

In [None]:
# Correspond à la séparation de source, micro_plot_sep

fig = plt.figure()
fig.set_size_inches(10, 8)

gs = gridspec.GridSpec(3, 2)

for i in range(2):
    ax = fig.add_subplot(gs[0, i])  
    ax.imshow(H_NKT[i, :, :], cmap="inferno", aspect="auto")
    ax.invert_yaxis()
    ax.set_title(f"Matrice de base H{i}")

for i in range(2):
    ax = fig.add_subplot(gs[1, i])  
    ax.imshow(H_NKT[i+2, :, :], cmap="inferno", aspect="auto")
    ax.invert_yaxis()
    ax.set_title(f"Matrice de base H{i+2}")


ax = fig.add_subplot(gs[2, :])  
ax.imshow(H_NKT[4, :, :], cmap="inferno", aspect="auto")
ax.invert_yaxis()
ax.set_title(f"Matrice de base H{4}")


plt.tight_layout()

fig.tight_layout(pad=2.5)
fig.suptitle("H_NKT_n_basis_" + str(n_basis) + "_n_fft_" + str(L))

if SAVE_FIG:
    fig.savefig(
        save_path
        + "activation/H_NKT_n_basis_"
        + str(n_basis)
        + "_n_fft_"
        + str(L)
        + ".pdf"
    )

### Représentation de W et H

In [None]:
W_old_NFK = np.einsum("nfl, nlk -> nfk", E, W_NFK)

In [None]:
# Correspond à la séparation de source, micro_plot_sep

fig = plt.figure()
fig.set_size_inches(10, 8)

gs = gridspec.GridSpec(3, 2)

for i in range(2):
    ax = fig.add_subplot(gs[0, i])  
    ax.imshow(W_old_NFK[i, :, :], cmap="inferno", aspect="auto")
    ax.invert_yaxis()
    ax.set_title(f"Matrice de base W{i}")

for i in range(2):
    ax = fig.add_subplot(gs[1, i])  
    ax.imshow(W_old_NFK[i+2, :, :], cmap="inferno", aspect="auto")
    ax.invert_yaxis()
    ax.set_title(f"Matrice de base W{i+2}")


ax = fig.add_subplot(gs[2, :])  
ax.imshow(W_old_NFK[4, :, :], cmap="inferno", aspect="auto")
ax.invert_yaxis()
ax.set_title(f"Matrice de base W{4}")


plt.tight_layout()

In [None]:
_, P_NOT = fast_nmf.init_EP_split(n_sources=5,
        n_time_frames=X.shape[0],
        n_FFT=L,
        n_activations=X.shape[0] - 1,
        n_notes=X.shape[1] - 1
        )

H_NKT_old = np.einsum("nko, not -> nkt", H_NKT, P_NOT)

In [None]:
# Correspond à la séparation de source, micro_plot_sep

fig = plt.figure()
fig.set_size_inches(10, 8)

gs = gridspec.GridSpec(3, 2)

for i in range(2):
    ax = fig.add_subplot(gs[0, i])  
    ax.imshow(H_NKT_old[i, :, :], cmap="inferno", aspect="auto")
    ax.invert_yaxis()
    ax.set_title(f"Matrice de base H{i}")

for i in range(2):
    ax = fig.add_subplot(gs[1, i])  
    ax.imshow(H_NKT_old[i+2, :, :], cmap="inferno", aspect="auto")
    ax.invert_yaxis()
    ax.set_title(f"Matrice de base H{i+2}")


ax = fig.add_subplot(gs[2, :])  
ax.imshow(H_NKT_old[4, :, :], cmap="inferno", aspect="auto")
ax.invert_yaxis()
ax.set_title(f"Matrice de base H{4}")


plt.tight_layout()