# Topic Modelling on store reviews

NLP basically works in improving the unstructured data and it resolves ambiguity in language and so mainly its application works under Marketting and in advertising field. Marketers and Advertisers are our basic audience.  

Businesses are heavily utilizing NLP techniques to analyze posts and understand their customers' profiles and requirements. NLP has also made it easier for companies to understand customer pain points.  

Topic modelling is one of the unsupervised machine learning techniques which uses clustering to discover latent variables or hidden structures in the data. It is a method for locating topics in enormous amounts of text, in other words.  

We"re gonna build an NLP pipeline with Spark NLP and train a topic model with PySpark.

## Installation

The initial task is to install PySpark and Spark NLP libraries to our Google Collaboratory and set the data up. We can proceed with the following installation. We also install `nltk` package because we will need it in one of the steps of our pipeline.

In [1]:
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark

# Install Spark NLP
! pip install --ignore-installed spark-nlp

# Install nltk
! pip install nltk

openjdk version "1.8.0_352"
OpenJDK Runtime Environment (build 1.8.0_352-8u352-ga-1~20.04-b08)
OpenJDK 64-Bit Server VM (build 25.352-b08, mixed mode)
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Using cached pyspark-3.3.1-py2.py3-none-any.whl
Collecting py4j==0.10.9.5
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting spark-nlp
  Using cached spark_nlp-4.2.7-py2.py3-none-any.whl (453 kB)
Installing collected packages: spark-nlp
Successfully installed spark-nlp-4.2.7
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


The data is ready to use. Let's start the Spark session through Spark NLP. If you need a special Spark session setting, you can start Spark session with `SparkSession.builder` from PySpark.

In [3]:
import sparknlp

spark = sparknlp.start()

## Data

We are going to use data from kaggle for our topic modelling analysis.

First, we access the downloaded data and read it into Spark dataframe.

In [4]:
from pyspark.sql import functions as F

data_path = "/content/drive/MyDrive/Reviews.csv"
data = spark.read.csv(data_path, header=True)
import pandas as pd
df = pd.read_csv(data_path)
df = df.head(10000)
print(df)

         Id   ProductId          UserId                      ProfileName  \
0         1  B001E4KFG0  A3SGXH7AUHU8GW                       delmartian   
1         2  B00813GRG4  A1D87F6ZCVE5NK                           dll pa   
2         3  B000LQOCH0   ABXLMWJIXXAIN  Natalia Corres "Natalia Corres"   
3         4  B000UA0QIQ  A395BORC6FGVXV                             Karl   
4         5  B006K2ZZ7K  A1UQRSCLF8GW1T    Michael D. Bigham "M. Wassir"   
...     ...         ...             ...                              ...   
9995   9996  B000P41A28  A3A63RACXR1XIL            A. Boodhoo "deaddodo"   
9996   9997  B000P41A28    A5VVRGL8JA7R                             Adam   
9997   9998  B000P41A28  A2TGDTJ8YCU6PD                          geena77   
9998   9999  B000P41A28   AUV4GIZZE693O              Susan Coe "sueysis"   
9999  10000  B000P41A28   A82WIMR4RSVLI                       Emrose mom   

      HelpfulnessNumerator  HelpfulnessDenominator  Score        Time  \
0             

Let's checkout what kind of information is stored in our data.

In [5]:
data.columns

['Id',
 'ProductId',
 'UserId',
 'ProfileName',
 'HelpfulnessNumerator',
 'HelpfulnessDenominator',
 'Score',
 'Time',
 'Summary',
 'Text']

In [6]:
# lets check the size of our dataset

data.count()

568454

In [7]:
# we will be trimming our dataset to save on the resources

data = data.limit(10000)
data.count()

10000

For topic modelling, we need only textual data, thus, we create a new dataframe only with the column of interest.

In [8]:
text_col = "Text"
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())

The data that we will use further for the analysis looks as follows:

In [9]:
review_text.limit(5).show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                                Text|
+----------------------------------------------------------------------------------------------------+
|I have bought several of the Vitality canned dog food products and have found them all to be of g...|
|"Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted...|
|"This is a confection that has been around a few centuries.  It is a light, pillowy citrus gelati...|
|If you are looking for the secret ingredient in Robitussin I believe I have found it.  I got this...|
|Great taffy at a great price.  There was a wide assortment of yummy taffy.  Delivery was very qui...|
+----------------------------------------------------------------------------------------------------+



## Spark NLP pipeline

Here we start our NLP pipeline for the task of topic modelling.

### Basic NLP pipeline

