## Environment Setup

In [1]:
# Set up the environment
%load_ext rpy2.ipython
import rpy2.robjects as robjects
from pyspark.mllib.linalg import Vectors
import numpy as np
import scipy as sp
import time
from scipy import optimize
import os
import pandas as pd
from scipy import io
from rpy2.robjects import pandas2ri
import rpy2

## R Imports for STM

In [2]:
for f in os.listdir("R"):
    if f not in ['.DS_Store', '.Rapp.history', 'box', 'e_step_spark.R']:
        robjects.r.source("R/" + f)

In [3]:
robjects.r('''
    library(Matrix); library(stringr); library(splines)
''')

array(['splines', 'stringr', 'Matrix', 'tools', 'stats', 'graphics',
       'grDevices', 'utils', 'datasets', 'methods', 'base'], 
      dtype='|S9')

## Sample run with R code

In [4]:
# Subsample the corpus to only consider articles mentioning 'monte carlo'
cond_mat_mc = pd.read_csv('cond_mat_mc.csv')
%Rpush cond_mat_mc

In [5]:
# Prep the corpus
robjects.r('''
    processed_corpus_temp = textProcessor(cond_mat_mc$abstract, metadata=cond_mat_mc, lowercase=TRUE)
    processed_corpus = prepDocuments(processed_corpus_temp$documents,
                                 processed_corpus_temp$vocab, 
                                 processed_corpus_temp$meta,
                                 lower.thresh=1)
    rm(processed_corpus_temp); invisible(gc())
''');

Building corpus... 
Converting to Lower Case... 
Removing stopwords... 
Removing numbers... 
Removing punctuation... 
Stemming... 
Creating Output... 
Removing 8267 of 15095 terms (8267 of 363331 tokens) due to frequency 
Your corpus now has 6359 documents, 6828 terms and 355064 tokens.

In [6]:
robjects.r('''
    fit = stm(processed_corpus$documents, 
             processed_corpus$vocab,
             K=20,
             data=processed_corpus$meta,
             init.type = 'Spectral',
             seed=02138)
''');

In [7]:
robjects.r('''
    documents <- fit$documents
    vocab <- fit$vocab
    settings <- fit$settings 
    model <- fit$model
    verbose <- settings$verbose
  ##########
  #Step 1: Initialize Parameters
  ##########
    ngroups <- settings$ngroups
  if(is.null(model)) {
    if(verbose) cat("Beginning Initialization.\n")
    #initialize
    model <- stm.init(documents, settings)
    #if we were using the Lee and Mimno method of setting K, update the settings
    if(settings$dim$K==0) settings$dim$K <- nrow(model$beta[[1]])
    #unpack
    mu <- list(mu=model$mu)
    sigma <- model$sigma
    beta <- list(beta=model$beta)
    if(!is.null(model$kappa)) beta$kappa <- model$kappa
    lambda <- model$lambda
    convergence <- NULL 
    #discard the old object
    rm(model)
  } else {
    if(verbose) cat("Restarting Model...\n")
    #extract from a standard STM object so we can simply continue.
    mu <- model$mu
    beta <- list(beta=lapply(model$beta$logbeta, exp))
    if(!is.null(model$beta$kappa)) beta$kappa <- model$beta$kappa
    sigma <- model$sigma
    lambda <- model$eta
    convergence <- model$convergence
    #manually declare the model not converged or it will stop after the first iteration
    convergence$stopits <- FALSE
    convergence$converged <- FALSE
    #iterate by 1 as that would have happened otherwise
    convergence$its <- convergence$its + 1 
  }    
  
  #Pull out some book keeping elements
  ntokens <- sum(settings$dim$wcounts$x)
  betaindex <- settings$covariates$betaindex
  stopits <- FALSE
  if(ngroups!=1) {
    groups <- cut(1:length(documents), breaks=ngroups, labels=FALSE) 
  }
  suffstats <- vector(mode="list", length=ngroups)
''');

Beginning Initialization.
	 Calculating the gram matrix...
	 Finding anchor words...
 	....................
	 Recovering initialization...
 	....................................................................
Initialization complete.


In [8]:
def rlist_2py(rlist):
    return dict(zip(rlist.names,
               list(rlist)))

In [9]:
fit = dict(zip( robjects.globalenv['fit'].names, 
         list( robjects.globalenv['fit'])))
settings = dict(zip( fit['settings'].names, 
         list(fit['settings'])))

In [10]:
K, A, V, N = [int(settings['dim'][i][0]) for i in range(4)]

In [11]:
# Some setup for EM, retrieving the R objects
stopits = False
ngroups = int(robjects.globalenv['ngroups'][0])
documents = [np.array(x) for x in list(robjects.globalenv['documents'])]
beta_index = np.array(robjects.globalenv['betaindex'])
beta = [np.array(x) for x in robjects.globalenv['beta'][0]]
update_mu = True
Lambda = np.array(robjects.globalenv['lambda'])
mu = np.array(robjects.globalenv['mu'][0])
sigma = np.array(robjects.globalenv['sigma'])
verbose = settings['verbose'][0]

# Run EM
for i in range(2):
    
    # Non-blocked update
    if ngroups==1:
        # TODO
        a = 0

In [12]:
# Function to run all the necessary imports
def run_imports(x):
    import scipy as sp
    import numpy as np
    from scipy import optimize
    return 1

