In [34]:
import sys
spark_home = '/Users/arj/Make/bin/spark-2.1.0-bin-hadoop2.7'
sys.path.insert(0, spark_home + "/python")

from pyspark import SparkConf, SparkContext

urlMaster = 'spark://Arjuns-MacBook-Pro.local:7077'

conf = (
    SparkConf()
        .setAppName('spark.app')
        .setMaster(urlMaster)
)
sc = SparkContext(conf=conf)

In [103]:
# building corpus
from nltk.corpus import brown
sentences = brown.sents()[:1000]
corpus = sc.parallelize(sentences).map(lambda s: ' '.join(s))

In [108]:
from itertools import chain
from pattern.text.en import parsetree

def get_chunks(sentence):
    return list(chain.from_iterable(
            map(
                lambda sentence: sentence.chunks, 
                parsetree(sentence)
            )
        ))

chunks = corpus \
    .map(get_chunks)
    
print (chunks.take(2))

[[Chunk('The Fulton County Grand Jury/NP'), Chunk('said/VP'), Chunk('Friday an investigation/NP'), Chunk('of/PP'), Chunk('Atlanta/NP'), Chunk('recent primary election/NP'), Chunk('produced/VP'), Chunk('no evidence/NP'), Chunk('that any irregularities/NP'), Chunk('took/VP'), Chunk('place/NP')], [Chunk('The jury/NP'), Chunk('further said/VP'), Chunk('in/PP'), Chunk('term-end presentments/NP'), Chunk('that/PP'), Chunk('the City Executive Committee/NP'), Chunk('had/VP'), Chunk('over-all charge/NP'), Chunk('of/PP'), Chunk('the election/NP'), Chunk('deserves/VP'), Chunk('the praise/NP'), Chunk('thanks/NP'), Chunk('of/PP'), Chunk('the City/NP'), Chunk('of/PP'), Chunk('Atlanta/NP'), Chunk('for/PP'), Chunk('the manner/NP'), Chunk('in/PP'), Chunk('which the election/NP'), Chunk('was conducted/VP')]]


#### nouns word count

In [163]:
def match_noun_like_pos(pos):
    import re
    return re.match(re.compile('^N.*'), pos) != None

noun_like = chunks \
    .flatMap(lambda chunks: chunks) \
    .filter(lambda chunk: chunk.part_of_speech == 'NP') \
    .flatMap(lambda chunk: chunk.words) \
    .filter(lambda word: match_noun_like_pos(word.part_of_speech)) \
    .map(lambda word: word.string.lower())

print (noun_like.take(10))

[u'fulton', u'county', u'grand', u'jury', u'friday', u'investigation', u'atlanta', u'primary', u'election', u'evidence']


In [171]:
noun_word_count = noun_like \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda d: d[1], ascending=False)
    
print (noun_word_count.take(10))

[(u'state', 85), (u'city', 58), (u'administration', 52), (u'president', 52), (u'mr.', 52), (u'year', 46), (u'committee', 39), (u'bill', 39), (u'states', 37), (u'county', 35)]


##### LDA

In [172]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

In [176]:
vocabulary = noun_word_count.map(lambda w: w[0]).collect()
vocabulary[:5]

[u'state', u'city', u'administration', u'president', u'mr.']

In [205]:
doc_nouns = chunks \
    .map(lambda chunks: filter(
            lambda chunk: chunk.part_of_speech == 'NP',
            chunks
        )) \
    .filter(lambda chunks: len(chunks) > 0) \
    .map(lambda chunks: list(chain.from_iterable(map(
            lambda chunk: chunk.words,
            chunks
        )))) \
    .map(lambda words: filter(
            lambda word: match_noun_like_pos(word.part_of_speech),
            words
        )) \
    .filter(lambda words: len(words) > 0) \
    .map(lambda words: map(
            lambda word: word.string.lower(),
            words,
        ))
    
doc_nouns.take(1)

[[u'fulton',
  u'county',
  u'grand',
  u'jury',
  u'friday',
  u'investigation',
  u'atlanta',
  u'primary',
  u'election',
  u'evidence',
  u'irregularities',
  u'place']]

In [207]:
def get_vector_representation(nouns, vocab):
    return  Vectors.dense(map(
        lambda word: 1.0 if word in nouns else 0.0,
        vocab
    ))

doc_vecs = doc_nouns \
    .map(lambda nouns: get_vector_representation(set(nouns), vocabulary)) \
    .zipWithIndex().map(lambda x: [x[1], x[0]])

In [209]:
ldaModel = LDA.train(doc_vecs, k=3)

In [253]:
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    topic_words = sorted(map(
        lambda d: (topics[d[0]][topic], d[1]),
        enumerate(vocabulary)
    ), reverse=True)
    for word in topic_words[:10]:
        print("{}: {}".format(word[1], word[0]))
    print '-----------'

Learned topics (as distributions over vocab of 2279 words):
Topic 0:
state: 27.6350480347
city: 18.9516713343
mr.: 17.5439356649
president: 16.8568307883
year: 15.4074257761
committee: 14.0324129502
administration: 13.9553862346
bill: 12.960995307
election: 11.6073234867
house: 11.5578186886
-----------
Topic 1:
state: 26.203168362
administration: 18.269679186
year: 16.2404114273
president: 16.0424256301
city: 14.5047677994
bill: 13.728963992
committee: 13.6235523038
mr.: 13.6074814177
tax: 12.0525070432
states: 11.6004234735
-----------
Topic 2:
state: 27.1617836034
city: 17.5435608663
president: 16.1007435816
mr.: 15.8485829174
administration: 15.7749345795
states: 14.7379675751
year: 13.3521627966
house: 12.5135762168
election: 12.140906292
united: 11.8705878794
-----------
