# Spark assignment 2: Collocations
As for the second part of the assignment, your task is to extract collocations: that is word combinations that occur together. For example, “high school” or “roman empire”.

To find collocations, you will use NPMI (normalized pointwise mutual information) metric.

PMI of two words, a & b, is defined as “PMI(a, b) = ln (P(ab) / (P(a) * P(b))”, where P(ab) is the probability of two words coming one after the other, and P(a) and P(b) are probabilities of words a & b respectively.

You will estimate probabilities with occurrence counts, that is “P(a) = # of occurrences of word a / total number of words”, and “P(ab) = # of occurrences of words ‘a b’ / total number of word pairs”.

To build an intuition behind the definition, see Reading material.

Therefore, rare combinations of coupled words have large PMI.

NPMI is computed as “NPMI(a, b) = PMI(a, b) / -ln P(ab)”. This normalizes the quantity to be within the range [-1; 1].

You task is a bit more complicated now:

* Extract all the words, as in the previous task.
* Filter out stopwords using the dictionary (/datasets/stop_words_en.txt ) (do not forget to convert words to the lowercase!)
* Compute all bigrams (that is, pairs of consequent words)
* Leave only bigrams with at least 500 occurrences
* Compute NPMI for every bigram (note: when computing probabilities, you need unpruned counts!)
* Sort word pairs by NPMI in the descending order
* Print top 39 word pairs, with words delimited by the underscore “_”
                                                        
For example,

roman_empire
                                                        
south_africa

Dataset location: /data/wiki/en_articles_part

The part of the result on the sample dataset:

...

references_reading

notes_references

award_best

north_america

new_zealand

...
                                                        
                                                        
If you want to deploy the environment on your own machine, please use https://hub.docker.com/r/bigdatateam/all-spark Docker container.                                                        

In [None]:
minOccurrences = 500
stopWordsAddress = "/datasets/stop_words_en.txt"
wikiArticlesAddress = "/data/wiki/en_articles_part/articles-part"

import re
import math

from pyspark import SparkConf, SparkContext

try:
    sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local").set("spark.cores.max", "16"))
except:
    pass

In [2]:
def readStopWords(fileAddress):
    with open(fileAddress) as f:
        return f.read().splitlines()    
stopWords = readStopWords(stopWordsAddress)
stopWords = sc.broadcast(stopWords)

In [3]:
def parse_article(line):
    try:
        article_id, text = line.rstrip().split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        return [word.lower() for word in words]
    except ValueError as e:
        return []

In [4]:
def removeStopWords(words):
    return [word for word in words if word not in stopWords.value]

def pairs(words):
    out = []
    for w1, w2 in zip(words, words[1:]):
        out.append((w1.lower() + "_" + w2.lower(), 1))
    return out

In [5]:
def calculateNPMI(pair):
    pair, count = pair
    word1, word2 = pair.split("_")
    w1_count = words_count_map.value[word1]
    w2_count = words_count_map.value[word2]
    
    pairProbability = float(count) / totalNumberPairs.value
    w1Probability = float(w1_count) / totalNumberOfWords.value
    w2Probability = float(w2_count) / totalNumberOfWords.value
    
    PMI = math.log(pairProbability / (w1Probability * w2Probability))
    NPMI = PMI / (-1 * math.log(pairProbability))
    return (pair, NPMI)

In [6]:
wordsByArticle = (sc.textFile(wikiArticlesAddress, 16)
                    .map(parse_article)  
                    .map(removeStopWords)
                   ).cache()

In [7]:
allPairs = (wordsByArticle.flatMap(pairs)
                          .reduceByKey(lambda a, b: a + b)
                         ).cache()

In [None]:
totalNumberPairs = allPairs.map(lambda pair: pair[1]).sum()
totalNumberPairs = sc.broadcast(totalNumberPairs)

In [10]:
totalWords = (wordsByArticle.flatMap(lambda article : [(word, 1) for word in article])
                            .reduceByKey(lambda a, b: a + b)
                           ).cache()

In [11]:
totalNumberOfWords = totalWords.map(lambda value: value[1]).sum()
totalNumberOfWords = sc.broadcast(totalNumberOfWords)

In [18]:
words_count_map = totalWords.collectAsMap()
words_count_map = sc.broadcast(words_count_map)

In [None]:
allPairsNPMI = (allPairs.filter(lambda pair: pair[1] > minOccurrences)
                        .map(calculateNPMI)
                        .sortBy(lambda value: value[1], ascending=False)
                       ).cache()

In [12]:
for pair, value in allPairsNPMI.take(39):
    print(pair)

los_angeles
external_links
united_states
prime_minister
san_francisco
et_al
new_york
supreme_court
19th_century
20th_century
references_external
soviet_union
air_force
baseball_player
university_press
roman_catholic
united_kingdom
references_reading
notes_references
award_best
north_america
new_zealand
civil_war
catholic_church
world_war
war_ii
south_africa
took_place
roman_empire
united_nations
american_singer-songwriter
high_school
american_actor
american_actress
american_baseball
york_city
american_football
years_later
north_american
