# Using SparkNLP, Spark with Tensorflow in Google CoLab

The below code was taken from this talk:
- [Natural Language Understanding at Scale with Spark Native NLP, Spark ML &TensorFlow](https://www.youtube.com/watch?v=k5X12mdEvb8) by Alex Thomas

- The original source of the code lives in [Alex Thomas' repo on GitHub](https://github.com/alexander-n-thomas/sparksummiteunlp)

In [1]:
# https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/quick_start_google_colab.ipynb
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.3

# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.2.2

openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)
Collecting pyspark==2.4.3
[?25l  Downloading https://files.pythonhosted.org/packages/37/98/244399c0daa7894cdf387e7007d5e8b3710a79b67f3fd991c0b0b644822d/pyspark-2.4.3.tar.gz (215.6MB)
[K     |████████████████████████████████| 215.6MB 48kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 35.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.3-py2.py3-none-any.whl size=215964963 sha256=d3e3cff114f6a2daa538955b71e0d2a8ad85772f9638b5ab8b3aa1630c8427ee
  Stored in directory: /root/.cache/pip/wheels/8d/20/f0/b30e2024

In [2]:
import sparknlp
spark = sparknlp.start(include_ocr=True)

print("Spark NLP version")
sparknlp.version()
print("Apache Spark version")
spark.version

Spark NLP version
2.2.2
Apache Spark version


'2.4.3'

In [3]:
!pip install BioPython

Collecting BioPython
[?25l  Downloading https://files.pythonhosted.org/packages/96/01/7e5858a1e54bd0bd0d179cd74654740f07e86fb921a43dd20fb8beabe69d/biopython-1.75-cp36-cp36m-manylinux1_x86_64.whl (2.3MB)
[K     |████████████████████████████████| 2.3MB 5.0MB/s 
Installing collected packages: BioPython
Successfully installed BioPython-1.75


In [4]:
from Bio import Entrez, Medline
import numpy as np
import pandas as pd
from sklearn.utils import resample
from sklearn.metrics import classification_report

import pyspark
from pyspark.ml import Pipeline, feature as spark_ft
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *

from sparknlp.annotator import *
from sparknlp.base import DocumentAssembler

import tensorflow as tf

In [0]:
def query(terms, num_docs=1000):
    search_term = '+'.join(terms)
    print('Searching PubMed abstracts for documents containing term: ',search_term)
    handle = Entrez.esearch(db="pubmed", term=search_term, retmax=num_docs)
    record = Entrez.read(handle)
    handle.close()
    idlist = record["IdList"]
    
    handle = Entrez.efetch(db="pubmed", id=idlist, rettype="medline",retmode="text")
    records = Medline.parse(handle)
    data = []
    for record in records:
        data.append((record.get("TI", "?"),record.get("AU", "?"),record.get("SO", "?"),record.get("AB","?")))

    df = pd.DataFrame(data=data, columns=['title','authors','source','text'])
    df.head(10)

    df.replace(r'^\?$', np.nan, regex=True, inplace=True)
    df['authors'] = df['authors'].apply(lambda x: x if isinstance(x, list) else [])
    df.fillna('', inplace=True)
    df['topic'] = search_term
    
    return spark.createDataFrame(df)

In [0]:
topics = [
    ['type', '1', 'diabetes'], 
    ['creutzfeldt', 'jakob', 'disease'], 
    ['post', 'traumatic', 'stress', 'disorder'],
    ['heart', 'disease'],
    ['AIDS'],
    ['breast', 'cancer']
]

In [8]:
texts = None

np.random.seed(123)
for terms in topics:
    num_docs = np.random.randint(200, 1000)
    print('terms', terms, 'num_docs', num_docs)
    if texts is None:
        texts = query(terms, num_docs)
    else:
        texts = texts.union(query(terms, num_docs))

terms ['type', '1', 'diabetes'] num_docs 710
Searching PubMed abstracts for documents containing term:  type+1+diabetes


Email address is not specified.

To make use of NCBI's E-utilities, NCBI requires you to specify your
email address with each request.  As an example, if your email address
is A.N.Other@example.com, you can specify it as follows:
   from Bio import Entrez
   Entrez.email = 'A.N.Other@example.com'
In case of excessive usage of the E-utilities, NCBI will attempt to contact
a user at the email address provided before blocking access to the
E-utilities.


terms ['creutzfeldt', 'jakob', 'disease'] num_docs 565
Searching PubMed abstracts for documents containing term:  creutzfeldt+jakob+disease
terms ['post', 'traumatic', 'stress', 'disorder'] num_docs 582
Searching PubMed abstracts for documents containing term:  post+traumatic+stress+disorder
terms ['heart', 'disease'] num_docs 522
Searching PubMed abstracts for documents containing term:  heart+disease
terms ['AIDS'] num_docs 298
Searching PubMed abstracts for documents containing term:  AIDS
terms ['breast', 'cancer'] num_docs 942
Searching PubMed abstracts for documents containing term:  breast+cancer


In [9]:
texts.show()

+--------------------+--------------------+--------------------+--------------------+---------------+
|               title|             authors|              source|                text|          topic|
+--------------------+--------------------+--------------------+--------------------+---------------+
|Correction to: Da...|  [Paik J, Blair HA]|Drugs. 2019 Nov 2...|The article Dapag...|type+1+diabetes|
|Insulin-Independe...|[Gunawardana SC, ...|Transplant Direct...|As our previous p...|type+1+diabetes|
|Interleukin-6 and...|[Siewko K, Maciul...|Biomed Res Int. 2...|Aim: The aim of o...|type+1+diabetes|
|The Effect of Com...|[Hendrijantini N,...|Contemp Clin Dent...|Background: Prolo...|type+1+diabetes|
|Cytotoxic T-lymph...|[Alshareef SA, Om...|BMC Res Notes. 20...|OBJECTIVES: This ...|type+1+diabetes|
|Do-It-Yourself (D...|            [Hng TM]|J Diabetes Sci Te...|Do-It-Yourself cl...|type+1+diabetes|
|The Endocannabino...|[Argenziano M, To...|Int J Mol Sci. 20...|Endocannabinoid s.

In [10]:
texts.first()

Row(title='Correction to: Dapagliflozin: A Review in Type 1 Diabetes.', authors=['Paik J', 'Blair HA'], source='Drugs. 2019 Nov 26. pii: 10.1007/s40265-019-01238-2. doi: 10.1007/s40265-019-01238-2.', text='The article Dapagliflozin: A Review in Type 1 Diabetes, written by Julia Paik and Hannah A. Blair, was originally published Online First without Open Access.', topic='type+1+diabetes')

In [11]:
texts = texts.filter('text != ""').persist()
texts.count()

3330

In [0]:
train, test = texts.randomSplit(weights=[0.8, 0.2], seed=123)

In [0]:
vocab_size=500

In [0]:
#import pyspark
#from pyspark.ml.feature import RegexTokenizer

from sparknlp.annotator import Tokenizer

tokenizer = Tokenizer()         \
    .setInputCols("sentence")   \
    .setOutputCol("token")

# tokenizer = RegexTokenizer( inputCol="sentence", outputCol="token" )


In [0]:
from sparknlp.annotator import SentenceDetector

sentence_detector = SentenceDetector( ) \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")



In [0]:
from sparknlp.base import DocumentAssembler

document_assembler = DocumentAssembler().setInputCol("text")

In [0]:
from sparknlp.base import Finisher

finisher = Finisher( )

finisher = finisher.setInputCols('token')
finisher = finisher.setOutputCols(['tokens'])
finisher = finisher.setOutputAsArray(True)
#finisher.setIncludeKeys(True)

In [0]:
sw_remover = spark_ft.StopWordsRemover()\
    .setInputCol('tokens')\
    .setOutputCol('cleantokens')\
    .setStopWords(spark_ft.StopWordsRemover.loadDefaultStopWords('english'))

In [0]:
hashingtf = spark_ft.HashingTF()\
    .setInputCol('cleantokens')\
    .setOutputCol('tf')\
    .setNumFeatures(vocab_size)

In [0]:
idf = spark_ft.IDF()\
    .setInputCol('tf')\
    .setOutputCol('tfidf')

In [0]:
label_indexer = spark_ft.StringIndexer(inputCol='topic', outputCol='label')

#model = label_indexer.fit(texts)
#df = model.transform(texts)
#
#df.show()

In [0]:
pipeline = Pipeline(stages=[document_assembler, sentence_detector, tokenizer, finisher, sw_remover, hashingtf,idf, label_indexer])

In [0]:
pipeline_model = pipeline.fit(train)

In [0]:
tx_train = pipeline_model.transform(train)

In [25]:
tx_train.show()

+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+-----+
|               title|             authors|              source|                text|          topic|              tokens|         cleantokens|                  tf|               tfidf|label|
+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+-----+
|"Out of the box" ...|[Paret M, Barash ...|Acta Diabetol. 20...|BACKGROUND: Use o...|type+1+diabetes|[BACKGROUND, :, U...|[BACKGROUND, :, U...|(500,[1,2,4,24,25...|(500,[1,2,4,24,25...|  1.0|
|A mHealth Support...|[Ng AH, Crowe TC,...|Digit Health. 201...|Aims and Objectiv...|type+1+diabetes|[Aims, and, Objec...|[Aims, Objectives...|(500,[1,9,19,20,2...|(500,[1,9,19,20,2...|  1.0|
|A model-based app...|[Jiang T, Lu Y, D.

In [0]:
tx_test = pipeline_model.transform(test)

In [0]:
train_df = tx_train.select('title', 'label', 'tfidf').toPandas()

In [28]:
train_df.head()

Unnamed: 0,title,label,tfidf
0,"""Out of the box"" solution for skin problems du...",1.0,"(0.0, 1.9988735601520602, 1.7457491635792217, ..."
1,A mHealth Support Program for Australian Young...,1.0,"(0.0, 2.99831034022809, 0.0, 0.0, 0.0, 0.0, 0...."
2,A model-based approach for clustering of multi...,1.0,"(0.0, 1.9988735601520602, 0.0, 0.0, 0.0, 0.0, ..."
3,"A physician-initiated double-blind, randomised...",1.0,"(0.0, 0.9994367800760301, 0.0, 0.0, 0.0, 0.0, ..."
4,A pilot study of preproinsulin peptides reacti...,1.0,"(2.7606661832067827, 4.997183900380151, 0.0, 1..."


In [29]:
test_df = tx_test.select('title', 'label', 'tfidf').toPandas()
test_df.head()

Unnamed: 0,title,label,tfidf
0,A Multiple Hypothesis Approach to Estimating M...,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.5677194588668..."
1,A Systematic Review of Case-Identification Alg...,1.0,"(0.0, 0.9994367800760301, 0.0, 1.5018377541827..."
2,A composite immune signature parallels disease...,1.0,"(0.0, 0.9994367800760301, 3.4914983271584434, ..."
3,A preclinical assessment to repurpose drugs to...,1.0,"(2.7606661832067827, 0.0, 5.237247490737666, 0..."
4,A review of the NG17 recommendations for the u...,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.7031583766004..."


In [0]:
train_X = tf.convert_to_tensor(np.vstack(train_df['tfidf'].apply(lambda sv: sv.toArray()).tolist()), dtype=tf.float32)
train_Y = tf.convert_to_tensor(pd.get_dummies(train_df['label']).values, dtype=tf.float32)

In [0]:
test_X = tf.convert_to_tensor(np.vstack(test_df['tfidf'].apply(lambda sv: sv.toArray()).tolist()), dtype=tf.float32)
test_Y = tf.convert_to_tensor(pd.get_dummies(test_df['label']).values, dtype=tf.float32)

In [0]:
# Parameters
learning_rate = 0.1
num_steps = 300
batch_size = 128
display_step = 10

# Network Parameters
num_input = vocab_size
n_hidden_1 = int(vocab_size / 3) # 1st layer number of neurons
n_hidden_2 = int(vocab_size / 3) # 2nd layer number of neurons
num_classes = len(topics)

# tf Graph input
X = tf.placeholder("float", [None, num_input])
Y = tf.placeholder("float", [None, num_classes])

In [0]:
# Store layers weight & bias
weights = {
    'h1': tf.Variable(tf.random_normal([num_input, n_hidden_1])),
    'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])),
    'out': tf.Variable(tf.random_normal([n_hidden_2, num_classes]))
}
biases = {
    'b1': tf.Variable(tf.random_normal([n_hidden_1])),
    'b2': tf.Variable(tf.random_normal([n_hidden_2])),
    'out': tf.Variable(tf.random_normal([num_classes]))
}

In [0]:
# Create model
def neural_net(x):
    # Hidden fully connected layer with 256 neurons
    layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1'])
    # Hidden fully connected layer with 256 neurons
    layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2'])
    # Output fully connected layer with a neuron for each class
    out_layer = tf.matmul(layer_2, weights['out']) + biases['out']
    return out_layer

In [35]:
# Construct model
logits = neural_net(train_X)

# Define loss and optimizer
loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(
    logits=logits, labels=train_Y))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(loss_op)

# Evaluate model (with test logits, for dropout to be disabled)
y_pred_op = tf.argmax(logits, 1)
y_true_op = tf.argmax(train_Y, 1)
correct_pred_op = tf.equal(y_pred_op, y_true_op)
accuracy_op = tf.reduce_mean(tf.cast(correct_pred_op, tf.float32))

# Initialize the variables (i.e. assign their default value)
init = tf.global_variables_initializer()

Instructions for updating:

Future major versions of TensorFlow will allow gradients to flow
into the labels input on backprop by default.

See `tf.nn.softmax_cross_entropy_with_logits_v2`.



In [36]:
# Start training
with tf.Session() as sess:
    # Run the initializer
    sess.run(init)
    tf.train.start_queue_runners(sess)
    train_X_eval = train_X.eval()
    train_Y_eval = train_Y.eval()
    test_X_eval = test_X.eval()
    test_Y_eval = test_Y.eval()

Instructions for updating:
To construct input pipelines, use the `tf.data` module.


In [37]:
# Start training
with tf.Session() as sess:
    # Run the initializer
    sess.run(init)
    tf.train.start_queue_runners(sess)
    for step in range(1, num_steps+1):
        x, y = resample(train_X_eval, train_Y_eval, n_samples=100)
        # Run optimization op (backprop)
        sess.run(train_op, feed_dict={X: x, Y: y})
        if step % display_step == 0 or step == 1:
            # Calculate batch loss and accuracy
            loss, acc = sess.run([loss_op, accuracy_op], feed_dict={X: x, Y: y})
            print("Step " + str(step) + ", Minibatch Loss= " + "{:.4f}".format(loss) + ", Training Accuracy= " + "{:.3f}".format(acc))

    print("Optimization Finished!")

    accuracy, y_pred, y_true = sess.run([accuracy_op, y_pred_op, y_true_op], feed_dict={X: test_X.eval(), Y: test_Y.eval()})
    print("Testing Accuracy:", accuracy)

Step 1, Minibatch Loss= 12936.9717, Training Accuracy= 0.393
Step 10, Minibatch Loss= 2401.2583, Training Accuracy= 0.700
Step 20, Minibatch Loss= 408.4059, Training Accuracy= 0.898
Step 30, Minibatch Loss= 105.8308, Training Accuracy= 0.948
Step 40, Minibatch Loss= 33.2053, Training Accuracy= 0.976
Step 50, Minibatch Loss= 7.4082, Training Accuracy= 0.989
Step 60, Minibatch Loss= 4.4690, Training Accuracy= 0.992
Step 70, Minibatch Loss= 2.5371, Training Accuracy= 0.994
Step 80, Minibatch Loss= 2.9678, Training Accuracy= 0.993
Step 90, Minibatch Loss= 2.1925, Training Accuracy= 0.994
Step 100, Minibatch Loss= 3.1888, Training Accuracy= 0.994
Step 110, Minibatch Loss= 2.5573, Training Accuracy= 0.994
Step 120, Minibatch Loss= 2.8129, Training Accuracy= 0.995
Step 130, Minibatch Loss= 3.6936, Training Accuracy= 0.995
Step 140, Minibatch Loss= 3.8285, Training Accuracy= 0.995
Step 150, Minibatch Loss= 2.8254, Training Accuracy= 0.994
Step 160, Minibatch Loss= 2.9666, Training Accuracy= 0.

In [38]:
print(classification_report(y_true, y_pred, target_names=pipeline_model.stages[-1].labels))

                                precision    recall  f1-score   support

                 breast+cancer       0.99      1.00      0.99       719
               type+1+diabetes       1.00      1.00      1.00       521
post+traumatic+stress+disorder       1.00      1.00      1.00       430
                 heart+disease       0.99      0.98      0.99       394
     creutzfeldt+jakob+disease       1.00      1.00      1.00       388
                          AIDS       0.99      0.99      0.99       223

                      accuracy                           1.00      2675
                     macro avg       1.00      0.99      0.99      2675
                  weighted avg       1.00      1.00      1.00      2675

