In [1]:
 import numpy as np
 import string
 import re

 from nltk.tokenize import word_tokenize
 from nltk.corpus import stopwords
 from nltk.stem.porter import PorterStemmer

 from pyspark.ml.clustering import KMeans
 from pyspark.ml.feature import CountVectorizer, IDF
 from pyspark.sql.functions import udf
 from pyspark.sql.types import ArrayType, StringType
 from pyspark.sql import Row

 PUNCTUATION = set(string.punctuation)
 STOPWORDS = set(stopwords.words('english'))
    

In [2]:
wiki = sc.textFile('s3a://galvanize-ds-bak/wiki_sample_1')

In [3]:
type(wiki)

pyspark.rdd.RDD

In [4]:
wiki.first()


'#REDIRECT [[Computer accessibility]]  {{Redr|move|from CamelCase|up}}'

In [5]:
wiki.cache()

s3a://galvanize-ds-bak/wiki_sample_1 MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [None]:
wiki.setName('wiki')

wiki MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [None]:
wiki.count()

In [None]:
thousand = wiki.take(1000)

In [None]:
t = sc.parallelize(wiki.take(1000))

In [None]:
t.count()

In [None]:
df = t.map(lambda x: Row(x)).toDF()

In [None]:
type(df)

In [None]:
df.show(1)


In [None]:
df = df.filter(~df._1.startswith('#REDIRECT'))

In [None]:
df.show()

In [None]:
def tokenize(text):
   regex = re.compile('<.+?>|[^a-zA-Z]')
   clean_txt = regex.sub(' ', text)
   tokens = clean_txt.split()
   lowercased = [t.lower() for t in tokens]

   no_punctuation = []
   for word in lowercased:
       punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION])
       no_punctuation.append(punct_removed)
   no_stopwords = [w for w in no_punctuation if not w in STOPWORDS]

   STEMMER = PorterStemmer()
   stemmed = [STEMMER.stem(w) for w in no_stopwords]
   return [w for w in stemmed if w]

In [None]:
tokenize_udf = udf(lambda row : tokenize(row), ArrayType(StringType()))

In [None]:
df = df.withColumn('tokens', tokenize_udf(df._1))

In [None]:
df.show()

In [None]:
df = df.withColumn('text', df._1)

In [None]:
df.show()

In [None]:
df = df.drop('_1')

In [None]:
df.show()

In [None]:
cv = CountVectorizer(inputCol='tokens', outputCol='vectors')

In [None]:
model = cv.fit(df)

In [None]:
count_df = model.transform(df)

In [None]:
vocab = np.array(model.vocabulary)

In [None]:
count_df.show()

In [None]:
idf = IDF(inputCol='vectors', outputCol='tfidf')

In [None]:
idf_model = idf.fit(count_df)

In [None]:
tfidf_df = idf_model.transform(count_df)

In [None]:
tfidf_df.show()

In [None]:
kmean = KMeans(featuresCol='tfidf',predictionCol='prediction', k=3, seed=123)

In [None]:
km_model = kmean.fit(tfidf_df)

In [None]:
kmeans_df = km_model.transform(tfidf_df)

In [None]:
kmeans_df.show()

In [None]:
centers = km_model.clusterCenters()

In [None]:
centers

In [None]:
top_10 = np.argsort(-centers[0])[:10]

In [None]:
vocab[top_10]