Topic modeling (unsupervised clustering) with Spark and [Latent Dirichlet Allocation](https://en.wikipedia.org/wiki/Latent_Dirichlet_allocation) (LDA). 
Dataset is mini_newsgroups from [here](http://kdd.ics.uci.edu/databases/20newsgroups/20newsgroups.html) 

In [32]:
from pyspark import SparkContext
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

#sc = SparkContext(appName="LatentDirichletAllocationExample2")  # SparkContext

# Load documents from text files, 1 document per file
corpus = sc.wholeTextFiles("datasets/mini_newsgroups/*/*").map(lambda x: x[1])

In [42]:
# standard english stop words
with open("datasets/stopwords.txt", "r") as f:
    stopwords = set(f.read().splitlines())

# specific to this dataset
stopwords.add('article')
stopwords.add('will')
print(len(stopwords))

667


In [43]:
print('like' not in stopwords, 'were' in stopwords, 'jesus' not in stopwords)

False True True


In [44]:
# Split each document into a sequence of terms (words)
tokenized = corpus.map(lambda i: i.lower().split()).\
    map(lambda i: list(filter(lambda w: len(w) > 3 and w.isalpha() and w not in stopwords, i)))

In [45]:
# build vocabulary
all_words = tokenized.flatMap(lambda l: l).collect()

from collections import Counter
cnt = Counter(all_words).most_common()
cnt.sort(key=lambda x: x[1], reverse=True)

# print top 25 words to check it's not stopwords or garbage
print(cnt[:25])

[('people', 834), ('university', 806), ('good', 536), ('time', 460), ('going', 372), ('find', 354), ('system', 349), ('image', 328), ('computer', 326), ('problem', 313), ('data', 307), ('work', 297), ('well', 284), ('state', 280), ('better', 279), ('news', 279), ('windows', 261), ('number', 255), ('help', 249), ('sure', 249), ('government', 244), ('program', 244), ('power', 240), ('file', 237), ('software', 236)]


In [46]:
# prepare vocab vector
vocabArr = [a for (a,b) in cnt]
vocab = dict({ (item, index) for (index, item) in enumerate(vocabArr) })

print(len(vocab))

22379


In [47]:
def to_vector(pair):
    vec = [0]*len(vocabArr)
    for word in pair[0]:
        if word in vocab:
            vec[vocab[word]] += 1
    return [pair[1], Vectors.dense(vec)]

# Convert documents into term count vectors
documents = tokenized.zipWithIndex().map(to_vector)

In [48]:
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(documents, k=20)

print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + "words)")

Learned topics (as distributions over vocab of 22379words)


In [49]:
# Print topics, showing top-weighted 10 terms for each topic.
topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)

with open("Output.txt", "w") as f:
    for terms, termWeights in topicIndices:
        print("TOPIC:", file=f)
        for term, weight in zip(terms, termWeights):
            print("\t{0}: {1}".format(vocabArr[term], str(weight)), file=f)
            
with open("Output.txt", "r") as f:
    print(f.read())

TOPIC:
	canada: 0.009955370585666002
	university: 0.009255400787371235
	group: 0.005935391393905406
	printer: 0.005546916335778018
	call: 0.005254214901441397
	germany: 0.004934621676008525
	israel: 0.0048111388901299545
	computer: 0.004630110539019527
	april: 0.004616537800189283
	systems: 0.004450505660681018
TOPIC:
	windows: 0.02863532780051452
	disk: 0.014135990120863223
	window: 0.013717348025091055
	support: 0.013605243202843164
	network: 0.013080793154365129
	system: 0.01079139133589274
	software: 0.01033993403125817
	server: 0.010158972581904534
	graphics: 0.009442766288329116
	running: 0.009203822307712254
TOPIC:
	police: 0.012229667205915826
	university: 0.010913968713088154
	year: 0.008852286576048553
	greek: 0.007123881257355777
	title: 0.006923755860073648
	turkish: 0.006512739093172437
	league: 0.006204597274349862
	insurance: 0.005471488497166305
	francisco: 0.004928853025651885
	author: 0.004809611172028609
TOPIC:
	image: 0.03054051244775677
	jpeg: 0.015293703224877806
