In [2]:
!rm -rf metastore_db/
from pyspark.sql import SQLContext, Row
sqlCtx = SQLContext(sc)

In [3]:
def tokenize(s):
    import re
    stopwords = set(['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'now'])
    word_regex = '^[a-z][a-z\'-]+[a-z]$'
    s = s.lower()
    arr = s.split()
    terms = []
    for term in arr:
        if re.match(word_regex, term) != None and len(term) > 3 and term not in stopwords:
            terms.append(term)
    return terms


In [4]:
test_strings = ['the quick brown fox jumps over the brown fence.',
              'the boy paints a tall fence brown!',
              'basketball players are tall.',
              'quick basketball players jump high']

In [5]:
tokens = sc.parallelize(test_strings).map(tokenize)

In [6]:
tokens.collect()

[['quick', 'brown', 'jumps', 'brown'],
 ['paints', 'tall', 'fence'],
 ['basketball', 'players'],
 ['quick', 'basketball', 'players', 'jump', 'high']]

In [16]:
vocab = tokens.flatMap(lambda words: words).distinct()
vocab.collect()

['quick',
 'fence',
 'players',
 'jump',
 'high',
 'basketball',
 'tall',
 'jumps',
 'paints',
 'brown']

In [4]:
from collections import Counter
import numpy as np

#sc.broadcast shares an immutable object throughout the cluster
broadcastVocab = sc.broadcast(vocab.collect())

def bow_vectorize(tokens):
    word_counts = Counter(tokens)
    vector = [word_counts[v] if v in word_counts else 0 for v in broadcastVocab.value]
    return np.array(vector)

NameError: name 'vocab' is not defined

In [9]:
# create a Bag of Words representation of our document collection
#TODO: MORE EFFICIENT IS TO USE SPARSE REPRESENTATION
bow = tokens.map(bow_vectorize).cache()
bow.collect()

[array([1, 0, 0, 0, 0, 0, 0, 1, 0, 2]),
 array([0, 1, 0, 0, 0, 0, 1, 0, 1, 0]),
 array([0, 0, 1, 0, 0, 1, 0, 0, 0, 0]),
 array([1, 0, 1, 1, 1, 1, 0, 0, 0, 0])]

In [10]:
broadcastVocab.value

['quick',
 'fence',
 'players',
 'jump',
 'high',
 'basketball',
 'tall',
 'jumps',
 'paints',
 'brown']

In [12]:
# TF is the number of occurences of each term in each document
term_freq = tokens.map(lambda terms: Counter(terms))
print(term_freq.collect())

[Counter({'brown': 2, 'quick': 1, 'jumps': 1}), Counter({'fence': 1, 'paints': 1, 'tall': 1}), Counter({'players': 1, 'basketball': 1}), Counter({'quick': 1, 'players': 1, 'high': 1, 'basketball': 1, 'jump': 1})]


In [13]:
# DF counts the number of documents in which each term occurs
doc_freq = term_freq.flatMap(lambda counts: counts.keys()).map(lambda keys: (keys, 1)).reduceByKey(lambda a, b: a + b)
doc_freq.collect()

[('quick', 2),
 ('fence', 1),
 ('players', 2),
 ('jump', 1),
 ('high', 1),
 ('basketball', 2),
 ('tall', 1),
 ('jumps', 1),
 ('paints', 1),
 ('brown', 1)]

In [14]:
total_docs = term_freq.count()
total_docs

4

In [17]:
# IDF (inverse document frequency) is the importance of each term in the corpus
# words that are in every document aren't useful; words that are rare are more interesting

# IDF(term) = log(number of docs / (1 + number of docs with term))

import math

idf = doc_freq.map(lambda tup: (tup[0], math.log(float(total_docs)/ (1 + tup[1])))).collect()
idf

[('quick', 0.28768207245178085),
 ('fence', 0.6931471805599453),
 ('players', 0.28768207245178085),
 ('jump', 0.6931471805599453),
 ('high', 0.6931471805599453),
 ('basketball', 0.28768207245178085),
 ('tall', 0.6931471805599453),
 ('jumps', 0.6931471805599453),
 ('paints', 0.6931471805599453),
 ('brown', 0.6931471805599453)]

In [18]:
broadcast_idf = sc.broadcast(idf)

In [5]:
# the TFIDF weight of a term t in a doc d is the product of the frequency of the term in the document,
# and the inverse document frequency (idf) of the term in the corpus

# tfidf(d, t) = tf(d, t) * idf(t)

def tfidf_vectorize(tokens):
    word_counts = Counter(tokens)
    doc_length = sum(word_counts.values())
    
    vector = [ word_counts.get(word[0], 0) * word[1] / float(doc_length) for word in broadcast_idf.value ]
    return np.array(vector)

In [20]:
# compute tfidf for all terms and all documents

tfidf = tokens.map(tfidf_vectorize)
tfidf.collect()

[array([ 0.07192052,  0.        ,  0.        ,  0.        ,  0.        ,
         0.        ,  0.        ,  0.1732868 ,  0.        ,  0.34657359]),
 array([ 0.        ,  0.23104906,  0.        ,  0.        ,  0.        ,
         0.        ,  0.23104906,  0.        ,  0.23104906,  0.        ]),
 array([ 0.        ,  0.        ,  0.14384104,  0.        ,  0.        ,
         0.14384104,  0.        ,  0.        ,  0.        ,  0.        ]),
 array([ 0.05753641,  0.        ,  0.05753641,  0.13862944,  0.13862944,
         0.05753641,  0.        ,  0.        ,  0.        ,  0.        ])]

In [None]:
### Now let's try to cluster these vectors

In [6]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from math import sqrt

In [39]:
# two clusters
clusters = KMeans.train(tfidf, 2, maxIterations=10, initializationMode="random") #runs=10 was deprecated

In [7]:
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

In [41]:
WSSSE = tfidf.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

Within Set Sum of Squared Error = 0.8385497277129637


In [42]:
clusters.centers

[array([ 0.02397351,  0.07701635,  0.04794701,  0.        ,  0.        ,
         0.04794701,  0.07701635,  0.05776227,  0.07701635,  0.11552453]),
 array([ 0.05753641,  0.        ,  0.05753641,  0.13862944,  0.13862944,
         0.05753641,  0.        ,  0.        ,  0.        ,  0.        ])]

In [43]:
# map the clusters back onto text
# do these clusters make sense?
top_n = 3
print([idf[idx][0] for idx in [np.argsort(x)[::-1][:top_n] for x in clusters.centers][0]])
print([idf[idx][0] for idx in [np.argsort(x)[::-1][:top_n] for x in clusters.centers][1]])

['brown', 'paints', 'tall']
['high', 'jump', 'basketball']


## Now repeat this with a real dataset

In [44]:
# read as dataframe 
!wget http://jsonstudio.com/wp-content/uploads/2014/02/enron.zip
!7z x enron.zip

--2017-01-25 09:59:40--  http://jsonstudio.com/wp-content/uploads/2014/02/enron.zip
Resolving jsonstudio.com (jsonstudio.com)... 104.238.96.73
Connecting to jsonstudio.com (jsonstudio.com)|104.238.96.73|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4133135 (3.9M) [application/zip]
Saving to: ‘enron.zip’


2017-01-25 09:59:47 (655 KB/s) - ‘enron.zip’ saved [4133135/4133135]


7-Zip [64] 9.20  Copyright (c) 1999-2010 Igor Pavlov  2010-11-18
p7zip Version 9.20 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,4 CPUs)

Processing archive: enron.zip

file enron.json
already exists. Overwrite with 
enron.json?
(Y)es / (N)o / (A)lways / (S)kip all / A(u)to rename all / (Q)uit? ^C
(Y)es / (N)o / (A)lways / (S)kip all / A(u)to rename all / (Q)uit? 

In [8]:
email = sqlCtx.read.json('enron.json')

In [9]:
#email.take(1)

In [10]:
# tokenize documents
# it would probably improve results if we included the subject line
tokenized_rdd = email.select('text').rdd \
  .map(lambda row: row.text) \
  .map(lambda text: text.replace('\n', ' ').replace('\r', ' ')) \
  .map(lambda text: tokenize(text)) \
  .filter(lambda text: len(text) > 0)

### Continue on your own

In [11]:
# TF is the number of occurences of each term in each document
term_freq = tokenized_rdd.map(lambda terms: Counter(terms))
#term_freq.collect()

In [12]:
# DF counts the number of documents in which each term occurs
doc_freq = term_freq.flatMap(lambda counts: counts.keys()).map(lambda keys: (keys, 1)).reduceByKey(lambda a, b: a + b)
#doc_freq.collect()

In [13]:
total_docs = term_freq.count()
total_docs

5866

In [14]:
# IDF (inverse document frequency) is the importance of each term in the corpus
# words that are in every document aren't useful; words that are rare are more interesting

# IDF(term) = log(number of docs / (1 + number of docs with term))

import math

idf = doc_freq.map(lambda tup: (tup[0], math.log(float(total_docs)/ (1 + tup[1])))).collect()

In [15]:
broadcast_idf = sc.broadcast(idf)

In [16]:
# compute tfidf for all terms and all documents
tfidf = tokenized_rdd.map(tfidf_vectorize)

In [None]:
# ten clusters
clusters = KMeans.train(tfidf, 10, maxIterations=10, initializationMode="random") #runs=10 was deprecated

--- Logging error ---
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36050)
Traceback (most recent call last):
  File "/home/dsi-student/anaconda3/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-21-94da58e8523f>", line 2, in <module>
    clusters = KMeans.train(tfidf, 10, maxIterations=10, initializationMode="random") #runs=10 was deprecated
  File "/home/dsi-student/spark/python/pyspark/mllib/clustering.py", line 356, in train
    clusterInitialModel)
  File "/home/dsi-student/spark/python/pyspark/mllib/common.py", line 130, in callMLlibFunc
    return callJavaFunc(sc, api, *args)
  File "/home/dsi-student/spark/python/pyspark/mllib/common.py", line 123, in callJavaFunc
    return _java2py(sc, func(*args))
  File "/home/dsi-student/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
    answer,

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 51362)
----------------------------------------


