# Topic Modelling with PySpark and Spark NLP

This is the tutorial for topic modelling using [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) and [Spark NLP](https://www.johnsnowlabs.com/) libraries. This code could be seen as a complement of Topic Modelling with PySpark and Spark NLP blog post on medium. You could refer to this blog post for more elaborated explanation on what topic modelling is, how to use Spark NLP for NLP pipelines and perform topic modelling with PySpark.

The code shows how to install PySpark and Spark NLP libraries and download a Kaggle dataset to Google Collaboratory. It also illustrates how to build the NLP pipeline with Spark NLP and train a topic model with PySpark.

## Installation

The initial task is to install PySpark and Spark NLP libraries to our Google Collaboratory and set the data up. We can proceed with the following installation. We experiment with the latest versions of PySpark (2.4.5) and Spark NLP (2.4.5) to date. We also install `nltk` package because we will need it in one of the steps of our pipeline.

We are going to use data from [Kaggle](https://www.kaggle.com//). I have chosen [Amazon Musical Instruments Review](https://www.kaggle.com/eswarchandt/amazon-music-reviews) dataset for our topic modelling analysis. The data is not that big to see the benefit of using Spark but it is nice for tutorial purposes.

The data is ready to use. Let's start the Spark session through Spark NLP. If you need a special Spark session setting, you can start Spark session with `SparkSession.builder` from PySpark.

My machine has following configuration...
- 6 cores with 12vCores
- 32GB RAM

Spark Standalone server:
```
cd /opt/softwares/spark-2.4.7-bin-hadoop2.7/

export PYSPARK_PYTHON=/opt/envs/ai4e/bin/python
export PYSPARK_DRIVER_PYTHON=/opt/envs/ai4e/bin/python

sbin/start-all.sh
sbin/stop-all.sh
```
Spark UI: [http://localhost:8080](http://localhost:8080)   
Spark Master URL : spark://IMCHLT276:7077

In [1]:
from pyspark.sql import SparkSession
import sparknlp
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .master("spark://IMCHLT276:7077") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "12") \
    .config("spark.local.dir", "/opt/tmp/spark-temp/") \
    .appName("SparkNLP-LDA") \
    .getOrCreate()
# spark = sparknlp.start()

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:92)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:570)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


## Data

Here we start our topic modelling tutorial. First, we access the downloaded data and read it into Spark dataframe.

In [None]:
data_path = '../data/Musical_instruments_reviews.csv'
data = spark.read.csv(data_path, header=True)

Let's checkout what kind of information is stored in our data.

In [None]:
data.show()

In [None]:
data.columns

For topic modelling, we need only textual data, thus, we create a new dataframe only with the column of interest.

In [6]:
text_col = 'reviewText'
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())

The data that we will use further for the analysis looks as follows:

In [7]:
review_text.limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                                reviewText|
+------------------------------------------------------------------------------------------+
|                                                                          that's just like|
|The product does exactly as it should and is quite affordable.I did not realized it was...|
|The primary job of this device is to block the breath that would otherwise produce a po...|
|Nice windscreen protects my MXL mic and prevents pops. Only thing is that the gooseneck...|
|This pop filter is great. It looks and performs like a studio filter. If you're recordi...|
+------------------------------------------------------------------------------------------+



## Spark NLP pipeline

Here we start our NLP pipeline for the task of topic modelling.

### Basic NLP pipeline

Let's start with basic NLP pipeline that clears the data and gets lemmatized unigrams. To understand how you can use Spark NLP annotators (estimators and transformers) for NLP pipeline, you can refer to [Spark NLP documentation](https://nlp.johnsnowlabs.com/) or a corresponding blog post on Topic Modelling with PySpark and Spark NLP.

