In [3]:
!rm -rf metastore_db/
import pyspark

It would be bit cleaner to use nltk to do the tokenization, but we don't have nltk installed in our cluster.

In [4]:
def tokenize(s):
    import re
    stopwords = set(['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'now'])
    word_regex = '^[a-z][a-z\'-]+[a-z]$'
    s = s.lower()
    arr = s.split()
    terms = []
    for term in arr:
        if re.match(word_regex, term) != None and len(term) > 3 and term not in stopwords:
            terms.append(term)
    return terms


In [5]:
test_strings = ['the quick brown fox jumps over the brown fence.',
              'the boy paints a tall fence brown!',
              'basketball players are tall.',
              'quick basketball players jump high']

In [6]:
tokens = sc.parallelize(test_strings).map(tokenize)

In [7]:
tokens.collect()

[['quick', 'brown', 'jumps', 'brown'],
 ['paints', 'tall', 'fence'],
 ['basketball', 'players'],
 ['quick', 'basketball', 'players', 'jump', 'high']]

In [8]:
vocab = tokens.flatMap(lambda words: words).distinct()
vocab.collect()

['quick',
 'jump',
 'high',
 'brown',
 'tall',
 'players',
 'paints',
 'jumps',
 'fence',
 'basketball']

In [9]:
from collections import Counter
import numpy as np

#sc.broadcast shares an immutable object throughout the cluster
broadcastVocab = sc.broadcast(vocab.collect())

def bow_vectorize(tokens):
    word_counts = Counter(tokens)
    vector = [word_counts[v] if v in word_counts else 0 for v in broadcastVocab.value]
    return np.array(vector)

In [10]:
tokens.map(bow_vectorize).collect()

[array([1, 0, 0, 2, 0, 0, 0, 1, 0, 0]),
 array([0, 0, 0, 0, 1, 0, 1, 0, 1, 0]),
 array([0, 0, 0, 0, 0, 1, 0, 0, 0, 1]),
 array([1, 1, 1, 0, 0, 1, 0, 0, 0, 1])]

In [11]:
broadcastVocab.value

['quick',
 'jump',
 'high',
 'brown',
 'tall',
 'players',
 'paints',
 'jumps',
 'fence',
 'basketball']

In [12]:
term_freq = tokens.map(lambda terms: Counter(terms))

In [13]:
doc_freq = term_freq.flatMap(lambda counts: counts.keys()).map(lambda keys: (keys, 1)).reduceByKey(lambda a, b: a + b)
doc_freq.collect()

[('quick', 2),
 ('jump', 1),
 ('high', 1),
 ('brown', 1),
 ('tall', 1),
 ('players', 2),
 ('paints', 1),
 ('jumps', 1),
 ('fence', 1),
 ('basketball', 2)]

In [14]:
total_docs = term_freq.count()
total_docs

4

In [15]:
import math

idf = doc_freq.map(lambda tup: (tup[0], math.log(float(total_docs)/ (1 + tup[1])))).collect()
idf

[('quick', 0.28768207245178085),
 ('jump', 0.6931471805599453),
 ('high', 0.6931471805599453),
 ('brown', 0.6931471805599453),
 ('tall', 0.6931471805599453),
 ('players', 0.28768207245178085),
 ('paints', 0.6931471805599453),
 ('jumps', 0.6931471805599453),
 ('fence', 0.6931471805599453),
 ('basketball', 0.28768207245178085)]

In [16]:
broadcast_idf = sc.broadcast(idf)

In [17]:
def tfidf_vectorize(tokens):
    word_counts = Counter(tokens)
    doc_length = sum(word_counts.values())
    
    vector = [ word_counts.get(word[0], 0) * word[1] / float(doc_length) for word in broadcast_idf.value ]
    return np.array(vector)

In [18]:
tfidf = tokens.map(tfidf_vectorize)
tfidf.collect()

[array([ 0.07192052,  0.        ,  0.        ,  0.34657359,  0.        ,
         0.        ,  0.        ,  0.1732868 ,  0.        ,  0.        ]),
 array([ 0.        ,  0.        ,  0.        ,  0.        ,  0.23104906,
         0.        ,  0.23104906,  0.        ,  0.23104906,  0.        ]),
 array([ 0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
         0.14384104,  0.        ,  0.        ,  0.        ,  0.14384104]),
 array([ 0.05753641,  0.13862944,  0.13862944,  0.        ,  0.        ,
         0.05753641,  0.        ,  0.        ,  0.        ,  0.05753641])]

In [19]:
bow = tokens.map(bow_vectorize).cache()
bow.collect()

[array([1, 0, 0, 2, 0, 0, 0, 1, 0, 0]),
 array([0, 0, 0, 0, 1, 0, 1, 0, 1, 0]),
 array([0, 0, 0, 0, 0, 1, 0, 0, 0, 1]),
 array([1, 1, 1, 0, 0, 1, 0, 0, 0, 1])]

In [20]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from math import sqrt

In [21]:
clusters = KMeans.train(tfidf, 2, maxIterations=10, runs=10, initializationMode="random")



In [22]:
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

In [23]:
WSSSE = tfidf.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

Within Set Sum of Squared Error = 0.667330168412


In [24]:
clusters.centers

[array([ 0.0191788 ,  0.04620981,  0.04620981,  0.        ,  0.07701635,
         0.06712582,  0.07701635,  0.        ,  0.07701635,  0.06712582]),
 array([ 0.07192052,  0.        ,  0.        ,  0.34657359,  0.        ,
         0.        ,  0.        ,  0.1732868 ,  0.        ,  0.        ])]

In [25]:
top_n = 3
print([idf[idx][0] for idx in [np.argsort(x)[::-1][:top_n] for x in clusters.centers][0]])
print([idf[idx][0] for idx in [np.argsort(x)[::-1][:top_n] for x in clusters.centers][1]])

['fence', 'paints', 'tall']
['brown', 'jumps', 'quick']


In [26]:
from pyspark.sql import SQLContext, Row
sqlCtx = SQLContext(sc)

In [None]:
!wget https://dsr-data.s3.amazonaws.com/enron/enron.json

In [27]:
# read as dataframe 
email = sqlCtx.read.json('enron.json')

In [28]:
email.take(1)

[Row(_id=Row($oid=u'52af48b5d55148fa0c199643'), bcc=[], cc=[], ctype=u'text/plain; charset=us-ascii', date=u'2000-01-12 08:24:00-08:00', fname=u'1.', folder=u'_sent', fpath=u'enron_mail_20110402/maildir/lay-k/_sent/1.', mid=u'18133935.1075840283210.JavaMail.evans@thyme', recipients=[u'sherri.reinartz@enron.com'], replyto=None, sender=u'rosalee.fleming@enron.com', subject=u'Re: EXECUTIVE COMMITTEE MEETINGS - MONDAY, JANUARY 17', text=u'Ken will attend both meetings.\n\nRosie\n\n\n\nSherri Reinartz\n01/12/2000 03:30 PM\n\n\nTo: James M Bannantine/ENRON_DEVELOPMENT@ENRON_DEVELOPMENT, Cliff \nBaxter/HOU/ECT@ECT, Sanjay Bhatnagar/ENRON_DEVELOPMENT@ENRON_DEVELOPMENT, \nRick Buy/HOU/ECT@ECT, Richard Causey/Corp/Enron@ENRON, Diomedes \nChristodoulou/ENRON_DEVELOPMENT@ENRON_DEVELOPMENT, James V Derrick@Enron, \nAndrew S Fastow/HOU/ECT@ECT, Peggy Fowler/Enron@Gateway, Mark \nFrevert/LON/ECT@ECT, Kevin P Hannon/HOU/ECT@ECT, Ken Harrison/Enron@Gateway, \nDavid Haug/ENRON_DEVELOPMENT@ENRON_DEVELOPM

In [29]:
# tokenize documents
tokenized_rdd = email.select('text').rdd \
  .map(lambda row: row.text) \
  .map(lambda text: text.replace('\n', ' ').replace('\r', ' ')) \
  .map(lambda text: tokenize(text)) \
  .filter(lambda text: len(text) > 0)

# compute term and document frequencies
term_frequency = tokenized_rdd.map(lambda terms: Counter(terms))

doc_frequency = term_frequency.flatMap(lambda counts: counts.keys()) \
                             .map(lambda keys: (keys, 1)) \
                             .reduceByKey(lambda a, b: a + b)

In [30]:
tokenized_rdd.take(1)

[[u'attend',
  u'rosie',
  u'sherri',
  u'reinartz',
  u'james',
  u'cliff',
  u'sanjay',
  u'rick',
  u'richard',
  u'diomedes',
  u'james',
  u'andrew',
  u'peggy',
  u'mark',
  u'kevin',
  u'david',
  u'stanley',
  u'kurt',
  u'larry',
  u'steven',
  u'mark',
  u'kenneth',
  u'rebecca',
  u'mike',
  u'rebecca',
  u'jeffrey',
  u'mark',
  u'cindy',
  u'kenneth',
  u'jeffrey',
  u'john',
  u'jeff',
  u'joseph',
  u'greg',
  u'thomas',
  u'brenda',
  u'marcia',
  u'susan',
  u'stacy',
  u'beena',
  u'karen',
  u'sharron',
  u'molly',
  u'rosane',
  u'stephanie',
  u'bridget',
  u'shelby',
  u'shelby',
  u'mary',
  u'nicki',
  u'carol',
  u'dolly',
  u'elaine',
  u'nancy',
  u'cindy',
  u'sherryl',
  u'mary',
  u'maureen',
  u'joannie',
  u'rosalee',
  u'vanessa',
  u'marsha',
  u'cathy',
  u'loretta',
  u'dolores',
  u'karen',
  u'dorothy',
  u'christina',
  u'lauren',
  u'sherri',
  u'katherine',
  u'judy',
  u'bobbie',
  u'rodney',
  u'demonica',
  u'vanessa',
  u'suzanne',
  u'keith

In [31]:
print term_frequency.count()
term_frequency.take(1)

5866


[Counter({u'andrew': 1,
          u'attend': 7,
          u'attendance': 1,
          u'beena': 1,
          u'boardroom': 1,
          u'bobbie': 1,
          u'breakfast': 1,
          u'brenda': 1,
          u'bridge': 1,
          u'bridget': 1,
          u'brown': 1,
          u'call': 2,
          u'carol': 1,
          u'cathy': 1,
          u'christina': 1,
          u'cindy': 2,
          u'cliff': 1,
          u'committee': 3,
          u'conf': 1,
          u'conference': 3,
          u'connections': 1,
          u'contact': 1,
          u'david': 1,
          u'demonica': 1,
          u'dial': 1,
          u'diomedes': 1,
          u'dolly': 1,
          u'dolores': 1,
          u'dorothy': 1,
          u'e-mail': 1,
          u'elaine': 1,
          u'established': 1,
          u'executive': 3,
          u'floor': 1,
          u'following': 1,
          u'greg': 1,
          u'indicate': 2,
          u'james': 2,
          u'january': 4,
          u'jeff': 1,
          u'j

In [32]:
doc_frequency.take(5)

[(u'aided', 1),
 (u'four', 190),
 (u'prices', 111),
 (u'cyprus', 1),
 (u'looking', 234)]

In [33]:
total_emails = term_frequency.count()

In [34]:
import math

enron_idf = doc_frequency.map(lambda tup: (tup[0], math.log(float(total_emails)/ (1 + tup[1])))).collect()
enron_idf[:3]

[(u'aided', 7.983781068977451),
 (u'four', 3.4246548214907664),
 (u'prices', 3.9584293782423017)]

In [35]:
broadcast_idf = sc.broadcast(enron_idf)

In [36]:
enron_tfidf = tokenized_rdd.map(tfidf_vectorize)
enron_tfidf.take(2)

[array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.])]

In [37]:
clusters = KMeans.train(enron_tfidf, 10, maxIterations=10, runs=10, initializationMode="random")
WSSSE = enron_tfidf.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

Within Set Sum of Squared Error = 3908.54865772


In [38]:
clusters.centers

[array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([  1.34998902e-05,   1.31342596e-03,   7.49086345e-04, ...,
          1.43436333e-05,   4.48880579e-05,   2.32181552e-04]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 array([ 0.,  0.,  0., ...,  0.,  0.,  0.])]

In [39]:
top_n = 3
for i in range(0,10):
    print([enron_idf[idx][0] for idx in [np.argsort(x)[::-1][:top_n] for x in clusters.centers][i]])

[u'number', u'special', u'ordering']
[u'nemtzow', u'alliance', u'save']
[u'enron', u'attached', u'energy']
[u'games', u'generousity', u'objecting']
[u'expense', u'approved', u'report']
[u'gracie', u'enjoyable', u'holidays']
[u'holiday', u'launch', u'link']
[u'unit', u'leavers', u'eeos']
[u'high-quality', u'magazine', u'listed']
[u'chairman', u'corey', u'cairo']
