In [1]:
import gensim

In [11]:
import numpy as np
from gensim import corpora
import pyspark.mllib.clustering
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
import numpy as np
import operator
import os
import string

In [8]:
#This block of code is only necessary if you are running locally
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 

class SCSingleton(object):
    """ Wrapper for Spark Context to prevent multiple instantiation of the Spark Context. """
    
    __instance = None

    def __new__(cls):
        if SCSingleton.__instance is None:
            SCSingleton.__instance = object.__new__(cls)
            #SCSingleton.__instance.conf = conf
            SCSingleton.__instance.sc = SparkContext()
            SCSingleton.__instance.sqlCtx = SQLContext(SCSingleton.__instance.sc)
        else:
            print "ERROR: An instance of a SparkContext is already created. " + \
                          " That instance is ", SCSingleton.__instance.conf
        return SCSingleton.__instance

singleton = SCSingleton()
sc = singleton.sc
sqlCtx = singleton.sqlCtx

In [3]:
def get_lda_corpus(data):
        
    documents_df = sc.parallelize(data)
    
    #choose a vocabulary - in this case using gensim to make a dictionary
    dictionary = corpora.Dictionary(data)
    
    #filter out very frequent terms - could also use an exclusion list
    dictionary.filter_extremes(no_below = 1, no_above=0.90, keep_n=None)   
    
    num_terms = len(dictionary.keys())
    tokens = dictionary.token2id
    
    print 'Using dictionary:'
    sorted_names = sorted(tokens, key=tokens.__getitem__)
    print sorted_names
    #Sorted_names is handy because the lda_corpus will only return the name index and not the name with the model
    #By returning both we have the name and its corresponding index
    
    lda_corpus = documents_df.map(lambda row: vectorize_by_dict(row, num_terms, tokens))\
        .zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
        
    return lda_corpus, sorted_names

In [4]:
def look_at_topics(lda_corpus, sorted_names, num_topics, num_words, \
                   maxIterations=20, docConcentration=-1.0, topicConcentration=-1.0, \
                   seed=None, checkpointInterval=10, optimizer='em'):
    
    # Cluster the documents into topics using LDA
    
    #defaults: k=10, maxIterations=20, docConcentration=-1.0, topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer='em'
    ldaModel = LDA.train(lda_corpus, num_topics, maxIterations, docConcentration, topicConcentration, \
                    seed, checkpointInterval, optimizer)
    
    #ldaModel = LDA.train(lda_corpus, k=num_topics)
    
    #print ldaModel.describeTopics()
    
    #The Topics Matrix is the weight of each word for each topic
    #For a given word the total weight overall topics equals the number of times the word is in the corpus
    #Renormalize the topics matrix to view the top words by topic
    topicMatrix = ldaModel.topicsMatrix()
    topicMatrix.flags.writeable = True
    word_sums = np.sum(topicMatrix, axis=1)
    topics = topicMatrix / word_sums[:,None]
    
    print "Top %i words by Topic:"%num_words
    for topic in range(num_topics):
        word_index = zip(*sorted(enumerate(np.transpose(ldaModel.topicsMatrix())[topic]), \
                                 key=operator.itemgetter(1)))[0][-num_words:]
        print("Topic " + str(topic) + ":")
        word_index = word_index[::-1]
        for idx in word_index:
            print (" " + sorted_names[idx] +" "+ str(topics[idx][topic]))
        
    #print '\n'
    
    #Uncomment this block of code if you want to describe all words in each topic with their weights
    # Output topics. Each is a distribution over words (matching word count vectors)
#     print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):")
#     topics = ldaModel.topicsMatrix()
#     for topic in range(num_topics):
#         print("Topic " + str(topic) + ":")
#         for word in range(0, ldaModel.vocabSize()):
#             print(" " + str(sorted_names[word]) +" "+ str(topics[word][topic]))

    return ldaModel

