#### Importing Necessary Config + Packages

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col, udf

from textblob import TextBlob

import sparknlp
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import Tokenizer, Normalizer, LemmatizerModel, StopWordsCleaner, ViveknSentimentApproach, SentimentDetector, SentenceDetector, Stemmer, Lemmatizer
from pyspark.ml import Pipeline

In [2]:
import os
os.environ["SPARK_JARS"] = "../../Urban-Research/venv-urban/lib/python3.7/site-packages/pyspark/jars"
# os.environ["SPARK_HOME"] = "../../Urban-Research/venv-urban/lib/python3.7/site-packages/pyspark"
os.environ["PYSPARK_PYTHON"]="/usr/local/bin/python3"
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/"
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
# os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
spark = sparknlp.start()

In [3]:
# os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk"
# os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

In [4]:
conf = SparkConf()
conf.setAppName("LDA_App")
conf.setMaster("local[4]")

<pyspark.conf.SparkConf at 0x12913aa50>

In [5]:
# os.environ["PATH"]

#### Building Spark Context

In [6]:
# sc = SparkContext(conf = conf)
sqlCtx = SQLContext(spark)

#### Reading in the Data

In [7]:
# input_file = "Tweets/streamed_05_06_2020_20_46_18.json.gz" #Single File
input_file = "TweetsSample" #Directory
twitter = sqlCtx.read.json(input_file)
twitter.registerTempTable("tweets") 

In [8]:
twitter = twitter.where(~col("id").isNull())

In [9]:
print("Number of Tweets:", twitter.count())

Number of Tweets: 53933


#### Line to show schema: twitter.printSchema()

In [10]:
# twitter.printSchema()

#### To see how Spark is configured and what tasks have been executed, visit the localhost port (here 4040) 

In [11]:
#Persist in Memory
# sqlCtx.cacheTable("tweets")
#Uncache in Memory
# sqlCtx.uncacheTable("tweets")

#### Culling to root-n Retweet Representation in the Dataset

In [12]:
import pyspark.sql.functions as f
from pyspark.sql.functions import mean as _mean, stddev as _stddev, to_date as _to_date, sqrt as _sqrt, col, lit, monotonically_increasing_id
from datetime import date

retweets = twitter.groupBy("retweeted_status.id").count().orderBy(col("count").desc())

from pyspark.sql import Window
w = Window.partitionBy('retweeted_status.id').orderBy('retweeted_status.id')

rt_count = twitter.withColumn('mono_id', f.row_number().over(w))
root_filt = rt_count.withColumn('rt_count', f.max('mono_id').over(w)).where((f.col('mono_id') <= f.ceil(f.sqrt('rt_count'))) | (f.col('retweeted_status.id').isNull()))

In [13]:
root_filt.select("retweeted_status.id", "mono_id").orderBy(col("retweeted_status.id").desc()).show(10)

+-------------------+-------+
|                 id|mono_id|
+-------------------+-------+
|1268754856501706752|      1|
|1268754776474488834|      1|
|1268754750612201472|      1|
|1268754724615974912|      1|
|1268754663983075329|      1|
|1268754654176968704|      1|
|1268754653392576513|      1|
|1268754644567719937|      1|
|1268754627123728391|      1|
|1268754618613260289|      1|
+-------------------+-------+
only showing top 10 rows



In [14]:
root_filt = root_filt.drop("rt_count").drop("mono_id")

In [15]:
root_filt.cache()
print("Number of Tweets Remaining: ", root_filt.count())

Number of Tweets Remaining:  29380


#### Removing Suspicious Users from Analysis

In [16]:
#Building Bounds
stats = twitter.select(
                f.mean(f.log10(col('user.followers_count') + 1)).alias('followers_mean'),
                f.stddev(f.log10(col('user.followers_count') + 1)).alias('followers_std'),
                f.mean(f.log10(col('user.friends_count') + 1)).alias('friends_mean'),
                f.stddev(f.log10(col('user.friends_count') + 1)).alias('friends_std'),
                f.mean(f.log10(col('user.statuses_count') + 1)).alias('statuses_mean'),
                f.stddev(f.log10(col('user.statuses_count') + 1)).alias('statuses_std')
                ).collect()

