<a href="https://colab.research.google.com/github/afairley19/nlp_colab/blob/master/topic_modeling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"] 
! java -version
# Install pyspark
! pip install --ignore-installed pyspark==2.4.4
# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.6.3

openjdk version "1.8.0_282"
OpenJDK Runtime Environment (build 1.8.0_282-8u282-b08-0ubuntu1~18.04-b08)
OpenJDK 64-Bit Server VM (build 25.282-b08, mixed mode)
Collecting pyspark==2.4.4
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 69kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 51.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130389 sha256=da25fa4f14547dcaa97e4bf5768239351cab6663fc0fb980ab4210026812f68d
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d18423005

In [None]:
! pip install --ignore-installed spark-nlp==2.6.3

Collecting spark-nlp==2.6.3
  Using cached https://files.pythonhosted.org/packages/84/84/3f15673db521fbc4e8e0ec3677a019ba1458b2cb70f0f7738c221511ef32/spark_nlp-2.6.3-py2.py3-none-any.whl
Installing collected packages: spark-nlp
Successfully installed spark-nlp-2.6.3


In [None]:

! sudo update-alternatives --config java

There are 2 choices for the alternative java (providing /usr/bin/java).

  Selection    Path                                            Priority   Status
------------------------------------------------------------
  0            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      auto mode
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      manual mode
* 2            /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java   1081      manual mode

Press <enter> to keep the current choice[*], or type selection number: 


In [None]:
from pyspark.sql import SparkSession 
from pyspark.ml import Pipeline
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline 
import sparknlp
spark = sparknlp.start()
print("Spark NLP version: ", sparknlp.version()) 
print("Apache Spark version: ", spark.version)

Spark NLP version:  2.6.3
Apache Spark version:  2.4.4


In [None]:
from google.colab import files
import io

uploaded = files.upload()

for fn in uploaded.keys():
    print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))
data_path = 'coronavirus-text-only-1000.txt'
with open(data_path, 'r') as f:
    lines = f.read().split('\n')

Saving coronavirus-text-only-1000.txt to coronavirus-text-only-1000.txt
User uploaded file "coronavirus-text-only-1000.txt" with length 149569 bytes


In [None]:
## if you are reading file from local storage
file_location = r'./coronavirus-text-only-1000.txt'
# if you are reading file from hdfs
#file_location = r'hdfs:\\\path\file' 
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type)\
.option("inferSchema", infer_schema)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load(file_location)
# Verify the count
df.count()

999

In [None]:
document_assembler = DocumentAssembler()\
.setInputCol("text")\
.setOutputCol("document")\
.setCleanupMode("shrink")

In [None]:
tokenizer = Tokenizer()\
.setInputCols(["document"])\
.setOutputCol("token")\
.setSplitChars(['-'])\
.setContextChars(['(', ')', '?', '!'])\
.setSplitPattern("'")\
.setMaxLength(0)\
.setMaxLength(99999)\
.setCaseSensitiveExceptions(False)

In [None]:
normalizer = Normalizer()\
.setInputCols(["token"])\
.setOutputCol("normalized")

In [None]:
stopwords_cleaner = StopWordsCleaner()\
.setInputCols("normalized")\
.setOutputCol("cleanTokens")\
.setStopWords(["coronavirus""])\
.setCaseSensitive(False)

In [None]:
stemmer = Stemmer()\
.setInputCols(["cleanTokens"])\
.setOutputCol("stem")

In [None]:
finisher = Finisher()\
.setInputCols(["stem"])\
.setOutputCols(["tokens"])\
.setOutputAsArray(True)\
.setCleanAnnotations(False)

In [None]:
nlp_pipeline = Pipeline( stages=[document_assembler,
            tokenizer,
            normalizer,
            stopwords_cleaner,
            stemmer,
            finisher])

In [None]:
nlp_model = nlp_pipeline.fit(df)
processed_df = nlp_model.transform(df)
tokens_df = processed_df.select('tokens').limit(10000) 
tokens_df.show()

+--------------------+
|              tokens|
+--------------------+
|[studi, look, at,...|
|[erictopol, these...|
|[npr, work, mom, ...|
|[harveywalk, al, ...|
|[cnnee, farmacuti...|
|[reutersworld, hu...|
|[cnn, thi, illino...|
|[censelio, argent...|
|[jilevin, trump, ...|
|[propublica, pres...|
|[nsw, close, vict...|
|[aslavitt, trumpä...|
|[claytravi, death...|
|[jamesgunn, ve, k...|
|[natashafatah, sc...|
|[crissl, yäôall, ...|
|[censelio, argent...|
|[villarruelclau, ...|
|[jaxalemani, äúal...|
|[jamesgunn, ve, k...|
+--------------------+
only showing top 20 rows



In [None]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)
# train the model
cv_model = cv.fit(tokens_df)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_df)

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 5
lda = LDA(k=num_topics, maxIter= 500)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

The lower bound on the log likelihood of the entire corpus: -49900.972540208335
The upper bound on perplexity: 5.2990307465443705


In [None]:
# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary 
topics = model.describeTopics() 
topics_rdd = topics.rdd 
topics_words = topics_rdd\
    .map(lambda row: row['termIndices'])\
    .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
    .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
        print(word)
    print("*"*25)

topic: 0
*************************

have
ar
peopl
case
new
death
on
who
their
*************************
topic: 1
*************************
work
hour
on
npr
more
than
but
recent
now
isä
*************************
topic: 2
*************************
i
thi
for
with
be
ha
will
covid
from
get
*************************
topic: 3
*************************
lo
caso
covid
la
con
del
m
da
mil
se
*************************
topic: 4
*************************
she
trump
even
donald
job
could
chines
than
said
t
*************************
