In [1]:
import numpy as np
import string
import re

from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql import Row
from pyspark import SparkContext, SparkConf

from collections import Counter

PUNCTUATION = set(string.punctuation)
STOPWORDS = set(stopwords.words('english'))

## Read in the Data

In [2]:
wiki_rdd = sc.textFile('s3a://jyt109/wiki_articles')

wiki_rdd.cache()
wiki_rdd.setName('wiki_full')

wiki_full MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

### Get Total Number of Articles in the Dataset

In [3]:
count = wiki_rdd.count()


In [4]:
count

15269957

### Take 1,000 articles and parallelize them to a new RDD
#### The second parameter in sc.parallelize( ) is the number of partitions to cut the data into.
#### Typically you want 2-4 partitions for each CPU in your cluster.

In [5]:
wiki_samples = sc.parallelize(wiki_rdd.take(10000), 60)

### Get an idea of how the data looks like

In [6]:
wiki_samples.take(1)


[u'#REDIRECT [[Computer accessibility]]  {{Redr|move|from CamelCase|up}}']

In [7]:
# Theres some lines that begin with #Redirect that aren't articles.
# Maybe it redirects us to another article which we don't need right now so we'll drop them.
wiki_samples.take(4)


[u'#REDIRECT [[Computer accessibility]]  {{Redr|move|from CamelCase|up}}',
 u'{{Redirect2|Anarchist|Anarchists|the fictional character|Anarchist (comics)|other uses|Anarchists (disambiguation)}} {{Use British English|date=January 2014}} {{pp-move-indef}} {{Anarchism sidebar}}  \'\'\'Anarchism\'\'\' is a collection of movements and ideologies that hold the [[state (polity)|state]] to be undesirable, unnecessary, or harmful.<ref>"Anarchism." The Shorter Routledge Encyclopedia of Philosophy. 2005. p. 14 "Anarchism is the view that a society without the state, or government, is both possible and desirable."</ref> These movements advocate some form of [[stateless society]] instead, often based on [[self-governance|self-governed]] voluntary institutions or non-[[Hierarchy|hierarchical]] [[Free association (communism and anarchism)|free associations]].<ref>"In a society developed on these lines, the voluntary associations which already now begin to cover all the fields of human activity would

### Remove lines starting with #REDIRECT

In [8]:
wiki_articles_only = wiki_samples.filter(lambda line: '#REDIRECT' not in line)

In [9]:
wiki_articles_only.take(4)

[u'{{Redirect2|Anarchist|Anarchists|the fictional character|Anarchist (comics)|other uses|Anarchists (disambiguation)}} {{Use British English|date=January 2014}} {{pp-move-indef}} {{Anarchism sidebar}}  \'\'\'Anarchism\'\'\' is a collection of movements and ideologies that hold the [[state (polity)|state]] to be undesirable, unnecessary, or harmful.<ref>"Anarchism." The Shorter Routledge Encyclopedia of Philosophy. 2005. p. 14 "Anarchism is the view that a society without the state, or government, is both possible and desirable."</ref> These movements advocate some form of [[stateless society]] instead, often based on [[self-governance|self-governed]] voluntary institutions or non-[[Hierarchy|hierarchical]] [[Free association (communism and anarchism)|free associations]].<ref>"In a society developed on these lines, the voluntary associations which already now begin to cover all the fields of human activity would take a still greater extension so as to substitute themselves for the stat

### Convert articles to a DataFrame, we'll need this to store article features.

In [10]:
df = wiki_articles_only.map(lambda x: Row(text=x)).toDF()

### Perform text cleaning, normalization, tokenization

In [11]:
def clean_tokenize(text):
    regex = re.compile('<.+?>|[^a-zA-Z]')
    clean_txt = regex.sub(' ', text)
    tokens = clean_txt.split()
    lowercased = [t.lower() for t in tokens]

    no_punctuation = []
    for word in lowercased:
        punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION])
        no_punctuation.append(punct_removed)
    no_stopwords = [w for w in no_punctuation if not w in STOPWORDS]
    
    STEMMER = PorterStemmer()
    stemmed = [STEMMER.stem(w) for w in no_stopwords]
    return [w for w in stemmed if w]

