### This is based on: https://github.com/maobedkova/TopicModelling_PySpark_SparkNLP

In [20]:
%config Completer.use_jedi = False
# https://stackoverflow.com/questions/40536560/ipython-and-jupyter-autocomplete-not-working
%load_ext autoreload
%autoreload 1
%aimport lda_pipeline

import sparknlp
import pyspark.sql.functions as F

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [21]:
data_path = "reddit_wsb.csv"

In [22]:
spark = sparknlp.start()

 In order to set multiline=True, I had to update my system so that Spark would use Java 8, not Java 11. Even still, the column body containing commas within quotes containing quotes, and this confused the csv parser. Solved following https://stackoverflow.com/questions/40413526/reading-csv-files-with-quoted-fields-containing-embedded-commas. In Arch, it was enought to run "sudo archlinux-java set java-8-openjdk/" after installing jdk8-openjdk.

In [33]:
# Note: Converting from Pandas df via df = spark.createDataFrame(df_pd) gives
# >> WARN  TaskSetManager:66 - Stage 2 contains a task of very large size 
# >> (1473 KB). The maximum recommended task size is 100 KB.

df = spark.read.csv(data_path, 
                    header=True,
                    multiLine=True, 
                    quote="\"", 
                    escape="\"")
df = df.sample(withReplacement=False, fraction=0.05, seed=1); print(df)
# print(f'{df.where(df["timestamp"].isNull()).count()} null timestamp values.')

# combine text columns and drop unwanted columns
df = (
    df.withColumn("text", 
               F.concat_ws(". ", df.title, df.body))
 .drop("title", "body", "url", "comms_num", "created")
)


texts = df.select("text")

DataFrame[title: string, score: string, id: string, url: string, comms_num: string, created: string, body: string, timestamp: string]


In [None]:
pipeline = lda_pipeline.build_pipeline()
processed_texts = pipeline.fit(texts).transform(texts)
print(processed_texts)

# for fair comparison with SpaCy below, should build pandas dataframe.
# will throw TaskSetManager:66 - Stage 4 contains a task of very large size
# df_post = processed_texts.toPandas()  
# df_post

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]

In [32]:
L = lda_pipeline.STOP_WORDS
L.sort()
print(L)

