In [None]:
import sys
import os

os.chdir("..")
os.chdir("./src")
# sys.path.append("./src")

In [None]:
from IPython import display
from IPython.display import clear_output
import pylab as pl
import numpy as np
import matplotlib.pyplot as plt
from numba import njit, jit
from time import time

# from helpers import *
from LDMIBSS import *
from scipy import linalg

# np.random.seed(13)
%load_ext autoreload
%autoreload 2

In [None]:
NumberofSources = 5
NumberofMixtures = 10
N = 100000
# https://stackoverflow.com/questions/65154622/sample-uniformly-at-random-from-a-simplex-in-python
S = np.random.exponential(scale=1.0, size=(NumberofSources, int(N)))
S = S / np.sum(S, axis=0)
# Generate Mxr random mixing from i.i.d N(0,1)
A = np.random.randn(NumberofMixtures, NumberofSources)
X = np.dot(A, S)

print("The following is the mixture matrix A")
display_matrix(A)

SNR = 30
X, NoisePart = addWGN(X, SNR, return_noise=True)

SNRinp = 10 * np.log10(
    np.sum(np.mean((X - NoisePart) ** 2, axis=1))
    / np.sum(np.mean(NoisePart**2, axis=1))
)
print("The following is the mixture matrix A")
display_matrix(A)
print("Input SNR is : {}".format(SNRinp))
plt.scatter(S[0, :], S[2, :])
plt.show()

In [None]:
subplot_1D_signals(
    S.T[0:100], title="Original Signals", figsize=(15.2, 9), colorcode=None
)
subplot_1D_signals(
    X.T[0:100], title="Mixture Signals", figsize=(15, 18), colorcode=None
)

In [None]:
S.sum(axis=0)

In [None]:
lambday = 1 - 1e-1 / 10
lambdae = 1 - 1e-1 / 10
s_dim = S.shape[0]
x_dim = X.shape[0]

# Inverse output covariance
By = 5 * np.eye(s_dim)
# Inverse error covariance
Be = 1000 * np.eye(s_dim)

debug_iteration_point = 1000
model = OnlineLDMIBSS(
    s_dim=s_dim,
    x_dim=x_dim,
    muW=30 * 1e-3,
    lambday=lambday,
    lambdae=lambdae,
    By=By,
    Be=Be,
    neural_OUTPUT_COMP_TOL=1e-6,
    set_ground_truth=True,
    S=S,
    A=A,
)

In [None]:
model.fit_batch_simplex(
    X=X,
    n_epochs=1,
    neural_dynamic_iterations=500,
    plot_in_jupyter=True,
    neural_lr_start=0.1,
    neural_lr_stop=0.001,
    debug_iteration_point=debug_iteration_point,
    shuffle=True,
)

In [None]:
mpl.rcParams["xtick.labelsize"] = 18
mpl.rcParams["ytick.labelsize"] = 18
plot_convergence_plot(
    model.SINR_list,
    xlabel="Number of Iterations / {}".format(debug_iteration_point),
    ylabel="SINR (dB)",
    title="SINR Convergence Plot",
    colorcode=None,
    linewidth=1.8,
)

print("Final SINR: {}".format(np.array(model.SINR_list[-1])))

In [None]:
Wf = model.compute_overall_mapping(return_mapping=True)
Y = Wf @ X
Y_ = signed_and_permutation_corrected_sources(S.T, Y.T).T
Y_.shape, X.shape, S.shape

In [None]:
subplot_1D_signals(
    X=Y_.T[0:100],
    title="Extracted Signals (Sign and Permutation Corrected)",
    figsize=(15.2, 9),
    colorcode=None,
)
subplot_1D_signals(
    X=S.T[0:100], title="Original Signals", figsize=(15.2, 9), colorcode=None
)

In [None]:
Szeromean = S - S.mean(axis=1).reshape(-1, 1)
Wf = model.compute_overall_mapping(return_mapping=True)
Y = Wf @ X
Yzeromean = Y - Y.mean(axis=1).reshape(-1, 1)
Y_ = model.signed_and_permutation_corrected_sources(Szeromean.T, Yzeromean.T)

coef_ = (Y_ * Szeromean.T).sum(axis=0) / (Y_ * Y_).sum(axis=0)
Y_ = coef_ * Y_
print("Component SNR Values : {}\n".format(snr(Szeromean.T, Y_)))

SINR = 10 * np.log10(model.CalculateSINR(Y_.T, Szeromean)[0])

print("Overall SINR : {}".format(SINR))

In [None]:
import mir_eval

In [None]:
S.shape, Y_.T.shape

In [None]:
mir_eval.separation.bss_eval_sources(Szeromean, Y_.T, compute_permutation=True)