Let's start with basic NLP pipeline that clears the data and gets lemmatized unigrams.

We will start with **DocumentAssembler** that converts data into Spark NLP annotation format that can be used by Spark NLP annotators.

In [10]:
from sparknlp.base import DocumentAssembler

documentAssembler = DocumentAssembler() \
     .setInputCol(text_col) \
     .setOutputCol("document")

Next step is to tokenize data with **Tokenizer**

In [11]:
from sparknlp.annotator import Tokenizer

tokenizer = Tokenizer() \
     .setInputCols(["document"]) \
     .setOutputCol("tokenized")

Further, we clean our data and lowercase it with **Normalizer**

In [12]:
from sparknlp.annotator import Normalizer

normalizer = Normalizer() \
     .setInputCols(["tokenized"]) \
     .setOutputCol("normalized") \
     .setLowercase(True)

We are going to lemmatize our text with pretrained lemming model provided by Spark NLP. We can access this model with **LemmatizerModel**

In [13]:
from sparknlp.annotator import LemmatizerModel

lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(["normalized"]) \
     .setOutputCol("lemmatized")

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


Spark NLP doesn"t provide stop word list, hence, we will use `nltk` package to download stop words for English.

In [14]:
import nltk
nltk.download("stopwords")

from nltk.corpus import stopwords

eng_stopwords = stopwords.words("english")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


The downloaded list of stop words we will input into **StopWordsCleaner** that will remove all such words from our lemmatized text.

In [15]:
from sparknlp.annotator import StopWordsCleaner

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(["lemmatized"]) \
     .setOutputCol("unigrams") \
     .setStopWords(eng_stopwords)

In addition to unigrams, it is good to use n-grams for topic modelling as well since they help to better refine topics. We can get n-grams with **NGramGenerator** in Spark NLP.

In [16]:
from sparknlp.annotator import NGramGenerator

ngrammer = NGramGenerator() \
    .setInputCols(["lemmatized"]) \
    .setOutputCol("ngrams") \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter("_")

We already have our basic NLP pipeline for topic modelling with all necessary steps. However, let's use POS tagger in order to improve our processed data for topic modelling even more with POS tagged data later. For this, we are going to use pretrained POS tagging model provided by Spark NLP.

In [17]:
from sparknlp.annotator import PerceptronModel

pos_tagger = PerceptronModel.pretrained("pos_anc") \
    .setInputCols(["document", "lemmatized"]) \
    .setOutputCol("pos")

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]


Now we have everything in Spark NLP annotation format. To be able to process the data further, we need to tranform data with **Finisher**

In [18]:
from sparknlp.base import Finisher

finisher = Finisher().setInputCols(["unigrams", "ngrams", "pos"])

Now we are ready to input everything into a pipeline. **Pipeline** functionality is accessible with PySpark.

In [19]:
from pyspark.ml import Pipeline

pipeline = Pipeline() \
     .setStages([documentAssembler,                  
                 tokenizer,
                 normalizer,                  
                 lemmatizer,                  
                 stopwords_cleaner, 
                 pos_tagger,
                 ngrammer,  
                 finisher])

First, we will fit all our estimators and then, transform the data with trained models and transformers.

In [20]:
processed_review = pipeline.fit(review_text).transform(review_text)

Let's look at the data we finally get.

In [21]:
processed_review.limit(5).show()