["'d", "'ll", "'m", "'re", "'s", "'ve", 'a', 'about', 'above', 'across', 'after', 'afterwards', 'again', 'against', 'all', 'almost', 'alone', 'along', 'already', 'also', 'although', 'always', 'am', 'among', 'amongst', 'amount', 'an', 'and', 'another', 'any', 'anyhow', 'anyone', 'anything', 'anyway', 'anywhere', 'are', 'around', 'as', 'at', 'back', 'be', 'became', 'because', 'become', 'becomes', 'becoming', 'been', 'before', 'beforehand', 'behind', 'being', 'below', 'beside', 'besides', 'between', 'beyond', 'both', 'bottom', 'but', 'by', 'ca', 'call', 'can', 'cannot', 'could', 'did', 'do', 'does', 'doing', 'done', 'down', 'due', 'during', 'each', 'eight', 'either', 'eleven', 'else', 'elsewhere', 'empty', 'enough', 'even', 'ever', 'every', 'everyone', 'everything', 'everywhere', 'except', 'few', 'fifteen', 'fifty', 'first', 'five', 'for', 'former', 'formerly', 'forty', 'four', 'from', 'front', 'full', 'further', 'get', 'give', 'go', 'had', 'has', 'have', 'he', 'hence', 'her', 'here', 'he

Some things to deal with:
   - ✓ long urls
   - repeated characters as in "holdddddd" and "woooooo"

In [29]:
processed_texts.select(["finished_unigrams"]).show(100, truncate=150)

+------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                     finished_unigrams|
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|[exit, system, ceo, nasdaq, push, halt, trade, investor, chance, recalibrate, position, sec, investigate, broker, disallow, buy, institution, flat,...|
|                                                                                                                                    [wasnt, meme, gme]|
|                                                                                                                            [yall, break, fix, advice]|
|                                                                                 

# Topic modelling

In [13]:
%%time

from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol='finished_unigrams',
                         outputCol='tf_features')
tf_model = tfizer.fit(processed_texts)
tf_result = tf_model.transform(processed_texts)

CPU times: user 10.5 ms, sys: 3.46 ms, total: 14 ms
Wall time: 2.77 s


In [14]:
%%time
from pyspark.ml.feature import IDF
idfizer = IDF(inputCol='tf_features', 
              outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

CPU times: user 10.6 ms, sys: 2.28 ms, total: 12.8 ms
Wall time: 2.66 s


In [15]:
%%time
from pyspark.ml.clustering import LDA
num_topics = 5
max_iter = 10
lda = LDA(k=num_topics, 
          maxIter=max_iter, 
          featuresCol="tf_idf_features")

CPU times: user 2.46 ms, sys: 431 µs, total: 2.89 ms
Wall time: 20.7 ms


In [16]:
%%time
lda_model = lda.fit(tfidf_result)

CPU times: user 9.28 ms, sys: 9.54 ms, total: 18.8 ms
Wall time: 18.4 s


In [17]:
from pyspark.sql import types as T
vocab = tf_model.vocabulary
def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [18]:
num_top_words = 5

topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=150)

+-----+--------------------------------------------------------+
|topic|                                              topicWords|
+-----+--------------------------------------------------------+
|    0|                             [buy, gme, short, hold, xb]|
|    1|                           [', stock, market, it's, gon]|
|    2|[clearinghouse, webull, portfolio, schwab, organization]|
|    3|                      [security, margin, gon, uh, cheap]|
|    4|                         [fund, hedge, money, let, play]|
+-----+--------------------------------------------------------+



## Compare pipeline time usage with spaCy

In [1]:
%config Completer.use_jedi = False
data_path = "reddit_wsb.csv"

from typing import List, Dict, Union
from spacy.tokens import Doc, Token
from spacy.matcher import Matcher

class FilterTextPreprocessing:
    def __init__(self, nlp):
        Doc.set_extension('bow', default=[], force=True)
        Token.set_extension('keep', default=True, force=True)
        
        self.matcher = Matcher(nlp.vocab)
        
        patterns = [
            {"string_id": "stop_word", "pattern": [[{"IS_STOP": True}]]},
            {"string_id": "punctuation", "pattern": [[{"IS_PUNCT": True}]]},
        ]
        
        
        for patt_obj in patterns:
            string_id = patt_obj.get('string_id')
            pattern = patt_obj.get('pattern')
            self.matcher.add(string_id, pattern, on_match=self.on_match)
   
    def on_match(self, matcher, doc, i, matches):
        _, start, end = matches[i]
        for tkn in doc[start:end]:
            tkn._.keep = False
              
    def __call__(self, doc) :
        self.matcher(doc)
        doc._.bow = [tkn.lemma_ for tkn in doc if tkn._.keep]
        return doc
      
#     @classmethod
#     def from_pattern_file(cls, nlp, path) :
#         patterns = read_json(path)
#         return cls(nlp, patterns)

import spacy
from spacy.lang.en import English

nlp = spacy.load("en_core_web_sm")

@English.factory("preprocessor")
def create_preprocessor(nlp, name):
    return FilterTextPreprocessing(nlp)

# nlp.select_pipes(enable=["tagger", "attribute_ruler", "lemmatizer"])
nlp.add_pipe("preprocessor", last=True)
nlp.pipeline

[('tok2vec', <spacy.pipeline.tok2vec.Tok2Vec at 0x7f58d2553d10>),
 ('tagger', <spacy.pipeline.tagger.Tagger at 0x7f58d2569590>),
 ('parser', <spacy.pipeline.dep_parser.DependencyParser at 0x7f58d2830c20>),
 ('ner', <spacy.pipeline.ner.EntityRecognizer at 0x7f58d2830d70>),
 ('attribute_ruler',
  <spacy.pipeline.attributeruler.AttributeRuler at 0x7f58d24b7cd0>),
 ('lemmatizer',
  <spacy.lang.en.lemmatizer.EnglishLemmatizer at 0x7f58d24c6e60>),
 ('preprocessor', <__main__.FilterTextPreprocessing at 0x7f58d27acdd0>)]

In [27]:
%%time
import csv
import pandas as pd

def process(filename):
    with open(filename, "r") as fobj:
        datareader = csv.DictReader(fobj)
        for row in datareader:
            text = " ".join([row["title"],
                              row["body"]])
            yield nlp(text)
            
gen = process(data_path)

words = []
i=0
while True:
    try:
        doc = next(gen)
        words.append(doc._.bow)
    except StopIteration:
        break
    i += 1
    if i%1000 == 0:
        print(f"i = {i}")
    
words =  pd.Series(words)

i = 1000
i = 2000
i = 3000
i = 4000
i = 5000
i = 6000
i = 7000
i = 8000
i = 9000
i = 10000
i = 11000
i = 12000
i = 13000
i = 14000
i = 15000
i = 16000
i = 17000
i = 18000
i = 19000
i = 20000
i = 21000
i = 22000
i = 23000
i = 24000
i = 25000
CPU times: user 6min 7s, sys: 2.59 s, total: 6min 10s
Wall time: 6min 11s


In [28]:
words

0                          [money, send, message, 🚀, 💎, 🙌]
1        [Math, Professor, Scott, Steiner, say, number,...
2        [exit, system, CEO, NASDAQ, push, halt, tradin...
3             [new, SEC, filing, GME, retarded, interpret]
4              [distract, GME, think, AMC, brother, aware]
                               ...                        
25642                                               [sign]
25643                                 [hold, GME, 🚀, 🚀, 🚀]
25644                    [AMC, Yolo, Update, Feb, 3, 2021]
25645                                         [loss, sell]
25646     [post, curiosity, teem, know, store, 👀, 💎, 🖐, 🚀]
Length: 25647, dtype: object

In [42]:
df_post.finished_unigrams

0                                   [money, send, message]
1        [math, professor, scott, steiner, number, spel...
2        [exit, system, ceo, nasdaq, push, halt, trade,...
3               [new, sec, file, gme, retarded, interpret]
4              [distract, gme, think, amc, brother, aware]
                               ...                        
25642                                               [sign]
25643                                          [hold, gme]
25644                             [amc, yolo, update, feb]
25645                                         [loss, sell]
25646           [dont, post, curiosity, teem, know, store]
Name: finished_unigrams, Length: 25647, dtype: object

In [5]:
371/18.6

19.946236559139784

Speed comparison: The sparknlp pipeline took 18.6 seconds, while spaCy took 371 second (20x as long).

## Playing around with Stanza and SpaCy

In [7]:
df_pd = pd.read_csv(data_path,
                 index_col="timestamp", 
                 parse_dates=True, 
                 keep_default_na=False)
# df_pd = df_pd.assign(timestamp=pd.to_datetime(df_pd.timestamp))
df_pd = df_pd[["id", "title", "body"]]
df_pd.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 25647 entries, 2021-01-28 21:37:41 to 2021-02-04 07:54:27
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   id      25647 non-null  object
 1   title   25647 non-null  object
 2   body    25647 non-null  object
dtypes: object(3)
memory usage: 801.5+ KB


In [12]:
bin_size = df.shape[0]//ddf.shape[0]
dfs = [df.iloc[bin_size*i : bin_size*(i+1)] for i in range(ddf.shape[0])]

In [13]:
df0 = dfs[0]
X = df0.iloc[2]
X.title, X.body

('I got in late on GME but I believe in the cause and am willing to lose it all.',
 "You guys are amazing. Thank you for sending GME to the moon! I know I'm going to lose most of my money here because I'll hold the line until the end. Let's send a clear message to wall street with GME, BB, AMC, and any others. I've never day traded before but I'm in it now. 🚀")

In [14]:
import stanza
# stanza.download("en")
nlp = stanza.Pipeline("en")
text = df.iloc[2].body
doc = nlp(text)
d_sent = {0:"-", 1:"Ⓝ", 2:"+"}
for sent in doc.sentences:
    print(d_sent[sent.sentiment], sent.text)

2021-02-14 15:42:21 INFO: Loading these models for language: en (English):
| Processor | Package   |
-------------------------
| tokenize  | combined  |
| pos       | combined  |
| lemma     | combined  |
| depparse  | combined  |
| sentiment | sstplus   |
| ner       | ontonotes |

2021-02-14 15:42:21 INFO: Use device: cpu
2021-02-14 15:42:21 INFO: Loading: tokenize
2021-02-14 15:42:21 INFO: Loading: pos
2021-02-14 15:42:21 INFO: Loading: lemma
2021-02-14 15:42:21 INFO: Loading: depparse
2021-02-14 15:42:21 INFO: Loading: sentiment
2021-02-14 15:42:22 INFO: Loading: ner
2021-02-14 15:42:22 INFO: Done loading processors!


+ You guys are amazing.
+ Thank you for sending GME to the moon!
- I know I'm going to lose most of my money here because I'll hold the line until the end.
Ⓝ Let's send a clear message to wall street with GME, BB, AMC, and any others.
Ⓝ I've never day traded before but I'm in it now.
Ⓝ 🚀


In [15]:
%%time
for s in ["I just love it when the regulators step in.",
          "That was amazingly boring.",
          "That was amazingly tolerable.",
          "At least it wasn't boring."]:
    sentiment = nlp(s).sentences[0].sentiment
    print(d_sent[sentiment], s)

+ I just love it when the regulators step in.
- That was amazingly boring.
+ That was amazingly tolerable.
- At least it wasn't boring.
CPU times: user 1.43 s, sys: 21.1 ms, total: 1.45 s
Wall time: 731 ms


In [16]:
doc = nlp("I knew you were trouble when you walked in!")
for word in doc.sentences[0].words:
    print(word.lemma)

I
know
you
be
trouble
when
you
walk
in
!


In [35]:
import spacy
nlp = spacy.load("en_core_web_sm")

In [129]:
import spacy
text = df.iloc[2].body
d_sent = {0:"-", 1:"Ⓝ", 2:"+"}
doc = nlp(text)
for sent in doc.sents:
    print(sent.sentiment, sent.text)

0.0 You guys are amazing.
0.0 Thank you for sending GME to the moon!
0.0 I know I'm going to lose most of my money here because I'll hold the line until the end.
0.0 Let's send a clear message to wall street with GME, BB, AMC, and any others.
0.0 I've never day traded before
0.0 but I'm in it now.
0.0 🚀


In [47]:
doc = nlp("I knew you were trouble when you walked in!")
for token in doc:
    print(token.lemma_)

I
know
you
be
trouble
when
you
walk
in
!
