In [1]:
import sys
import os
sys.path.append("../src")

import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import pylab as pl

from WSMBSS import *
from general_utils import *
from visualization_utils import * 

import warnings
warnings.filterwarnings("ignore")

notebook_name = 'Nonnegative_Antisparse_Copula'

In [2]:
@njit
def run_neural_dynamics_nnantisparse_jit(x_current, h, y, M_H, M_Y, W_HX, W_YH, D1, D2, beta, zeta, 
                                         neural_dynamic_iterations, lr_start, lr_stop, 
                                         lr_rule, lr_decay_multiplier = 0.01, hidden_layer_gain = 2, 
                                         use_hopfield = True, OUTPUT_COMP_TOL = 1e-7):
    """_summary_

    Args:
        x_current (_type_): _description_
        h (_type_): _description_
        y (_type_): _description_
        M_H (_type_): _description_
        M_Y (_type_): _description_
        W_HX (_type_): _description_
        W_YH (_type_): _description_
        D1 (_type_): _description_
        D2 (_type_): _description_
        beta (_type_): _description_
        zeta (_type_): _description_
        neural_dynamic_iterations (_type_): _description_
        lr_start (_type_): _description_
        lr_stop (_type_): _description_
        OUTPUT_COMP_TOL (_type_): _description_
    """
    def offdiag(A, return_diag = False):
        """_summary_

        Args:
            A (_type_): _description_
            return_diag (bool, optional): _description_. Defaults to False.

        Returns:
            _type_: _description_
        """
        if return_diag:
            diag = np.diag(A)
            return A - np.diag(diag), diag
        else:
            return A - np.diag(diag)

    M_hat_H, Gamma_H = offdiag(M_H, True)
    M_hat_Y, Gamma_Y = offdiag(M_Y, True)

    mat_factor1 = (1 - zeta) * beta * (D1 * W_HX)
    mat_factor2 = ((1 - zeta) * (1 - beta) * M_hat_H  + (1- zeta) * beta * ((D1 * M_hat_H) * D1.T))
    mat_factor3 = (1 - zeta) * (1 - beta) * (W_YH.T * D2.T)
    mat_factor4 = (1 - zeta) * Gamma_H * ((1 - beta) + beta * (D1.T) ** 2)
    mat_factor5 = M_hat_Y * D2.T
    mat_factor6 = Gamma_Y * D2.T

    v = mat_factor4[0] * h
    u = mat_factor6[0] * y

    PreviousMembraneVoltages = {'v': np.zeros_like(v), 'u': np.zeros_like(u)}
    MembraneVoltageNotSettled = 1
    OutputCounter = 0
    while MembraneVoltageNotSettled & (OutputCounter < neural_dynamic_iterations):
        OutputCounter += 1
        if lr_rule == "constant":
            MUV = lr_start
        elif lr_rule == "divide_by_loop_index":
            MUV = max(lr_start/(1+OutputCounter), lr_stop)
        elif lr_rule == "divide_by_slow_loop_index":
            MUV = max(lr_start/(1+OutputCounter*lr_decay_multiplier), lr_stop)

        delv = -v + mat_factor1 @ x_current - mat_factor2 @ h + mat_factor3 @ y
        v = v + MUV * delv
        h = v / mat_factor4[0]
        h = np.clip(h, -hidden_layer_gain, hidden_layer_gain)
        delu = -u + W_YH @ h - mat_factor5 @ y
        u = u + MUV * delu
        y = u / mat_factor6[0]
        y = np.clip(y, 0, 1)

        MembraneVoltageNotSettled = 0
        if (np.linalg.norm(v - PreviousMembraneVoltages['v'])/np.linalg.norm(v) > OUTPUT_COMP_TOL) | (np.linalg.norm(u - PreviousMembraneVoltages['u'])/np.linalg.norm(u) > OUTPUT_COMP_TOL):
            MembraneVoltageNotSettled = 1
        PreviousMembraneVoltages['v'] = v
        PreviousMembraneVoltages['u'] = u

    return h,y, OutputCounter

