In [8]:
# importing some libraries
from pyspark.context import SparkContext
import pandas as pd
import pyspark
from pyspark.sql import SQLContext

In [9]:
import glob

books = []

path = '../../data/texts/English/'

for filepath in glob.glob(os.path.join(path, '*.txt')):
    with open(os.path.join(os.getcwd(), filepath), 'r') as file:
        data = file.read().replace('\n', '')
        filename = filepath.replace(path, "", 1)
        name = filename.replace(".txt", "", 1)
        books.append((name, data))

df = pd.DataFrame(books, columns =[['Name','Text']])

df.to_csv("en_books.csv")

In [10]:
# check if spark context is defined
sc = SparkContext('local').getOrCreate()
print(sc.version)

3.1.1


In [11]:
df.head(5)

Unnamed: 0,Name,Text
0,Captains Courageous - Rudyard Kipling,Rudyard KiplingCAPTAINS COURAGEOUSCHAPTER IThe...
1,Dracula - Bram Stoker,Bram StokerDraculaPrefaceHow these papers have...
2,Three Men in a Boat (to Say Nothing of The Dog...,Jerome K. JeromeTHREE MEN IN A BOAT(TO SAY NOT...
3,"Bush Boys, The - Mayne Reid",Captain Mayne ReidThe Bush BoysChapter One.The...
4,Macbeth - William Shakespeare,"William ShakespeareMacbethAct I, Scene 1A dese..."


In [24]:

sqlContext = SQLContext(sc)
# stuff we'll need for text processing

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF

# stuff we'll need for building the model
from pyspark.mllib.linalg import Vector, Vectors

# reading the data
data = sqlContext.read.format("csv") \
   .options(header='true', inferschema='true') \
   .load(os.path.realpath("en_books.csv"))

type(data)

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/Tobias/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


pyspark.sql.dataframe.DataFrame

In [25]:
reviews = data.rdd.map(lambda x : x['Text']).filter(lambda x: x is not None)
StopWords = stopwords.words("english")

type(reviews)


pyspark.rdd.PipelinedRDD

In [26]:
tokens = reviews                                                   \
    .map( lambda document: document.strip().lower())               \
    .map( lambda document: re.split(" ", document))          \
    .map( lambda word: [x for x in word if x.isalpha()])           \
    .map( lambda word: [x for x in word if len(x) > 3] )           \
    .map( lambda word: [x for x in word if x not in StopWords])    \
    .zipWithIndex()

In [27]:
df_txts = sqlContext.createDataFrame(tokens, ["list_of_words",'index'])
# TF
cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(df_txts)
result_cv = cvmodel.transform(df_txts)
# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv)

In [28]:
tfidf_df = result_tfidf.toPandas()
tfidf_df.head()

Unnamed: 0,list_of_words,index,raw_features,features
0,"[rudyard, kiplingcaptains, courageouschapter, ...",0,"(87.0, 418.0, 86.0, 11.0, 102.0, 25.0, 103.0, ...","(12.323594984483336, 68.39709914079329, 14.072..."
1,"[stokerdraculaprefacehow, papers, placed, sequ...",1,"(1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, ...","(0.14165051706302684, 0.16362942378180212, 0.1..."
2,"[jerome, jeromethree, nothing, thoughts, idle,...",2,"(348.0, 279.0, 151.0, 93.0, 113.0, 56.0, 113.0...","(49.29437993793334, 45.65260923512279, 24.7080..."
3,"[captain, mayne, reidthe, bush, boyschapter, b...",3,"(478.0, 44.0, 347.0, 426.0, 153.0, 48.0, 87.0,...","(67.70894715612683, 7.199694646399293, 56.7794..."
4,"[william, shakespearemacbethact, scene, desert...",4,"(37.0, 2.0, 12.0, 47.0, 5.0, 30.0, 32.0, 19.0,...","(5.241069131331993, 0.32725884756360424, 1.963..."


In [29]:

from pyspark.ml.clustering import LDA
result_df = result_tfidf['index','features'].toDF('index','features')
lda_model = LDA().fit(result_df)
wordNumbers = 10
topicIndices = lda_model.describeTopics(maxTermsPerTopic = wordNumbers)
vocab = cvmodel.vocabulary

In [30]:
def topic_render(topic):
    terms = topic[1]
    result = []
    for i in range(wordNumbers):
        term = vocab[terms[i]]
        result.append(term)
    return result


In [36]:
topics_final = topicIndices.rdd.map(lambda topic: topic_render(topic)).collect()


for topic in range(len(topics_final)):
    print ("Topic" + str(topic) + " : " + str(topics_final[topic]))

Topic0 : ['traversed', 'board', 'smallest', 'entitled', 'indignant', 'struggled', 'backs', 'breathe', 'professed', 'curtain']
Topic1 : ['thou', 'thee', 'hath', 'hast', 'doth', 'holy', 'paris', 'villain', 'heaven', 'hence']
Topic2 : ['funeral', 'event', 'break', 'want', 'science', 'faith', 'asking', 'nation', 'cable', 'tricks']
Topic3 : ['holmes', 'said', 'upon', 'would', 'could', 'anyone', 'come', 'professor', 'nothing', 'baker']
Topic4 : ['holmes', 'upon', 'wolf', 'said', 'captain', 'come', 'would', 'could', 'dollars', 'police']
Topic5 : ['thou', 'thee', 'hast', 'hath', 'holmes', 'gods', 'self', 'eternal', 'thence', 'angels']
Topic6 : ['holmes', 'said', 'upon', 'police', 'would', 'professor', 'could', 'peter', 'london', 'back']
Topic7 : ['question', 'perfect', 'france', 'belonged', 'things', 'fully', 'selfish', 'smoking', 'fear', 'defend']
Topic8 : ['said', 'would', 'could', 'miss', 'like', 'george', 'never', 'know', 'must', 'come']
Topic9 : ['holmes', 'said', 'upon', 'would', 'police