We will start with [**DocumentAssembler**](https://nlp.johnsnowlabs.com/docs/en/transformers#documentassembler-getting-data-in) that converts data into Spark NLP annotation format that can be used by Spark NLP annotators.

In [8]:
from sparknlp.base import DocumentAssembler

documentAssembler = DocumentAssembler() \
     .setInputCol(text_col) \
     .setOutputCol('document')

TypeError: 'JavaPackage' object is not callable

Next step is to tokenize data with [**Tokenizer**](https://nlp.johnsnowlabs.com/docs/en/annotators#tokenizer).

In [9]:
from sparknlp.annotator import Tokenizer

tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

TypeError: 'JavaPackage' object is not callable

Further, we clean our data and lowercase it with [**Normalizer**](https://nlp.johnsnowlabs.com/docs/en/annotators#normalizer).

In [None]:
from sparknlp.annotator import Normalizer

normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)

We are going to lemmatize our text with pretrained lemming model provided by Spark NLP. We can access this model with [**LemmatizerModel**](https://nlp.johnsnowlabs.com/docs/en/models#english---models).

In [14]:
from sparknlp.annotator import LemmatizerModel

lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemmatized')

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


Spark NLP doesn't provide stop word list, hence, we will use `nltk` package to download stop words for English.

In [15]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


The downloaded list of stop words we will input into [**StopWordsCleaner**](https://nlp.johnsnowlabs.com/docs/en/annotators#stopwordscleaner) that will remove all such words from our lemmatized text.

In [None]:
from sparknlp.annotator import StopWordsCleaner

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('unigrams') \
     .setStopWords(eng_stopwords)

In addition to unigrams, it is good to use n-grams for topic modelling as well since they help to better refine topics. We can get n-grams with [**NGramGenerator**](https://nlp.johnsnowlabs.com/docs/en/annotators#ngramgenerator) in Spark NLP.

In [None]:
from sparknlp.annotator import NGramGenerator

ngrammer = NGramGenerator() \
    .setInputCols(['lemmatized']) \
    .setOutputCol('ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

We already have our basic NLP pipeline for topic modelling with all necessary steps. However, let's use POS tagger in order to improve our processed data for topic modelling even more with POS tagged data later. For this, we are going to use pretrained POS tagging model provided by Spark NLP. We can access the model with [**PerceptronModel**](https://nlp.johnsnowlabs.com/docs/en/annotators#postagger).

In [18]:
from sparknlp.annotator import PerceptronModel

pos_tagger = PerceptronModel.pretrained('pos_anc') \
    .setInputCols(['document', 'lemmatized']) \
    .setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 4.3 MB
[OK!]


Now we have everything in Spark NLP annotation format. To be able to process the data further, we need to tranform data with [**Finisher**](https://nlp.johnsnowlabs.com/docs/en/transformers#finisher).

In [None]:
from sparknlp.base import Finisher

finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams', 'pos']) \

Now we are ready to input everything into a pipeline. **Pipeline** functionality is accessible with PySpark.

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline() \
     .setStages([documentAssembler,                  
                 tokenizer,
                 normalizer,                  
                 lemmatizer,                  
                 stopwords_cleaner, 
                 pos_tagger,
                 ngrammer,  
                 finisher])

First, we will fit all our estimators and then, transform the data with trained models and transformers.

In [None]:
processed_review = pipeline.fit(review_text).transform(review_text)

Let's look at the data we finally get.

In [22]:
processed_review.limit(5).show()

+--------------------+--------------------+--------------------+--------------------+
|          reviewText|   finished_unigrams|     finished_ngrams|        finished_pos|
+--------------------+--------------------+--------------------+--------------------+
|    that's just like|              [like]|[that, just, like...|        [IN, RB, IN]|
|The product does ...|[product, exactly...|[the, product, do...|[DT, NN, VBP, RB,...|
|The primary job o...|[primary, job, de...|[the, primary, jo...|[DT, JJ, NN, IN, ...|
|Nice windscreen p...|[nice, windscreen...|[nice, windscreen...|[JJ, NN, NN, NNP,...|
|This pop filter i...|[pop, filter, gre...|[this, pop, filte...|[DT, NN, NN, VB, ...|
+--------------------+--------------------+--------------------+--------------------+



### Extended NLP pipeline

Up to now, we have our data in a form of unigrams that are lemmatized, with no stop words in there. I think it is a good idea to incorporate n-grams into our NLP pipeline. We obtained n-grams as one step of our pipeline but now n-grams are messy and have a lot of questionable combinations in there. To tackle this problem, let's filter out strange combinations of words in n-grams based on their POS tags. We can imagine a list of viable combinations like ADJ + NOUN so let's restrict our POS combinations in n-grams to this list. Plus, we can also exclude some POS tags from our unigrams to ensure that we don't use functional words for topic modelling (they can be partially covered by stop words but probably not fully).

Doing this POS-based filtering will significantly reduce the vocabulary size for topic modelling which will speed up the whole processing.

Let's start this processing. First, we need join all our POS tags obtained previously.

In [None]:
from pyspark.sql import types as T

udf_join_arr = F.udf(lambda x: ' '.join(x), T.StringType())
processed_review  = processed_review.withColumn('finished_pos', udf_join_arr(F.col('finished_pos')))

Then we start another Spark NLP pipeline in order to get POS tag n-grams that correspond to word n-grams. We start with convertation into Spark NLP annotation format.

In [None]:
pos_documentAssembler = DocumentAssembler() \
     .setInputCol('finished_pos') \
     .setOutputCol('pos_document')

Then, we tokenize our POS tags.

In [None]:
pos_tokenizer = Tokenizer() \
     .setInputCols(['pos_document']) \
     .setOutputCol('pos')

And generate n-grams from them in the same way we did that for words.

In [None]:
pos_ngrammer = NGramGenerator() \
    .setInputCols(['pos']) \
    .setOutputCol('pos_ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

Lastly, we are ready to get POS tags ngrams with **Finisher**.

In [None]:
pos_finisher = Finisher() \
     .setInputCols(['pos', 'pos_ngrams']) \

We create this new Spark NLP pipeline...

In [None]:
pos_pipeline = Pipeline() \
     .setStages([pos_documentAssembler,                  
                 pos_tokenizer,
                 pos_ngrammer,  
                 pos_finisher])

... and again fit it and transform the data.

In [None]:
processed_review = pos_pipeline.fit(processed_review).transform(processed_review)

Let's look what kind of data we have to operate with.

In [30]:
processed_review.columns

['reviewText',
 'finished_unigrams',
 'finished_ngrams',
 'finished_pos',
 'finished_pos_ngrams']

And these are our word n-grams with their corresponding pos n-grams.

In [31]:
processed_review.select('finished_ngrams', 'finished_pos_ngrams').limit(5).show()

+--------------------+--------------------+
|     finished_ngrams| finished_pos_ngrams|
+--------------------+--------------------+
|[that, just, like...|[IN, RB, IN, IN_R...|
|[the, product, do...|[DT, NN, VBP, RB,...|
|[the, primary, jo...|[DT, JJ, NN, IN, ...|
|[nice, windscreen...|[JJ, NN, NN, NNP,...|
|[this, pop, filte...|[DT, NN, NN, VB, ...|
+--------------------+--------------------+



Now we are ready to filter out not useful for topic modelling analysis POS tags from our data. Let's create the function that does it for unigrams first. We create the custom Python function and then transform it to PySpark UDF to be used on Spark dataframe.

In [None]:
def filter_pos(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if pos in ['JJ', 'NN', 'NNS', 'VB', 'VBP']]

udf_filter_pos = F.udf(filter_pos, T.ArrayType(T.StringType()))

Then, we apply this function on columns with unigrams and their POS tags to get filtered unigrams in a separate dataframe column.

In [None]:
processed_review = processed_review.withColumn('filtered_unigrams',
                                               udf_filter_pos(F.col('finished_unigrams'), 
                                                              F.col('finished_pos')))

That is how our filtered unigrams look like.

In [34]:
processed_review.select('filtered_unigrams').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                         filtered_unigrams|
+------------------------------------------------------------------------------------------+
|                                                                                        []|
|[exactly, quite, even, expectedas, add, one, carry, small, hint, grape, buy, put, next,...|
|[job, device, would, otherwise, pop, allow, reduction, high, frequency, cloth, block, l...|
|[nice, windscreen, protect, mic, prevent, thing, gooseneck, able, hold, require, carefu...|
|               [filter, great, look, perform, studio, youre, eliminate, pop, record, sing]|
+------------------------------------------------------------------------------------------+



It is time to filter out improper POS combinations of n-grams. We create the custom function in the same manner as before. Since we deal with bi- and trigrams, we need to restrict tags for both.

In [None]:
def filter_pos_combs(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if (len(pos.split('_')) == 2 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS']) \
            or (len(pos.split('_')) == 3 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                  pos.split('_')[2] in ['NN', 'NNS'])]
    
udf_filter_pos_combs = F.udf(filter_pos_combs, T.ArrayType(T.StringType()))

And we call the function on word and POS n-grams.

In [None]:
processed_review = processed_review.withColumn('filtered_ngrams',
                                               udf_filter_pos_combs(F.col('finished_ngrams'),
                                                                    F.col('finished_pos_ngrams')))

Below is what we get after filtering for n-grams.

In [37]:
processed_review.select('filtered_ngrams').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                           filtered_ngrams|
+------------------------------------------------------------------------------------------+
|                                                                                        []|
|[be_double, double_screen, add_bonus, small_hint, old_grape, grape_candy, cannot_stop, ...|
|[primary_job, pop_sound, noticeable_reduction, high_frequency, double_cloth, cloth_filt...|
|[nice_windscreen, windscreen_protect, mxl_mic, prevent_pop, require_careful, careful_po...|
|             [pop_filter, be_great, studio_filter, youre_record, record_vocal, get_record]|
+------------------------------------------------------------------------------------------+



Now we have unigrams and n-grams stored in different columns in the dataframe. Let's combine them together.

In [None]:
from pyspark.sql.functions import concat

processed_review = processed_review.withColumn('final', 
                                               concat(F.col('filtered_unigrams'), 
                                                      F.col('filtered_ngrams')))

And this is our final look of the data.

In [39]:
processed_review.select('final').limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                                     final|
+------------------------------------------------------------------------------------------+
|                                                                                        []|
|[exactly, quite, even, expectedas, add, one, carry, small, hint, grape, buy, put, next,...|
|[job, device, would, otherwise, pop, allow, reduction, high, frequency, cloth, block, l...|
|[nice, windscreen, protect, mic, prevent, thing, gooseneck, able, hold, require, carefu...|
|[filter, great, look, perform, studio, youre, eliminate, pop, record, sing, pop_filter,...|
+------------------------------------------------------------------------------------------+



There are additional ways how we could provide cleaner data with Spark NLP for topic modelling analysis. For example: 

* You might want to incorporate [**SentenceDetector**](https://nlp.johnsnowlabs.com/docs/en/annotators#sentencedetector) in order to ignore n-grams on the borders of sentences since tokenization in Spark NLP does not account for sentence borders. 

* [**DependencyParser**](https://nlp.johnsnowlabs.com/docs/en/annotators#dependency-parsers) also could be used to provide more meaningful n-grams, namely the ones with dependency relation.

* Spell checker also could be incorporated at the early steps of the NLP pipeline for less noisy results. There are various options in Spark NLP such as [**NorvigSpellChecker**](https://nlp.johnsnowlabs.com/docs/en/annotators#norvig-spellchecker) and [**SymmetricSpellChecker**](https://nlp.johnsnowlabs.com/docs/en/annotators#symmetric-spellchecker).

However, in this tutorial we will omit these options since they probably will not bring significant improvements fot topic modelling.

## Vectorization 

Now we are set to vectorization of our data. First, we will proceed with **TF** (*term frequency*) vectorization with **CountVectorizer** in PySpark. We fit tf dictionary and then transform the data to vectors of counts.

In [None]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol='final', outputCol='tf_features')
tf_model = tfizer.fit(processed_review)
tf_result = tf_model.transform(processed_review)

After we get TF results, we can account for words that are frequent for all the documents. We can use **IDF** (*inverse document frequency*) to lower score of such words.

In [None]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol='tf_features', outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

## LDA

Finally, we are ready to model topics in our data with [**LDA**](https://spark.apache.org/docs/2.2.0/ml-clustering.html#latent-dirichlet-allocation-lda) (*Latent Dirichlet Allocation*). To use the algorithm, we have to provide the number of topics we presume our data contains and the number of iterations for the LDA algorithm. Then, we initialize the model and train it.

In [None]:
from pyspark.ml.clustering import LDA

num_topics = 6
max_iter = 10

lda = LDA(k=num_topics, maxIter=max_iter, featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)

To be able to see words that characterize the defined topics, we need to convert word ids into actual words with the custom function. This function will again be converted to PySpark UDF to be used on our topic dataframe.

In [None]:
vocab = tf_model.vocabulary

def get_words(token_list):
     return [vocab[token_id] for token_id in token_list]
       
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

Let's define the number of top words per topic we would like to see and extract the words with our function.

In [44]:
num_top_words = 10

topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=90)

+-----+------------------------------------------------------------------------------------+
|topic|                                                                          topicWords|
+-----+------------------------------------------------------------------------------------+
|    0|             [pick, bow, shockmount, store, pm, use, mic, tortex_pick, nice, guitar]|
|    1|                  [pedal, amp, great, sound, tuner, good, cable, use, effect, price]|
|    2|                     [string, use, guitar, sound, one, strap, get, well, work, good]|
|    3|       [mic, use, guitar, good, word, great, quality, word_word, work, good_quality]|
|    4|                    [use, string, pick, pedal, one, good, buy, guitar, sound, great]|
|    5|[guitar, harmonica, yamaha, use, volume_pedal, string, record, tuner, battery, work]|
+-----+------------------------------------------------------------------------------------+



And that's it! We are done with topic modelling pipeline on review data. I hope this tutorial was somehow helpful for you. 

Good luck with your own experiments!

