In [None]:
# load libraries, declaration of global variables
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import DataFrameWriter
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import StringIndexer
import pandas as pd

db = "sm"

In [None]:
# load chosen data into a spark data frame
df = spark.sql("SELECT status_id, translatedText AS text FROM " + db + ".twitter_translations")
df.show()

In [None]:
# tokenize text
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W", minTokenLength=4, 
                                toLowercase=True)
df = regexTokenizer.transform(df)

In [None]:
# drop a standard list of words
stopwordList = ["i","me","my","myself","we","our","ours","ourselves","you","your","yours","yourself","yourselves","he",
                "him","his","himself","she","her","hers","herself","it","its","itself","they","them","their","theirs",
                "themselves","what","which","who","whom","this","that","these","those","am","is","are","was","were",
                "be","been","being","have","has","had","having","do","does","did","doing","a","an","the","and","but",
                "if","or","because","as","until","while","of","at","by","for","with","about","against","between","into",
                "through","during","before","after","above","below","to","from","up","down","in","out","on","off","over",
                "under","again","further","then","once","here","there","when","where","why","how","all","any","both",
                "each","few","more","most","other","some","such","no","nor","not","only","own","same","so","than","too",
                "very","s","t","can","will","just","don","should","now","like","would","find","part","href", "bock", "poland",
                "people"]
remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stopwordList)
df = remover.transform(df)
df = df.filter("text <> ''")
df.show()

In [None]:
# compute counts of words in tweets/comments
cv = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5.0)
model = cv.fit(df)
vocabulary_local = pd.DataFrame(model.vocabulary, columns = ["word"])
vocabulary_local.head()
# model.transform(df).show()
df = model.transform(df)
df=df.drop('text', 'words','filtered')
df.show()

In [None]:
# discovering topics through LDA algorithm
# seed not working??? different results even though the seed is set...
num_topics = 7 #,maxIter=10
lda = LDA(k=num_topics, seed=1, optimizer="em")
model = lda.fit(df)
modelres = model.transform(df)
model.describeTopics().show()
modelres.show()

In [None]:
# LDA results
pd_topics = model.describeTopics().toPandas()
pd_topics.head()
df_terms = vocabulary_local.reset_index()
translate = df_terms.set_index('index').to_dict()['word']
empty_list = [[] for x in range(len(pd_topics))]
for i in range(len(pd_topics)):
    for j in range(0,len(pd_topics['termIndices'][i])):
        empty_list[i].append(translate[pd_topics['termIndices'][i][j]])
pd_topics_bu = pd_topics
pd_topics_bu['termIndices'] = empty_list
pd_topics_bu

In [None]:
# top ten influential words for each topic
for i in range(0,num_topics):
    print(pd_topics_bu['termIndices'][i])
    print

In [None]:
# re-score data (assign the topic with the highest probability to each tweet/comment)
modelres = model.transform(df)
modelres = modelres.toPandas()
modelres.head()
max_index=[]
for i in modelres['topicDistribution']:
    i=i.tolist()
    max_value = max(i)
    max_index.append(i.index(max_value))
modelres['topicindex'] = max_index
topicname = {0:'poland_anti_ukraine',1:'forrest_needs',2:'polish_government_support',3:'poland_jews',4:'forrest_support'}
modelres['topicindex'].replace(topicname,inplace=True)

In [None]:
# combine data with sentiment data
df_sentiments = spark.sql("SELECT * FROM sm.twitter_sentiment_fb").toPandas()
trada=modelres.merge(df_sentiments,how='inner',on='status_id')
trada.groupby("topicindex")['love','angry','haha','wow','sad'].mean()