<a href="https://colab.research.google.com/github/Tamrika/BigData/blob/main/Topic_modelling_with_Apache_Spark_and_SparkNLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Topic modelling with Apache Spark and SparkNLP

## Installing Java and Spark NLP

In [None]:
import os
# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:"+ os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.6.3

openjdk version "1.8.0_282"
OpenJDK Runtime Environment (build 1.8.0_282-8u282-b08-0ubuntu1~18.04-b08)
OpenJDK 64-Bit Server VM (build 25.282-b08, mixed mode)
Processing /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471/pyspark-2.4.4-py2.py3-none-any.whl
Collecting py4j==0.10.7
  Using cached https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4
Collecting spark-nlp==2.6.3
  Using cached https://files.pythonhosted.org/packages/84/84/3f15673db521fbc4e8e0ec3677a019ba1458b2cb70f0f7738c221511ef32/spark_nlp-2.6.3-py2.py3-none-any.whl
Installing collected packages: spark-nlp
Successfully installed spark-nlp-2.6.3


## Import the relevant packages

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp

spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

Spark NLP version:  2.6.3
Apache Spark version:  2.4.4


In [None]:
from pathlib import Path
import urllib.request
download_path = "./abcnews-date-text.csv"
if not Path(download_path).is_file():
  print("File Not found will downloading it!")
  url = "https://github.com/ravishchawla/topic_modeling/raw/master/data/abcnews-date-text.csv"
  urllib.request.urlretrieve(url, download_path)
else:
  print("File already present.")

File already present.


## Download the news data

In [None]:
# if you are reading file from local storage 
file_location = r'./abcnews-date-text.csv'
# if you are reading file from hdfs
# file_location = r'hdfs:\\\user\path\to\abcnews_date_txt.csv'
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type)\
      .option("inferSchema", infer_schema)\
      .option("header", first_row_is_header)\
      .option("sep", delimiter)\
      .load(file_location)
# Verify the count
df.count()

1041793

In [None]:
df.show(10)

+------------+--------------------+
|publish_date|       headline_text|
+------------+--------------------+
|    20030219|aba decides again...|
|    20030219|act fire witnesse...|
|    20030219|a g calls for inf...|
|    20030219|air nz staff in a...|
|    20030219|air nz strike to ...|
|    20030219|ambitious olsson ...|
|    20030219|antic delighted w...|
|    20030219|aussie qualifier ...|
|    20030219|aust addresses un...|
|    20030219|australia is lock...|
+------------+--------------------+
only showing top 10 rows



## Pre-processing Pipeline using Spark NLP(Assignment 1-4)

In [None]:
#3.6.1Document Assembling
document_assembler = DocumentAssembler() \
                      .setInputCol("headline_text") \
                      .setOutputCol("document") \
                      .setCleanupMode("shrink")

#Split sentence to tokens(array)
tokenizer = Tokenizer() \
              .setInputCols(["document"]) \
              .setOutputCol("token")
#3.6.3 Normalizing-Clean unwanted characters and garbage
normalizer = Normalizer() \
              .setInputCols(["token"]) \
              .setOutputCol("normalized")
#3.6.4 Stopwords removal
stopwords_cleaner = StopWordsCleaner()\
                      .setInputCols("normalized")\
                      .setOutputCol("cleanTokens")\
                      .setCaseSensitive(False) 
#3.6.5 Stemming
stemmer = Stemmer() \
            .setInputCols(["cleanTokens"]) \
            .setOutputCol("stem")  
#3.6.6 Finishing
finisher = Finisher() \
              .setInputCols(["stem"]) \
              .setOutputCols(["tokens"]) \
              .setOutputAsArray(True) \
              .setCleanAnnotations(False)  

#3.6.7 Buildthe ML Pipeline
nlp_pipeline = Pipeline(stages=[document_assembler, tokenizer,normalizer,stopwords_cleaner, stemmer, finisher]) 

#3.6.8 Train and Apply the ML Pipeline
nlp_model = nlp_pipeline.fit(df)
processed_df  = nlp_model.transform(df)
tokens_df = processed_df.select('publish_date','tokens').limit(10000)
tokens_df.show()



