#  Algorithme de séparation de sources basé sur les statistiques du second ordre

* *Correlation Matrix*

In [1]:
# Compute an estimation of the inter-correlation
def correlation(x, y):
    """
    Function to estimate the inter-correlation matrix
    
    inputs:
        x (Numpy array)
        y (Numpy array) 
    
    return:
        r (Numpy array)
    """
    # Vectors dimensions
    N, m = x.shape[0], x.shape[1]
    
    # Inter-correlation matrix initialization
    r = np.zeros((m, m))
    
    # Compute the intercorrelation
    for i in range(N):
        r += np.outer(x[i,:], y[i,:])  # Update the intercorrelation matrix
    r = r / N  # Normlization according to the number of windows
    
    return r

* *Off function*

In [2]:
# Function Off
def off(M):
    """
    Function that implements the off function
    
    inputs:
        M (Numpy Array)
        
    returns:
        Off (float)
    """
    
    # Number of windows
    N = M.shape[0]
    
    # Off value
    Off = (np.sum(M) - np.trace(M)) / (N * (N - 1))
    
    return Off

* *Trace Function*

In [3]:
# Function Trace
def tr(M):
    """
    Function to compute normalized trace
    
    inputs:
        M (Numpy array)
        
    returns:
        trace (float)
    """
    
    # Normalized trace
    trace = np.trace(M) / M.shape[0]
    
    return trace

* *EQMN function*

In [4]:
def quadratic_error(s, ref):
    """
    Function to compute the normalized quadratic mean error.
    
    inputs:
        s    (Numpy array)
        ref  (Numpy array)
        
    returns:
        eqmn (float)
    """
    
    # EQMN
    eqmn = 1 - (np.inner(ref,s) / (np.linalg.norm(ref) * np.linalg.norm(s)))**2
    
    return eqmn

* *SOBI Algorithm*

In [10]:
def sobi(x, window_size):
    """
    Function that implements SOBI algorithm
    
    inputs:
        x           (Numpy array) --  observations vector
        window_size (float)       --  parameter to compute the intercorrelation
    
    returns:
        s_sobi (Numpy array)      --  sources vector
    """
    
    # Retrieve observations
    x1, x2 = x[0,:], x[1,:]
    
    # Modify x dimensions
    m = x1.shape[0]
    N = int(m/window_size)
    x1_r, x2_r = x1.reshape((N,window_size)), x2.reshape((N,window_size))

    # Compute intercorrelation matrices
    r11, r22, r12 = correlation(x1_r,x1_r), correlation(x2_r,x2_r), correlation(x1_r,x2_r)

    # Compute SOBI parameters
    T1, T2, T12 = tr(r11), tr(r22), tr(r12)
    F1, F2, F12 = off(r11), off(r22), off(r12)
    
    alpha = 2 * F12 * T12 - F1 * T2 - F2 * T1
    beta = 2 * (T12**2 - T1 * T2)
    gamma = np.sqrt((F1*T2-F2*T1)**2+4*(F12*T2-T12*F2)*(F12*T1-T12*F1))
    d1 = alpha - gamma
    d2 = alpha + gamma

    # Estimate mixing matrix elements
    a11 = beta*off(r11)-tr(r11)*d1
    a12 = beta*off(r12)-tr(r12)*d2
    a21 = beta*off(r12)-tr(r12)*d1
    a22 = beta*off(r22)-tr(r22)*d2
    A = np.array([[a11,a12],[a21,a22]])
    
    # Estimate sources
    w_sobi = np.linalg.inv(A)
    w_sobi = w_sobi / np.linalg.norm(w_sobi)
    s_sobi = w_sobi @ x

    return s_sobi

* *SOBI Algorithm*

In [11]:
# *************************
#    Main
# *************************

# Numpy
import numpy as np

# Importer les signaux observés et les signaux de référence
x_1  = np.fromfile("files/In_1.txt", dtype=np.float64)
x_2  = np.fromfile("files/In_2.txt", dtype=np.float64)
ref1 = np.fromfile("files/Ref_1.txt", dtype=np.float64)
ref2 = np.fromfile("files/Ref_2.txt", dtype=np.float64)