+--------------------+--------------------+--------------------+--------------------+
|                Text|   finished_unigrams|     finished_ngrams|        finished_pos|
+--------------------+--------------------+--------------------+--------------------+
|I have bought sev...|[buy, several, vi...|[i, have, buy, se...|[NNP, VBP, VBN, J...|
|"Product arrived ...|[product, arrive,...|[product, arrive,...|[NN, JJ, NN, IN, ...|
|"This is a confec...|[confection, arou...|[this, be, a, con...|[DT, VB, DT, NN, ...|
|If you are lookin...|[look, secret, in...|[if, you, be, loo...|[IN, PRP, VB, VB,...|
|Great taffy at a ...|[great, taffy, gr...|[great, taffy, at...|[JJ, NN, IN, DT, ...|
+--------------------+--------------------+--------------------+--------------------+



### Extended NLP pipeline

We have our data in a form of unigrams that are lemmatized, with no stop words in there. It is a good idea to incorporate n-grams into our NLP pipeline. We obtained n-grams as one step of our pipeline but now n-grams are messy and have a lot of questionable combinations in there. To tackle this problem, let"s filter out strange combinations of words in n-grams based on their POS tags. We can imagine a list of viable combinations like ADJ + NOUN so let"s restrict our POS combinations in n-grams to this list. Plus, we can also exclude some POS tags from our unigrams to ensure that we don"t use functional words for topic modelling (they can be partially covered by stop words but probably not fully).

Doing this POS-based filtering will significantly reduce the vocabulary size for topic modelling which will speed up the whole processing.

For this processing, first, we need join all our POS tags obtained previously.

In [22]:
from pyspark.sql import types as T

udf_join_arr = F.udf(lambda x: " ".join(x), T.StringType())
processed_review  = processed_review.withColumn("finished_pos", udf_join_arr(F.col("finished_pos")))

Then we start another Spark NLP pipeline in order to get POS tag n-grams that correspond to word n-grams. We start with convertation into Spark NLP annotation format.

In [23]:
pos_documentAssembler = DocumentAssembler() \
     .setInputCol("finished_pos") \
     .setOutputCol("pos_document")

Then, we tokenize our POS tags.

In [24]:
pos_tokenizer = Tokenizer() \
     .setInputCols(["pos_document"]) \
     .setOutputCol("pos")

And generate n-grams from them in the same way we did that for words.

In [25]:
pos_ngrammer = NGramGenerator() \
    .setInputCols(["pos"]) \
    .setOutputCol("pos_ngrams") \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter("_")

Lastly, we are ready to get POS tags ngrams with **Finisher**.

In [26]:
pos_finisher = Finisher().setInputCols(["pos", "pos_ngrams"])

We create this new Spark NLP pipeline...

In [27]:
pos_pipeline = Pipeline() \
     .setStages([pos_documentAssembler,                  
                 pos_tokenizer,
                 pos_ngrammer,  
                 pos_finisher])

... and again fit it and transform the data.

In [28]:
processed_review = pos_pipeline.fit(processed_review).transform(processed_review)

Let's look what kind of data we have to operate with.

In [29]:
processed_review.columns

['Text',
 'finished_unigrams',
 'finished_ngrams',
 'finished_pos',
 'finished_pos_ngrams']

And these are our word n-grams with their corresponding pos n-grams.

In [30]:
processed_review.select("finished_ngrams", "finished_pos_ngrams").limit(5).show()

+--------------------+--------------------+
|     finished_ngrams| finished_pos_ngrams|
+--------------------+--------------------+
|[i, have, buy, se...|[NNP, VBP, VBN, J...|
|[product, arrive,...|[NN, JJ, NN, IN, ...|
|[this, be, a, con...|[DT, VB, DT, NN, ...|
|[if, you, be, loo...|[IN, PRP, VB, VB,...|
|[great, taffy, at...|[JJ, NN, IN, DT, ...|
+--------------------+--------------------+



Now we are ready to filter out not useful for topic modelling analysis POS tags from our data. Let's create the function that does it for unigrams first. We create the custom Python function and then transform it to PySpark UDF to be used on Spark dataframe.

In [31]:
def filter_pos(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if pos in ["JJ", "NN", "NNS", "VB", "VBP"]]

udf_filter_pos = F.udf(filter_pos, T.ArrayType(T.StringType()))

Then, we apply this function on columns with unigrams and their POS tags to get filtered unigrams in a separate dataframe column.

In [32]:
processed_review = processed_review.withColumn("filtered_unigrams",
                                               udf_filter_pos(F.col("finished_unigrams"), 
                                                              F.col("finished_pos")))

That is how our filtered unigrams look like.

In [33]:
processed_review.select("filtered_unigrams").limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                         filtered_unigrams|
+------------------------------------------------------------------------------------------+
|[several, dog, find, quality, product, look, stew, process, labrador, appreciate, product]|
|[product, arrive, label, salt, peanutsthe, peanut, actually, small, unsalted, sure, err...|
|[around, light, citrus, gelatin, filbert, cut, square, coat, powder, sugar, tiny, heave...|
|             [ingredient, robitussin, get, addition, beer, order, make, cherry, medicinal]|
|                               [great, taffy, wide, assortment, taffy, quick, taffy, deal]|
+------------------------------------------------------------------------------------------+



It is time to filter out improper POS combinations of n-grams. We create the custom function in the same manner as before. Since we deal with bi- and trigrams, we need to restrict tags for both.

In [34]:
def filter_pos_combs(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if (len(pos.split("_")) == 2 and \
                pos.split("_")[0] in ["JJ", "NN", "NNS", "VB", "VBP"] and \
                 pos.split("_")[1] in ["JJ", "NN", "NNS"]) \
            or (len(pos.split("_")) == 3 and \
                pos.split("_")[0] in ["JJ", "NN", "NNS", "VB", "VBP"] and \
                 pos.split("_")[1] in ["JJ", "NN", "NNS", "VB", "VBP"] and \
                  pos.split("_")[2] in ["NN", "NNS"])]
    
udf_filter_pos_combs = F.udf(filter_pos_combs, T.ArrayType(T.StringType()))

And we call the function on word and POS n-grams.

In [35]:
processed_review = processed_review.withColumn("filtered_ngrams",
                                               udf_filter_pos_combs(F.col("finished_ngrams"),
                                                                    F.col("finished_pos_ngrams")))

Below is what we get after filtering for n-grams.

In [36]:
processed_review.select("filtered_ngrams").limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                           filtered_ngrams|
+------------------------------------------------------------------------------------------+
|[dog_food, food_product, good_quality, product_look, process_meat, be_finicky, dog_food...|
|[product_arrive, arrive_label, jumbo_salt, salt_peanutsthe, peanutsthe_peanut, small_si...|
|[few_century, light_pillowy, pillowy_citrus, citrus_gelatin, case_filbert, tiny_square,...|
|     [secret_ingredient, root_beer, beer_extract, be_good, cherry_soda, root_beer_extract]|
|[great_taffy, great_price, wide_assortment, yummy_taffy, taffy_delivery, taffy_lover, y...|
+------------------------------------------------------------------------------------------+



Now we have unigrams and n-grams stored in different columns in the dataframe. Let's combine them together.

In [37]:
from pyspark.sql.functions import concat

processed_review = processed_review.withColumn("final", 
                                               concat(F.col("filtered_unigrams"), 
                                                      F.col("filtered_ngrams")))

And this is our final look of the data.

In [38]:
processed_review.select("final").limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                                     final|
+------------------------------------------------------------------------------------------+
|[several, dog, find, quality, product, look, stew, process, labrador, appreciate, produ...|
|[product, arrive, label, salt, peanutsthe, peanut, actually, small, unsalted, sure, err...|
|[around, light, citrus, gelatin, filbert, cut, square, coat, powder, sugar, tiny, heave...|
|[ingredient, robitussin, get, addition, beer, order, make, cherry, medicinal, secret_in...|
|[great, taffy, wide, assortment, taffy, quick, taffy, deal, great_taffy, great_price, w...|
+------------------------------------------------------------------------------------------+



## Vectorization 

Now, we are set to vectorization of our data. First, we will proceed with **TF** (*term frequency*) vectorization with **CountVectorizer** in PySpark. We fit tf dictionary and then transform the data to vectors of counts.

In [39]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol="final", outputCol="tf_features")
tf_model = tfizer.fit(processed_review)
tf_result = tf_model.transform(processed_review)

After we get TF results, we can account for words that are frequent for all the documents. We can use **IDF** (*inverse document frequency*) to lower score of such words.

In [40]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol="tf_features", outputCol="tf_idf_features")
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

## LDA

Finally, we are ready to model topics in our data with **LDA** (*Latent Dirichlet Allocation*). To use the algorithm, we have to provide the number of topics we presume our data contains and the number of iterations for the LDA algorithm. Then, we initialize the model and train it.

In [41]:
from pyspark.ml.clustering import LDA

num_topics = 3
max_iter = 100

lda = LDA(k=num_topics, maxIter=max_iter, featuresCol="tf_idf_features")
lda_model = lda.fit(tfidf_result)

To be able to see words that characterize the defined topics, we need to convert word ids into actual words with the custom function. This function will again be converted to PySpark UDF to be used on our topic dataframe.

In [42]:
vocab = tf_model.vocabulary

def get_words(token_list):
     return [vocab[token_id] for token_id in token_list]
       
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

Let's define the number of top words per topic we would like to see and extract the words with our function.

In [43]:
num_top_words = 10

topics = lda_model.describeTopics(num_top_words).withColumn("topicWords", udf_to_words(F.col("termIndices")))
topics.select("topic", "topicWords").show(truncate=90)

+-----+---------------------------------------------------------------+
|topic|                                                     topicWords|
+-----+---------------------------------------------------------------+
|    0|[taste, good, like, chip, flavor, br, product, great, use, try]|
|    1| [br, food, dog, use, make, dog_food, product, good, like, one]|
|    2|    [coffee, tea, cup, like, taste, good, try, flavor, one, br]|
+-----+---------------------------------------------------------------+