In [17]:
user_filt = twitter.where((f.log10(f.col('user.followers_count')) <= stats[0]['followers_mean'] + 3*stats[0]['followers_std']) & \
                        (f.log10(f.col('user.friends_count')) <= stats[0]['friends_mean'] + 3*stats[0]['friends_std']) & \
                        (f.log10(f.col('user.statuses_count')) <= stats[0]['statuses_mean'] + 3*stats[0]['statuses_std']) & \
                        (f.log10(f.col('user.followers_count')) >= stats[0]['followers_mean'] - 3*stats[0]['followers_std']) & \
                        (f.log10(f.col('user.friends_count')) >= stats[0]['friends_mean'] - 3*stats[0]['friends_std']) & \
                        (f.log10(f.col('user.statuses_count')) >= stats[0]['statuses_mean'] - 3*stats[0]['statuses_std']))

In [18]:
user_filt.cache()
print("Number of Tweets Remaining: ", user_filt.count())

Number of Tweets Remaining:  52723


#### Intersect of Both Datasets

In [19]:
#This is a very costly operation on large datasets (should only be used as neccesarry)
# urt_root = user_filt.intersect(root_filt) 

In [20]:
#In fact, you can just find the lesser of the two, and keep only values in column in present in the next column
if user_filt.count() > root_filt.count():
    urt_root = user_filt.join(root_filt,["id"], how='leftsemi')
else:
    urt_root = root_filt.join(user_filt,["id"], how='leftsemi')

In [21]:
print("Number of Tweets Remaining: ", urt_root.count())

Number of Tweets Remaining:  28474


In [22]:
#Clear Cache
user_filt.unpersist()
root_filt.unpersist()
print("unpersist")
#Persist New Data
urt_root.cache()
print("persist")

unpersist
persist


#### Beginning Natural Language Processing Task

In [23]:
from sparknlp.annotator import ViveknSentimentApproach, SentimentDetector, SentenceDetector, Stemmer, Lemmatizer, NGramGenerator, TextMatcher
from textblob import TextBlob
from pyspark.sql.functions import udf
import pyspark.sql.functions as f

In [24]:
#We need to get full_text or text or the original RT Text depending on the type
text_ready = urt_root.withColumn('full_tweet_text', f.when(~f.col('retweeted_status.extended_tweet.full_text').isNull(), f.col('retweeted_status.extended_tweet.full_text')) \
                                                .when(~f.col('retweeted_status.text').isNull(), f.col('retweeted_status.text')) \
                                                .when(~f.col('extended_tweet.full_text').isNull(), f.col('extended_tweet.full_text')) \
                                                .otherwise(f.col('text')))

In [25]:
# !wget https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt

In [26]:
documentAssembler = DocumentAssembler() \
     .setInputCol('full_tweet_text') \
     .setOutputCol('document') \
     .setCleanupMode('shrink')

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("token")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)
     #.setStopWords(["no", "without"]) (e.g. read a list of words from a txt)

normalizer = Normalizer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("normalized")\
    .setLowercase(True)\
    .setCleanupPatterns(["[^\w\d\s]"]) # remove punctuations (keep alphanumeric chars)
    # if we don't set CleanupPatterns, it will only keep alphabet letters ([^A-Za-z])
    
stemmer = Stemmer() \
    .setInputCols(["normalized"]) \
    .setOutputCol("stem")

lemmatizer = Lemmatizer() \
    .setInputCols(["normalized"]) \
    .setOutputCol("lemma") \
    .setDictionary("AntBNC_lemmas_ver_001.txt", value_delimiter ="\t", key_delimiter = "->")

bigram = NGramGenerator() \
    .setInputCols(['lemma']) \
    .setOutputCol("ngram_2") \
    .setN(2) \
    .setEnableCumulative(False) \
    .setDelimiter(" ")