In [12]:
# Create a user defined function to map over all entries in the DataFrame 
udf_tokenize_text = udf(clean_tokenize,ArrayType(StringType()))


In [13]:
# Add a column of word tokens to our DataFrame 
df = df.withColumn("words",udf_tokenize_text("text"))

In [14]:
df.show()

+--------------------+--------------------+
|                text|               words|
+--------------------+--------------------+
|{{Redirect2|Anarc...|[redirect, anarch...|
|{{Hatnote|This ar...|[hatnot, articl, ...|
|{{Other uses}} {{...|[use, use, dmi, d...|
|{{ref improve|dat...|[ref, improv, dat...|
|{{about|the U.S. ...|[u, state, alabam...|
|{{Redirect|Achill...|[redirect, achill...|
|{{About|the Ameri...|[american, presid...|
|{{other uses}} {{...|[use, use, dmi, d...|
|{{About|the 1928 ...|[georg, gershwin,...|
|{{Use mdy dates|d...|[use, mdi, date, ...|
|{{Redirect2|Oscar...|[redirect, oscar,...|
|{{Use dmy dates|d...|[use, dmi, date, ...|
|{{Use dmy dates|d...|[use, dmi, date, ...|
|{{Refimprove|date...|[refimprov, date,...|
|{{Use dmy dates|d...|[use, dmi, date, ...|
|{{Use mdy dates|d...|[use, mdi, date, ...|
|{{Use dmy dates|d...|[use, dmi, date, ...|
|{{Use dmy dates|d...|[use, dmi, date, ...|
|{{pp-move|small=y...|[pp, move, small,...|
|{{DISPLAYTITLE:Li...|[displayti

### Begin Word Vectorization

In [25]:
# First compute TF
cv = CountVectorizer(inputCol="words", outputCol="tf_vectors",vocabSize=5000,minDF=10)
model = cv.fit(df)
df_tf = model.transform(df)

In [39]:
# Check out the vocabulary our CountVectorizer learned
vocab = model.vocabulary
vocab

[u'http',
 u'titl',
 u'cite',
 u'first',
 u'www',
 u'publish',
 u'url',
 u'com',
 u'date',
 u'year',
 u'last',
 u'book',
 u'web',
 u'accessd',
 u'use',
 u'state',
 u'p',
 u'also',
 u'new',
 u'journal',
 u'page',
 u'author',
 u'one',
 u'categori',
 u'org',
 u'c',
 u'work',
 u'time',
 u'ndash',
 u'right',
 u'html',
 u'nbsp',
 u'world',
 u'american',
 u'isbn',
 u'unit',
 u'may',
 u'b',
 u'news',
 u'file',
 u'f',
 u'align',
 u'univers',
 u'languag',
 u'e',
 u'name',
 u'histori',
 u'includ',
 u'nation',
 u'two',
 u'jpg',
 u'n',
 u'th',
 u'style',
 u'de',
 u'thumb',
 u'system',
 u'gener',
 u'id',
 u'war',
 u'citi',
 u'film',
 u'locat',
 u'issu',
 u'volum',
 u'x',
 u'peopl',
 u'number',
 u'press',
 u'mani',
 u'uk',
 u'r',
 u'j',
 u'group',
 u'centuri',
 u'center',
 u'countri',
 u'would',
 u'intern',
 u'english',
 u'form',
 u'g',
 u'left',
 u'text',
 u'call',
 u'develop',
 u'main',
 u'h',
 u'see',
 u'pp',
 u'refer',
 u'part',
 u'pdf',
 u'govern',
 u'list',
 u'john',
 u'day',
 u'v',
 u'game',
 

In [27]:
# Now compute IDF using the TF column
idf = IDF(inputCol="tf_vectors", outputCol="idf_features")
idf_model = idf.fit(df_tf)
df_features = idf_model.transform(df_tf)

In [28]:
# Look at our final DataFrame
df_features.show()

+--------------------+--------------------+--------------------+--------------------+
|                text|               words|          tf_vectors|        idf_features|
+--------------------+--------------------+--------------------+--------------------+
|{{Redirect2|Anarc...|[redirect, anarch...|(5000,[0,1,2,3,4,...|(5000,[0,1,2,3,4,...|
|{{Hatnote|This ar...|[hatnot, articl, ...|(5000,[0,1,2,3,4,...|(5000,[0,1,2,3,4,...|
|{{Other uses}} {{...|[use, use, dmi, d...|(5000,[0,1,2,3,4,...|(5000,[0,1,2,3,4,...|
|{{ref improve|dat...|[ref, improv, dat...|(5000,[0,1,2,3,4,...|(5000,[0,1,2,3,4,...|
|{{about|the U.S. ...|[u, state, alabam...|(5000,[0,1,2,3,4,...|(5000,[0,1,2,3,4,...|
|{{Redirect|Achill...|[redirect, achill...|(5000,[0,1,2,3,4,...|(5000,[0,1,2,3,4,...|
|{{About|the Ameri...|[american, presid...|(5000,[0,1,2,3,4,...|(5000,[0,1,2,3,4,...|
|{{other uses}} {{...|[use, use, dmi, d...|(5000,[0,1,2,3,4,...|(5000,[0,1,2,3,4,...|
|{{About|the 1928 ...|[georg, gershwin,...|(5000,[0,1,

In [29]:
# Cache our DataFrame so following steps run faster
df_features.cache()

DataFrame[text: string, words: array<string>, tf_vectors: vector, idf_features: vector]

### Train KMeans Model

In [81]:
# k represents the number of clusters
# featuresCol selects which column to use as the feature from our DataFrame
kMeans = KMeans(k=7, featuresCol="idf_features", seed=1)
km_model = kMeans.fit(df_features)


In [82]:
centroids = km_model.clusterCenters()
centroids

[array([ 4.2540551 ,  4.88330583,  5.62313436, ...,  0.25648084,
         0.23674576,  0.22451131]),
 array([ 14.73452861,  26.34284448,  35.28729338, ...,   0.        ,
          0.        ,   0.        ]),
 array([ 33.37821786,  45.06585791,  55.82334116, ...,   0.        ,
          0.        ,   0.        ]),
 array([ 0.34020401,  0.07564481,  0.04657256, ...,  0.0290217 ,
         1.43380885,  1.16996821]),
 array([ 11.87783428,  23.29491206,  30.65945162, ...,   0.        ,
          0.        ,   9.86116065]),
 array([ 25.10883956,  12.62714859,  13.01580493, ...,   0.        ,
          0.        ,   0.        ]),
 array([  0.15035233,   2.83022296,   0.        , ...,   0.        ,
          0.        ,  19.7223213 ])]

### Inspect our clusters

In [83]:
# Create a mapping from our vocabulary to the centroids they were assigned.
import operator
word_clusters = []
for centroid in centroids:
    word_centroid_map = zip(vocab,centroid)
    word_clusters.append(sorted(word_centroid_map,key=lambda x: x[1],reverse=True)[:10])



In [84]:
# Print out top 10 words for each cluster
for i,cluster in enumerate(word_clusters):
    print 'Cluster: ', i 
    print [x[0] for x in cluster]


Cluster:  0
[u'align', u'journal', u'cite', u'sfn', u'film', u'web', u'url', u'accessd', u'titl', u'book']
Cluster:  1
[u'choctaw', u'mississippi', u'indian', u'oklahoma', u'tribe', u'treati', u'tribal', u'alabama', u'creek', u'jackson']
Cluster:  2
[u'cardiff', u'wale', u'welsh', u'citi', u'stadium', u'uk', u'centr', u'sport', u'castl', u'bbc']
Cluster:  3
[u'ndash', u'american', u'singer', u'actor', u'actress', u'player', u'politician', u'footbal', u'songwrit', u'b']
Cluster:  4
[u'chaplin', u'robinson', u'film', u'charli', u'pp', u'p', u'comedi', u'tag', u'kid', u'award']
Cluster:  5
[u'bronx', u'borough', u'background', u'counti', u'york', u'style', u'manhattan', u'citi', u'censu', u'align']
Cluster:  6
[u'sortnam', u'flagicon', u'champion', u'rank', u'doubl', u'championship', u'world', u'singl', u'women', u'tenni']
