In [None]:
import barmuscomp.ae_utils as ae_utils
import barmuscomp.ae as ae
import barmuscomp.ae_ntd as ae_ntd
import barmuscomp.scripts.default_path as paths
import barmuscomp.scripts.overall_scripts as scr
import barmuscomp.model.features as features
import barmuscomp.model.pattern_study as pattern_study
import barmuscomp.model.plot_comparison_ae_ntd as plot_aentd
from barmuscomp.model.current_plot import *

import as_seg.CBM_algorithm as CBM
import as_seg.barwise_input as bi
import as_seg.data_manipulation as dm
import as_seg.autosimilarity_computation as as_comp

import math
import numpy as np
import pandas as pd
import mirdata
import os
import tensorly as tl
import librosa
import soundfile as sf
import torch
import IPython.display as ipd

In [None]:
## Important parameters
feature = "stft"
beta = 2
ntd_dimensions = [32,12,10]
autosimilarity_type = "Cosine"
n_epochs = 10

nb_bars = 16 ########## SDR is computed on these bars

plot_patterns = True
nb_patterns_to_show = 4

if beta == 2:
    recons_loss = torch.nn.MSELoss()
else:
    recons_loss = ae_utils.BetaDivergenceLoss(beta)

In [None]:
# Standard Hyperparemeters
subdivision = 96
time = nb_bars * subdivision
hop_length = 32
hop_length_seconds = hop_length/44100
batch_size = None
lr = 1e-3
compute_if_not_persisted = True
verbose = False
n_fft=2048
init_ntd = "tucker"

# Song
song_name = "The Beatles - Come Together"
song_path = "C:/Users/amarmore/this_folder/The Beatles - Come Together.wav"
the_signal, sampling_rate = sf.read(song_path)

# Get the downbeats
bars = np.load("C:/Users/amarmore/Desktop/data_persisted/bars/The Beatles - Come Together.npy") #dm.get_bars_from_audio(song_path)

median_hop = pattern_study.get_median_hop(bars, subdivision = subdivision)

stft_complex = librosa.core.stft(np.asfortranarray(the_signal[:,0]), n_fft=n_fft, hop_length = hop_length)
if the_signal.shape[1] > 1:
    for i in range(1,the_signal.shape[1]):
        stft_complex += librosa.core.stft(np.asfortranarray(the_signal[:,i]), n_fft=n_fft, hop_length = hop_length)
        
complex_tensor_stft = bi.tensorize_barwise_FTB(stft_complex, bars, hop_length_seconds, subdivision)

In [None]:
# NTD
tensor_spectrogram_ntd, tensor_phase = librosa.magphase(complex_tensor_stft, power=1) 
if beta == 2:
    persisted_arguments = f"_{song_name}_{feature}_{init_ntd}_{subdivision}"
    core, factors = scr.NTD_decomp_as_script(paths.path_data_persisted_come_together, persisted_arguments, tensor_spectrogram_ntd, ntd_dimensions, init = init_ntd, update_rule = "hals", beta = 2, compute_if_not_persisted = compute_if_not_persisted)
else:
    persisted_arguments = f"mu_slow_{song_name}_beta{beta}_{feature}_{init_ntd}_{subdivision}_n_iter_max1000"
    core, factors = scr.NTD_decomp_as_script(paths.path_data_persisted_come_together, persisted_arguments, tensor_spectrogram_ntd, ntd_dimensions, init = init_ntd, update_rule = "mu", beta = beta, compute_if_not_persisted = compute_if_not_persisted)
bn_latent_init_stds = [np.std(factors[2][:,i]) for i in range(factors[2].shape[1])]
bn_latent_init_avgs = [np.mean(factors[2][:,i]) for i in range(factors[2].shape[1])]

ntd_reconstruction = tl.tenalg.multi_mode_dot(core, factors)

nb_bars_song = tensor_spectrogram_ntd.shape[2]
freq_size = tensor_spectrogram_ntd.shape[0]

In [None]:
# AE
stft_mag = np.abs(stft_complex)
tensor_spectrogram_ae = bi.tensorize_barwise_BFT(stft_mag, bars, hop_length_seconds, subdivision)

