![JohnSnowLabs](https://nlp.johnsnowlabs.com/assets/images/logo.png)

# Spark NLP and Spark ML Pipelines

## Simple Topic Modeling

`Spark-NLP`
* DocumentAssembler
* SentenceDetector
* Tokenizer
* Normalizer
* POS tagger
* Chunker
* Finisher

`Spark ML`
* Hashing
* TF-IDF
* LDA

In [1]:
! pip install -q pyspark==3.1.2 spark-nlp

[K     |████████████████████████████████| 212.4 MB 69 kB/s 
[K     |████████████████████████████████| 133 kB 21.2 MB/s 
[K     |████████████████████████████████| 198 kB 47.2 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import sys
import time

from pyspark.sql.functions import col
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, Tokenizer
from pyspark.ml.clustering import LDA, LDAModel

#Spark NLP
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.common import RegexRule
from sparknlp.base import *

### Let's create a Spark Session for our app

In [3]:
spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

Spark NLP version:  3.3.4
Apache Spark version:  3.1.2


Let's download some scientific sample from PubMed dataset:
```
wget -N 	https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/pubmed/pubmed-sample.csv -P /tmp
```

In [4]:
! wget -N 	https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/pubmed/pubmed-sample.csv -P /tmp

--2021-12-02 08:14:47--  https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/pubmed/pubmed-sample.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.104.110
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.104.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10484510 (10.0M) [text/csv]
Saving to: ‘/tmp/pubmed-sample.csv’


2021-12-02 08:14:48 (28.5 MB/s) - ‘/tmp/pubmed-sample.csv’ saved [10484510/10484510]



In [5]:
pubMedDF = spark.read\
                .option("header", "true")\
                .csv("/tmp/pubmed-sample.csv")\
                .filter("AB IS NOT null")\
                .withColumn("text", col("AB"))\
                .drop("TI", "AB")

In [6]:
pubMedDF.printSchema()
pubMedDF.show()
print('rows', pubMedDF.count())
pubMedDF = pubMedDF.limit(200) #minimize dataset if you are not running on a cluster

root
 |-- text: string (nullable = true)

+--------------------+
|                text|
+--------------------+
|The human KCNJ9 (...|
|BACKGROUND: At pr...|
|OBJECTIVE: To inv...|
|Combined EEG/fMRI...|
|Kohlschutter synd...|
|Statistical analy...|
|The synthetic DOX...|
|Our objective was...|
|We conducted a ph...|
|"Monomeric sarcos...|
|We presented the ...|
|The literature de...|
|A novel approach ...|
|An HPLC-ESI-MS-MS...|
|The localizing an...|
|OBJECTIVE: To eva...|
|For the construct...|
|We report the res...|
|Intraparenchymal ...|
|It is known that ...|
+--------------------+
only showing top 20 rows

rows 7537


### Let's create Spark-NLP Pipeline

In [7]:
# Spark NLP Pipeline

document_assembler = DocumentAssembler() \
    .setInputCol("text")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

posTagger = PerceptronModel.pretrained() \
  .setInputCols(["sentence", "token"])

chunker = Chunker() \
    .setInputCols(["sentence", "pos"]) \
    .setOutputCol("chunk") \
    .setRegexParsers(["<NNP>+", "<DT>?<JJ>*<NN>"])

finisher = Finisher() \
  .setInputCols(["chunk"]) \
  .setIncludeMetadata(False)

nlpPipeline = Pipeline(stages=[
    document_assembler, 
    sentence_detector, 
    tokenizer,
    posTagger,
    chunker,
    finisher
])

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]


In [8]:
nlpPipelineDF = nlpPipeline.fit(pubMedDF).transform(pubMedDF)

### Let's create Spark ML Pipeline

In [9]:
# SPark ML Pipeline

cv = CountVectorizer(inputCol="finished_chunk", outputCol="features", vocabSize=1000, minDF=10.0, minTF=10.0)
idf = IDF(inputCol="features", outputCol="idf")
lda = LDA(k=10, maxIter=5)
### Let's create Spark-NLP Pipeline
mlPipeline = Pipeline(stages=[
    cv,
    idf,
    lda
])

### We are going to train Spark ML Pipeline by using Spark-NLP Pipeline

In [10]:
# Let's create Spark-NLP Pipeline
mlModel = mlPipeline.fit(nlpPipelineDF)

In [11]:
mlPipelineDF = mlModel.transform(nlpPipelineDF)

In [12]:
mlPipelineDF.show()

+--------------------+--------------------+----------+----------+--------------------+
|                text|      finished_chunk|  features|       idf|   topicDistribution|
+--------------------+--------------------+----------+----------+--------------------+
|The human KCNJ9 (...|[KCNJ9, Kir, GIRK...|(39,[],[])|(39,[],[])|[0.0,0.0,0.0,0.0,...|
|BACKGROUND: At pr...|[BACKGROUND, the ...|(39,[],[])|(39,[],[])|[0.0,0.0,0.0,0.0,...|
|OBJECTIVE: To inv...|[OBJECTIVE, =9796...|(39,[],[])|(39,[],[])|[0.0,0.0,0.0,0.0,...|
|Combined EEG/fMRI...|[Combined EEG/fMR...|(39,[],[])|(39,[],[])|[0.0,0.0,0.0,0.0,...|
|Kohlschutter synd...|[Kohlschutter, sy...|(39,[],[])|(39,[],[])|[0.0,0.0,0.0,0.0,...|
|Statistical analy...|[Statistical, ana...|(39,[],[])|(39,[],[])|[0.0,0.0,0.0,0.0,...|
|The synthetic DOX...|[DOX-LNA, conjuga...|(39,[],[])|(39,[],[])|[0.0,0.0,0.0,0.0,...|
|Our objective was...|[objective, blood...|(39,[],[])|(39,[],[])|[0.0,0.0,0.0,0.0,...|
|We conducted a ph...|[II, a phase, stu...|

In [13]:
ldaModel = mlModel.stages[2]

In [14]:
ll = ldaModel.logLikelihood(mlPipelineDF)
lp = ldaModel.logPerplexity(mlPipelineDF)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))


The lower bound on the log likelihood of the entire corpus: -507.62578437116224
The upper bound on perplexity: 22.070686277007052


In [15]:
# Describe topics.
print("The topics described by their top-weighted terms:")
ldaModel.describeTopics(3).show(truncate=False)

The topics described by their top-weighted terms:
+-----+------------+------------------------------------------------------------------+
|topic|termIndices |termWeights                                                       |
+-----+------------+------------------------------------------------------------------+
|0    |[33, 16, 14]|[0.03018628309769907, 0.029681217516439682, 0.02869041483345357]  |
|1    |[8, 12, 30] |[0.03091504468994952, 0.030564905055513247, 0.030347406718960728] |
|2    |[38, 23, 34]|[0.030800605263050097, 0.0303272771337697, 0.030183291985159597]  |
|3    |[35, 10, 1] |[0.03224629748995109, 0.031950817237563725, 0.030392057367293862] |
|4    |[23, 3, 37] |[0.03324215054134379, 0.029263134932537502, 0.02901417457626244]  |
|5    |[13, 30, 25]|[0.03229819762533601, 0.03228638681256154, 0.0304560373241258]    |
|6    |[13, 37, 5] |[0.030097751872073652, 0.029355989145604066, 0.029005240587263746]|
|7    |[5, 24, 4]  |[0.0318190860429291, 0.03168316394538527, 0.029143

### Let's look at out topics
NOTE: More cleaning, filtering, playing around with `CountVectorizer`, and more iterations in `LDA` will result in better Topic Modelling results.

In [16]:
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")

topics = ldaModel.describeTopics(20)
topics_rdd = topics.rdd

vocab = mlModel.stages[0].vocabulary

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print("topic: ", idx)
    print("----------")
    for word in topic:
        print(word)
    print("----------")

Learned topics (as distributions over vocab of 39 words):
topic:  0
----------
level
activity
BACKGROUND
addition
contrast
CONCLUSIONS
response
function
study
rate
therapy
analysis
protein
OBJECTIVE
P
PURPOSE
CONCLUSION
method
cell
vivo
----------
topic:  1
----------
analysis
DNA
response
expression
age
function
P
BACKGROUND
therapy
family
cancer
<
contrast
addition
protein
risk
OBJECTIVE
METHODS
CONCLUSION
),
----------
topic:  2
----------
factor
disease
contrast
cancer
therapy
CONCLUSION
protein
this study
<
METHODS
gene
level
study
activity
vitro
age
P
treatment
response
method
----------
topic:  3
----------
family
CONCLUSIONS
),
contrast
expression
function
response
vitro
this study
serum
gene
cell
DNA
OBJECTIVE
CONCLUSION
activity
).
BACKGROUND
study
treatment
----------
topic:  4
----------
disease
METHODS
vivo
group
<
cancer
study
function
DNA
risk
analysis
family
CONCLUSIONS
factor
serum
vitro
response
CONCLUSION
expression
).
----------
topic:  5
----------
rate
response
me