In [1]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
import re
import nltk
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
import wikipedia
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.ml.feature import CountVectorizer, IDF, RegexTokenizer, StopWordsRemover, Normalizer
from pyspark.sql import SQLContext
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline

import time

#LocalMode
#spark = SparkSession.builder.getOrCreate()

#StandaloneMode
spark = SparkSession.builder.master("spark://169.254.57.70:7077").getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

def get_title(content):
    # Remove any leading or lagging space if present 
    content = content.strip()
    title = ''
    try:
        if(content != ''):
            # Split the content on the basis of new line
            arr = content.split("\n",2)
            # Second line is the title
            title = arr[1]
            # Rest is the actual content
            actual_content = arr[2]
    except:
        title = 'error'
    return title

def get_content(content):
    # Remove any leading or lagging space if present 
    content = content.strip()
    actual_content = ''
    try:
        if(content != ''):
            # Split the content on the basis of new line
            arr = content.split("\n",2)
            # Second line is the title
            title = arr[1]
            # Rest is the actual content
            actual_content = arr[2]
    except:
        actual_content = 'error'
    return actual_content

def clean(article):
    title = article[0]
    document = article[1]
    tokens = RegexpTokenizer(r'\w+').tokenize(document.lower())
    tokens_clean = [token for token in tokens if token not in stopwords.words('english')]
    tokens_stemmed = [PorterStemmer().stem(token) for token in tokens_clean]
    return (title, tokens_stemmed)

def splitByDoc(textfile):
    return list(filter(lambda x: x != "\n", textfile[1].split("</doc>")))

In [2]:
spark

In [7]:
start = time.time()
#data = sc.wholeTextFiles("C:/Users/Alina/Big Data/Wikipedia Exports/cars/*/*")
#data = sc.wholeTextFiles("C:/Users/Alina/Big Data/Wikipedia Exports/42_articles/*/*")
#data = sc.wholeTextFiles("C:/Users/Alina/Big Data/Wikipedia Exports/all_articles_2mb/*/*")
data = sc.wholeTextFiles("C:/Users/Alina/Big Data/Wikipedia Exports/all_articles_100mb/*/*")

pagesRaw = data.flatMap(splitByDoc)
pagesTitleContent = pagesRaw.map(lambda x : (get_title(x),get_content(x))).filter(lambda x: x[0] != "error" and x[0] != "")

#RDD to DataFrame
dfPagesTitleContent = sqlContext.createDataFrame(pagesTitleContent, ["title",'content'])

regexTokenizer = RegexTokenizer(inputCol="content", outputCol="list_of_words_raw", pattern="\\W", minTokenLength=4)
#tokenizedWords = regexTokenizer.transform(dfPagesTitleContent)

stopWordsRemover = StopWordsRemover(inputCol="list_of_words_raw", outputCol="list_of_words")
stopwords = stopWordsRemover.getStopWords()
stopwords.extend(["also", "first", "used"])
stopWordsRemover.setStopWords(stopwords)
#cleanedWords = stopWordsRemover.transform(tokenizedWords)

countVectorizer = CountVectorizer(inputCol="list_of_words", outputCol="features")
#vectorizedWords = countVectorizer.fit(cleanedWords).transform(cleanedWords)

lda = LDA(k=20, maxIter=15)
#model = lda.fit(vectorizedWords)

pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, lda])

model = pipeline.fit(dfPagesTitleContent)
end = time.time()
print(end - start)

82.3042414188385


In [4]:
model.stages

[RegexTokenizer_d259efcaf7e0,
 StopWordsRemover_47566fa96056,
 CountVectorizerModel: uid=CountVectorizer_6e99e00036b6, vocabularySize=8371,
 LocalLDAModel: uid=LDA_181d79937efc, k=3, numFeatures=8371]

In [8]:
cleanedPagesTitles = pagesTitleContent.map(lambda x: x[0])
cleanedPagesTitles.collect()
#cleanedPagesTitles.count()

