<a href="https://colab.research.google.com/github/ayuti/Topic_Modeling/blob/main/Topic_Modeling_spark_nlp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Import PySpark and Spark NLP



In [2]:
# !pip install -q pyspark==3.3.0 spark-nlp==4.2.4


Starting of Spark Session



In [3]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

spark = sparknlp.start()


Upload sample CSV file to Colab session storage and read into Spark dataframe



In [4]:
df = spark.read.format("csv").option("header",True).load("CommentsApril2017.csv")
df.count()


19093

In [5]:
df.printSchema()


root
 |-- approveDate: string (nullable = true)
 |-- commentBody: string (nullable = true)
 |-- commentID: string (nullable = true)
 |-- commentSequence: string (nullable = true)
 |-- commentTitle: string (nullable = true)
 |-- commentType: string (nullable = true)
 |-- createDate: string (nullable = true)
 |-- depth: string (nullable = true)
 |-- editorsSelection: string (nullable = true)
 |-- parentID: string (nullable = true)
 |-- parentUserDisplayName: string (nullable = true)
 |-- permID: string (nullable = true)
 |-- picURL: string (nullable = true)
 |-- recommendations: string (nullable = true)
 |-- recommendedFlag: string (nullable = true)
 |-- replyCount: string (nullable = true)
 |-- reportAbuseFlag: string (nullable = true)
 |-- sharing: string (nullable = true)
 |-- status: string (nullable = true)
 |-- timespeople: string (nullable = true)
 |-- trusted: string (nullable = true)
 |-- updateDate: string (nullable = true)
 |-- userDisplayName: string (nullable = true)
 |-- us

Pre-processing Pipeline using Spark NLP


In [6]:
# Spark NLP requires the input dataframe or column to be converted to document
document_assembler = DocumentAssembler() \
    .setInputCol("commentBody") \
    .setOutputCol("document") \
    .setCleanupMode("shrink")

# Split sentence into tokens
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")

# Clean unwanted characters and garbage
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

# Remove stopwords
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

finisher = Finisher() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

# Build pipeline
nlp_pipeline = Pipeline(
    stages=[document_assembler,
            tokenizer,
            normalizer,
            stopwords_cleaner,
            finisher])

# Train pipeline
nlp_model = nlp_pipeline.fit(df)

# Apply pipeline to dataframe
processed_df  = nlp_model.transform(df)

# Get tokens
tokens_df = processed_df.select('tokens').limit(10000)


Feature Engineering


In [7]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=1000, minDF=3.0)

# Train the model
cv_model = cv.fit(tokens_df)

# Transform the data and output features
vectorized_tokens = cv_model.transform(tokens_df)


Build the LDA (Latent Dirichlet Allocation) Model


In [8]:
from pyspark.ml.clustering import LDA

num_topics = 15
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)


Visualize the topics



In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()
topics_rdd = topics.rdd
topics_words = topics_rdd \
       .map(lambda row: row['termIndices']) \
       .map(lambda idx_list: [vocab[idx] for idx in idx_list]) \
       .collect()

# Add each array of words to a new column
def get_words(idx_list):
    return [vocab[idx] for idx in idx_list]

udf_get_words = udf(get_words, ArrayType(StringType()))
topics = topics.withColumn("words", udf_get_words(topics.termIndices))

# From topics dataframa select topic and words columns
topics_df = topics.select("topic", "words")

topics_df.show(truncate=False)


+-----+-------------------------------------------------------------------------------------+
|topic|words                                                                                |
+-----+-------------------------------------------------------------------------------------+
|0    |[people, news, many, like, women, amp, dont, Fox, one, News]                         |
|1    |[Carrie, maybe, Trump, guy, every, know, minds, people, four, name]                  |
|2    |[election, Trump, Russians, get, every, time, make, God, Spicer, Sean]               |
|3    |[nuclear, Democratic, White, House, think, Party, comments, Thank, book, problem]    |
|4    |[people, country, time, different, One, story, America, North, budget, saying]       |
|5    |[Bill, OReilly, abuse, father, DJ, knew, family, years, believe, old]                |
|6    |[tax, like, Trump, people, money, government, US, American, trade, border]           |
|7    |[Trump, people, one, dont, like, get, see, think, muc