fc_data_loader = ae_utils.generate_flatten_dataloader(tensor_spectrogram_ae, batch_size = batch_size)
conv_data_loader = ae_utils.generate_dataloader(tensor_spectrogram_ae, batch_size = batch_size)

In [None]:
#signal_stft_istft = librosa.istft(np.reshape((tensor_spectrogram_ntd*tensor_phase)[:,:,:nb_bars], (1025, time), order = 'F'), hop_length = median_hop)
#ipd.Audio(signal_stft_istft, rate=sampling_rate)

# Résultats NTD

In [None]:
autosimilarity_ntd = as_comp.switch_autosimilarity(factors[2], similarity_type = autosimilarity_type, normalise = True)
plot_me_this_spectrogram(autosimilarity_ntd, title = "Autosimilarity of Q")
print(f"NTD reconstruction error: {recons_loss(torch.tensor(tensor_spectrogram_ntd).float(), torch.tensor(ntd_reconstruction).float())}")

plot_me_this_spectrogram(factors[0], title = "W matrix")
plot_me_this_spectrogram(factors[1].T, title = "H matrix")
plot_me_this_spectrogram(factors[2].T, title = "Q matrix")

In [None]:
# Patterns Griffin-Lim
song_sdr_gl, patterns_sdr_gl, audio_patterns_ntd_gl = pattern_study.sdr_songscale_patternscale_encpasulation(core, factors, median_hop, 
                                         tensor_mag_original = tensor_spectrogram_ntd, tensor_phase_original = tensor_phase,
                                         nb_bars = nb_bars, phase_retrieval_song = "griffin_lim", phase_retrieval_patterns = "griffin_lim", subdivision = subdivision)

if plot_patterns:
    spec_patterns_ntd = []
    for i in range(nb_patterns_to_show):
        pattern = factors[0]@core[:,:,i]@factors[1].T
        spec_patterns_ntd.append(pattern)
        plot_me_this_spectrogram(pattern, title = f"{i}-th pattern")
        ipd.display(audio_patterns_ntd_gl[i])

In [None]:
# Patterns Masking
song_sdr_mask, patterns_sdr_mask, audio_patterns_ntd_mask = pattern_study.sdr_songscale_patternscale_encpasulation(core, factors, median_hop, 
                                         tensor_mag_original = tensor_spectrogram_ntd, tensor_phase_original = tensor_phase,
                                         nb_bars = nb_bars, phase_retrieval_song = "original_phase", phase_retrieval_patterns = "masking", subdivision = subdivision)

if plot_patterns:
    for i in range(nb_patterns_to_show):
        pattern = factors[0]@core[:,:,i]@factors[1].T
        plot_me_this_spectrogram(pattern, title = f"{i}-th pattern")
        ipd.display(audio_patterns_ntd_mask[i])

# Décodeur Aléatoire

## FC

In [None]:
fc_random_decoder = ae_ntd.FullyConnectedAutoencoderNTD(input_size_x = subdivision, input_size_y = freq_size, 
                                                        ntd_dimensions = ntd_dimensions, unfolded_G = None, W = None, H = None,
                                                        bn_latent_init_stds = None, bn_latent_init_avgs = None,
                                                        beta = beta, seed = 42)
fc_random_decoder, losses = fc_random_decoder.my_optim_method(n_epochs, fc_data_loader, verbose = verbose, lr = lr*10)
print(f"Final reconstruction error: {losses[-1]}")
plt.plot(losses)
plt.title("Reconstruction error with iterations")
plt.legend(["Reconstruction error"])
plt.yscale('log')
plt.show()

projection_fc_random = fc_random_decoder.get_latent_projection(fc_data_loader)

In [None]:
plot_aentd.plot_comparison_this_ae_ntd(fc_random_decoder, projection_fc_random, median_hop, factors, tensor_spectrogram_ntd, tensor_phase, 
                                       nb_bars, phase_retrieval_song = "griffin_lim", phase_retrieval_patterns= "griffin_lim",
                            autosimilarity_type = autosimilarity_type, plot_patterns = True, nb_patterns_to_show = nb_patterns_to_show,
                            spec_patterns_ntd = spec_patterns_ntd, signal_patterns_ntd = audio_patterns_ntd_gl)

