### Algorithm Draft

In [1]:
%%writefile Source/fastICA_0.py

import numpy as np
from sklearn import preprocessing

def sym_decorrelation(W):
    """ Symmetric decorrelation """
    K = np.dot(W, W.T)
    s, u = np.linalg.eigh(K) 
    W = (u @ np.diag(1.0/np.sqrt(s)) @ u.T) @ W
    return W

def g_logcosh(wx,alpha):
    """derivatives of logcosh"""
    return np.tanh(alpha * wx)
def gprime_logcosh(wx,alpha):
    """second derivatives of logcosh"""
    return alpha * (1-np.square(np.tanh(alpha*wx)))
# exp
def g_exp(wx,alpha):
    """derivatives of exp"""
    return wx * np.exp(-np.square(wx)/2)
def gprime_exp(wx,alpha):
    """second derivatives of exp"""
    return (1-np.square(wx)) * np.exp(-np.square(wx)/2)

def fastICA_0(X, f,alpha=None, n_comp=None,maxit=200, tol=1e-04):
    """FastICA algorithm for several units"""
    n,p = X.shape
    #check if n_comp is valid
    if n_comp is None:
        n_comp = min(n,p)
    elif n_comp > min(n,p):
        print("n_comp is too large")
        n_comp = min(n,p)
        
    #centering
    #by subtracting the mean of each column of X (array).
    X = preprocessing.scale(X,axis = 0,with_std=False)
    X = X.T

    #whitening
    svd = np.linalg.svd(X @ (X.T) / n)
    k = np.diag(1/np.sqrt(svd[1])) @ (svd[0].T)
    k = k[:n_comp,:] 
    X1 = k @ X

    # initial random weght vector
    w_init = np.random.normal(size=(n_comp, n_comp))
    W = sym_decorrelation(w_init)
    lim = 1
    it = 0
    
    
    # The FastICA algorithm
    if f == "logcosh":
        while lim > tol and it < maxit :
            wx = W @ X1
            gwx = g_logcosh(wx,alpha)
            g_wx = gprime_logcosh(wx,alpha)
            W1 = np.dot(gwx,X1.T)/X1.shape[1] - np.dot(np.diag(g_wx.mean(axis=1)),W)
            W1 = sym_decorrelation(W1)
            it = it +1
            lim = np.max(np.abs(np.abs(np.diag(W1 @ W.T)) - 1.0))
            W = W1

        S = W @ X1
        A = np.linalg.inv(W @ k)
        X_re = A @ S
        return{'X':X1.T,'X_re':X_re.T,'A':A.T,'S':S.T}

    elif f == "exp":
        while lim > tol and it < maxit :
            wx = W @ X1
            gwx = g_exp(wx,alpha)
            g_wx = gprime_exp(wx,alpha)
            W1 = np.dot(gwx,X1.T)/X1.shape[1] - np.dot(np.diag(g_wx.mean(axis=1)),W)
            W1 = sym_decorrelation(W1)
            it = it +1
            lim = np.max(np.abs(np.abs(np.diag(W1 @ W.T)) - 1.0))
            W = W1

        S = W @ X1
        A = np.linalg.inv(W @ k)
        X_re = A @ S
        return{'X':X1.T,'X_re':X_re.T,'A':A.T,'S':S.T}

    else:
        print("doesn't support this approximation negentropy function")

Overwriting Source/fastICA_0.py


### Better Algorithm

In [2]:
%%writefile Source/fastICA_1.py

import numpy as np

def sym_decorrelation(W):
    """ Symmetric decorrelation """
    K = np.dot(W, W.T)
    s, u = np.linalg.eigh(K) 
    W = (u @ np.diag(1.0/np.sqrt(s)) @ u.T) @ W
    return W

