In [3]:
import ray
ray.init()

Process STDOUT and STDERR is being redirected to /tmp/raylogs/.
Waiting for redis server at 127.0.0.1:23880 to respond...
Waiting for redis server at 127.0.0.1:50040 to respond...
Starting local scheduler with the following resources: {'CPU': 8, 'GPU': 0}.

View the web UI at http://localhost:8889/notebooks/ray_ui58088.ipynb?token=a427b773035bd347b2510f221252e41ccbe962a3304c644d



{'local_scheduler_socket_names': ['/tmp/scheduler54127281'],
 'node_ip_address': '127.0.0.1',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store42997752', manager_name='/tmp/plasma_manager45097496', manager_port=20170)],
 'redis_address': '127.0.0.1:23880',
 'webui_url': 'http://localhost:8889/notebooks/ray_ui58088.ipynb?token=a427b773035bd347b2510f221252e41ccbe962a3304c644d'}

In [1]:
import numpy as np
from scipy.special import digamma, polygamma
import time

In [None]:
from VIonLDA import *

In [4]:
np.random.seed(123)
docs, alpha, BETA=simulation_data()

In [7]:
@ray.remote
def aggregate_data(x, y):
    return x + y


In [8]:
def E_step(alpha, BETA, doc, Phi0, gamma0, tol=1e-6):
    """
    Latent Dirichlet Allocation: E-step.
    Do to a specific document.
    ------------------------------------
    Input:
    alpha as a k*1 vector;
    BETA as a k*V matrix;
    doc as a Nd*V matrix;
    Phi0 as a Nd*k matrix;
    gamma0 as a k*1 vector;
    tol as a float: tolerance.
    -------------------------------------
    Output:
    optimal Nd*k matrix Phi;
    optimal k*1 vector gamma."""
    
    
    #Initialization
    Phi = Phi0
    gamma = gamma0
    phi_delta = 1
    gamma_delta = 1
    
    #relative tolerance is for each element in the matrix
    tol=tol**2
 

    while(phi_delta > tol or gamma_delta > tol):
        ##update Phi
        Phi=(doc@BETA.T)*np.exp(digamma(gamma)-digamma(sum(gamma)))
        Phi=Phi/(Phi.sum(axis=1)[:,None]) #row sum to 1        
        ##update gamma
        gamma = alpha + Phi.sum(axis = 0)        
        ##check the convergence
        phi_delta = np.mean((Phi - Phi0) ** 2)
        gamma_delta = np.mean((gamma - gamma0) ** 2)        
        ##refill
        Phi0 = Phi
        gamma0 = gamma
        
    return Phi, gamma

In [9]:
def M_step(docs,k, tol,tol_estep,max_iter=1000,initial_alpha_shape=5,initial_alpha_scale=2):
    """
    Latent Dirichlet Allocation: M-step.
    Do to a list of documnents. -- a list of matrix.
    -------------------------------------------------
    Input:
    docs: a list of one-hot-coding matrix ;
    k: a fixed positive integer indicate the number of topics.
    -------------------------------------------------
    Output:
    optimal Nd*k matrix Phi;
    optimal k*1 vector gamma;
    optimal k*V matrix BETA;
    optimal k*1 vector alpha.
    """
    
    #get basic iteration
    M=len(docs)
    V=docs[1].shape[1]
    N=[doc.shape[0] for doc in docs]

    
    #initialization
    BETA0=np.random.dirichlet(np.ones(V),k)
    alpha0=np.random.gamma(shape=initial_alpha_shape,scale=initial_alpha_scale,size=k)
    PHI=[np.ones((N[d],k))/k for d in range(M)]
    GAMMA=np.array([alpha0+N[d]/k for d in range(M)])       
    

    BETA=BETA0
    alpha=alpha0
    alpha_dis = k
    beta_dis = k*V
    
    #relative tolerance: tolerance for each element
    tol=tol**2
    
    for iteration in range(max_iter):
        #update PHI,GAMMA,BETA
        @ray.remote
        def E_step_(d):
            Phi,gamma=E_step(alpha0, BETA0, docs[d],PHI[d],GAMMA[d,],tol_estep)
            beta=Phi.T@docs[d]
            return beta,gamma
        
        e_step_results = ray.get([E_step_.remote(d) for d in range(M)])
        pre_BETA,GAMMA=map(list, zip(*e_step_results))
        
        while len(pre_BETA) > 1:
            pre_BETA.append(aggregate_data.remote(pre_BETA.pop(0), pre_BETA.pop(0)))
        BETA=ray.get(pre_BETA)[0]
        
        BETA=BETA/(BETA.sum(axis=1)[:,None])   #rowsum=1
        GAMMA=np.array(GAMMA)
        #update alpha
                        
        z = M * polygamma(1, sum(alpha0))
        h=-M*polygamma(1,alpha0)
        g=M*(digamma(sum(alpha0))-digamma(alpha0))+(digamma(GAMMA)-digamma(GAMMA.sum(axis=1))[:,None]).sum(axis=0)
        c = (sum(g / h)) / (1/z + sum(1/h))
        alpha = alpha0 - (g - c)/h
        
        alpha_dis = np.mean((alpha - alpha0) ** 2)
        beta_dis = np.mean((BETA - BETA0) ** 2)
        alpha0 = alpha
        BETA0 = BETA
        if((alpha_dis <= tol) and (beta_dis <= tol)):
            break
        
        
    return alpha, BETA

In [10]:
%%time

a2, B2 = M_step(docs=docs,k=10,tol=1e-3,tol_estep=1e-3,max_iter=100,initial_alpha_shape=100,initial_alpha_scale=0.01)

CPU times: user 40.5 s, sys: 24.2 s, total: 1min 4s
Wall time: 11min 26s