In [None]:
plot_aentd.plot_comparison_this_ae_ntd(fc_random_decoder, projection_fc_random, median_hop, factors,
                            tensor_spectrogram_ntd, tensor_phase,
                            nb_bars, phase_retrieval_song = "original_phase", phase_retrieval_patterns= "masking",
                            autosimilarity_type = autosimilarity_type, plot_patterns = True, nb_patterns_to_show = nb_patterns_to_show,
                            spec_patterns_ntd = spec_patterns_ntd, signal_patterns_ntd = audio_patterns_ntd_mask)

# Decodeur General Init

In [None]:
import sklearn.cluster
path_to_save_cluster = "C:/Users/amarmore/Desktop/data_persisted/cluster_matrices"

In [None]:
big_h = []
big_w = []
ntd_dimensions_load_cluster = [32, 16, 16]
for song_name in range(1,101):
    if beta == 2:
        persisted_arguments = f"_{song_name}_nn_log_mel_grill_{init_ntd}_{subdivision}"
        path_ntd = f"{paths.path_data_persisted_rwc}/ntd/{ntd_dimensions_load_cluster[0]}_{ntd_dimensions_load_cluster[1]}_{ntd_dimensions_load_cluster[2]}"
    else:
        persisted_arguments = f"mu_slow_{song_name}_beta{beta}_nn_log_mel_grill_{init_ntd}_{subdivision}_n_iter_max1000"
        path_ntd = f"{paths.path_data_persisted_rwc}/ntd_mu/{ntd_dimensions_load_cluster[0]}_{ntd_dimensions_load_cluster[1]}_{ntd_dimensions_load_cluster[2]}"

    factors_tmp = np.load(f"{path_ntd}/factors{persisted_arguments}.npy", allow_pickle = True)

    for column_h in factors_tmp[1].T:
        big_h.append(column_h)
    for column_w in factors_tmp[0].T:
        big_w.append(column_w)

In [None]:
kmeansh = sklearn.cluster.KMeans(n_clusters = ntd_dimensions[1], random_state = 42).fit(big_h)
k_means_h = kmeansh.cluster_centers_
k_means_h = k_means_h.T
#np.save(f"{path_to_save_cluster}/kmeans_H_rwcpop_dimensions{ntd_dimensions}_beta{beta}", k_means_h)
#k_means_h = np.load(f"{path_to_save_cluster}/kmeans_H_rwcpop_dimensions{ntd_dimensions}_beta{beta}.npy")
perm_cluster_h = permutate_factor(k_means_h)
plot_me_this_spectrogram(k_means_h[:,perm_cluster_h].T, x_axis = "Time, at the barscale", y_axis = "Rhythmic patterns",
                         title = None)#"Cluster centroids for H")  

In [None]:
kmeansw = sklearn.cluster.KMeans(n_clusters = ntd_dimensions[0], random_state = 42).fit(big_w)
k_means_w = kmeansw.cluster_centers_
k_means_w = k_means_w.T
k_means_w_mel = librosa.db_to_power(k_means_w) - np.ones(k_means_w.shape)    
k_means_w_stft = librosa.feature.inverse.mel_to_stft(k_means_w_mel, sr=44100, n_fft=2048, power=2.0, fmin=80.0, fmax=16000)

#np.save(f"{path_to_save_cluster}/kmeans_W_rwcpop_dimensions{ntd_dimensions}_beta{beta}", k_means_w)
#k_means_w = np.load(f"{path_to_save_cluster}/kmeans_W_rwcpop_dimensions{ntd_dimensions}_beta{beta}.npy")
perm_cluster_w = permutate_factor(k_means_w_stft)
plot_me_this_spectrogram(k_means_w_stft[:,perm_cluster_w], x_axis = "Frequential patterns", y_axis = "Mel scale",
                         title = None)#"Cluster centroids for W"