def fastICA_1(X, f,alpha=None, n_comp=None,maxit=200, tol=1e-04):
    """FastICA algorithm for several units"""
    n,p = X.shape
    #check if n_comp is valid
    if n_comp is None:
        n_comp = min(n,p)
    elif n_comp > min(n,p):
        print("n_comp is too large")
        n_comp = min(n,p)
        
    #centering
    #by subtracting the mean of each column of X (array).
    X = X - X.mean(axis=0)[None,:]
    X = X.T

    #whitening
    svd = np.linalg.svd(X @ (X.T) / n)
    k = np.diag(1/np.sqrt(svd[1])) @ (svd[0].T)
    k = k[:n_comp,:] 
    X1 = k @ X
    del X
    
    # approximation negentropy function
    if f == "logcosh":
        def g(wx,alpha):
            return np.tanh(alpha * wx)
        def gprime(wx,alpha):
            return alpha * (1-np.square(np.tanh(alpha*wx)))
    elif f == "exp":
        def g(wx,alpha):
            return wx * np.exp(-np.square(wx)/2)
        def gprime(wx,alpha):
            return (1-np.square(wx)) * np.exp(-np.square(wx)/2)
    else:
        print("doesn't support this approximation negentropy function")
               
    # initial random weght vector
    w_init = np.random.normal(size=(n_comp, n_comp))
    W = sym_decorrelation(w_init)

    lim = 1
    it = 0
    
    # The FastICA algorithm
    while lim > tol and it < maxit :
        wx = W @ X1
        gwx = g(wx,alpha)
        g_wx = gprime(wx,alpha)
        W1 = np.dot(gwx,X1.T)/X1.shape[1] - np.dot(np.diag(g_wx.mean(axis=1)),W)
        W1 = sym_decorrelation(W1)
        it = it +1
        lim = np.max(np.abs(np.abs(np.diag(W1 @ W.T)) - 1.0))
        W = W1

    S = W @ X1
    A = np.linalg.inv(W @ k)
    X_re = A @ S
    return{'X':X1.T,'X_re':X_re.T,'A':A.T,'S':S.T}

Overwriting Source/fastICA_1.py


In [3]:
%%writefile Source/fastICA_3.py
import scipy.linalg
import numpy as np


def sym_decorrelation(W):
    """ Symmetric decorrelation """
    K = np.dot(W, W.T)
    s, u = np.linalg.eigh(K) 
    W = (u @ np.diag(1.0/np.sqrt(s)) @ u.T) @ W
    return W

def g_logcosh(wx,alpha):
    """derivatives of logcosh"""
    return np.tanh(alpha * wx)
def gprime_logcosh(wx,alpha):
    """second derivatives of logcosh"""
    return alpha * (1-np.square(np.tanh(alpha*wx)))
# exp
def g_exp(wx,alpha):
    """derivatives of exp"""
    return wx * np.exp(-np.square(wx)/2)
def gprime_exp(wx,alpha):
    """second derivatives of exp"""
    return (1-np.square(wx)) * np.exp(-np.square(wx)/2)


def fastICA_3(X, f,alpha=None,n_comp=None,maxit=200, tol=1e-04):
    """FastICA algorithm for several units"""
    n,p = X.shape
    #check if n_comp is valid
    if n_comp is None:
        n_comp = min(n,p)
    elif n_comp > min(n,p):
        print("n_comp is too large")
        n_comp = min(n,p)
       
    #centering
    #by subtracting the mean of each column of X (array).
    X = X - X.mean(axis=0)[None,:]
    X = X.T
 
    #whitening
    s = np.linalg.svd(X @ (X.T) / n)
    D = np.diag(1/np.sqrt(s[1]))
    k = D @ (s[0].T)
    k = k[:n_comp,:]
    X1 = k @ X
   
    # initial random weght vector
    w_init = np.random.normal(size=(n_comp, n_comp))
    W = sym_decorrelation(w_init)
 
    lim = 1
    it = 0
   
    # The FastICA algorithm
    while lim > tol and it < maxit :
        wx = W @ X1
        if f =="logcosh":
            gwx = g_logcosh(wx,alpha)
            g_wx = gprime_logcosh(wx,alpha)
        elif f =="exp":
            gwx = g_exp(wx,alpha)
            g_wx = gprimeg_exp(wx,alpha)
        else:
            print("doesn't support this approximation negentropy function")
        W1 = np.dot(gwx,X1.T)/X1.shape[1] - np.dot(np.diag(g_wx.mean(axis=1)),W)
        W1 = sym_decorrelation(W1)
        it = it +1
        lim = np.max(np.abs(np.abs(np.diag(W1 @ W.T))) - 1.0)
        W = W1
 
    S = W @ X1
    A = scipy.linalg.pinv2(W @ k)   
    return{'X':X1.T,'A':A.T,'S':S.T}

Overwriting Source/fastICA_3.py