@njit
def update_weights_jit(x_current, h, y, zeta, beta, W_HX, W_YH, M_H, M_Y, D1, D2, 
                       MU_MH, MU_MY, MU_WHX, MU_WYH, muD, LayerMinimumGains, 
                       LayerMaximumGains, clip_gain_gradients = False):
    """_summary_

    Args:
        x_current (_type_): _description_
        h (_type_): _description_
        y (_type_): _description_
        zeta (_type_): _description_
        beta (_type_): _description_
        W_HX (_type_): _description_
        W_YH (_type_): _description_
        M_H (_type_): _description_
        M_Y (_type_): _description_
        D1 (_type_): _description_
        D2 (_type_): _description_
        MUS (_type_): _description_
        muD (_type_): _description_
        LayerMinimumGains (_type_): _description_
        LayerMaximumGains (_type_): _description_
    """
    def clipping(inp,lev):
        out=inp*(np.abs(inp)<=lev)+lev*(inp>lev)-lev*(inp<-lev)
        return out

    M_H = (1 - MU_MH) * M_H + MU_MH * np.outer(h,h)
    W_HX = (1 - MU_WHX) * W_HX + MU_WHX * np.outer(h,x_current)

    M_Y = (1 - MU_MY) * M_Y + MU_MY * np.outer(y,y)
    W_YH = (1 - MU_WYH) * W_YH + MU_WYH * np.outer(y,h)

    D1derivative = (1 - zeta) * beta * (np.sum((np.abs(M_H)**2) * D1.T,axis=1) - np.sum(np.abs(W_HX)**2,axis=1)).reshape(-1,1)  + zeta * (1/D1)#+ zeta * self.dlogdet(D1)
    if clip_gain_gradients:
        D1 = D1 - clipping(muD[0] * D1derivative, D1 * 1)
    else:
        D1 = D1 - muD[0] * D1derivative
    D1 = np.clip(D1, LayerMinimumGains[0], LayerMaximumGains[0])
    D2derivative = (1 - zeta) * (1 - beta) * (np.sum((np.abs(M_Y)**2) * D2.T,axis=1) - np.sum(np.abs(W_YH)**2,axis=1)).reshape(-1,1)  + zeta * (1/D2)#+ zeta * self.dlogdet(D2)
    if clip_gain_gradients:
        D2 = D2 - clipping(muD[1] * D2derivative, D2 * 1) 
    else:
        D2 = D2 - muD[1] * D2derivative
    D2 = np.clip(D2, LayerMinimumGains[1], LayerMaximumGains[1])
    return W_HX, W_YH, M_H, M_Y, D1, D2


In [3]:
np.random.seed(100)
rho = 0.5
N = 500000
NumberofSources = 5
NumberofMixtures = 10

# NoiseAmp = (10 ** (-SNR/20))# * np.sqrt(NumberofSources)

S = generate_correlated_copula_sources(rho = rho, df = 4, n_sources = NumberofSources, size_sources = N , 
                                       decreasing_correlation = True)

INPUT_STD = 0.28
A, X = WSM_Mixing_Scenario(S, NumberofMixtures, INPUT_STD)

SNR=30
X, NoisePart = addWGN(X, SNR, return_noise = True)

SNRinp = 10 * np.log10(np.sum(np.mean((X - NoisePart)**2, axis = 1)) / np.sum(np.mean(NoisePart**2, axis = 1)))
print("The following is the mixture matrix A")
display_matrix(A)
print("Input SNR is : {}".format(SNRinp))

The following is the mixture matrix A


<IPython.core.display.Math object>

Input SNR is : 29.99847169729263


In [10]:
if rho > 0.4:
    gamma_start = 0.05
    gamma_stop = 5*1e-4
else:
    gamma_start = 0.1
    gamma_stop = 1e-3
    
gammaM_start = [gamma_start, gamma_start]
gammaM_stop = [gamma_stop, gamma_stop]
gammaW_start = [gamma_start, gamma_start]
gammaW_stop = [gamma_stop, gamma_stop]

OUTPUT_COMP_TOL = 1e-7
LayerGains = np.array([1,1])
LayerMinimumGains = np.array([1e-3,1e-3])
LayerMaximumGains = np.array([1e6,20])
WScalings = [0.0033,0.0033]
GamScalings = [2,1]
zeta = 1*1e-5
beta = 0.5
muD = np.array([1, 1e-2])

