In [1]:
from __future__ import division
from pyspark import SparkConf, SparkContext
from operator import add
import re

In [2]:
sc = SparkContext(conf=SparkConf().setAppName("Spark assignment collocations").setMaster("local"))

In [3]:
path_to_file='/datasets/stop_words_en.txt'

with open(path_to_file) as stop_words_file:
    content = stop_words_file.readlines()
    stop_words = set(l.strip().lower() for l in content)


In [5]:
 def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        return [w.lower() for w in words if w.lower() not in stop_words]# if w.lower() not in stop_words
    except ValueError as e:
        return []


In [6]:
wiki = sc.textFile("/data/wiki/en_articles_part/articles-part", 16) \
                  .flatMap(parse_article) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)


In [10]:


words_count = wiki.collectAsMap()
total_words = sum(words_count.values())



In [12]:
def get_pairs(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        words = [w.lower() for w in words if w.lower() not in stop_words]
        
        result = []
        for idx, w in enumerate(words):
            if idx + 1 != len(words):
                result.append(w + '_' + words[idx+1].lower())
              
        return result
    except ValueError as e:
        return []


In [13]:
pairs = sc.textFile("/data/wiki/en_articles_part/articles-part", 16) \
                .flatMap(get_pairs) \
                .cache()

In [17]:
total_collocations = pairs.count()

In [18]:
def pmi(pair):
    left, right = unicode(pair[0]).split('_', 1)
    p_left = words_count[left] / total_words
    p_right = words_count[right] / total_words
    p_pair =  pair[1] / total_collocations
    result = p_pair / (p_left * p_right)
    return (pair[0], result)


In [19]:
top_39 = pairs.map(lambda x: (x, 1)) \
                .reduceByKey(add) \
                .filter(lambda x: x[1] > 499) \
                .map(pmi) \
                .sortBy(lambda x: x[1], ascending=False) \
                .take(39)


In [20]:
 for p in top_39:
    print p[0]   

los_angeles
san_francisco
prime_minister
et_al
external_links
supreme_court
soviet_union
20th_century
19th_century
roman_catholic
references_reading
baseball_player
references_external
award_best
notes_references
air_force
united_states
catholic_church
new_zealand
north_america
united_kingdom
university_press
new_york
south_africa
roman_empire
took_place
civil_war
united_nations
american_singer-songwriter
war_ii
high_school
world_war
american_actress
american_actor
american_baseball
american_football
york_city
years_later
north_american
