In [1]:
from pyspark import SparkConf, SparkContext, SQLContext  
from pyspark.sql import SparkSession   
from pyspark.ml.feature import Word2Vec,CountVectorizer  
from pyspark.ml.clustering import LDA, LDAModel  
from pyspark.sql.functions import col, udf  
from pyspark.sql.types import IntegerType,ArrayType,StringType  
import pylab as pl  

In [2]:
def to_word(termIndices):
  words = []  
  for termID in termIndices:
    words.append(vocab_broadcast.value[termID])      
  return words

In [3]:
#Load your document dataframe here
#================your code here==================

spark = SparkSession.builder \
    .appName('CSV_Handler').getOrCreate()

spark_df = spark.read.options(header=True, inferSchema=True) \
    .csv(' ') # your stream_data.csv file directory, can be the gs path

spark_df = spark_df.dropna(subset=['sentence'])

#==================================================
spark_df.show(5)

+--------------------+
|            sentence|
+--------------------+
|   iPhone  #      # |
|                   "|
|RT @WandasAttorne...|
|     RT @T_Az38: AI |
|     RT @T_Az38: AI |
+--------------------+
only showing top 5 rows



In [4]:
#CountVectorizer
#================your code here==================

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(spark_df)

wordsData.show(5)

cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=100)
cvModel = cv.fit(wordsData)

cvResult = cvModel.transform(wordsData)
cvResult.show(5, truncate=False)

#==================================================

+--------------------+--------------------+
|            sentence|               words|
+--------------------+--------------------+
|   iPhone  #      # |[, iphone, , #, ,...|
|                   "|               [, "]|
|RT @WandasAttorne...|[rt, @wandasattor...|
|     RT @T_Az38: AI |  [rt, @t_az38:, ai]|
|     RT @T_Az38: AI |  [rt, @t_az38:, ai]|
+--------------------+--------------------+
only showing top 5 rows

+----------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+
|sentence                                                                                                        |words                                                                                                                            |features                                |
+------

In [5]:
#train LDA model, cluster the documents into 10 topics 
#================your code here==================

lda = LDA(k = 10, seed = 1, optimizer = "em")
lda.setMaxIter(100)
ldaModel = lda.fit(cvResult)

#==================================================

In [6]:
transformed = ldaModel.transform(cvResult).select("topicDistribution")  
#show the weight of every topic Distribution 
transformed.show(5, truncate=False) 

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topicDistribution                                                                                                                                                                                       |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[0.08695708608228897,0.08695675244901402,0.12324203388850045,0.1326341745492554,0.08695675667400174,0.13488126970778308,0.08746744017308353,0.0869628761072495,0.08695913908263181,0.08698247128619148] |
|[0.09677501473212528,0.09677442940493965,0.11029027233693207,0.10398842590378818,0.09677438843408909,0.1042594207363663,0.10075042490494877,0.09679009181753573,0.09678212772917134,0.09681

In [7]:
#The higher ll is, the lower lp is, the better model is.
ll = ldaModel.logLikelihood(cvResult)  
lp = ldaModel.logPerplexity(cvResult)
print("ll: ", ll)
print("lp: ", lp)

ll:  -219892.38051280688
lp:  3.6965400348452895


In [8]:
wordNumbers = 10
vocabArray = cvModel.vocabulary

sc = SparkContext.getOrCreate()

topicIndices = ldaModel.describeTopics(maxTermsPerTopic = wordNumbers)
vocab_broadcast = sc.broadcast(vocabArray)
udf_to_word = udf(to_word, ArrayType(StringType()))
 
topics = topicIndices.withColumn("words", udf_to_word(topicIndices.termIndices))
topics.show(5, truncate=False)

+-----+--------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------+
|topic|termIndices                           |termWeights                                                                                                                                                                                                                |words                                           |
+-----+--------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------+
|0    |[5, 11, 17, 19, 4, 37, 38, 42, 58, 56]|[0.260

In [9]:
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())+ " words):")
topics = ldaModel.topicsMatrix()
print(topics)

Learned topics (as distributions over vocab of 100 words):
DenseMatrix([[4.44025656e-01, 3.38664462e-01, 9.37252313e+02, 6.29260387e+03,
              3.39404730e-01, 6.48613679e+03, 9.21858314e+01, 1.53735391e+00,
              9.02863222e-01, 5.25888432e+00],
             [1.66685708e+00, 1.99447506e+00, 7.26802061e+01, 4.26428005e-01,
              5.35555200e-01, 7.40506718e-01, 2.46618364e+03, 4.31182788e+02,
              2.60024868e+02, 7.76564677e+02],
             [5.65331394e-01, 3.54350114e-01, 2.64056392e+03, 8.10896183e-01,
              3.27200611e-01, 7.65152514e+00, 8.92237684e+02, 3.92480873e+00,
              2.33686135e+00, 9.22742563e+00],
             [3.51578754e-01, 2.75492801e-01, 5.47527391e+00, 3.87443189e-01,
              2.68974569e-01, 5.01502347e-01, 1.72050083e+01, 4.52451874e+02,
              7.61680788e-01, 2.12032117e+03],
             [2.72496692e+02, 1.13373992e+03, 3.84032310e-01, 2.28626432e-01,
              8.08515319e+02, 2.29134979e-01, 5.585