In [5]:
#Get the document's topics by summing the normalized word weights for each word in the document
#Then divide by the number of words to get the topic fraction (so weights for a document over all topics will equal 1)
def doc_to_topic(docs, ldaModel, sorted_names):
    num_topics = ldaModel.topicsMatrix().shape[1]
    
    
    topicMatrix = ldaModel.topicsMatrix()
    topicMatrix.flags.writeable = True
    word_sums = np.sum(topicMatrix, axis=1)
    topics = topicMatrix / word_sums[:,None]
    
    doc_topics = []
    for doc in docs:
        doc_topic_weights = []
        for topic in range(num_topics):
            num_words = 0
            total_weight = 0
            for word in doc:
                try:
                    word_idx = sorted_names.index(word)
                    word_topic_weight = topics[word_idx][topic]
                    num_words += 1
                    total_weight += word_topic_weight
                except:
                    pass
            topic_weight = total_weight/num_words
            doc_topic_weights.append(topic_weight)
        
        doc_topics.append(doc_topic_weights)
    return doc_topics
            
def vectorize_by_dict(row, term_count, tokens):
    ar = np.zeros(term_count)
    for word in row:
        try:
            idx = tokens[word]
            ar[idx] += 1
        except:
            pass
    return Vectors.dense(ar)

In [6]:
toy_data = [('apple', 'orange'),
            ('apple', 'banana'),
            ('banana', 'orange'),
            ('tiger', 'cat'),
            ('cat', 'dog'),
            ('dog', 'tiger')]

In [9]:
lda_corpus, sorted_names = get_lda_corpus(toy_data)

Using dictionary:
[u'apple', u'dog', u'cat', u'tiger', u'orange', u'banana']


In [12]:
ldaModel = look_at_topics(lda_corpus, sorted_names, num_topics=2, num_words=3)

Top 3 words by Topic:
Topic 0:
 orange 0.564960882549
 apple 0.537878401362
 tiger 0.50677397809
Topic 1:
 dog 0.559068651179
 cat 0.540407844128
 banana 0.510141565969


In [14]:
doc_to_topic(toy_data, ldaModel, sorted_names)

[[0.55141964195520521, 0.44858035804479479],
 [0.51386841769648883, 0.48613158230351128],
 [0.52740965828997044, 0.47259034171002962],
 [0.4831830669811345, 0.51681693301886555],
 [0.4502617523465467, 0.54973824765345336],
 [0.47385266345568866, 0.52614733654431134]]

In [15]:
#Describe topics is the word index and their weights for each topic
ldaModel.describeTopics() 

[([4, 0, 3, 5, 2, 1],
  [0.1883205954503354,
   0.1792930872795163,
   0.1689249296024874,
   0.16328640589587137,
   0.15319763036984826,
   0.14697735140194132]),
 ([1, 2, 5, 3, 0, 4],
  [0.1863559189352009,
   0.18013565986914823,
   0.1700469166222787,
   0.16440841095618422,
   0.15404028645219528,
   0.14501280716499268])]

In [1]:
###Now that it works for a small example - we can also go through a directory listing

In [16]:
corpus_directory = 'test_docs/'

In [17]:
def doc_splitter(row):
    str_row = str(row)
    exclude = set(string.punctuation)
    str_row_clean = ''.join(ch for ch in str_row if ch not in exclude)
    return str_row_clean.lower().split()

In [18]:
all_docs = sc.emptyRDD()
for i in os.listdir(corpus_directory):
    doc = sc.textFile(corpus_directory+i)
    split_doc = doc.map(lambda row: doc_splitter(row))
    if all_docs.isEmpty():
        all_docs = split_doc
    else:
        all_docs = all_docs.union(split_doc)
docs= all_docs.collect()

In [19]:
lda_corpus_docs, sorted_names_docs = get_lda_corpus(docs)
ldaModel = look_at_topics(lda_corpus_docs, sorted_names_docs, num_topics=2, num_words=3)

Using dictionary:
[u'brown', u'lazy', u'slow', u'clouds', u'sloth', u'very', u'jumped', u'over', u'tree', u'dog', u'yellow', u'gently', u'fox', u'overhead', u'pretty', u'quick', u'climed', u'roll']
Top 3 words by Topic:
Topic 0:
 over 0.535308496426
 dog 0.534304742768
 lazy 0.52703040387
Topic 1:
 slow 0.530950123118
 sloth 0.527294647625
 roll 0.52548928296


In [20]:
doc_to_topic(docs, ldaModel, sorted_names_docs)

[[0.52664540662037762, 0.47335459337962238],
 [0.47793136062292624, 0.52206863937707371]]