In [80]:
import numpy as np
import pandas as pd
import string
import re

from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql import Row

PUNCTUATION = set(string.punctuation)
STOPWORDS = set(stopwords.words('english'))

In [3]:
wiki_df = spark.read.json('s3a://galv-wiki-data/wiki_mini/*/')

In [10]:
wiki_df.count()

128889

In [4]:
wiki_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)



In [8]:
wiki_df.show(5)

+-----+--------------------+--------------------+--------------------+
|   id|                text|               title|                 url|
+-----+--------------------+--------------------+--------------------+
|84786|Meiji period

The...|        Meiji period|https://en.wikipe...|
|84789|Wim Duisenberg

W...|      Wim Duisenberg|https://en.wikipe...|
|84790|Sleepy Hollow

Sl...|       Sleepy Hollow|https://en.wikipe...|
|84791|The Legend of Sle...|The Legend of Sle...|https://en.wikipe...|
|84792|Timeline of Mongo...|Timeline of Mongo...|https://en.wikipe...|
+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [4]:
wiki_df.cache()

DataFrame[id: string, text: string, title: string, url: string]

In [5]:
wiki_samp = wiki_df.sample(fraction=0.01, withReplacement=False)
wiki_samp.cache()

DataFrame[id: string, text: string, title: string, url: string]

In [17]:
wiki_samp.show(5)

+------+--------------------+--------------------+--------------------+
|    id|                text|               title|                 url|
+------+--------------------+--------------------+--------------------+
| 89822|Pez

Pez (tradema...|                 Pez|https://en.wikipe...|
| 89984|Child development...|Child development...|https://en.wikipe...|
|193263|Leonard Chang

Le...|       Leonard Chang|https://en.wikipe...|
| 40977|International Cry...|International Cry...|https://en.wikipe...|
| 41061|Department of Def...|Department of Def...|https://en.wikipe...|
+------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [6]:
def tokenize(text):
    regex = re.compile('<.+?>|[^a-zA-Z]')
    clean_txt = regex.sub(' ', text)
    tokens = clean_txt.split()
    lowercased = [t.lower() for t in tokens]

    no_punctuation = []
    for word in lowercased:
        punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION])
        no_punctuation.append(punct_removed)
    no_stopwords = [w for w in no_punctuation if not w in STOPWORDS]

    STEMMER = PorterStemmer()
    stemmed = [STEMMER.stem(w) for w in no_stopwords]
    return [w for w in stemmed if w]

In [7]:
tokenize_udf = udf(tokenize, ArrayType(StringType()))

In [8]:
tokenized_df = wiki_samp.withColumn('tokens', tokenize_udf(col('text')))

In [25]:
tokenized_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [49]:
cv = CountVectorizer(minDF=10, vocabSize=5000, inputCol='tokens', outputCol='vectors')

In [60]:
vec_model = cv.fit(tokenized_df)

In [68]:
vocab = vec_model.vocabulary
print(vocab)

['year', 'also', 'state', 'use', 'one', 'citi', 'age', 'includ', 'first', 'new', 'two', 'time', 'popul', 'famili', 'unit', 'household', 'live', 'area', 'name', 'peopl', 'mani', 'american', 'may', 'counti', 'work', 'school', 'part', 'would', 'nation', 'th', 'world', 'made', 'femal', 'gener', 'town', 'known', 'averag', 'north', 'hous', 'govern', 'form', 'later', 'call', 'univers', 'system', 'follow', 'incom', 'three', 'howev', 'median', 'well', 'number', 'war', 'develop', 'becam', 'male', 'per', 'centuri', 'censu', 'origin', 'river', 'sever', 'day', 'found', 'race', 'record', 'sinc', 'play', 'locat', 'high', 'group', 'public', 'land', 'line', 'base', 'present', 'film', 'accord', 'south', 'earli', 'place', 'countri', 'servic', 'church', 'oper', 'everi', 'size', 'game', 'make', 'major', 'larg', 'end', 'like', 'second', 'produc', 'provid', 'appear', 'build', 'member', 'show', 'differ', 'life', 'older', 'set', 'region', 'commun', 'local', 'result', 'forc', 'elect', 'book', 'chang', 'four', '

In [61]:
vectorized_df = vec_model.transform(tokenized_df)

In [52]:
idf = IDF(minDocFreq=10, inputCol='vectors', outputCol='features')

In [53]:
model = idf.fit(vectorized_df)

In [54]:
feature_df = model.transform(vectorized_df)

In [56]:
feature_df.cache()

DataFrame[id: string, text: string, title: string, url: string, tokens: array<string>, vectors: vector, features: vector]

In [57]:
km = KMeans(k=10)

In [58]:
model = km.fit(feature_df)

In [62]:
centers = model.clusterCenters()

In [72]:
centers_sorted = np.flip(np.argsort(centers, axis=1), axis=1)

array([   5,   15,    6, ..., 1658, 4650, 4568])

In [78]:
for j, center in enumerate(centers_sorted):
    print('\n Cluster center #{}'.format(j))
    for i in range(10):
        print(' -',vocab[center[i]])



 Cluster center #0
 - citi
 - household
 - age
 - use
 - school
 - popul
 - new
 - game
 - univers
 - govern

 Cluster center #1
 - potenti
 - sexual
 - action
 - treat
 - scotland
 - rocket
 - cell
 - channel
 - trademark
 - self

 Cluster center #2
 - multipl
 - album
 - song
 - particl
 - alreadi
 - hit
 - manuscript
 - releas
 - disguis
 - isaac

 Cluster center #3
 - print
 - romanc
 - humanitarian
 - son
 - latino
 - robert
 - despit
 - motion
 - activist
 - london

 Cluster center #4
 - farm
 - pope
 - firm
 - di
 - deploy
 - borough
 - golf
 - church
 - reduct
 - detail

 Cluster center #5
 - v
 - engin
 - golden
 - fire
 - genu
 - design
 - bishop
 - shaft
 - mirror
 - adventur

 Cluster center #6
 - entertain
 - capac
 - previous
 - absenc
 - self
 - control
 - insid
 - iso
 - rich
 - grate

 Cluster center #7
 - avoid
 - decad
 - sing
 - donat
 - commit
 - round
 - pen
 - armi
 - corridor
 - soviet

 Cluster center #8
 - electr
 - river
 - allianc
 - missil
 - baltimor
 - c