![JohnSnowLabs](https://nlp.johnsnowlabs.com/assets/images/logo.png)

# Spark NLP and Spark ML Pipelines

## Simple Topic Modeling

`Spark-NLP`
* DocumentAssembler
* SentenceDetector
* Tokenizer
* Normalizer
* POS tagger
* Chunker
* Finisher

`Spark ML`
* Hashing
* TF-IDF
* LDA

In [None]:
import sys
import time

from pyspark.sql.functions import col
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, Tokenizer
from pyspark.ml.clustering import LDA, LDAModel

#Spark NLP
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.common import RegexRule
from sparknlp.base import *

### Let's create a Spark Session for our app

In [2]:
spark = sparknlp.start()

Let's download some scientific sample from PubMed dataset:
```
wget -N 	https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/pubmed/pubmed-sample.csv -P /tmp
```

In [None]:
! wget -N 	https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/pubmed/pubmed-sample.csv -P /tmp

In [3]:
pubMedDF = spark.read\
                .option("header", "true")\
                .csv("/tmp/pubmed-sample.csv")\
                .filter("AB IS NOT null")\
                .withColumn("text", col("AB"))\
                .drop("TI", "AB")

In [4]:
pubMedDF.printSchema()
pubMedDF.show()

root
 |-- text: string (nullable = true)

+--------------------+
|                text|
+--------------------+
|The human KCNJ9 (...|
|BACKGROUND: At pr...|
|OBJECTIVE: To inv...|
|Combined EEG/fMRI...|
|Kohlschutter synd...|
|Statistical analy...|
|The synthetic DOX...|
|Our objective was...|
|We conducted a ph...|
|"Monomeric sarcos...|
|We presented the ...|
|The literature de...|
|A novel approach ...|
|An HPLC-ESI-MS-MS...|
|The localizing an...|
|OBJECTIVE: To eva...|
|For the construct...|
|We report the res...|
|Intraparenchymal ...|
|It is known that ...|
+--------------------+
only showing top 20 rows



In [5]:
pubMedDF.count()

7537

### Let's create Spark-NLP Pipeline

In [7]:
# Spark NLP Pipeline

document_assembler = DocumentAssembler() \
    .setInputCol("text")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

posTagger = PerceptronModel.pretrained() \
  .setInputCols(["sentence", "token"])

chunker = Chunker() \
    .setInputCols(["sentence", "pos"]) \
    .setOutputCol("chunk") \
    .setRegexParsers(["<NNP>+", "<DT>?<JJ>*<NN>"])

finisher = Finisher() \
  .setInputCols(["chunk"]) \
  .setIncludeMetadata(False)

nlpPipeline = Pipeline(stages=[
    document_assembler, 
    sentence_detector, 
    tokenizer,
    posTagger,
    chunker,
    finisher
])

CPU times: user 14 ms, sys: 4.23 ms, total: 18.3 ms
Wall time: 48.3 ms


In [8]:
nlpPipelineDF = nlpPipeline.fit(pubMedDF).transform(pubMedDF)

CPU times: user 18.1 ms, sys: 6.6 ms, total: 24.7 ms
Wall time: 450 ms


### Let's create Spark ML Pipeline

In [9]:
# SPark ML Pipeline

cv = CountVectorizer(inputCol="finished_chunk", outputCol="features", vocabSize=1000, minDF=10.0, minTF=10.0)
idf = IDF(inputCol="features", outputCol="idf")
lda = LDA(k=10, maxIter=5)
### Let's create Spark-NLP Pipeline
mlPipeline = Pipeline(stages=[
    cv,
    idf,
    lda
])

CPU times: user 3.64 ms, sys: 855 µs, total: 4.49 ms
Wall time: 30.1 ms


### We are going to train Spark ML Pipeline by using Spark-NLP Pipeline

In [10]:
# Let's create Spark-NLP Pipeline
mlModel = mlPipeline.fit(nlpPipelineDF)

CPU times: user 149 ms, sys: 88.2 ms, total: 238 ms
Wall time: 17min 59s


In [11]:
mlPipelineDF = mlModel.transform(nlpPipelineDF)

CPU times: user 23.6 ms, sys: 9.08 ms, total: 32.7 ms
Wall time: 95.5 ms


In [12]:
mlPipelineDF.show()

+--------------------+--------------------+------------+------------+--------------------+
|                text|      finished_chunk|    features|         idf|   topicDistribution|
+--------------------+--------------------+------------+------------+--------------------+
|The human KCNJ9 (...|[KCNJ9, Kir, GIRK...|(1000,[],[])|(1000,[],[])|[0.0,0.0,0.0,0.0,...|
|BACKGROUND: At pr...|[BACKGROUND, the ...|(1000,[],[])|(1000,[],[])|[0.0,0.0,0.0,0.0,...|
|OBJECTIVE: To inv...|[OBJECTIVE, the r...|(1000,[],[])|(1000,[],[])|[0.0,0.0,0.0,0.0,...|
|Combined EEG/fMRI...|[Combined EEG/fMR...|(1000,[],[])|(1000,[],[])|[0.0,0.0,0.0,0.0,...|
|Kohlschutter synd...|[Kohlschutter, sy...|(1000,[],[])|(1000,[],[])|[0.0,0.0,0.0,0.0,...|
|Statistical analy...|[Statistical, ana...|(1000,[],[])|(1000,[],[])|[0.0,0.0,0.0,0.0,...|
|The synthetic DOX...|[DOX-LNA, conjuga...|(1000,[],[])|(1000,[],[])|[0.0,0.0,0.0,0.0,...|
|Our objective was...|[objective, blood...|(1000,[],[])|(1000,[],[])|[0.0,0.0,0.0,0.0,...|

In [13]:
ldaModel = mlModel.stages[2]

In [None]:
ll = ldaModel.logLikelihood(mlPipelineDF)
lp = ldaModel.logPerplexity(mlPipelineDF)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))


In [14]:
# Describe topics.
print("The topics described by their top-weighted terms:")
ldaModel.describeTopics(3).show(truncate=False)

The topics described by their top-weighted terms:
+-----+---------------+-------------------------------------------------------------------+
|topic|termIndices    |termWeights                                                        |
+-----+---------------+-------------------------------------------------------------------+
|0    |[14, 548, 186] |[0.01585514997343471, 0.008875496465547342, 0.0013124023225362833] |
|1    |[50, 954, 351] |[0.008349254284205958, 0.00753547628648889, 0.0013099340062148375] |
|2    |[660, 475, 665]|[0.009414105927154907, 0.008065746072952672, 0.007336631079291149] |
|3    |[0, 3, 830]    |[0.14763974789592677, 0.016467518100805777, 0.01128479619300655]   |
|4    |[125, 927, 453]|[0.010021930458005748, 0.008013699542864731, 0.00783221889190857]  |
|5    |[570, 72, 490] |[0.0087714655865149, 0.008257067329311462, 0.0012890162564849917]  |
|6    |[362, 580, 101]|[0.007757053975635789, 0.007476033213583562, 0.007462557297979436] |
|7    |[49, 26, 18]   |[0.0277

### Let's look at out topics
NOTE: More cleaning, filtering, playing around with `CountVectorizer`, and more iterations in `LDA` will result in better Topic Modelling results.

In [15]:
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")

topics = ldaModel.describeTopics(50)
topics_rdd = topics.rdd

vocab = mlModel.stages[0].vocabulary

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print("topic: ", idx)
    print("----------")
    for word in topic:
       print(word)
    print("----------")

Learned topics (as distributions over vocab of 1000 words):
topic:  0
----------
+
IgE
sensitivity
muscle
interval
OBJECTIVE
resection
fraction
inhibit
distance
thyroid
diameter
adult
hypertension
status
tothe
the role
surface
mmHg
necrosis
interest
RF
transcription
balance
CD4
the onset
stability
a variety
polymerase
DNA
disease
mortality
trial
presence
absence
form
absorption
problem
chromosome
the degree
the range
Treatment
tool
brain
prevalence
inthe
classification
progression
skin
outcome
----------
topic:  1
----------
mice
GnRH
incidence
receptor
structure
attention
range
CD4
The method
failure
cleavage
the range
phase
mitochondria
susceptibility
enhancement
mucosal
PSA
deficiency
vector
skin
damage
The aim
oxygen
chemical
percentage
technology
CF
plant
oxidase
The expression
delivery
aureus
mature
the present study
this work
point
evidence
adjustment
combination
January
preterm
antigen
blood
IL-6
the field
tool
SS
dependence
[
----------
topic:  2
----------
max
ER
AR
Moreover