+------------+--------------------+
|publish_date|              tokens|
+------------+--------------------+
|    20030219|[aba, decid, comm...|
|    20030219|[act, fire, wit, ...|
|    20030219|[g, call, infrast...|
|    20030219|[air, nz, staff, ...|
|    20030219|[air, nz, strike,...|
|    20030219|[ambiti, olsson, ...|
|    20030219|[antic, delight, ...|
|    20030219|[aussi, qualifi, ...|
|    20030219|[aust, address, u...|
|    20030219|[australia, lock,...|
|    20030219|[australia, contr...|
|    20030219|[barca, take, rec...|
|    20030219|[bathhous, plan, ...|
|    20030219|[big, hope, launc...|
|    20030219|[big, plan, boost...|
|    20030219|[blizzard, buri, ...|
|    20030219|[brigadi, dismiss...|
|    20030219|[british, combat,...|
|    20030219|[bryant, lead, la...|
|    20030219|[bushfir, victim,...|
+------------+--------------------+
only showing top 20 rows



## 3.7 Feature Engineering

We will use Spark MLlib’s CountVectorizer to generate features from textual data. Latent Dirichlet Allocation requires a data specific vocabulary to perform topic modeling.

In [None]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)
# train the model
cv_model = cv.fit(tokens_df)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_df)

## Build the LDA Model

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 3
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -179487.63762961264
The upper bound on perplexity: 6.326224363090816


## Visualize the topics

In [None]:
# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()   
topics_rdd = topics.rdd
topics_words = topics_rdd \
                .map(lambda row: row['termIndices'])\
                .map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()
for idx, topic in enumerate(topics_words):
  print("topic: {}".format(idx))
  print("*"*25)
  for word in topic:
    print(word)
    print("*"*25)

topic: 0
*************************
iraq
*************************
polic
*************************
sai
*************************
council
*************************
new
*************************
win
*************************
crash
*************************
mai
*************************
report
*************************
world
*************************
topic: 1
*************************
war
*************************
plan
*************************
govt
*************************
protest
*************************
iraqi
*************************
anti
*************************
water
*************************
new
*************************
rain
*************************
fire
*************************
topic: 2
*************************
u
*************************
man
*************************
charg
*************************
call
*************************
govt
*************************
get
*************************
court
*************************
lead
*************************
fund
******************

## Assignment 5
Trying different values of k and maxIter to see which combination best suits our data

With k=3 Maxiter 10
The lower bound on the log likelihood of the entire corpus: -179487.63762961264
The upper bound on perplexity: 6.326224363090816

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 2
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -176015.1953842923
The upper bound on perplexity: 6.203834603986054


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 5
lda = LDA(k=num_topics, maxIter=15)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -182818.79801061886
The upper bound on perplexity: 6.443634499175908


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 2
lda = LDA(k=num_topics, maxIter=15)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -175191.71152018418
The upper bound on perplexity: 6.174810077547729


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 3
lda = LDA(k=num_topics, maxIter=50)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -175926.4439257532
The upper bound on perplexity: 6.2007064685518545


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 7
lda = LDA(k=num_topics, maxIter=15)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -186713.43285278097
The upper bound on perplexity: 6.580904865810693


## Observation:

The model with k = 2 and maxIter = 15 looks to be best as it has minimum perplexity of all and maximum likelihood. Also, I observed that with the decrease in k and increase in maximum iterations likelihood increased and perplexity decreased

## Assignment 6

Rewrite the codes for finding topics in tweets coronavirus dataset.Try different values of k and maxIter to see which combination best suits the data.Show at least five different combinations, show their results, and explain why it’s best.

### Read the data

In [None]:
# if you are reading file from local storage 
file_location = r'/content/coronavirus-text-only-1000.txt'
# CSV options
infer_schema = "true"
first_row_is_header = "true"
cv_df = spark.read.format("csv")\
      .option("inferSchema", infer_schema)\
      .option("header", first_row_is_header)\
      .load(file_location)
# Verify the count
cv_df.count()

999

### Preprocess the pipeline

In [None]:
#3.6.1Document Assembling
document_assembler = DocumentAssembler() \
                      .setInputCol("text") \
                      .setOutputCol("document") \
                      .setCleanupMode("shrink")

#Split sentence to tokens(array)
tokenizer = Tokenizer() \
              .setInputCols(["document"]) \
              .setOutputCol("token")

#3.6.3 Normalizing-Clean unwanted characters and garbage
normalizer = Normalizer() \
              .setInputCols(["token"]) \
              .setOutputCol("normalized")

#3.6.4 Stopwords removal
stopwords_cleaner = StopWordsCleaner()\
                      .setInputCols("token")\
                      .setOutputCol("cleanTokens")\
                      .setCaseSensitive(False) \
                      .setStopWords(["coronavirus"])

#3.6.5 Stemming
stemmer = Stemmer() \
            .setInputCols(["cleanTokens"]) \
            .setOutputCol("stem")  

#3.6.6 Finishing
finisher = Finisher() \
              .setInputCols(["stem"]) \
              .setOutputCols(["tokens"]) \
              .setOutputAsArray(True) \
              .setCleanAnnotations(False)  

#3.6.7 Build the ML Pipeline
nlp_pipeline = Pipeline(stages=[document_assembler, tokenizer,normalizer,stopwords_cleaner, stemmer, finisher]) 

# 3.6.8 Train and Apply the ML Pipeline
nlp_model = nlp_pipeline.fit(cv_df)
processed_cvdf  = nlp_model.transform(cv_df)
tokens_cvdf = processed_cvdf.select('text','tokens').limit(10000)
tokens_cvdf.show()

+--------------------+--------------------+
|                text|              tokens|
+--------------------+--------------------+
|Studies look at t...|[studi, look, at,...|
|RT @EricTopol: Th...|[rt, @erictopol, ...|
|RT @NPR: Working ...|[rt, @npr, :, wor...|
|"RT @Harvey_Walke...|[", rt, @harvey_w...|
|RT @CNNEE: La far...|[rt, @cnnee, :, l...|
|RT @ReutersWorld:...|[rt, @reutersworl...|
|RT @CNN: This Ill...|[rt, @cnn, :, thi...|
|"RT @Censelio: Ar...|[", rt, @censelio...|
|RT @jilevin: Trum...|[rt, @jilevin, :,...|
|RT @propublica: P...|[rt, @propublica,...|
|NSW to close Vict...|[nsw, to, close, ...|
|RT @ASlavitt: Tru...|[rt, @aslavitt, :...|
|RT @ClayTravis: C...|[rt, @claytravi, ...|
|RT @JamesGunn: I'...|[rt, @jamesgunn, ...|
|RT @NatashaFatah:...|[rt, @natashafata...|
|RT @crissles: Y‚Ä...|[rt, @crissl, :, ...|
|"RT @Censelio: Ar...|[", rt, @censelio...|
|RT @Villarruel_cl...|[rt, @villarruel_...|
|RT @JaxAlemany: ‚...|[rt, @jaxalemani,...|
|RT @JamesGunn: I'...|[rt, @jame

### Feature Engineering

In [None]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)
# train the model
cv_model = cv.fit(tokens_cvdf)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_cvdf)

### Build LDA model

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 3
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -84381.97508253767
The upper bound on perplexity: 5.28311890073489


### Visualize topics

In [None]:
# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()   
topics_rdd = topics.rdd
topics_words = topics_rdd \
                .map(lambda row: row['termIndices'])\
                .map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()
for idx, topic in enumerate(topics_words):
  print("topic: {}".format(idx))
  print("*"*25)
  for word in topic:
    print(word)
    print("*"*25)

topic: 0
*************************
:
*************************
rt
*************************
,
*************************
.
*************************
the
*************************
de
*************************
to
*************************
a
*************************
la
*************************
and
*************************
topic: 1
*************************
.
*************************
in
*************************
the
*************************
:
*************************
rt
*************************
have
*************************
a
*************************
;
*************************
&amp
*************************
over
*************************
topic: 2
*************************
:
*************************
rt
*************************
a
*************************
the
*************************
.
*************************
,
*************************
she
*************************
than
*************************
trump
*************************
do
*************************


### Modelling with various k and maxIter values
For k=3; maxIter=10,
The lower bound on the log likelihood of the entire corpus: -84381.97508253767
The upper bound on perplexity: 5.28311890073489

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 2
lda = LDA(k=num_topics, maxIter=15)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -84045.42245923339
The upper bound on perplexity: 5.262047486803994


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 2
lda = LDA(k=num_topics, maxIter=50)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -83015.46169817106
The upper bound on perplexity: 5.197562089792829


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 3
lda = LDA(k=num_topics, maxIter=100)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -81105.92844496891
The upper bound on perplexity: 5.078007040130786


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 5
lda = LDA(k=num_topics, maxIter=50)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -80504.5831028166
The upper bound on perplexity: 5.040357068796431


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 7
lda = LDA(k=num_topics, maxIter=30)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: "+ str(ll))
print("The upper bound on perplexity: "+ str(lp))

The lower bound on the log likelihood of the entire corpus: -84764.05701580833
The upper bound on perplexity: 5.307040885036835


## Observation

For this dataset, k=5 and maxIter=50 would be best parameters as the likelihood is more and perplexity is less of all combinations.