In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import col
import pyspark.sql.functions as fn


#from pyspark.mllib.util import MLUtils
from pyspark.sql.types import *
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, Tokenizer, RegexTokenizer, StopWordsRemover

sc = pyspark.SparkContext()


In [2]:
dataSchema = StructType([
        StructField("textID",StringType(),True),
        StructField("ID(seq)",StringType(),True),
        StructField("word",StringType(),True),
        StructField("lemma",StringType(),True),
        StructField("PoS",StringType(),True)])

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
DATA_DIR = '../SampleData/'

In [5]:
dataAll  = spark.read.option('delimiter', '\t').csv(path=DATA_DIR+'us_mini.txt', schema=dataSchema)

In [6]:
dataAll.show()

+--------+----------+----------+--------+------+
|  textID|   ID(seq)|      word|   lemma|   PoS|
+--------+----------+----------+--------+------+
|14637197|4739839025|@@14637197|    null|    fo|
|14637197|4739839026|       <p>|    null|  null|
|14637197|4739839027|       NEW|     new|   np1|
|14637197|4739839028|      YORK|    york|   np1|
|14637197|4739839029|         (|    null|     (|
|14637197|4739839030|        AP|      ap|   np1|
|14637197|4739839031|         )|    null|     )|
|14637197|4739839032|        --|    null|jj_nn1|
|14637197|4739839033|    Donald|  donald|   np1|
|14637197|4739839034|     Trump|   trump|   nn1|
|14637197|4739839035|        's|      's|    ge|
|14637197|4739839036|  five-day|five-day|    jj|
|14637197|4739839037|      feud|    feud|   nn1|
|14637197|4739839038|      with|    with|    iw|
|14637197|4739839039|         a|       a|   at1|
|14637197|4739839040|    former|  former|    da|
|14637197|4739839041|    beauty|  beauty|   nn1|
|14637197|4739839042

In [7]:
#df.select(df.columns[['textID','lemma']])

In [8]:
data_all = dataAll['textID','lemma'].na.drop()

In [9]:
# import pandas as pd
# pll = data_all.toPandas()
# pll.lemma.nunique()

In [10]:
data_all_g = data_all.groupby("textID").agg(fn.collect_list("lemma"))

In [11]:
data_all_g.show()

