<a href="https://colab.research.google.com/github/carissa406/CSC533/blob/main/Topic_Modeling_COVID_tweets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version
# Install pyspark
! pip install --ignore-installed pyspark==2.4.4
# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.6.3

openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-8u312-b07-0ubuntu1~18.04-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)
Collecting pyspark==2.4.4
  Downloading pyspark-2.4.4.tar.gz (215.7 MB)
[K     |████████████████████████████████| 215.7 MB 58 kB/s 
[?25hCollecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[K     |████████████████████████████████| 197 kB 19.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130392 sha256=18a7c6206f0598ba875d48f94f78ba451f0a10dbaca0965ad9e88ac9f651d179
  Stored in directory: /root/.cache/pip/wheels/11/48/19/c3b6b66e4575c164407a83bc065179904ddc33c9d6500846f0
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4
Collecting spark-nlp==2.6.3
  Downloading spark_nlp-2.6.3-

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
spark = sparknlp.start()
print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

Spark NLP version:  2.6.3
Apache Spark version:  2.4.4


In [None]:
 # if you are reading file from local storage
file_location = '/content/coronavirus-text-only-1000.txt'
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type) \
 .option("inferSchema", infer_schema) \
 .option("header", first_row_is_header) \
 .option("sep", delimiter) \
 .load(file_location)
# Verify the count
df.count()

999

In [None]:
df.show(20, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|Studies look at the potential of natural remedies for treating coronavirus https://t.co/UQMDXZCDTE                                                   |
|RT @EricTopol: These rapid home tests, especially if accurate for transmissibility and cheap, can be transformative and are likely to hit t‚Ä¶       |
|RT @NPR: Working moms now spend 15 more hours than working dads on childcare and housework, a recent survey finds. But fewer work hours is‚Ä¶        |
|"RT @Harvey_Walker96: To Al Jazeera,   Malaysia didnt lock these Illegal Immigrants bec

In [None]:
#convert to document
document_assembler = DocumentAssembler() \
 .setInputCol("text") \
 .setOutputCol("document") \
 .setCleanupMode("shrink")

In [None]:
#split sentence to tokens(array)
tokenizer = Tokenizer() \
 .setInputCols(["document"]) \
 .setOutputCol("token")

In [None]:
 #normalizing
 normalizer = Normalizer() \
 .setInputCols(["token"]) \
 .setOutputCol("normalized")

In [None]:
 #stopword removal + removing coronavirus as a stopword
 stopwords_cleaner = StopWordsCleaner()\
 .setInputCols("normalized")\
 .setOutputCol("cleanTokens")\
 .setStopWords(["coronavirus"])\
 .setCaseSensitive(False)

In [None]:
 #stemming
 stemmer = Stemmer() \
 .setInputCols(["cleanTokens"]) \
 .setOutputCol("stem")

In [None]:
 #finishing back to array of tokens
 finisher = Finisher() \
 .setInputCols(["stem"]) \
 .setOutputCols(["tokens"]) \
 .setOutputAsArray(True) \
 .setCleanAnnotations(False)

In [None]:
 #build ML pipeline
 nlp_pipeline = Pipeline(
 stages=[document_assembler,
 tokenizer,
 normalizer,
 stopwords_cleaner,
 stemmer,
 finisher])

In [None]:
 #train and apply the ML pipeline
 nlp_model = nlp_pipeline.fit(df)
processed_df = nlp_model.transform(df)
tokens_df = processed_df.select('tokens').limit(10000)
tokens_df.show()

+--------------------+
|              tokens|
+--------------------+
|[studi, look, at,...|
|[rt, erictopol, t...|
|[rt, npr, work, m...|
|[rt, harveywalk, ...|
|[rt, cnnee, la, f...|
|[rt, reutersworld...|
|[rt, cnn, thi, il...|
|[rt, censelio, ar...|
|[rt, jilevin, tru...|
|[rt, propublica, ...|
|[nsw, to, close, ...|
|[rt, aslavitt, tr...|
|[rt, claytravi, d...|
|[rt, jamesgunn, i...|
|[rt, natashafatah...|
|[rt, crissl, yäôa...|
|[rt, censelio, ar...|
|[rt, villarruelcl...|
|[rt, jaxalemani, ...|
|[rt, jamesgunn, i...|
+--------------------+
only showing top 20 rows



In [None]:
tokens_df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|tokens                                                                                                                                                 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|[studi, look, at, the, potenti, of, natur, remedi, for, treat, httpstcouqmdxzcdt]                                                                      |
|[rt, erictopol, these, rapid, home, test, especi, if, accur, for, transmiss, and, cheap, can, be, transform, and, ar, like, to, hit, tä]               |
|[rt, npr, work, mom, now, spend, more, hour, than, work, dad, on, childcar, and, housework, a, recent, survei, find, but, fewer, work, hour, isä]      |
|[rt, harveywalk, to, al, jazeera, malaysia, didnt, lock, these, illeg, immi

In [None]:

#countvectorizer to generate features from text data
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="tokens", outputCol="features",
vocabSize=500, minDF=3.0)
# train the model
cv_model = cv.fit(tokens_df)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_df)

In [None]:
#build Latent Dirichlet Allocation model
from pyspark.ml.clustering import LDA
num_topics = 3
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -70365.88841696964
The upper bound on perplexity: 5.293056146906096


In [None]:
#visualize the topics
# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()
topics_rdd = topics.rdd
topics_words = topics_rdd\
 .map(lambda row: row['termIndices'])\
 .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
 .collect()
for idx, topic in enumerate(topics_words):
 print("topic: {}".format(idx))
 print("*"*25)
 for word in topic:
  print(word)
 print("*"*25)
 

topic: 0
*************************
the
rt
a
of
and
to
in
i
that
on
*************************
topic: 1
*************************
de
rt
la
en
el
a
que
y
por
lo
*************************
topic: 2
*************************
rt
the
a
in
she
trump
to
than

job
*************************


In [None]:
from pyspark.ml.clustering import LDA
num_topics = 10
lda = LDA(k=num_topics, maxIter=100)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))

vocab = cv_model.vocabulary
topics = model.describeTopics()
topics_rdd = topics.rdd
topics_words = topics_rdd\
 .map(lambda row: row['termIndices'])\
 .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
 .collect()
for idx, topic in enumerate(topics_words):
  print("topic: {}".format(idx))
  print("*"*25)
  for word in topic:
    print(word)
  print("*"*25)

The lower bound on the log likelihood of the entire corpus: -65934.66507330502
The upper bound on perplexity: 4.959731087205132
topic: 0
*************************
work
hour
now
isä
on
recent
but
more
rt
find
*************************
topic: 1
*************************
semana
la
o
rt
por
aä
e
est
una
el
*************************
topic: 2
*************************
she
said
could
even
job
chines
rt
donald
trump
the
*************************
topic: 3
*************************
de
rt
la
en
el
que
a
por
y
lo
*************************
topic: 4
*************************
rt
in
th
mai
the
to
plant
death
trump
april
*************************
topic: 5
*************************
mai
todo
made
care
ha
their
refus
hydroxychloroquin
fda
should
*************************
topic: 6
*************************
florida
sai
rt
msnbc
spent
servic
ha
gov
of
blame
*************************
topic: 7
*************************
the
rt
in
a
to
of
i
have
and
ar
*************************
topic: 8
*************************

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 15
lda = LDA(k=num_topics, maxIter=500)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " +
str(ll))
print("The upper bound on perplexity: " + str(lp))


The lower bound on the log likelihood of the entire corpus: -65097.44071699343
The upper bound on perplexity: 4.896753476530272