trigram = NGramGenerator() \
    .setInputCols(['lemma']) \
    .setOutputCol("ngram_3") \
    .setN(3) \
    .setEnableCumulative(False) \
    .setDelimiter(" ")

quadgram = NGramGenerator() \
    .setInputCols(['lemma']) \
    .setOutputCol("ngram_4") \
    .setN(4) \
    .setEnableCumulative(False) \
    .setDelimiter(" ")
    
nlpPipeline = Pipeline(stages=[
 documentAssembler, 
 tokenizer,
 stopwords_cleaner,
 normalizer,
 lemmatizer,
 bigram,
 trigram,
 quadgram
 ])

In [27]:
doc_df = documentAssembler.transform(text_ready)
# token_df = tokenizer.fit(doc_df).transform(doc_df)
# token_df.select('token.result').show(3, truncate=False)

In [28]:
# empty_df = spark.createDataFrame([['']]).toDF("full_tweet_text")

pipelineModel = nlpPipeline.fit(doc_df)

In [29]:
result = pipelineModel.transform(doc_df)

In [30]:
result = result.withColumn("bigrams", f.col("ngram_2.result"))
result = result.withColumn("trigrams", f.col("ngram_3.result"))
result = result.withColumn("quadgrams", f.col("ngram_4.result"))

##### We need to write code for the n-gram detection
##### I say we calculate the co-occurence matrix between terms, and take the top 20% of those terms as the co-occurence dictionary

In [31]:
from pyspark.ml.feature import CountVectorizer

In [32]:
#Step 1 is to (in one pass-through), create a vector 
# (Count Vectorizer of N-grams, taking n-grams that only occur above a certain threshold)
bigram_cv = CountVectorizer(inputCol="bigrams", outputCol="bigram_feature", minDF = 0.0008, binary = True)
trigram_cv = CountVectorizer(inputCol="trigrams", outputCol="trigram_feature", minDF = 0.0008, binary = True)
quadgram_cv = CountVectorizer(inputCol="quadgrams", outputCol="quadgram_feature", minDF = 0.0008, binary = True)

In [33]:
#Step 2 is to count co-occurence + populate the matrix
bigram_cv_mod = bigram_cv.fit(result)
trigram_cv_mod = trigram_cv.fit(result)
quadgram_cv_mod = quadgram_cv.fit(result)

In [34]:
bigram_vectors = bigram_cv_mod.transform(result)
trigram_vectors = trigram_cv_mod.transform(result)
quadgram_vectors = quadgram_cv_mod.transform(result)

In [35]:
#Hyperparameters
# print("Vocab Size: ", ngram_cv_mod.getVocabSize())
# print("Max Ngram Count: ", ngram_cv_mod.getMaxDF())
# print("Ngram: ", ngram_cv_mod.getMinDF())

In [36]:
#These are great functions to call if you're confused about the functionality of certain parameters
# ngram_cv_mod.explainParams()
# ngram_cv_mod.extractParamMap()
print(bigram_cv_mod.vocabulary[-30:])
print(trigram_cv_mod.vocabulary[0:30])
print(quadgram_cv_mod.vocabulary[0:30])
print("Number of Bigrams: " ,len(bigram_cv_mod.vocabulary))
print("Number of Trigrams: " ,len(trigram_cv_mod.vocabulary))
print("Number of Quadgrams: " ,len(quadgram_cv_mod.vocabulary))