+--------+--------------------+
|  textID| collect_list(lemma)|
+--------+--------------------+
|14637197|[new, york, ap, d...|
|14637202|[in, this, sept, ...|
|14637201|[another, hotel, ...|
|14637200|[here, be, all, t...|
+--------+--------------------+



In [12]:
cv = CountVectorizer(inputCol="collect_list(lemma)", outputCol="vectors")
cv_model = cv.fit(data_all_g)


In [13]:
data_all_v = cv_model.transform(data_all_g)
data_all_v.show()

+--------+--------------------+--------------------+
|  textID| collect_list(lemma)|             vectors|
+--------+--------------------+--------------------+
|14637197|[new, york, ap, d...|(438,[0,1,2,3,4,5...|
|14637202|[in, this, sept, ...|(438,[0,1,2,3,4,5...|
|14637201|[another, hotel, ...|(438,[0,1,2,3,4,5...|
|14637200|[here, be, all, t...|(438,[0,1,2,3,4,5...|
+--------+--------------------+--------------------+



In [18]:
top20 = list(cv_model.vocabulary[0:20])
more_then_3_charachters = [word for word in cv_model.vocabulary if len(word) <= 3]
contains_digits = [word for word in cv_model.vocabulary if any(char.isdigit() for char in word)]


In [19]:
stopwords = []  #Add additional stopwords in this list
default_stop = StopWordsRemover.loadDefaultStopWords('english')
#Combine the three stopwords
stopwords = stopwords + top20  + more_then_3_charachters + contains_digits + default_stop

In [20]:
#Remove stopwords from the tokenized list
remover = StopWordsRemover(inputCol="collect_list(lemma)", outputCol="filtered", stopWords = stopwords)
data_all_filtered = remover.transform(data_all_g)

In [21]:
data_all_filtered.show()

+--------+--------------------+--------------------+
|  textID| collect_list(lemma)|            filtered|
+--------+--------------------+--------------------+
|14637197|[new, york, ap, d...|[york, donald, tr...|
|14637202|[in, this, sept, ...|[sept, photo, ric...|
|14637201|[another, hotel, ...|[another, hotel, ...|
|14637200|[here, be, all, t...|[crazy, stuff, ha...|
+--------+--------------------+--------------------+



In [22]:
#Create a new CountVectorizer model without the stopwords
cv = CountVectorizer(inputCol="filtered", outputCol="vectors")
cvmodel = cv.fit(data_all_filtered)
df_vect = cvmodel.transform(data_all_filtered)

In [23]:
df_vect.show()

+--------+--------------------+--------------------+--------------------+
|  textID| collect_list(lemma)|            filtered|             vectors|
+--------+--------------------+--------------------+--------------------+
|14637197|[new, york, ap, d...|[york, donald, tr...|(315,[5,8,11,12,1...|
|14637202|[in, this, sept, ...|[sept, photo, ric...|(315,[2,4,7,9,13,...|
|14637201|[another, hotel, ...|[another, hotel, ...|(315,[3,6,17,18,1...|
|14637200|[here, be, all, t...|[crazy, stuff, ha...|(315,[0,1,8,10,13...|
+--------+--------------------+--------------------+--------------------+



In [24]:
#transform the dataframe to a format that can be used as input for LDA.train. LDA train expects a RDD with lists,
#where the list consists of a uid and (sparse) Vector
def parseVectors(line):
    return [int(line[2]), line[0]]

In [25]:
df_vect.show()

+--------+--------------------+--------------------+--------------------+
|  textID| collect_list(lemma)|            filtered|             vectors|
+--------+--------------------+--------------------+--------------------+
|14637197|[new, york, ap, d...|[york, donald, tr...|(315,[5,8,11,12,1...|
|14637202|[in, this, sept, ...|[sept, photo, ric...|(315,[2,4,7,9,13,...|
|14637201|[another, hotel, ...|[another, hotel, ...|(315,[3,6,17,18,1...|
|14637200|[here, be, all, t...|[crazy, stuff, ha...|(315,[0,1,8,10,13...|
+--------+--------------------+--------------------+--------------------+



In [35]:
from pyspark.mllib.util import MLUtils
from pyspark.sql.types import *
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors, SparseVector

In [46]:
parseData = df_vect.select('textID','vectors').rdd.map(lambda x: [int(x[0]), Vectors.sparse(x[1])] ) #.toDF()

In [49]:
parseData.show(5)

AttributeError: 'PipelinedRDD' object has no attribute 'show'

In [41]:
from pyspark.ml.clustering import LDA

In [42]:
lda = LDA(k=10, maxIter=10)

In [50]:
lda.fit(parseData)

AttributeError: 'PipelinedRDD' object has no attribute '_jdf'

In [40]:
#parseData.registerTempTable('parseData')

In [33]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import sql

sqlContext = sql.SQLContext(sc)

In [None]:
type(df_vect)

In [26]:
from pyspark.mllib.clustering import LDA, LDAModel

In [27]:
sparsevector = df_vect.select('vectors', 'filtered', 'textID') #.map(parseVectors)

In [28]:
from pyspark.mllib.linalg import Vectors, SparseVector
# data = [[1, Vectors.dense([0.0, 1.0])],[2, SparseVector(2, {0: 1.0})],]
# rdd =  sc.parallelize(data)

In [29]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

# Load and parse the data
data = sc.textFile("data/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)

# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distribu;tions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
    .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")

Learned topics (as distribu;tions over vocab of 11 words):
Topic 0:
 8.15582005442717
 10.656943075611748
 3.855385888247155
 6.040492015211564
 5.650997868296005
 9.44768518707235
 21.145627148673356
 2.4724284854156613
 2.601081411159921
 10.144900085534445
 6.875519127498014
Topic 1:
 9.441854988623765
 6.350068100696127
 1.5408204958934313
 9.778830398980592
 10.950042790661328
 8.675691428171943
 5.864832561565598
 5.287038760035064
 3.2741848108201212
 6.9649099138112
 18.086982260006085
Topic 2:
 8.402324956949066
 11.992988823692123
 6.603793615859415
 24.180677585807846
 8.398959341042666
 3.8766233847557072
 3.9895402897610457
 2.240532754549275
 2.124733778019957
 6.890190000654354
 8.037498612495902


Py4JJavaError: An error occurred while calling o238.save.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/ezgi/Desktop/ADA/NewsCorpus/repo/Scripts/target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel/metadata already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:283)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1493)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1472)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1472)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1472)
	at org.apache.spark.mllib.clustering.DistributedLDAModel$SaveLoadV1_0$.save(LDAModel.scala:876)
	at org.apache.spark.mllib.clustering.DistributedLDAModel.save(LDAModel.scala:819)
	at org.apache.spark.mllib.api.python.LDAModelWrapper.save(LDAModelWrapper.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [32]:
corpus.take(15)

[[0, DenseVector([1.0, 2.0, 6.0, 0.0, 2.0, 3.0, 1.0, 1.0, 0.0, 0.0, 3.0])],
 [1, DenseVector([1.0, 3.0, 0.0, 1.0, 3.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0])],
 [2, DenseVector([1.0, 4.0, 1.0, 0.0, 0.0, 4.0, 9.0, 0.0, 1.0, 2.0, 0.0])],
 [3, DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 3.0, 9.0])],
 [4, DenseVector([3.0, 1.0, 1.0, 9.0, 3.0, 0.0, 2.0, 0.0, 0.0, 1.0, 3.0])],
 [5, DenseVector([4.0, 2.0, 0.0, 3.0, 4.0, 5.0, 1.0, 1.0, 1.0, 4.0, 0.0])],
 [6, DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 2.0, 9.0])],
 [7, DenseVector([1.0, 1.0, 1.0, 9.0, 2.0, 1.0, 2.0, 0.0, 0.0, 1.0, 3.0])],
 [8, DenseVector([4.0, 4.0, 0.0, 3.0, 4.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0])],
 [9, DenseVector([2.0, 8.0, 2.0, 0.0, 3.0, 0.0, 2.0, 0.0, 2.0, 7.0, 2.0])],
 [10, DenseVector([1.0, 1.0, 1.0, 9.0, 0.0, 2.0, 2.0, 0.0, 0.0, 3.0, 3.0])],
 [11, DenseVector([4.0, 1.0, 0.0, 0.0, 4.0, 5.0, 1.0, 3.0, 0.0, 1.0, 0.0])]]

In [33]:
df_vect.show()

+--------+--------------------+--------------------+--------------------+
|  textID| collect_list(lemma)|            filtered|             vectors|
+--------+--------------------+--------------------+--------------------+
|14637197|[new, york, ap, d...|[york, donald, tr...|(314,[4,5,8,11,19...|
|14637202|[in, this, sept, ...|[sept, photo, ric...|(314,[1,3,7,10,15...|
|14637201|[another, hotel, ...|[another, hotel, ...|(314,[2,14,16,18,...|
|14637200|[here, be, all, t...|[crazy, stuff, ha...|(314,[0,5,6,9,10,...|
+--------+--------------------+--------------------+--------------------+



In [34]:
type(df_vect)

pyspark.sql.dataframe.DataFrame

In [35]:
parseData = df_vect.select('textID','vectors').rdd.map(lambda x: [int(x[0]), Vectors.dense(x[1])] )

In [36]:
ldaModel = LDA.train(parseData, k=2)

In [37]:
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distribu;tions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(2):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

Learned topics (as distribu;tions over vocab of 314 words):
Topic 0:
 3.9704595682608885
 0.02312037932725725
 3.90595140113783
 0.023120452870414258
 0.01972778996220793
 2.0369844612066887
 2.9707684611354175
 0.02288816952648734
 0.019535498897296895
 2.9707795102163557
 2.0995037488636528
 0.019535498881077598
 2.970785959735581
 2.9707690597089096
 2.9075029688966096
 0.02288850041655209
 2.9074694284330835
 2.970775091173252
 2.907454359187454
 1.4837181865838622
 0.022435483137319155
 0.9363735375394422
 1.9713957555515416
 0.49595815146467526
 1.9104723505664165
 1.9104741406398098
 1.9104654669090033
 1.9713965223819496
 1.971395731497023
 1.9713941787585625
 0.8971043371498983
 0.019160297196939177
 0.019160296989536488
 1.9713980272233411
 1.971396388294141
 0.022435220779828877
 0.02243488671002538
 1.9104750418658183
 0.02243487944807963
 1.9713985407064687
 0.019160297040379106
 0.02243499045743865
 1.9713978430322157
 1.9713961502785355
 1.971393122241633
 1.971394981190

In [38]:
parseData.take(4)

[[14637197,
  DenseVector([0.0, 0.0, 0.0, 0.0, 4.0, 1.0, 0.0, 0.0, 3.0, 0.0, 0.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0

In [None]:
sparsevector = df_vect.select('vectors', 'text', 'textID').map(parseVectors)

#Train the LDA model
model = LDA.train(sparsevector, k=5, seed=1)

#Print the topics in the model
topics = model.describeTopics(maxTermsPerTopic = 15)
for x, topic in enumerate(topics):
    print ('topic nr: ' + str(x))
    words = topic[0]
    weights = topic[1]
    for n in range(len(words)):
        print (cvmodel.vocabulary[words[n]] + ' ' + str(weights[n]))

In [None]:
with open ('topic_result.txt', 'w') as f:
    #Print the topics in the model
    topics = ldaModel.describeTopics(maxTermsPerTopic = 10)
    for x, topic in enumerate(topics):
        f.write('topic nr: ' + str(x)+ '\n')
        words = topic[0]
        weights = topic[1]
        for n in range(len(words)):
            f.write(cvmodel.vocabulary[words[n]] + ' ' + str(weights[n])+ '\n')

In [None]:
lda