In [None]:
fc_init_centroid = ae_ntd.FullyConnectedAutoencoderNTD(input_size_x = subdivision, input_size_y = freq_size, 
                                                        ntd_dimensions = ntd_dimensions, unfolded_G = None, W = k_means_w_stft, H = k_means_h,
                                                        bn_latent_init_stds = None, bn_latent_init_avgs = None,
                                                        beta = beta, seed = 42)
fc_init_centroid, losses = fc_init_centroid.my_optim_method(n_epochs, fc_data_loader, verbose = verbose, lr = lr)
print(f"Final reconstruction error: {losses[-1]}")
plt.plot(losses)
plt.title("Reconstruction error with iterations")
plt.legend(["Reconstruction error"])
plt.yscale('log')
plt.show()

projection_fc_init_centroid = fc_init_centroid.get_latent_projection(fc_data_loader)

In [None]:
plot_aentd.plot_comparison_this_ae_ntd(fc_init_centroid, projection_fc_init_centroid, median_hop, factors, tensor_spectrogram_ntd, tensor_phase, 
                                       nb_bars, phase_retrieval_song = "griffin_lim", phase_retrieval_patterns= "griffin_lim",
                            autosimilarity_type = autosimilarity_type, plot_patterns = True, nb_patterns_to_show = nb_patterns_to_show,
                            spec_patterns_ntd = spec_patterns_ntd, signal_patterns_ntd = audio_patterns_ntd_gl)

In [None]:
plot_aentd.plot_comparison_this_ae_ntd(fc_init_centroid, projection_fc_init_centroid, median_hop, factors,
                            tensor_spectrogram_ntd, tensor_phase,
                            nb_bars, phase_retrieval_song = "original_phase", phase_retrieval_patterns= "masking",
                            autosimilarity_type = autosimilarity_type, plot_patterns = True, nb_patterns_to_show = nb_patterns_to_show,
                            spec_patterns_ntd = spec_patterns_ntd, signal_patterns_ntd = audio_patterns_ntd_mask)

# Décodeur initalisé avec NTD

## FC

In [None]:
fc_init_ntd = ae_ntd.FullyConnectedAutoencoderNTD(input_size_x = subdivision, input_size_y = freq_size, 
                                                        ntd_dimensions = ntd_dimensions, unfolded_G = tl.unfold(core, 2),
                                                        W = factors[0], H = factors[1],
                                                        bn_latent_init_stds = bn_latent_init_stds, bn_latent_init_avgs = bn_latent_init_avgs,
                                                        beta = beta, seed = 42)
fc_init_ntd, losses = fc_init_ntd.my_optim_method(n_epochs, fc_data_loader, verbose = verbose, lr = lr)
print(f"Final reconstruction error: {losses[-1]}")
plt.plot(losses)
plt.title("Reconstruction error with iterations")
plt.legend(["Reconstruction error"])
plt.yscale('log')
plt.show()

projection_fc_init_ntd = fc_init_ntd.get_latent_projection(fc_data_loader)

In [None]:
plot_aentd.plot_comparison_this_ae_ntd(fc_init_ntd, projection_fc_init_ntd, median_hop, factors, tensor_spectrogram_ntd, tensor_phase, 
                                       nb_bars, phase_retrieval_song = "griffin_lim", phase_retrieval_patterns= "griffin_lim",
                            autosimilarity_type = autosimilarity_type, plot_patterns = True, nb_patterns_to_show = nb_patterns_to_show,
                            spec_patterns_ntd = spec_patterns_ntd, signal_patterns_ntd = audio_patterns_ntd_gl)

In [None]:
plot_aentd.plot_comparison_this_ae_ntd(fc_init_ntd, projection_fc_init_ntd, median_hop, factors,
                            tensor_spectrogram_ntd, tensor_phase,
                            nb_bars, phase_retrieval_song = "original_phase", phase_retrieval_patterns= "masking",
                            autosimilarity_type = autosimilarity_type, plot_patterns = True, nb_patterns_to_show = nb_patterns_to_show,
                            spec_patterns_ntd = spec_patterns_ntd, signal_patterns_ntd = audio_patterns_ntd_mask)