In [1]:
import sys
import os

# Set the environment variables to use the current python executable
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark import SparkContext

sc = SparkContext.getOrCreate()
print(sc.version)


4.0.1


In [11]:
# Data file at https://www.cse.ust.hk/msbd5003/data

lines = sc.textFile('C:\\Users\\allen\\OneDrive\\Documents\\Big_Data_Technology\\data\\adj_noun_pairs.txt', 8)

In [12]:
lines.count()

3162692

In [13]:
lines.getNumPartitions()

8

In [14]:
lines.take(5)

['early radical',
 'french revolution',
 'pejorative way',
 'violent means',
 'positive label']

In [17]:
# Converting lines into word pairs. 
# Data is dirty: some lines have more than 2 words, so filter them out.
pairs = lines.map(lambda l: tuple(l.split())).filter(lambda p: len(p)==2)
pairs.cache()

PythonRDD[18] at RDD at PythonRDD.scala:56

In [18]:
pairs.take(5)

[('early', 'radical'),
 ('french', 'revolution'),
 ('pejorative', 'way'),
 ('violent', 'means'),
 ('positive', 'label')]

In [8]:
N = pairs.count()

In [9]:
N

3162674

In [21]:
# Compute the frequency of each pair.
# Ignore pairs that not frequent enough
pair_freqs = pairs.map(lambda p: (p,1)).reduceByKey(lambda f1, f2: f1 + f2) \
                  .filter(lambda pf: pf[1] >= 100)


In [22]:
pair_freqs.take(5)

[(('modern', 'era'), 146),
 (('early', 'century'), 1568),
 (('major', 'city'), 572),
 (('human', 'history'), 185),
 (('other', 'branch'), 109)]

In [23]:
# Computing the frequencies of the adjectives and the nouns
a_freqs = pairs.map(lambda p: (p[0],1)).reduceByKey(lambda x,y: x+y)
n_freqs = pairs.map(lambda p: (p[1],1)).reduceByKey(lambda x,y: x+y)

In [24]:
a_freqs.take(5)

[('differ', 381),
 ('most', 18259),
 ('moral', 2006),
 ('good', 7499),
 ('philosophical', 1804)]

In [22]:
print(n_freqs.count())
print(a_freqs.count())
print(pair_freqs.count())

106333
104304
1127051


In [None]:
# Broadcasting the adjective and noun frequencies. 
#a_dict = a_freqs.collectAsMap()
#a_dict = sc.parallelize(a_dict).map(lambda x: x)
#a_dict.value['violent']
n_dict = n_freqs.collectAsMap() 
a_dict = a_freqs.collectAsMap()
# global variables ? - by default will be packaged into closure and broadcasted

# in-video code:
n_dict = sc.broadcast(n_freqs.collectAsMap()) # collect and convert to map
a_dict = sc.broadcast(a_freqs.collectAsMap())
a_dict.value['violent']
# boradcast variables allow the programmer to keep a read-only variable cached on each machine (not each task)
# more efficient than shipping a copy of it with (sending closures to) tasks


1191

In [None]:
from math import *

# Computing the PMI for a pair.
def pmi_score(pair_freq):
    w1, w2 = pair_freq[0]
    f = pair_freq[1]
    pmi = log(float(f)*N/(a_dict.value[w1]*n_dict.value[w2]), 2) 
    # =log(f(x,y)*N/(f(x)*f(y))) - f for frequency
    return pmi, (w1, w2)

In [18]:
# Computing the PMI for all pairs.
scored_pairs = pair_freqs.map(pmi_score)

In [19]:
# Printing the most strongly associated pairs. 
scored_pairs.top(10)

[(14.41018838546462, ('magna', 'carta')),
 (13.071365888694997, ('polish-lithuanian', 'Commonwealth')),
 (12.990597616733414, ('nitrous', 'oxide')),
 (12.64972604311254, ('latter-day', 'Saints')),
 (12.50658937509916, ('stainless', 'steel')),
 (12.482331020687814, ('pave', 'runway')),
 (12.19140721768055, ('corporal', 'punishment')),
 (12.183248694293388, ('capital', 'punishment')),
 (12.147015483562537, ('rush', 'yard')),
 (12.109945794428935, ('globular', 'cluster'))]