['racist white', 'pay attention', 'crack skull', 'one black', '8 pm', 'protestor one', 'serious stable', 'say defund', 'day george', 'medic help', 'animal cross', 'many police', 'keep go', 'come back', 'head hit', 'fun fact', 'cop riot', 'floyd sign', 'fuck happen', 'stop kill', 'plan parenthood', 'take time', 'stay home', '8 year', 'trump campaign', 'bill de', 'first place', 'cop buffalo', 'past curfew', 'completely peaceful']
['black life matter', 'buffalo police department', 'police department fire', 'department fire police', 'office aaron torgalski', 'police office aaron', 'fire police office', 'aaron torgalski sign', 'torgalski sign petition', 'suspend without pay', 'new york city', 'year old girl', 'protest police brutality', 'shove elderly man', 'death george floyd', '10 year old', 'name jared campbell', 'officer suspend without', 'badge number 8470', 'campbell badge number', 'maced 10 year', 'jared campbell badge', 'report name jared', 'number 8470 spread', '8470 spread info', 

In [37]:
large_list = quadgram_cv_mod.vocabulary + trigram_cv_mod.vocabulary + bigram_cv_mod.vocabulary

In [38]:
#Outputting bigrams, trigrams, and quadrgams
# with open('bigrams.txt', 'w') as fbi:
#     for item in bigram_cv_mod.vocabulary:
#         fbi.write("%s\n" % item)

# with open('trigrams.txt', 'w') as ftri:
#     for item in trigram_cv_mod.vocabulary:
#         ftri.write("%s\n" % item)
        
# with open('quadgrams.txt', 'w') as fquad:
#     for item in quadgram_cv_mod.vocabulary:
#         fquad.write("%s\n" % item)

In [39]:
# Write function to compare join vocabulary
# First build list of n-grams from the n-gram to compare with others
# Sample Data Structure
# [((0), ['police_act']), ((1), ['justice_emerald']), ..., ((10), ['dc_protest'])]
# [((0,1), ['police_act', 'justice_emerald']), ((1, 2), ['justice_emerald', 'life_take']), ...]
# in order to build this, you need to remember that 
# Orrrr....you just go backwards - you replace backwards...s.t. the replacements will be done without worry...
# Clearly the superior solution ^

In [None]:
with open('ngrams.txt', 'w') as fout:
    for item in pre_ngrams:
        fout.write("%s\n" % item)

In [None]:
from NgramReplacementDuhhhhhhhhh import NGramReplacer
from sparknlp.base import TokenAssembler

In [40]:
documentAssembler = DocumentAssembler() \
     .setInputCol('full_tweet_text') \
     .setOutputCol('document') \
     .setCleanupMode('shrink')

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("token")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)
     #.setStopWords(["no", "without"]) (e.g. read a list of words from a txt)

normalizer = Normalizer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("normalized")\
    .setLowercase(True)\
    .setCleanupPatterns(["[^\w\d\s]"]) # remove punctuations (keep alphanumeric chars)
    # if we don't set CleanupPatterns, it will only keep alphabet letters ([^A-Za-z])
    
stemmer = Stemmer() \
    .setInputCols(["normalized"]) \
    .setOutputCol("stem")

lemmatizer = Lemmatizer() \
    .setInputCols(["normalized"]) \
    .setOutputCol("lemma") \
    .setDictionary("AntBNC_lemmas_ver_001.txt", value_delimiter ="\t", key_delimiter = "->")

finisher = Finisher() \
    .setInputCols(["lemma"]) \
    .setOutputCols("finished") \
    .setCleanAnnotations(True) \
    .setIncludeMetadata(False)
    
nlpPipeline = Pipeline(stages=[
 documentAssembler, 
 tokenizer,
 stopwords_cleaner,
 normalizer,
 lemmatizer,
 finisher
 ])

In [41]:
doc_df = documentAssembler.transform(text_ready)

In [42]:
pipelineModel = nlpPipeline.fit(doc_df)

In [43]:
result = pipelineModel.transform(doc_df)

In [44]:
result_fin = result.withColumn("final", f.concat_ws(" ", "finished"))

In [117]:
documentAssembler = DocumentAssembler() \
     .setInputCol('final') \
     .setOutputCol('document') \
     .setCleanupMode('shrink')

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token") \
    .setExceptions(large_list)

# finisher = Finisher() \
#     .setInputCols(["token"]) \
#     .setOutputCols("finished") \
#     .setCleanAnnotations(True) \
#     .setIncludeMetadata(False)

