In [1]:
print CLUSTER_URL

spark://ec2-54-198-93-146.compute-1.amazonaws.com:7077


In [3]:
import string
import json 
import pickle as pkl
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import pyspark as ps
from collections import Counter
import numpy as np

In [4]:
sc = ps.SparkContext(CLUSTER_URL, 'nlp-spark')

In [5]:
essay_rdd = sc.textFile('s3n://galvanize-example-data/donors_choose/essays.json')
essay_rdd.first()

u'{"essay":"\\"One of my classes this year has been an 11th grade  English class.  These students will be taking the SAT next year and I know that they are not ready to do well.  \\r\\\\n\\r\\\\nUnlike students in more well-financed school districts, they have not had access to special SAT classes or tutors.  I have been so focused on completing the curriculum and helping them to pass the Regents that I have not had time to do SAT preparation. They do not come from families with extra cash to pay for classes on their own. \\r\\\\n\\r\\\\nI would love to be able to get them a good SAT preparation book before the end of the school term.  In this way, I can get them started so that they can review the book on their own over the summer.\\r\\\\n\\""}'

In [7]:
import json

row_rdd = essay_rdd.map(lambda x: json.loads(x))

## Bag of Words

In [8]:
row_rdd.getNumPartitions()

2

In [9]:
essay_rdd_repartition = row_rdd.repartition(38)

In [10]:
import nltk, string

def tokenize(text):
    tokens = [] 
    
    for word in nltk.word_tokenize(text):
        if word \
            not in nltk.corpus.stopwords.words('english') \
            and word not in string.punctuation \
            and word != '``':    
                tokens.append(word)
    
    return tokens

In [11]:
tokenized_rdd = essay_rdd_repartition.filter(lambda row: row['essay'] and row['essay'] != '') \
                       .map(lambda row: row['essay']) \
                       .map(lambda text: text.replace('\\n', '').replace('\r', '')) \
                       .map(lambda text: tokenize(text))

In [12]:
tokenized_rdd.cache()

PythonRDD[9] at RDD at PythonRDD.scala:43

In [13]:
term_frequency = tokenized_rdd.map(lambda terms: Counter(terms))

In [14]:
document_freqency = term_frequency.flatMap(lambda counts: counts.keys()).map(lambda keys: (keys, 1)).reduceByKey(lambda a, b: a + b)

In [15]:
num_features = 40000

top_terms = document_freqency.top(num_features, key=lambda a: a[1])

In [17]:
total_docs = tokenized_rdd.count()

In [18]:
import math

idf = map(lambda tup: (tup[0], math.log(float(total_docs) / (1 + tup[1]))), top_terms)

In [19]:
broadcast_idf = sc.broadcast(idf)

def vectorize(tokens):
    word_counts = Counter(tokens)
    doc_length = sum(word_counts.values())
    
    vector = [word_counts.get(word[0], 0) * word[1] / float(doc_length) for word in broadcast_idf.value]
    return np.array(vector)

In [20]:
bag_of_words = tokenized_rdd.filter(lambda x: len(x) > 0).map(vectorize)

In [21]:
bag_of_words.persist()

PythonRDD[16] at RDD at PythonRDD.scala:43

## Kmeans

In [23]:
sample_data = bag_of_words.sample(False, 0.1)

In [24]:
sample_data.persist()

PythonRDD[17] at RDD at PythonRDD.scala:43

In [25]:
# initialize centroids
k = 200

centroids = sample_data.takeSample(False, k)

