A brute force implementation of the multiplicative method for NMF in dask: i.e. converting all array operations to `dask.array` operations. This version does not include the regularization.

In [None]:
import numpy as np
from scipy import linalg
import matplotlib.pyplot as plt
import os
%matplotlib inline

For testing we will use the face dataset used in scikit-learn, which comes from the “Labeled Faces in the Wild” dataset, also known as LFW:

[http://vis-www.cs.umass.edu/lfw/lfw-funneled.tgz](http://vis-www.cs.umass.edu/lfw/lfw-funneled.tgz)


In [None]:
from dask.distributed import Client, progress
c = Client()
c.restart()

In [None]:
# read the faces dataset
path = 'lfw_funneled'
from dask.array.image import imread
import dask.array as da
faces = imread(os.path.join(path,'*','*.jpg'))
N,m,n,d = faces[:,::4,::4,:].shape


In [None]:
data = faces[:,::4,::4,0].reshape((faces.shape[0],-1))

In [None]:
X_da = c.persist(data[:100,:])

In [None]:
# compute for sklearn
X = data[:100,:].compute()

In [None]:
import time as time

In [None]:
# to avoid deviding by zero
EPSILON = np.finfo(np.float32).eps*1000

Numpy Updates:

In [None]:
def update_H(M,H,W):
    denominator = (np.dot(W.T,np.dot(W,H)))
    denominator[np.abs(denominator) < EPSILON] = EPSILON
    H_new = H*np.dot(W.T,M)/denominator
    return(H_new)

In [None]:
def update_W(M,H,W):
    denominator = (np.dot(W,np.dot(H,H.T)))
    denominator[np.abs(denominator) < EPSILON] = EPSILON
    W_new = W*np.dot(M,H.T)/denominator
    return(W_new)

Dask Updates:

In [None]:
def update_H_da(M,H,W):
    denominator = da.dot(W.T,da.dot(W,H))
    denominator_new = da.where(da.fabs(denominator) < EPSILON,EPSILON,denominator) 
    H_new = H*da.dot(W.T,M)/denominator_new
    return(H_new)

In [None]:
def update_W_da(M,H,W):
    denominator = da.dot(W,da.dot(H,H.T))
    denominator_new = da.where(da.fabs(denominator) < EPSILON,EPSILON,denominator) 
    W_new = W*da.dot(M,H.T)/denominator_new
    return(W_new)

In [None]:
#print(update_W(M,H,W).shape)
#print(update_H(M,H,W).shape)

In [None]:
# loss
def Frobenius_loss(M,H,W):
    return(linalg(M - np.dot(W,H)))

In [None]:
# test da.where
ones = da.zeros((3,3),chunks = (3,3))
ones = da.where(ones==0,EPSILON,ones)
ones.compute()

In [None]:
def initialize_da(X, k, init='random'):
    n_components = k
    n_samples, n_features = X.shape
    if init == 'random':
        avg = da.sqrt(X.mean() / n_components)
        da.random.seed(42)
        H = avg * da.random.normal(0,1,size=(n_components, n_features),chunks=(n_components,X.chunks[1][0]))
        W = avg * da.random.normal(0,1,size=(n_samples, n_components),chunks=(n_samples,n_components))
        
        H = da.fabs(H)
        W = da.fabs(W)
        return W, H

In [None]:
# NNDSVD/A initialization from sklearn
def initialize(X,k,init):

    from scipy.linalg import svd
    n_components = k
    U, S, V = svd(X, full_matrices = False)
    W, H = np.zeros(U.shape), np.zeros(V.shape)

    # The leading singular triplet is non-negative
    # so it can be used as is for initialization.
    W[:, 0] = np.sqrt(S[0]) * np.abs(U[:, 0])
    H[0, :] = np.sqrt(S[0]) * np.abs(V[0, :])
    
    def norm(x):
        x = x.ravel()   
        return(np.dot(x,x))

    for j in range(1, n_components):
        x, y = U[:, j], V[j, :]

        # extract positive and negative parts of column vectors
        x_p, y_p = np.maximum(x, 0), np.maximum(y, 0)
        x_n, y_n = np.abs(np.minimum(x, 0)), np.abs(np.minimum(y, 0))

        # and their norms
        x_p_nrm, y_p_nrm = norm(x_p), norm(y_p)
        x_n_nrm, y_n_nrm = norm(x_n), norm(y_n)

        m_p, m_n = x_p_nrm * y_p_nrm, x_n_nrm * y_n_nrm

        # choose update
        if m_p > m_n:
            u = x_p / x_p_nrm
            v = y_p / y_p_nrm
            sigma = m_p
        else:
            u = x_n / x_n_nrm
            v = y_n / y_n_nrm
            sigma = m_n

        lbd = np.sqrt(S[j] * sigma)
        W[:, j] = lbd * u
        H[j, :] = lbd * v
        
    eps=1e-6

    if init == 'nndsvd':
        W[W < eps] = 0
        H[H < eps] = 0
    
    if init == 'nndsvda':
        avg = X.mean()
        W[W == 0] = avg
        H[H == 0] = avg
    
    return(W,H)


In [None]:

def initialize_random(X,k,init):
    n_components = k
    n_samples, n_features = X.shape
    if init == 'random':
        avg = np.sqrt(X.mean() / n_components)
        np.random.seed(42)
        H = avg * np.random.normal(0,1,size=(n_components, n_features))
        W = avg * np.random.normal(0,1,size=(n_samples, n_components))
        
        np.fabs(H, H)
        np.fabs(W, W)
        return W, H
    

In [None]:
k = 100

In [None]:
W_init, H_init = initialize_da(X_da,k,init='random')

In [None]:
# fitting function
EPSILON = np.finfo(np.float32).eps
def fit(M,k,nofit):
    
    # initialize H and W
    #u,s,vt = linalg.svd(M)
    #W = u[:,:k]
    #H = np.dot(np.diag(s[:k]),vt[:k,:])
    #print(s[:k])
    
    #W, H = initialize(M, k, init='nndsvda')
    
    # use outside initialization
    W = W_init.compute()
    H = H_init.compute()

    #W, H = initialize_random(M,k,init='random')
    
    err = []
    for it in range(nofit):
        W = update_W(M,H,W)
        #print(np.sum(np.isnan(W)))
        H = update_H(M,H,W)
        err.append(linalg.norm(M - np.dot(W,H)))
        print('Iteration '+str(it)+': error = '+ str(err[it]))
    return(W, H, err)

In [None]:
# fitting function
def fit_da(M,k,nofit):
    
    from dask import compute
    
    # initialize H and W
    #u,s,vt = da.linalg.svd(M)
    #uk,sk,vtk = compute(u[:,k],s[:k],vt[:k])
    #W = uk
    #H = da.dot(np.diag(sk),vtk[:k,:])
    
    W, H = initialize_da(M,k,init='random')
    
    W_init = W
    H_init = H
    
    err = []
    for it in range(nofit):
        W = update_W_da(M,H,W)
        H = update_H_da(M,H,W)
        
        err.append(da.linalg.norm(M - da.dot(W,H)))  
        
    return(W,H,err,W_init,H_init)

In [None]:
X_da = X_da.rechunk((100,441))
X_da

In [None]:
nofit = 1

In [None]:
W, H, err = fit(X,100,100)

In [None]:
#W

In [None]:
#H

In [None]:
#### plt.plot(err)

In [None]:
data_fitted = np.dot(W,H).reshape(-1,m,n)

In [None]:
# plot the results
plt.figure(figsize = (10,5))
plt.subplot(1,2,1)
plt.imshow(faces[1,::4,::4,0],cmap = 'gray')
plt.title('Raw')
    
plt.subplot(1,2,2)
plt.imshow(data_fitted[1,:,:],cmap = 'gray')
plt.title('Fitted')

In [None]:
# compare to sklearn multiplicative method 
# only latest versions have the multiplicative method
# performance seems worse than the coordinate descent method
 
from sklearn.decomposition import NMF
nmf = NMF(n_components = 100,init = 'nndsvda',solver = 'mu',max_iter = 200)
W = nmf.fit_transform(X)
H = nmf.components_
data_fitted_sk = np.dot(W,H)

#plot the results
plt.figure(figsize = (10,5))
plt.subplot(1,2,1)
plt.imshow(faces[1,::4,::4,0],cmap = 'gray')
plt.title('Raw')
    
plt.subplot(1,2,2)
plt.imshow(data_fitted_sk.reshape(-1,m,n)[1,:,:],cmap = 'gray')
plt.title('Fitted')

In [None]:
# plot the results
u = H.reshape((-1,m,n))
plt.figure(figsize = (10,5))
for i in range(10):
    plt.subplot(2,5,i+1)
    # we are rescaling between 0 and 1 before plotting
    plt.imshow(u[i,:,:],cmap = 'gray')
    plt.title('Mode '+str(i+1))

In [None]:
W, H, err, W_init, H_init = fit_da(X_da,100,100)

In [None]:
#H.compute()
#W.compute()

In [None]:
# plot the results
u = H.compute().reshape((-1,m,n))
plt.figure(figsize = (10,5))
for i in range(10):
    plt.subplot(2,5,i+1)
    # we are rescaling between 0 and 1 before plotting
    plt.imshow(u[i,:,:],cmap = 'gray')
    plt.title('Mode '+str(i+1))

In [None]:
#plt.plot(err)

In [None]:
# random states in dask anf numpy

In [None]:
da.random.seed(42)
da.random.RandomState(42).normal(0,1,(1,),(1,)).compute()

In [None]:
np.random.seed(42)
np.random.RandomState(42).normal(0,1,(1,))

In [None]:
# how do I get the same streams???
help(da.random.RandomState(42).normal)