### Scipy

In [4]:
%%writefile Source/fastICA_scipy.py

import scipy
import scipy.linalg
import numpy as np

def sym_decorrelation_scipy(W):
    """ Symmetric decorrelation """
    K = scipy.dot(W, W.T)
    s, u = scipy.linalg.eigh(K) 
    W = scipy.dot(scipy.dot(scipy.dot(u,np.diag(1.0/np.sqrt(s)) ),u.T),W)
    return W

def fastICA_scipy(X, f,alpha=None, n_comp=None,maxit=200, tol=1e-04):
    """FastICA algorithm for several units"""
    n,p = X.shape
    #check if n_comp is valid
    if n_comp is None:
        n_comp = min(n,p)
    elif n_comp > min(n,p):
        print("n_comp is too large")
        n_comp = min(n,p)
        
    #centering
    #by subtracting the mean of each column of X (array).
    X = X - X.mean(axis=0)[None,:]
    X = X.T

    #whitening
    svd = scipy.linalg.svd(scipy.dot(X,X.T) / n)
    k = scipy.dot(np.diag(1/np.sqrt(svd[1])),svd[0].T)
    k = k[:n_comp,:] 
    X1 = scipy.dot(k,X)
    del X
    
    # approximation negentropy function
    if f == "logcosh":
        def g(wx,alpha):
            return scipy.tanh(alpha * wx)
        def gprime(wx,alpha):
            return alpha * (1-np.square(scipy.tanh(alpha*wx)))
    elif f == "exp":
        def g(wx,alpha):
            return wx * np.exp(-np.square(wx)/2)
        def gprime(wx,alpha):
            return (1-np.square(wx)) * np.exp(-np.square(wx)/2)
    else:
        print("doesn't support this approximation negentropy function")
               
    # initial random weght vector
    w_init = np.random.normal(size=(n_comp, n_comp))
    W = sym_decorrelation_scipy(w_init)

    lim = 1
    it = 0
    
    # The FastICA algorithm
    while lim > tol and it < maxit :
        wx = scipy.dot(W,X1)
        gwx = g(wx,alpha)
        g_wx = gprime(wx,alpha)
        W1 = scipy.dot(gwx,X1.T)/X1.shape[1] - scipy.dot(np.diag(g_wx.mean(axis=1)),W)
        W1 = sym_decorrelation_scipy(W1)
        it = it +1
        lim = np.max(np.abs(np.abs(np.diag(scipy.dot(W1,W.T))) - 1.0))
        W = W1

    S = scipy.dot(W,X1)
    A = scipy.linalg.pinv2(scipy.dot(W,k))
    X_re = scipy.dot(A,S)
    return{'X':X1.T,'X_re':X_re.T,'A':A.T,'S':S.T}

Overwriting Source/fastICA_scipy.py


### Jit

In [5]:
%%writefile Source/fastICA_jit.py

import scipy.linalg
import numpy as np
from numba import jit

@jit
def sym_decorrelation_jit(W):
    """ Symmetric decorrelation """
    K = np.dot(W, W.T)
    s, u = np.linalg.eigh(K) 
    W = (u @ np.diag(1.0/np.sqrt(s)) @ u.T) @ W
    return W

def g_logcosh_jit(wx,alpha):
    """derivatives of logcosh"""
    return np.tanh(alpha * wx)
def gprime_logcosh_jit(wx,alpha):
    """second derivatives of logcosh"""
    return alpha * (1-np.square(np.tanh(alpha*wx)))
# exp
def g_exp_jit(wx,alpha):
    """derivatives of exp"""
    return wx * np.exp(-np.square(wx)/2)
def gprime_exp_jit(wx,alpha):
    """second derivatives of exp"""
    return (1-np.square(wx)) * np.exp(-np.square(wx)/2)