--- Logging error ---
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36050)
Traceback (most recent call last):
  File "/home/dsi-student/anaconda3/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-21-94da58e8523f>", line 2, in <module>
    clusters = KMeans.train(tfidf, 10, maxIterations=10, initializationMode="random") #runs=10 was deprecated
  File "/home/dsi-student/spark/python/pyspark/mllib/clustering.py", line 356, in train
    clusterInitialModel)
  File "/home/dsi-student/spark/python/pyspark/mllib/common.py", line 130, in callMLlibFunc
    return callJavaFunc(sc, api, *args)
  File "/home/dsi-student/spark/python/pyspark/mllib/common.py", line 123, in callJavaFunc
    return _java2py(sc, func(*args))
  File "/home/dsi-student/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
    answer,

In [18]:
WSSSE = tfidf.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

Within Set Sum of Squared Error = 3940.8481519906754


In [20]:
# map the clusters back onto text
# do these clusters make sense?
top_n = 5
print([idf[idx][0] for idx in [np.argsort(x)[::-1][:top_n] for x in clusters.centers][0]])
print([idf[idx][0] for idx in [np.argsort(x)[::-1][:top_n] for x in clusters.centers][1]])

['busy', 'sorry', "it's", "cfo's", 'zeidman']
['enron', 'attached', 'energy', 'consumers', 'made']