In [13]:
def likelihood(eta, beta, doc_ct, mu, siginv):
    exp_eta = np.exp(np.append(eta, np.array([0])))
    ndoc = np.sum(doc_ct)
    part1 = np.dot(np.log(np.dot(exp_eta, beta)), doc_ct) - ndoc * np.log(np.sum(exp_eta))
    diff = mu.T - eta
    part2 = 0.5 * float(np.dot(np.dot(diff, siginv), diff.T))
    return part2 - part1

In [14]:
def grad(eta, beta, doc_ct, mu, siginv):
    exp_eta = np.exp(np.append(eta, [0]))
    beta_prime = np.apply_along_axis(lambda x: x * exp_eta, 0, beta)
    part1 = np.dot(beta_prime, doc_ct/np.sum(beta_prime, 0).T) - (np.sum(doc_ct)/ np.sum(exp_eta)) * exp_eta
    diff = mu.T - eta
    part2 = np.dot(siginv, diff.T)
    part1 = part1[:len(part1)-1]
    return (part2.T - part1).flatten()

In [15]:
def estep_docloop(doc_item, siginv, sigmaentropy):
    doc_ct = doc_item['doc'][1]
    eta = doc_item['init']
    beta = doc_item['beta_i']
    mu = doc_item['mu_i']
    optim_par = sp.optimize.minimize(likelihood, eta, args=(beta, doc_ct, mu, siginv), 
                            method='BFGS')
    
    def hpb(eta, beta, doc_ct, mu, siginv, sigmaentropy):
        
        # Compute the Hessian
        exp_eta = np.exp(np.append(eta, [0]))
        theta = np.reshape(exp_eta/np.sum(exp_eta), (len(exp_eta), -1)).T
        EB = np.apply_along_axis(lambda x: x * exp_eta, 0, beta)
        EB = np.apply_along_axis(lambda x: x * (np.sqrt(doc_ct).T) / np.sum(EB,0), 1, EB)
        hess = np.dot(EB, EB.T) - np.sum(doc_ct) * np.dot(theta.T, theta)    
        EB = np.apply_along_axis(lambda x: x * np.sqrt(doc_ct).T, 1, EB)
        hess[np.diag_indices_from(hess)] = hess[np.diag_indices_from(hess)] - np.sum(EB, 1) + np.sum(doc_ct) * theta
        hess = hess[:hess.shape[0]-1,:hess.shape[1]-1] + siginv

        # Invert via Cholesky decomposition
        try:
            nu = np.linalg.cholesky(hess)
        except:
            dvec = np.array(np.diag(hess))
            magnitudes = np.sum(np.abs(hess), 1) - abs(dvec)
            Km1 = len(dvec)
            for i in range(Km1):
                if dvec[i] < magnitudes[i]:
                    dvec[i] = magnitudes[i]
            hess[np.diag_indices_from(hess)] = dvec
            nu = np.linalg.cholesky(hess)

        # Finish construction
        det_term = -np.sum(np.log(np.diag(nu)))
        nu = np.linalg.inv(np.triu(nu))
        nu = np.dot(nu, nu.T)
        diff = eta - mu.flatten()

        # Compute the bound
        bound = (np.dot(np.log(np.dot(theta, beta)), doc_ct) + det_term 
                 - 0.5 * np.dot(diff.T, np.dot(siginv, diff)) - sigmaentropy)

        # Construct ouput
        out = {'phis': EB,
               'eta': {'lambda': eta, 'nu': nu},
               'bound': bound}
        return out
    
    return hpb(optim_par.x, beta, doc_ct, mu, siginv, sigmaentropy)

In [20]:
def estep_spark(documents, beta_index, beta, Lambda_old,
                mu, sigma, verbose, sc, update_mu=False):
    
    # Initialize sufficient statistics
    sigma_ss = np.zeros((K-1, K-1))
    beta_ss = [np.zeros((K, V)) for i in range(A)]
    bound = np.array([0] * N)
    Lambda = np.array([0] * N)
    siginv = np.linalg.inv(sigma)
    sigmaentropy = np.log(np.abs(np.linalg.det(sigma))) * 0.5
    
    # Parallelize document collection
    collection = [{'doc':doc, 'aspect': int(aspect), 'init': init} 
                  for (doc, aspect, init) in zip(documents, beta_index, Lambda_old)]
    for item in collection:
        item['beta_i'] = beta[item['aspect']-1][:,[x-1 for x in item['doc'][0]]]
        item['mu_i'] = mu
        
    # Run the imports
    imports_return = sc.parallelize(range(100)).map(run_imports).collect()
        
    # Run estep on just a few samples for testing purposes
    collection_par = sc.parallelize(collection[:5])
    out = collection_par.map(lambda x: estep_docloop(x, siginv, sigmaentropy)).collect()
    
    return out

In [21]:
# A trial run of the E-step
trial_run = estep_spark(documents, beta_index, beta, Lambda, mu, sigma, verbose, sc, False)
print trial_run[0]

{'eta': {'nu': array([[ 12.9133492 ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ],
       [  0.        ,  16.98471722,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ],
       [  0.        ,   0.        ,  14.3728152 ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ,   0.        ],
       [  0.        ,   0.        ,   0.        ,   3.40251017,