nlpPipeline = Pipeline(stages = [
    documentAssembler,
    tokenizer
#     finisher
])

In [118]:
pipelineModelNgram = nlpPipeline.fit(result_fin)

In [119]:
result_ngram = pipelineModelNgram.transform(result_fin)

In [120]:
# result_ngram.select("matched_ngrams.result", "token.result").show(100, truncate = 50)
lda_df = result_ngram.withColumn("lda_input", f.col("token.result"))
lda_df.select("lda_input").show(20, truncate = 50)

+--------------------------------------------------+
|                                         lda_input|
+--------------------------------------------------+
|[juggernautbg, funny, part, clip, every single,...|
|[seattle, lift, curfew, cop havent, tear gass, ...|
|                   [twitter, unite, hurt, elderly]|
|[fact, month ago, many, centrist, dem, push, gu...|
|                                     [hear, story]|
|[cop dont need well, pay, teacher, healthcare, ...|
|[even, individual, one, make, extremely, racist...|
|[usually, save, fuck, guy, segment, podcast, of...|
|[it, personal, mission, white, minnesotans, ame...|
|[shock, video, lapd officer see, strike, protes...|
|[281, nyc, notice, tactic, kettle, protestor, 8...|
|[president, aka, chicken, little, literally, wa...|
|[many video police, violently, attack, peaceful...|
|[white house, look like, prison, httpstcofyoc4t...|
|[see, speechless, get, everyone know right poli...|
|[protestor, yell, police, you, ancestor, prou

In [121]:
# result_fin = result_ngram.withColumn("final", f.concat_ws(" ", "token.result"))

### Code for Sentiment Analysis

In [122]:
# import pyspark.sql.types as Types

# def get_sent(x):
#     import textblob
#     res = textblob.TextBlob(x).sentiment[0]
#     return res

# sentiment = f.udf(lambda x : get_sent(x), Types.DoubleType())

# tweets = result_fin.withColumn('sentiment',sentiment(f.col("final")))

# TextBlob("My name is alexander and I'm very hungry").sentiment[0]

In [123]:
#Try with small sample
# result_sample = result_fin.sample(False, 0.01)
# tweets = result_sample.withColumn('sentiment',sentiment(f.col("final")))

In [124]:
##Function to get dtype
# def get_dtype(df,colname):
#     return [dtype for name, dtype in df.dtypes if name == colname][0]

# get_dtype(result_fin,'final')

#### Code for Topic Modeling

In [125]:
#What's left? Attach LDA to the end (There is some preprocessing work left but for now, this will work to write)
#Operating on subset of actual data
lda_sample = lda_df.sample(False, 0.25)

In [126]:
print("Sample Size: ", lda_sample.count())

Sample Size:  7074


In [127]:
#Persist
lda_sample.cache()
print()




In [136]:
#Getting Vector Representation of the dataset
from pyspark.ml.feature import CountVectorizer
# Optional Parameter:  vocabSize = 10000
lda_vect = CountVectorizer(inputCol="lda_input", outputCol="features", maxDF = 0.5, minDF = 0.0008, binary = True)

In [143]:
# lda_vect_mod = lda_vect.fit(lda_sample)

In [138]:
print("Vocab Size: ", len(lda_vect_mod.vocabulary))

Vocab Size:  2655


In [145]:
# lda_vect_mod.save("lda_vectorizer_model")
type(lda_vect_mod)

pyspark.ml.feature.CountVectorizerModel

In [149]:
# lda_vect_mod = CountVectorizer.load("lda_vectorizer_model")
lda_vect_mod = lda_vect_mod.setInputCol("lda_input")
lda_vect_mod = lda_vect_mod.setOutputCol("features")

In [150]:
lda_features = lda_vect_mod.transform(lda_sample)

In [152]:
# lda_features.printSchema()

In [153]:
from pyspark.ml.clustering import LDA

In [154]:
lda = LDA(k = 40, optimizer = "online")
lda.setMaxIter(100)

LDA_177d0501b9ef

In [155]:
lda_model = lda.fit(lda_features)

In [156]:
ll = lda_model.logLikelihood(lda_features)
lp = lda_model.logPerplexity(lda_features)

In [157]:
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -541590.1134235631
The upper bound on perplexity: 8.521329097087072


In [158]:
topics = lda_model.describeTopics(3)

In [160]:
topics.show(truncate = False)

+-----+------------------+-------------------------------------------------------------------+
|topic|termIndices       |termWeights                                                        |
+-----+------------------+-------------------------------------------------------------------+
|0    |[1571, 1890, 1112]|[0.007253373679139064, 5.028304972047716E-4, 4.7570577365756443E-4]|
|1    |[744, 2441, 2550] |[0.03221213654352879, 0.0224194382642429, 0.019918551127648416]    |
|2    |[3, 42, 11]       |[0.0344381894702296, 0.011558164734669207, 0.010580260573407052]   |
|3    |[483, 147, 951]   |[0.029878000482313927, 0.027850941489825736, 0.025493516560262445] |
|4    |[89, 357, 1012]   |[0.0334416545991758, 0.03146388405278239, 0.02693706636790752]     |
|5    |[113, 73, 276]    |[0.039375281987895465, 0.0369442358746812, 0.022985886225573945]   |
|6    |[1455, 826, 52]   |[0.028197715863123397, 0.025665659429518744, 0.023303554586465786] |
|7    |[2216, 159, 484]  |[6.238828173157206E-4, 5

In [161]:
transformed = lda_model.transform(lda_features)

In [175]:
# transformed.select("topicDistribution").show(truncate=True)
# transformed.show()

In [172]:
print("Distributed: ", lda_model.isDistributed())
print("Vocab Size: ", lda_model.vocabSize())

Distributed:  False
Vocab Size:  2655


In [174]:
lda_model.describeTopics().show()

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[1571, 1890, 1112...|[0.00725337367913...|
|    1|[744, 2441, 2550,...|[0.03221213654352...|
|    2|[3, 42, 11, 7, 0,...|[0.03443818947022...|
|    3|[483, 147, 951, 1...|[0.02987800048231...|
|    4|[89, 357, 1012, 1...|[0.03344165459917...|
|    5|[113, 73, 276, 18...|[0.03937528198789...|
|    6|[1455, 826, 52, 6...|[0.02819771586312...|
|    7|[2216, 159, 484, ...|[6.23882817315720...|
|    8|[468, 707, 519, 4...|[0.03660616378153...|
|    9|[907, 890, 493, 1...|[0.04733648548689...|
|   10|[315, 1385, 1493,...|[0.05992097395831...|
|   11|[247, 94, 1099, 1...|[0.02948385005780...|
|   12|[141, 976, 49, 19...|[9.08997182958109...|
|   13|[1635, 875, 637, ...|[0.03387205293652...|
|   14|[2598, 1709, 733,...|[0.02413715365103...|
|   15|[685, 374, 2636, ...|[0.01873666704853...|
|   16|[887, 205, 170, 1...|[0.05123778656875...|


In [199]:
wordNumbers = 5  
topicIndices = lda_model.describeTopics(maxTermsPerTopic = wordNumbers)
vocabArray = lda_vect_mod.vocabulary

In [200]:
topicIndices = topicIndices.select("termIndices").collect()

In [204]:
print(topic_render(topicIndices[0]))

delhi
cop bad
cost
btw
office
['delhi', 'cop bad', 'cost', 'btw', 'office']


In [209]:
def topic_render(topic):
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result

In [214]:
def print_topics(topicIndices):
    for topic in range(len(topicIndices)):
        print ("Topic " + str(topic) + ":")
        print(topic_render(topicIndices[topic]))
        print()

In [216]:
# print_topics(topicIndices)

In [None]:
#Done with First Attempt at Apache Spark LDA Pipeline
#Let's make a work schedule