def fastICA_jit(X, f,alpha=None,n_comp=None,maxit=200, tol=1e-04):
    """FastICA algorithm for several units"""
    n,p = X.shape
    #check if n_comp is valid
    if n_comp is None:
        n_comp = min(n,p)
    elif n_comp > min(n,p):
        print("n_comp is too large")
        n_comp = min(n,p)
       
    #centering
    #by subtracting the mean of each column of X (array).
    X = X - X.mean(axis=0)[None,:]
    X = X.T
 
    #whitening
    s = np.linalg.svd(X @ (X.T) / n)
    D = np.diag(1/np.sqrt(s[1]))
    k = D @ (s[0].T)
    k = k[:n_comp,:]
    X1 = k @ X
   
    # initial random weght vector
    w_init = np.random.normal(size=(n_comp, n_comp))
    W = sym_decorrelation_jit(w_init)
 
    lim = 1
    it = 0
   
    # The FastICA algorithm
    while lim > tol and it < maxit :
        wx = W @ X1
        if f =="logcosh":
            gwx = g_logcosh_jit(wx,alpha)
            g_wx = gprime_logcosh_jit(wx,alpha)
        elif f =="exp":
            gwx = g_exp_jit(wx,alpha)
            g_wx = gprimeg_exp_jit(wx,alpha)
        else:
            print("doesn't support this approximation negentropy function")
        W1 = np.dot(gwx,X1.T)/X1.shape[1] - np.dot(np.diag(g_wx.mean(axis=1)),W)
        W1 = sym_decorrelation_jit(W1)
        it = it +1
        lim = np.max(np.abs(np.abs(np.diag(W1 @ W.T))) - 1.0)
        W = W1
 
    S = W @ X1
    A = scipy.linalg.pinv2(W @ k)   
    return{'X':X1.T,'A':A.T,'S':S.T}

Overwriting Source/fastICA_jit.py


### Numexpr

note: numexpr can used only for element-wise operations

In [6]:
%%writefile Source/fastICA_ne.py

import scipy.linalg
import numpy as np
import numexpr as ne

def sym_decorrelation_ne(W):
    """ Symmetric decorrelation """
    K = np.dot(W, W.T)
    s, u = np.linalg.eigh(K) 
    return (u @ np.diag(1.0/np.sqrt(s)) @ u.T) @ W
# logcosh
def g_logcosh_ne(wx,alpha):
    """derivatives of logcosh"""
    return ne.evaluate('tanh(alpha * wx)')
def gprime_logcosh_ne(wx,alpha):
    """second derivatives of logcosh"""
    return alpha * (1-ne.evaluate('tanh(alpha*wx)**2'))
# exp
def g_exp_ne(wx,alpha):
    """derivatives of exp"""
    return ne.evaluate('wx * exp(-wx**2/2)')
def gprime_exp_ne(wx,alpha):
    """second derivatives of exp"""
    return (1-np.square(wx)) * ne.evaluate('exp(-wx**2/2)')


def fastICA_ne(X, f,alpha=None,n_comp=None,maxit=200, tol=1e-04):
    n,p = X.shape
    #check if n_comp is valid
    if n_comp is None:
        n_comp = min(n,p)
    elif n_comp > min(n,p):
        print("n_comp is too large")
        n_comp = min(n,p)
       
    #centering
    #by subtracting the mean of each column of X (array).
    X = X - X.mean(axis=0)[None,:]
    X = X.T
 
    #whitening
    s = np.linalg.svd(X @ (X.T) / n)
    D = np.diag(1/np.sqrt(s[1]))
    k = D @ (s[0].T)
    k = k[:n_comp,:]
    X1 = k @ X
   
    # initial random weght vector
    w_init = np.random.normal(size=(n_comp, n_comp))
    W = sym_decorrelation_ne(w_init)
 
    lim = 1
    it = 0
   
    # The FastICA algorithm
    while lim > tol and it < maxit :
        wx = W @ X1
        if f =="logcosh":
            gwx = g_logcosh_ne(wx,alpha)
            g_wx = gprime_logcosh_ne(wx,alpha)
        elif f =="exp":
            gwx = g_exp_ne(wx,alpha)
            g_wx = gprimeg_exp_ne(wx,alpha)
        else:
            print("doesn't support this approximation negentropy function")
        W1 = np.dot(gwx,X1.T)/X1.shape[1] - np.dot(np.diag(g_wx.mean(axis=1)),W)
        W1 = sym_decorrelation_ne(W1)
        it = it +1
        lim = np.max(np.abs(np.abs(np.diag(W1 @ W.T))) - 1.0)
        W = W1
 
    S = W @ X1
    A = scipy.linalg.pinv2(W @ k)   
    return{'X':X1.T,'A':A.T,'S':S.T}

Overwriting Source/fastICA_ne.py
