In [1]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
import re
import nltk
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
import wikipedia
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.ml.feature import CountVectorizer, IDF, RegexTokenizer, StopWordsRemover, Normalizer
from pyspark.sql import SQLContext
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

def get_title(content):
    # Remove any leading or lagging space if present 
    content = content.strip()
    title = ''
    try:
        if(content != ''):
            # Split the content on the basis of new line
            arr = content.split("\n",2)
            # Second line is the title
            title = arr[1]
            # Rest is the actual content
            actual_content = arr[2]
    except:
        title = 'error'
    return title

def get_content(content):
    # Remove any leading or lagging space if present 
    content = content.strip()
    actual_content = ''
    try:
        if(content != ''):
            # Split the content on the basis of new line
            arr = content.split("\n",2)
            # Second line is the title
            title = arr[1]
            # Rest is the actual content
            actual_content = arr[2]
    except:
        actual_content = 'error'
    return actual_content

def clean(article):
    title = article[0]
    document = article[1]
    tokens = RegexpTokenizer(r'\w+').tokenize(document.lower())
    tokens_clean = [token for token in tokens if token not in stopwords.words('english')]
    tokens_stemmed = [PorterStemmer().stem(token) for token in tokens_clean]
    return (title, tokens_stemmed)

def splitByDoc(textfile):
    return list(filter(lambda x: x != "\n", textfile[1].split("</doc>")))

In [2]:
spark

In [3]:
data = sc.wholeTextFiles("C:/Users/Alina/Big Data/Wikipedia Exports/cars/*/*")
#data = sc.wholeTextFiles("C:/Users/Alina/Big Data/Wikipedia Exports/42_articles/*/*")
#data = sc.wholeTextFiles("C:/Users/Alina/Big Data/Wikipedia Exports/metadaten_test1/*/*")

pagesRaw = data.flatMap(splitByDoc)
pagesTitleContent = pagesRaw.map(lambda x : (get_title(x),get_content(x)))

#RDD to DataFrame
dfPagesTitleContent = sqlContext.createDataFrame(pagesTitleContent, ["title",'content'])

regexTokenizer = RegexTokenizer(inputCol="content", outputCol="list_of_words_raw", pattern="\\W")
#tokenizedWords = regexTokenizer.transform(dfPagesTitleContent)

stopWordsRemover = StopWordsRemover(inputCol="list_of_words_raw", outputCol="list_of_words")
#cleanedWords = stopWordsRemover.transform(tokenizedWords)

countVectorizer = CountVectorizer(inputCol="list_of_words", outputCol="features")
#vectorizedWords = countVectorizer.fit(cleanedWords).transform(cleanedWords)

lda = LDA(k=3, maxIter=15)
#model = lda.fit(vectorizedWords)

pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, lda])

model = pipeline.fit(dfPagesTitleContent)

In [4]:
model.stages

[RegexTokenizer_d259efcaf7e0,
 StopWordsRemover_47566fa96056,
 CountVectorizerModel: uid=CountVectorizer_6e99e00036b6, vocabularySize=8371,
 LocalLDAModel: uid=LDA_181d79937efc, k=3, numFeatures=8371]

In [5]:
cleanedPagesTitles = pagesTitleContent.map(lambda x: x[0])
cleanedPagesTitles.collect()
#cleanedPagesTitles.count()

['Audi',
 'BMW',
 'Opel',
 'Mercedes-Benz',
 'Human',
 'Nature',
 'Chair',
 'Table',
 'Plant',
 'Window',
 'Notebook',
 'BYD',
 'Volvo',
 'Škoda Auto']

In [6]:
#verbessert das Model

# IDF
#idf = IDF(inputCol="raw_features", outputCol="features")
#idfModel = idf.fit(result_cv)
#result_tfidf = idfModel.transform(result_cv) 
#result_tfidf.select("features").take(1)

In [7]:
vocab = model.stages[2].vocabulary
topics = model.stages[3].describeTopics()
topicsRdd = topics.rdd
topics_words = topicsRdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()
topics_words

[['koda',
  'window',
  'windows',
  'glass',
  'used',
  'humans',
  'human',
  'company',
  'new',
  'production'],
 ['humans',
  'human',
  'opel',
  'koda',
  'years',
  'also',
  '000',
  'first',
  'species',
  'volvo'],
 ['tag',
  'hydrocarbon',
  'herbaceous',
  'cheaper',
  'tropical',
  'popemobile',
  'tax',
  'benefit',
  'gtc',
  'honours']]

In [8]:
article_title = "Lamborghini"
#article_title = "Car"
article_content_test = clean([article_title, wikipedia.page(article_title).content])[1]

In [9]:
article_content_test_rdd = sc.parallelize([article_content_test]).zipWithIndex()
df_txts_test  = sqlContext.createDataFrame(article_content_test_rdd, ["list_of_words",'index'])

cv_test = CountVectorizer(inputCol="list_of_words", outputCol="features")
cvmodel_test = cv_test.fit(df_txts_test)
result_cv_test = cvmodel_test.transform(df_txts_test)

In [10]:
result_cv_test.show()

+--------------------+-----+--------------------+
|       list_of_words|index|            features|
+--------------------+-----+--------------------+
|[automobili, lamb...|    0|(1075,[0,1,2,3,4,...|
+--------------------+-----+--------------------+



In [11]:
ll = model.stages[3].logLikelihood(result_cv_test)
lp = model.stages[3].logPerplexity(result_cv_test)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -32270.573814533127
The upper bound on perplexity: 11.051566374840112
