# **Topic Modeling with PySpark and Spark NLP**
Topic Modelling is a statistical approach for data modelling that helps in discovering underlying topics that are present in the collection of documents

In this project we built the NLP pipeline with Spark NLP and trained a topic model with PySpark. (see [here](https://github.com/maobedkova/TopicModelling_PySpark_SparkNLP/blob/master/Topic_Modelling_with_PySpark_and_Spark_NLP.ipynb))

# **Part 0: PySpark Environment Setup**

In [1]:
## update apt-get
!apt-get update

## Install java
import os
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!java -version

## Install pyspark
!pip install -q pyspark==3.3.0

## Install Spark NLP
!pip install -q spark-nlp==4.2.4

## start the Spark session through Spark NLP
import sparknlp
spark = sparknlp.start()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.91.83)] [Waiting for headers] [1 I0% [Connecting to archive.ubuntu.com (91.189.91.83)] [Waiting for headers] [Con                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.83)] [Waiting for headers] [Con                                                                               Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:8 http://security.ubuntu.com/ubu

In [None]:
# Install pyspark
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark # used to locate the spark in the system

import findspark
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
findspark.init()

# spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark






## Install pyspark
!pip install pyspark==2.4.5

## Install Spark NLP
!pip install spark-nlp==2.4.5

## Install nltk
#!pip install nltk

## start the Spark session through Spark NLP
import sparknlp
spark = sparknlp.start()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Hit:5 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease
Hit:11 http:

# **Part 1: Data Loading**

In this project, I analyzed three datasets which from Kaggle:
- [Amazon Musical Instruments Review](https://www.kaggle.com/eswarchandt/amazon-music-reviews)
- [A Million News Headlines](https://www.kaggle.com/therohk/million-headlines)
- [Amazon Watch Review]
- [Youtube PetChannel Review]

## 1.1 Mount google drive

In [2]:
from google.colab import drive

drive.mount('ggdrive')

Mounted at ggdrive


## 1.2 Read data from google drive

In [3]:
datasets = {'amazonMusic': ['ggdrive/MyDrive/Data/SparkNlp/Musical_Instruments_5.json',
                            'reviewText', 6],
            'abcnews': ['ggdrive/MyDrive/Data/SparkNlp/abcnews-date-text.csv',
                        'headline_text', 20],
            'amazonWatch': ['ggdrive/MyDrive/Data/watch_reviews.tsv',
                            'review_body', 6]}

data_nm = ['amazonMusic', 'abcnews', 'amazonWatch'][0]
data_params = datasets[data_nm]

if data_nm == 'amazonMusic':
  data = spark.read.json(data_params[0])
elif data_nm == 'abcnews':
  data = spark.read.option('header', 'true')\
         .option('mode', 'DROPMALFORMED')\
         .option('inferSchema', True)\
         .csv(data_params[0])
elif data_nm == 'amazonWatch':
  data = spark.read.csv(data_params[0], sep=r'\t', header=True)

## 1.3 Exploratory Data Analysis

In [4]:
# spark dataframe schema
data.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [5]:
# how the data looks like
data.show(10)

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|1384719342|  [0, 0]|    5.0|Not much to write...|02 28, 2014|A2IBPI20UZIR0U|cassandra tu "Yea...|                good|    1393545600|
|1384719342|[13, 14]|    5.0|The product does ...|03 16, 2013|A14VAT5EAX3D9S|                Jake|                Jake|    1363392000|
|1384719342|  [1, 1]|    5.0|The primary job o...|08 28, 2013|A195EZSQDW3E21|Rick Bennette "Ri...|It Does The Job Well|    1377648000|
|1384719342|  [0, 0]|    5.0|Nice windscreen p...|02 14, 2014|A2C00NNG1ZQQG2|RustyBill "Sunday...|GOOD WINDSCREEN F...|    1392336000|
|1384719342|  [0, 0]|    5.0|This pop filter i...|02 21

In [8]:
# the number of rows
print(f'Totally, we have {data.count()} rows of data.')

# target column
text_col = data_params[1]
print(f'For column "{text_col}", we have {data.select(text_col).distinct().count()} rows of distinct values')
review_text = data.select(text_col).na.drop()#.dropDuplicates()

#review_text = spark.createDataFrame(review_text.collect()[:1000])
#review_text = review_text.sample(withReplacement=False, fraction=0.01, seed=2020)

print(f'After clean-up and sampling, we have {review_text.count()} rows of {text_col} data:')
review_text.show(10, truncate=100)

Totally, we have 10261 rows of data.
For column "reviewText", we have 10255 rows of distinct values
After clean-up and sampling, we have 10261 rows of reviewText data:
+----------------------------------------------------------------------------------------------------+
|                                                                                          reviewText|
+----------------------------------------------------------------------------------------------------+
|Not much to write about here, but it does exactly what it's supposed to. filters out the pop soun...|
|The product does exactly as it should and is quite affordable.I did not realized it was double sc...|
|The primary job of this device is to block the breath that would otherwise produce a popping soun...|
|Nice windscreen protects my MXL mic and prevents pops. Only thing is that the gooseneck is only m...|
|This pop filter is great. It looks and performs like a studio filter. If you're recording vocals ...|
|So good

In [7]:
import pyspark.sql.functions as F

#review_text.select(F.count(F.when(F.col(text_col).isNull() | F.isnan(text_col), text_col)).alias(text_col)).show()

# **Part 2: Spark NLP pipeline** ([annotators](https://nlp.johnsnowlabs.com/docs/en/annotators))

## 2.1 DocumentAssembler (see [here](https://nlp.johnsnowlabs.com/docs/en/transformers#documentassembler-getting-data-in))

In [9]:
from sparknlp.base import DocumentAssembler

documentAssembler = DocumentAssembler() \
     .setInputCol(text_col) \
     .setOutputCol('document')

## 2.2 Tokenizer (see [here](https://nlp.johnsnowlabs.com/docs/en/annotators#tokenizer))

In [10]:
from sparknlp.annotator import Tokenizer

tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

## 2.3 Normalizer (see [here](https://nlp.johnsnowlabs.com/docs/en/annotators#normalizer))

In [11]:
from sparknlp.annotator import Normalizer

normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)

## 2.4 LemmatizerModel (see [here](https://nlp.johnsnowlabs.com/docs/en/models#english---models))

In [12]:
from sparknlp.annotator import LemmatizerModel

lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemmatized')

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


## 2.5 StopWordsCleaner (see [here](https://nlp.johnsnowlabs.com/docs/en/annotators#stopwordscleaner))

In [13]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')

######
from sparknlp.annotator import StopWordsCleaner

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('unigrams') \
     .setStopWords(eng_stopwords)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


## 2.6 NGramGenerator (see [here](https://nlp.johnsnowlabs.com/docs/en/annotators#ngramgenerator))

In [14]:
from sparknlp.annotator import NGramGenerator

ngrammer = NGramGenerator() \
    .setInputCols(['lemmatized']) \
    .setOutputCol('ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

## 2.7 PerceptronModel (see [here](https://nlp.johnsnowlabs.com/docs/en/annotators#postagger)) for POS (Part Of Speech)

In [15]:
from sparknlp.annotator import PerceptronModel

pos_tagger = PerceptronModel.pretrained('pos_anc') \
    .setInputCols(['document', 'lemmatized']) \
    .setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]


## 2.8 Finisher (see [here](https://nlp.johnsnowlabs.com/docs/en/transformers#finisher))

In [17]:
from sparknlp.base import Finisher

finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams', 'pos'])

## 2.9 Basic NLP Pipeline

In [18]:
from pyspark.ml import Pipeline

pipeline = Pipeline() \
     .setStages([documentAssembler,
                 tokenizer,
                 normalizer,
                 lemmatizer,
                 stopwords_cleaner,
                 ngrammer,
                 pos_tagger,
                 finisher])

## 2.10 Fit and transform review_text

In [19]:
processed_review = pipeline.fit(review_text).transform(review_text)
#processed_review.select(*(F.count(F.when(F.size(F.col(c)) == 0, c)).alias(c)\
#                          for c in processed_review.columns[1:])).show()

processed_review = processed_review.filter(F.size(F.col('finished_unigrams')) > 0)

## 2.11 processed_review looks like

In [20]:
processed_review.show(10, truncate=30)

+------------------------------+------------------------------+------------------------------+------------------------------+
|                    reviewText|             finished_unigrams|               finished_ngrams|                  finished_pos|
+------------------------------+------------------------------+------------------------------+------------------------------+
|Not much to write about her...|[much, write, exactly, supp...|[not, much, to, write, abou...|[RB, JJ, TO, VB, IN, RB, CC...|
|The product does exactly as...|[product, exactly, quite, a...|[the, product, do, exactly,...|[DT, NN, VBP, RB, IN, PRP, ...|
|The primary job of this dev...|[primary, job, device, bloc...|[the, primary, job, of, thi...|[DT, JJ, NN, IN, DT, NN, VB...|
|Nice windscreen protects my...|[nice, windscreen, protect,...|[nice, windscreen, protect,...|[JJ, NN, NN, NNP, NN, JJ, C...|
|This pop filter is great. I...|[pop, filter, great, look, ...|[this, pop, filter, be, gre...|[DT, NN, NN, VB, JJ, PRP

## 2.12 POS tags of ngrams

### 2.12.1 Join pos of unigram

In [21]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

udf_join_arr = F.udf(lambda x: ' '.join(x), T.StringType())
processed_review  = processed_review.withColumn('finished_pos', udf_join_arr(F.col('finished_pos')))

### 2.12.2 Transform joined pos into annotation format

In [22]:
pos_documentAssembler = DocumentAssembler() \
     .setInputCol('finished_pos') \
     .setOutputCol('pos_document')

### 2.12.3 pos_tokenizer

In [23]:
pos_tokenizer = Tokenizer() \
     .setInputCols(['pos_document']) \
     .setOutputCol('pos')

### 2.12.4 NGramGenerator of pos tagging

In [24]:
pos_ngrammer = NGramGenerator() \
    .setInputCols(['pos']) \
    .setOutputCol('pos_ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

### 2.12.5 pos_finisher

In [26]:
pos_finisher = Finisher() \
     .setInputCols(['pos', 'pos_ngrams'])

### 2.12.6 NLP pipeline for pos tagging

In [27]:
pos_pipeline = Pipeline() \
     .setStages([pos_documentAssembler,
                 pos_tokenizer,
                 pos_ngrammer,
                 pos_finisher])

### 2.12.7 Pipeline fit and transform

In [28]:
processed_review = pos_pipeline.fit(processed_review).transform(processed_review)

### 2.12.8 New processed review

In [29]:
processed_review.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|          reviewText|   finished_unigrams|     finished_ngrams|        finished_pos| finished_pos_ngrams|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Not much to write...|[much, write, exa...|[not, much, to, w...|[RB, JJ, TO, VB, ...|[RB, JJ, TO, VB, ...|
|The product does ...|[product, exactly...|[the, product, do...|[DT, NN, VBP, RB,...|[DT, NN, VBP, RB,...|
|The primary job o...|[primary, job, de...|[the, primary, jo...|[DT, JJ, NN, IN, ...|[DT, JJ, NN, IN, ...|
|Nice windscreen p...|[nice, windscreen...|[nice, windscreen...|[JJ, NN, NN, NNP,...|[JJ, NN, NN, NNP,...|
|This pop filter i...|[pop, filter, gre...|[this, pop, filte...|[DT, NN, NN, VB, ...|[DT, NN, NN, VB, ...|
|So good that I bo...|[good, buy, anoth...|[so, good, that, ...|[RB, JJ, IN, NNP,...|[RB, JJ, IN, NNP,...|
|I have used monst...|[use, monster, 

## 2.13 Filter messing pos tag combination for unigram

### 2.13.1 udf function

In [30]:
def filter_pos(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) ## not 1 : 1
            if pos in ['JJ', 'NN', 'NNS', 'VB', 'VBP']]

udf_filter_pos = F.udf(filter_pos, T.ArrayType(T.StringType()))

In [31]:
# Function to get rows at `rownums`
def getrows(df, rownums=None):
    return df.rdd.zipWithIndex()\
         .filter(lambda x: x[1] in rownums)\
         .map(lambda x: x[0])

# Get rows at positions 0 and 2.
#row02 = getrows(processed_review, rownums=[2]).collect()
#for i in [1, 2, 3, 4]:
#  print(len(row02[0][i]))

### 2.13.2 Filter unigram

In [32]:
#processed_review = processed_review.withColumn('filtered_unigrams',
#      udf_filter_pos(F.col('finished_unigrams'), F.col('finished_pos')))
#processed_review.select('filtered_unigrams').show(10, truncate=90)

## 2.14 Filter out improper POS combinations of n-grams

### 2.14.1 udf function

In [33]:
def filter_pos_combs(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags)
            if (len(pos.split('_')) == 2 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS']) \
            or (len(pos.split('_')) == 3 and \
                pos.split('_')[0] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                 pos.split('_')[1] in ['JJ', 'NN', 'NNS', 'VB', 'VBP'] and \
                  pos.split('_')[2] in ['NN', 'NNS'])]

udf_filter_pos_combs = F.udf(filter_pos_combs, T.ArrayType(T.StringType()))

### 2.14.2 Transform

In [34]:
processed_review = processed_review.withColumn('filtered_ngrams',
    udf_filter_pos_combs(F.col('finished_ngrams'), F.col('finished_pos_ngrams')))

### 2.14.3 New processed review

In [35]:
processed_review.select('filtered_ngrams').show(10, truncate=100)

+------------------------------------------------------------------------------------------+
|                                                                           filtered_ngrams|
+------------------------------------------------------------------------------------------+
|            [pop_sound, low_price, price_pop, pop_filter, low_price_pop, price_pop_filter]|
|[be_double, double_screen, add_bonus, small_hint, old_grape, grape_candy, cannot_stop, ...|
|[primary_job, pop_sound, noticeable_reduction, high_frequency, double_cloth, cloth_filt...|
|[nice_windscreen, windscreen_protect, mxl_mic, prevent_pop, require_careful, careful_po...|
|             [pop_filter, be_great, studio_filter, youre_record, record_vocal, get_record]|
|[heavy_cord, gold_connector, connector_bass, bass_sound, sound_great, learn_last, last_...|
|[have_use, use_monster, monster_cable, good_reason, lifetime_warranty, be_worth, simple...|
|[buy_monster, monster_cable, pedal_board, be_use, high_end, end_plane

## 2.15 Combine unigram and ngrams

In [36]:
from pyspark.sql.functions import concat

processed_review = processed_review.withColumn('final',
      concat(F.col('finished_unigrams'), F.col('filtered_ngrams')))

processed_review.select('final').show(10, truncate=100)

+------------------------------------------------------------------------------------------+
|                                                                                     final|
+------------------------------------------------------------------------------------------+
|[much, write, exactly, suppose, filter, pop, sound, recordings, much, crisp, one, low, ...|
|[product, exactly, quite, affordablei, realize, double, screen, arrive, even, well, exp...|
|[primary, job, device, block, breath, would, otherwise, produce, pop, sound, allow, voi...|
|[nice, windscreen, protect, mxl, mic, prevent, pop, thing, gooseneck, marginally, able,...|
|[pop, filter, great, look, perform, like, studio, filter, youre, record, vocal, elimina...|
|[good, buy, another, one, love, heavy, cord, gold, connector, bass, sound, great, learn...|
|[use, monster, cable, year, good, reason, lifetime, warranty, worth, price, alone, simp...|
|[use, cable, run, output, pedal, chain, input, fender, amp, buy, mons

# **Part 3: Vectorization of text tokens**

## 3.1 CountVectorizer

In [37]:
from pyspark.ml.feature import CountVectorizer

countVectorizer = CountVectorizer(inputCol='final', outputCol='tf_features')
tf_model = countVectorizer.fit(processed_review)
processed_review = tf_model.transform(processed_review)

## 3.2 IDF (*inverse document frequency*)

In [38]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol='tf_features', outputCol='tf_idf_features')
idf_model = idfizer.fit(processed_review)
processed_review = idf_model.transform(processed_review)

In [39]:
processed_review.show(10, truncate=30)

+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|                    reviewText|             finished_unigrams|               finished_ngrams|                  finished_pos|           finished_pos_ngrams|               filtered_ngrams|                         final|                   tf_features|               tf_idf_features|
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|Not much to write about her...|[much, write, exactly, supp...|[not, much, to, write, abou...|[RB, JJ, TO, VB, IN, RB, CC...|[RB, JJ, TO, VB, IN, RB, CC...|[

# **Part 4: Latent Dirichlet Allocation algorithm**

## 4.1 Train Topic Model

In [40]:
from pyspark.ml.clustering import LDA

num_topics = data_params[2]
max_iter = 10

lda = LDA(k=num_topics, maxIter=max_iter, featuresCol='tf_idf_features')
lda_model = lda.fit(processed_review)

## 4.2 Topic size

In [41]:
import numpy as np
def vect_argmax(vect):
  arra = vect.toArray()
  max_pos = np.argmax(arra)
  return int(max_pos)

udf_argmax = F.udf(vect_argmax, T.IntegerType())

processed_review = lda_model.transform(processed_review)
processed_review = processed_review.withColumn('topic#', udf_argmax(F.col('topicDistribution')))
processed_review.show(10, truncate=30)

+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------+
|                    reviewText|             finished_unigrams|               finished_ngrams|                  finished_pos|           finished_pos_ngrams|               filtered_ngrams|                         final|                   tf_features|               tf_idf_features|             topicDistribution|topic#|
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------+
|Not much to write about her...|[much, writ

In [42]:
processed_review.groupBy('topic#').count().show()

+------+-----+
|topic#|count|
+------+-----+
|     1|  231|
|     3|  346|
|     5| 1260|
|     4|  306|
|     2| 1637|
|     0| 6474|
+------+-----+



## 4.3 Top words that define topics

In [43]:
vocab = tf_model.vocabulary

def get_words(token_list):
     return [vocab[token_id] for token_id in token_list]

udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [44]:
num_top_words = 10

topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=90)

+-----+-------------------------------------------------------------------------------+
|topic|                                                                     topicWords|
+-----+-------------------------------------------------------------------------------+
|    0|                  [pedal, use, sound, one, get, guitar, like, well, good, work]|
|    1|     [volt, tc, bx, voltage, pick, drmkii, filter, shockmount, power, receiver]|
|    2|[string, pick, guitar, case, acoustic, daddario, finger, sound, mandolin, play]|
|    3|     [keyboard, violin, tuner, snark, case, tune, easy, display, ukulele, vibe]|
|    4|               [pin, hole, tuner, drill, string, pick, sound, use, pedal, gate]|
|    5|              [amp, guitar, strap, pick, string, great, sound, look, one, well]|
+-----+-------------------------------------------------------------------------------+

