In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession,SQLContext, Row
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors

spark = SparkSession.builder.master("local[*]") \
                    .appName('Topic Modelling') \
                    .getOrCreate()

In [2]:
def parse_line(line):
    print(type(line))
    words = line[0].split(" ")
    idd = line[1]
    return Row(idd = idd, words = words)

In [3]:
path = "/home/giangvdq/workspaces/pythie/sample.txt"

In [7]:
data = spark.sparkContext.textFile(path).zipWithIndex().map(lambda line: parse_line(line))#.map(lambda words, idd: Row(idd = idd, words = words.split(" ")))
docDF = spark.createDataFrame(data)
docDF.show(20, False)


+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
docDF.printSchema()

root
 |-- idd: long (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [5]:
Vector = CountVectorizer(inputCol="words", outputCol="vectors")
model = Vector.fit(docDF)
result = model.transform(docDF)

result.printSchema()

root
 |-- idd: long (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vectors: vector (nullable = true)



In [6]:
result.show(20,False)


+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
def rowFromML(line):
    x = line[0]
    y = line[1]
    return [x,Vectors.fromML(y)]

In [7]:
corpus = result.select("idd", "vectors").rdd.map(lambda line: rowFromML(line)).cache()

for x in corpus.collect():
    print(x)

[0, SparseVector(540, {0: 1.0, 2: 1.0, 5: 1.0, 6: 1.0, 10: 1.0, 18: 2.0, 20: 1.0, 73: 1.0, 81: 1.0, 104: 1.0, 131: 1.0, 187: 1.0, 199: 1.0, 200: 1.0, 273: 1.0, 456: 1.0, 498: 1.0})]
[1, SparseVector(540, {0: 7.0, 2: 1.0, 5: 7.0, 6: 1.0, 12: 2.0, 15: 1.0, 18: 2.0, 20: 3.0, 21: 1.0, 24: 1.0, 25: 2.0, 29: 1.0, 41: 1.0, 42: 1.0, 43: 3.0, 49: 1.0, 51: 3.0, 55: 1.0, 62: 1.0, 63: 3.0, 74: 1.0, 78: 2.0, 88: 1.0, 100: 3.0, 101: 3.0, 111: 1.0, 116: 1.0, 119: 1.0, 122: 1.0, 128: 1.0, 137: 1.0, 139: 1.0, 143: 2.0, 166: 1.0, 167: 2.0, 214: 1.0, 216: 1.0, 275: 1.0, 290: 1.0, 295: 1.0, 313: 1.0, 337: 1.0, 341: 1.0, 347: 1.0, 350: 1.0, 396: 1.0, 423: 1.0, 424: 1.0, 426: 1.0, 430: 1.0, 434: 1.0, 444: 1.0, 471: 1.0, 539: 1.0})]
[2, SparseVector(540, {0: 1.0, 1: 1.0, 6: 1.0, 7: 1.0, 13: 1.0, 51: 1.0, 67: 1.0, 79: 1.0, 92: 1.0, 96: 1.0, 109: 1.0, 139: 1.0, 145: 1.0, 229: 1.0, 231: 1.0, 265: 1.0, 335: 1.0, 387: 1.0, 407: 1.0, 461: 1.0, 477: 1.0})]
[3, SparseVector(540, {0: 6.0, 5: 2.0, 6: 1.0, 15: 1.0, 18:

In [None]:
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3,maxIterations=10000,optimizer='online')
topics = ldaModel.topicsMatrix()
vocabArray = model.vocabulary

wordNumbers = 10  # number of words per topic
topicIndices = spark.sparkContext.parallelize(ldaModel.describeTopics(maxTermsPerTopic = wordNumbers))

In [None]:
def topic_render(topic):  # specify vector id of words to actual words
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result

topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()

In [None]:
for topic in range(len(topics_final)):
    print ("Topic" + str(topic) + ":")
    for term in topics_final[topic]:
        print (term)
    print ('\n')

In [None]:
https://stackoverflow.com/questions/42051184/latent-dirichlet-allocation-lda-in-spark