['Anarchism\r',
 'Autism\r',
 'Albedo\r',
 'A\r',
 'Alabama\r',
 'Achilles\r',
 'Abraham Lincoln\r',
 'Aristotle\r',
 'An American in Paris\r',
 'Academy Award for Best Production Design\r',
 'Academy Awards\r',
 'Actrius\r',
 'Animalia (book)\r',
 'International Atomic Time\r',
 'Altruism\r',
 'Ayn Rand\r',
 'Alain Connes\r',
 'Allan Dwan\r',
 'Algeria\r',
 'List of Atlas Shrugged characters\r',
 'Anthropology\r',
 'Agricultural science\r',
 'Alchemy\r',
 'Alien\r',
 'Astronomer\r',
 'ASCII\r',
 'Austin (disambiguation)\r',
 'Animation\r',
 'Apollo\r',
 'Andre Agassi\r',
 'Austroasiatic languages\r',
 'Afroasiatic languages\r',
 'Andorra\r',
 'Arithmetic mean\r',
 'American Football Conference\r',
 'Animal Farm\r',
 'Amphibian\r',
 'Alaska\r',
 'Agriculture\r',
 'Aldous Huxley\r',
 'Ada\r',
 'Aberdeen (disambiguation)\r',
 'Algae\r',
 'Analysis of variance\r',
 'ANOVA\r',
 'Alkane\r',
 'Appellate procedure in the United States\r',
 'Answer (law)\r',
 'Appellate court\r',
 'Arraignment

In [8]:
vocab = model.stages[2].vocabulary
topics = model.stages[3].describeTopics()
topicsRdd = topics.rdd
topicsRaw = topicsRdd.map(lambda row: row['termIndices']).collect()
result = map(lambda entry: [vocab[idx] for idx in entry], topicsRaw)
print(list(result))

[['mountbatten', 'californium', 'augustine', 'curium', 'alcohols', 'berkelium', 'dysprosium', 'dance', 'alkenes', 'form'], ['apollo', 'lunar', 'crew', 'spacecraft', 'mission', 'module', 'moon', 'nasa', 'orbit', 'astronauts'], ['system', 'tree', 'bermuda', 'jenner', 'canal', 'number', 'node', 'called', 'fortaleza', 'many'], ['corbett', 'lancaster', 'monet', 'european', 'party', 'booth', 'parliament', 'national', 'club', 'eurosceptic'], ['party', 'european', 'earnhardt', 'parties', 'world', 'season', 'chess', 'finland', 'game', 'national'], ['many', 'time', 'world', 'years', 'city', 'known', 'states', 'however', 'later', 'century'], ['candide', 'canal', 'voltaire', 'many', 'wright', 'walsh', 'world', 'anaximenes', 'three', 'work'], ['matter', 'dark', 'ecliptic', 'comet', 'fiorentina', 'system', 'equinox', 'comets', 'energy', 'many'], ['work', 'time', 'book', 'later', 'many', 'economics', 'german', 'smith', 'world', 'people'], ['form', 'field', 'dipole', 'vector', 'curl', '1040', 'conserv

In [8]:
article_title = "Lamborghini"
#article_title = "Car"
article_content_test = clean([article_title, wikipedia.page(article_title).content])[1]

In [9]:
article_content_test_rdd = sc.parallelize([article_content_test]).zipWithIndex()
df_txts_test  = sqlContext.createDataFrame(article_content_test_rdd, ["list_of_words",'index'])

cv_test = CountVectorizer(inputCol="list_of_words", outputCol="features")
cvmodel_test = cv_test.fit(df_txts_test)
result_cv_test = cvmodel_test.transform(df_txts_test)

In [10]:
result_cv_test.show()

+--------------------+-----+--------------------+
|       list_of_words|index|            features|
+--------------------+-----+--------------------+
|[automobili, lamb...|    0|(1075,[0,1,2,3,4,...|
+--------------------+-----+--------------------+



In [11]:
ll = model.stages[3].logLikelihood(result_cv_test)
lp = model.stages[3].logPerplexity(result_cv_test)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -32270.573814533127
The upper bound on perplexity: 11.051566374840112
