In [1]:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))

import re

def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        return words
    except ValueError as e:
        return []

#### Read the word file

In [2]:
wiki = sc.textFile("/data/wiki/en_articles_part/articles-part", 16).map(parse_article)
result = wiki.take(1)[0]
for word in result[:10]:
    print word

Anarchism
Anarchism
is
often
defined
as
a
political
philosophy
which


#### count the words

In [3]:
#count the words
num_words = float(wiki.count())
print(num_words)

4100.0


#### transform them to lowercase

In [4]:
wiki_flat = wiki.flatMap(lambda x: [(x[i].lower(),1) for i in range(0,len(x))])
result = wiki_flat.take(10)
for word in result[:10]:
    print word

(u'anarchism', 1)
(u'anarchism', 1)
(u'is', 1)
(u'often', 1)
(u'defined', 1)
(u'as', 1)
(u'a', 1)
(u'political', 1)
(u'philosophy', 1)
(u'which', 1)


#### Filter out stopwords & reduce the words

In [5]:
stop_words = sc.textFile("/datasets/stop_words_en.txt", 16)
stop_words_data = stop_words.collect()
broadcast_var = sc.broadcast(stop_words_data)
wiki_filtered = wiki_flat.filter(lambda (x,y): x not in broadcast_var.value).reduceByKey(lambda x,y: x+y)
result = wiki_filtered.take(10)
for word in result[:10]:
    print word

(u'biennials', 10)
(u'underlyingly', 1)
(u'ancyra', 43)
(u'tripolitan', 2)
(u'tilton', 4)
(u'nordland', 1)
(u'squealer', 8)
(u'regularize', 2)
(u'skylights.passive', 1)
(u'thesis"(kleene', 1)


#### compute words probability

In [6]:
wiki_filtered_p = wiki_filtered.map(lambda (x,y): (x,y/num_words))
result = wiki_filtered_p.take(10)
for word in result[:10]:
    print word

(u'biennials', 0.0024390243902439024)
(u'underlyingly', 0.00024390243902439024)
(u'ancyra', 0.010487804878048781)
(u'tripolitan', 0.0004878048780487805)
(u'tilton', 0.000975609756097561)
(u'nordland', 0.00024390243902439024)
(u'squealer', 0.001951219512195122)
(u'regularize', 0.0004878048780487805)
(u'skylights.passive', 0.00024390243902439024)
(u'thesis"(kleene', 0.00024390243902439024)


#### Compute the bigrams

In [7]:
bigrams = wiki.flatMap(lambda x: [((x[i].lower(),x[i+1].lower()),1) for i in range(0,len(x)-1)])
result = bigrams.take(10)
for word in result[:10]:
    print word

((u'anarchism', u'anarchism'), 1)
((u'anarchism', u'is'), 1)
((u'is', u'often'), 1)
((u'often', u'defined'), 1)
((u'defined', u'as'), 1)
((u'as', u'a'), 1)
((u'a', u'political'), 1)
((u'political', u'philosophy'), 1)
((u'philosophy', u'which'), 1)
((u'which', u'holds'), 1)


#### Count the bigrams

In [None]:
num_bigrams = float(bigrams.count())
print(num_bigrams)

11933217.0


#### Filter out stopwords & reduce the bigrams

In [None]:
bigrams_filtered = bigrams.filter(lambda (x,y): x not in broadcast_var.value and y not in broadcast_var.value) \
.reduceByKey(lambda x,y: x+y) \
.filter(lambda x,y: y>=500)
result = bigrams_filtered.take(10)
for word in result[:10]:
    print word

#### compute bigrams probability

In [None]:
bigrams_filtered_p = bigrams_filtered.map(lambda (x,y): (x,y/num_bigrams))
result = bigrams_filtered-_p.take(10)
for word in result[:10]:
    print word

#### Join the RDDs

In [None]:
join_bi_words = bigrams_filtered_p.map(lambda ((a,b),p_ab) : (a,(b,p_ab))) \
.cogroup(wiki_filtered_p) \
.map(lambda (a, ((b,p_ab),p_a)): (b,(a,p_a,p_ab))) \
.cogroup(wiki_filtered_p) \
.map(lambda (b,((a,p_a,p_ab),p_b)): ((a,b),(p_a,p_b,p_ab)))

#### compute NPMI

In [None]:
import numpy as np
#PMI(a, b) = ln (P(ab) / (P(a) * P(b)
pmi_rdd = join_bi_words.map(lambda ((a,b),(p_a,p_b,p_ab)) : ((a,b),(np.log((p_ab)/(p_a * p_b)),p_ab)))
#NPMI(a, b) = PMI(a, b) / -ln P(ab)
npmi_rdd = pmi_rdd.map(lambda ((a,b),(pmi,p_ab)): ((a,b),(pmi/-np.log(p_ab)))) \ 
            .sortBy(lambda (x,y): y, ascending=False)

#### Print the results

In [None]:
result = bigrams_filtered_p.take(39)
for bigram in results:
    print (('%s_%s') % (bigram[0][0],bigram[0][1]))