In [27]:
def Kmeans(features, centers, num_iter):
    
    # track convergence
    error = []
    
    # compute closest centroid for given data point
    def closest_centroid(point, centroids):
        distances = [ np.sqrt(np.sum((point - c) ** 2)) for c in centroids ]
        return (np.argmin(distances), np.min(distances))
    
    # format return value appropriately for RDD
    def compute_assignments(point, centroids):
            closest = closest_centroid(point, centroids)
            return (closest[0], (point, 1, closest[1]))

    # iterate until convergence or total num_iter
    for i in range(num_iter):
        
        # assign each point to closest centroid
        assignments = features.map(lambda x: compute_assignments(x, centers))
        
        # update centroids to mean of assigned points
        means = assignments.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))
        centroids = means.map(lambda cent: (cent[0], cent[1][0]/cent[1][1])).collect()
        
        # map each new update to the appropriate position in centroid array
        for i, p in centroids:
            centers[i] = p
            
        # compute within-cluster error
        WSSE = means.map(lambda x: x[1][2]).sum()
        error.append(WSSE)
        
        # reached convergence?
        if len(error) > 1 and error[-2] == error[-1]:
            break
            
    return (assignments, centroids, error)

In [28]:
text_results = Kmeans(sample_data, centroids, 3)
text_results

(PythonRDD[44] at RDD at PythonRDD.scala:43,
 [(0, array([ 0.00097824,  0.0026617 ,  0.00147913, ...,  0.        ,
           0.        ,  0.        ])),
  (38, array([ 0.00087889,  0.00478275,  0.00265781, ...,  0.        ,
           0.        ,  0.        ])),
  (76, array([ 0.00064375,  0.00747691,  0.00153589, ...,  0.        ,
           0.        ,  0.        ])),
  (114, array([ 0.00080029,  0.00124429,  0.00276585, ...,  0.        ,
           0.        ,  0.        ])),
  (152, array([ 0.00053513,  0.0032256 ,  0.00174298, ...,  0.        ,
           0.        ,  0.        ])),
  (190, array([ 0.00067683,  0.00289997,  0.00220891, ...,  0.        ,
           0.        ,  0.        ])),
  (1, array([ 0.00111858,  0.00086959,  0.0038659 , ...,  0.        ,
           0.        ,  0.        ])),
  (39, array([ 0.00069883,  0.00314628,  0.00262498, ...,  0.        ,
           0.        ,  0.        ])),
  (77, array([ 0.00058022,  0.00370053,  0.00340184, ...,  0.        ,
   

In [31]:
filtered_res = text_results[0].reduceByKey(lambda x, y:  (x[0] ,x[1] + y[1]))
common = filtered_res.map(lambda tup: (tup[0], tup[1][1])).filter(lambda x: x[1] > 1).collect()
topics = np.array(common)
top_topics = np.argsort(topics[:,1])[::-1][:10]
topics[top_topics]

array([[  136, 17597],
       [   51,  8969],
       [   68,  5162],
       [   18,  4692],
       [   44,  3750],
       [  122,  2282],
       [   25,  2271],
       [   91,  2037],
       [   77,  1869],
       [   49,  1726]])

In [33]:
import pandas as pd
top_n = 20
df = pd.DataFrame(text_results[1])

df.columns = ['a', 'b']
df_index = df.set_index('a')

print "Top Words for each Cluster:\n"

for i , v in df_index.loc[topics[top_topics][:,0]].iterrows():
    print "%d: %s" % (i, ", ".join(top_terms[j][0] for j in v.b.argsort()[::-1][:top_n]))
    print "\n"

Top Words for each Cluster:

136: '', would, reading, work, We, children, world, 's, learning, class, They, year, student, technology, classroom, Our, want, community, able, make


51: books, reading, read, book, library, readers, listening, '', love, level, center, literature, stories, children, listen, home, grade, want, comprehension, fluency


68: science, Science, hands-on, '', lab, materials, plants, water, scientific, animals, life, experience, world, concepts, explore, see, scientists, learning, activities, labs


18: games, play, fun, materials, skills, children, activities, game, learning, '', kindergarten, learn, practice, recess, equipment, center, These, language, time, math


44: technology, iPad, apps, access, use, tablet, learning, math, iPod, classroom, world, skills, using, would, Technology, research, computer, able, computers, educational


122: camera, document, digital, pictures, cameras, yearbook, technology, see, photos, would, video, share, '', take, capture, p