# Mettre les signaux x1 et x2 dans un seul vecteur
x = np.array([x_1, x_2])

# Définir la taille de fenêtre
window_size = 12

# Estimer les signaux de référence
s_sobi = sobi(x, window_size)

# Evaluation de l'algorithme
err_1 = quadratic_error(s_sobi[0,:], ref1)
err_2 = quadratic_error(s_sobi[1,:], ref2)
err_1_db = 10*np.log(err_1)/np.log(10)
err_2_db = 10*np.log(err_2)/np.log(10)
print("L'erreur quadratique moyenne pour l'estimation de s1: %f  db" %err_1_db)
print("L'erreur quadratique moyenne pour l'estimation de s2: %f  db" %err_2_db)

[[ 1.33060902e-06  4.35468224e-06]
 [ 2.37714099e-06 -3.89148471e-06]]
L'erreur quadratique moyenne pour l'estimation de s1: -84.068552  db
L'erreur quadratique moyenne pour l'estimation de s2: -84.068551  db


* *Window size Optimization*

In [7]:
# Compute EQMN for different values of window sizes
# -------------------------------------------------

window_sizes = [2, 3, 4, 5, 6, 7, 9, 10, 12, 14, 20]
errors_1 = []
errors_2 = []
for window_size in window_sizes:
    # Estimer les signaux de référence
    s_sobi = sobi(x, window_size)

    # Evaluation de l'algorithme
    errors_1.append(quadratic_error(s_sobi[0,:], ref1))
    errors_2.append(quadratic_error(s_sobi[1,:], ref2))

In [8]:
# Plot EQMN as function of window_size
# ------------------------------------

# import bokeh package
from bokeh.plotting import figure, show, output_notebook

# initialize figure
p = figure(plot_width=1300, plot_height=600)

# Scatter random realisations
p.line(window_sizes, 10*np.log(errors_1)/np.log(10), line_width=2)
p.line(window_sizes, 10*np.log(errors_2)/np.log(10), line_width=2)

# Définir le style de p
p.xaxis.axis_label = "Window Sizes"
p.yaxis.axis_label = "EQMN (dB)"

# Afficher le graphique p
show(p)

In [9]:
# Plot sources and their estimation for optimal value of window_size
# ------------------------------------------------------------------

# import bokeh package
from bokeh.plotting import figure, show, output_notebook
from bokeh.layouts import column, row

# initialize figure
sobi_1 = figure(plot_width=600, plot_height=300, toolbar_location=None)
sobi_1.line(np.linspace(0, 5, s_sobi.shape[1]), s_sobi[0,:], line_width=2)
sobi_2 = figure(plot_width=600, plot_height=300, toolbar_location = None)
sobi_2.line(np.linspace(0, 5, s_sobi.shape[1]), s_sobi[1,:], line_width=2)

pref_1 = figure(plot_width=600, plot_height=300, toolbar_location = None)
pref_1.line(np.linspace(0, 5, s_sobi.shape[1]), ref1, line_width=2)
pref_2 = figure(plot_width=600, plot_height=300, toolbar_location = None)
pref_2.line(np.linspace(0, 5, s_sobi.shape[1]), ref2, line_width=2)


# Définir le style de p
sobi_1.xaxis.axis_label = "Time (s)"
sobi_1.yaxis.axis_label = "Amp"
sobi_1.title.text = "Signal S1 Estimé"
sobi_1.title.align = "center"

sobi_2.xaxis.axis_label = "Time (s)"
sobi_2.yaxis.axis_label = "Amp"
sobi_2.title.text = "Signal S2 Estimé"
sobi_2.title.align = "center"

pref_1.xaxis.axis_label = "Time (s)"
pref_1.yaxis.axis_label = "Amp"
pref_1.title.text = "Signal S1 de Référence"
pref_1.title.align = "center"

pref_2.xaxis.axis_label = "Time (s)"
pref_2.yaxis.axis_label = "Amp"
pref_2.title.text = "Signal S2 de Référence"
pref_2.title.align = "center"


# Afficher le graphique p
show(row(column(sobi_1, sobi_2), column(pref_1, pref_2)))