neural_dynamic_iterations = 500
neural_lr_start = 0.75
neural_lr_stop = 0.05
neural_lr_rule = "divide_by_slow_loop_index"
synaptic_lr_decay_divider = 1
lr_decay_multiplier = 0.005
hidden_layer_gain = 200
use_hopfield = True
clip_gain_gradients = True
s_dim = S.shape[0]
x_dim = X.shape[0]
h_dim = s_dim
samples = S.shape[1]
W_HX = np.eye(h_dim, x_dim)
W_YH = np.eye(s_dim, h_dim)
M_H = GamScalings[0] * np.eye(h_dim)
M_Y = GamScalings[1] * np.eye(s_dim)
D1 = LayerGains[0] * np.ones((h_dim, 1))
D2 = LayerGains[1] * np.ones((s_dim, 1))

H = np.zeros((h_dim,samples))
Y = np.zeros((s_dim,samples))

In [11]:
i_sample = 0
x_current = X[:, i_sample]
h = H[:, i_sample]
y = Y[:, i_sample]

h,y, oc = run_neural_dynamics_nnantisparse_jit(x_current, h, y, M_H, M_Y, W_HX, W_YH, D1, D2, beta, zeta, 
                                         neural_dynamic_iterations, neural_lr_start, neural_lr_stop, 
                                         neural_lr_rule, lr_decay_multiplier, hidden_layer_gain, 
                                         use_hopfield, OUTPUT_COMP_TOL)
h,y, oc

(array([ 0.07172723, -0.12295662, -0.22121069, -0.05889156, -0.14912889]),
 array([0.07172721, 0.        , 0.        , 0.        , 0.        ]),
 25)

In [12]:
MU_MH = np.max([gammaM_start[0]/(1 + np.log(1 + (i_sample//synaptic_lr_decay_divider))/10), gammaM_stop[0]])
MU_MY = np.max([gammaM_start[1]/(1 + np.log(1 + (i_sample//synaptic_lr_decay_divider))/10), gammaM_stop[1]])
MU_WHX = np.max([gammaW_start[0]/(1 + np.log(1 + (i_sample//synaptic_lr_decay_divider))/10), gammaW_stop[0]])
MU_WYH = np.max([gammaW_start[1]/(1 + np.log(1 + (i_sample//synaptic_lr_decay_divider))/10), gammaW_stop[1]])
W_HX, W_YH, M_H, M_Y, D1, D2 = update_weights_jit( x_current, h, y, zeta, beta, W_HX, W_YH, M_H, M_Y, D1, D2, 
                                                   MU_MH, MU_MY, MU_WHX, MU_WYH, muD, LayerMinimumGains, 
                                                   LayerMaximumGains, clip_gain_gradients)

In [17]:
W_HX

array([[ 9.50771720e-01, -1.76386744e-03, -3.17336589e-03,
        -8.44825635e-04, -2.13932029e-03,  3.72749260e-03,
         1.96563602e-03, -2.34090290e-03,  4.21589966e-04,
        -3.79112546e-03],
       [-1.32290104e-03,  9.53023666e-01,  5.43986370e-03,
         1.44822137e-03,  3.66727670e-03, -6.38976167e-03,
        -3.36954275e-03,  4.01283468e-03, -7.22700134e-04,
         6.49884271e-03],
       [-2.38002525e-03,  5.43986370e-03,  9.59786834e-01,
         2.60548849e-03,  6.59778084e-03, -1.14957912e-02,
        -6.06212904e-03,  7.21947263e-03, -1.30020653e-03,
         1.16920384e-02],
       [-6.33619448e-04,  1.44822137e-03,  2.60548849e-03,
         9.50693643e-01,  1.75648652e-03, -3.06045361e-03,
        -1.61388324e-03,  1.92199569e-03, -3.46145967e-04,
         3.11269929e-03],
       [-1.60449078e-03,  3.66727670e-03,  6.59778084e-03,
         1.75648652e-03,  9.54447885e-01, -7.74987195e-03,
        -4.08677603e-03,  4.86699764e-03, -8.76532458e-04,
         7.

In [23]:
np.min(D1 - LayerMinimumGains[0])

0.0