In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, PipelineModel
import pyspark.sql.functions as F
import pandas as pd

import sparknlp
from sparknlp.annotator import * 
from sparknlp.base import * 
from sparknlp.common import *

In [2]:
def start(gpu = False):
    builder = SparkSession.builder\
        .appName('Spark NLP - Prediction Pipeline')\
        .master('local[*]')\
        .config('spark.driver.memory', '10G')\
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
        .config('spark.kryoserializer.buffer.max', '1000M')
    if gpu:
        builder.config('spark.jars.packages', 'com.johnsnowlabs.nlp:spark-nlp-gpu_2.11:2.4.3')
    else:
        builder.config('spark.jars.packages', 'com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.3')
    
    return builder.getOrCreate()

In [3]:
import os
os.environ['PYSPARK_PYTHON'] = '/home/aminmoradi/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/aminmoradi/anaconda3/bin/python'

In [4]:
spark = start()

In [5]:
print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

Spark NLP version:  2.5.3
Apache Spark version:  2.4.5


### PREDICTION PIPELINE

In [6]:
empty_data = spark.createDataFrame([['']]).toDF('text')

In [8]:
# prediction pipeline
document = DocumentAssembler()\
        .setInputCol('text')\
        .setOutputCol('document')

sentence = SentenceDetector()\
        .setInputCols(['document'])\
        .setOutputCol('sentence')

token = Tokenizer()\
        .setInputCols(['sentence'])\
        .setOutputCol('token')

BioVec_embeddings = WordEmbeddingsModel.load('./embeddings/BioVec_model')\
        .setInputCols(['sentence', 'token'])\
        .setOutputCol('biowordvec')

model = NerDLModel.load('./models/NER_DL_no_validationSet')\
        .setInputCols(['sentence', 'token', 'biowordvec'])\
        .setOutputCol('ner')

converter = NerConverter()\
        .setInputCols(['sentence', 'token', 'ner'])\
        .setOutputCol('ner_span')

ner_pipeline = Pipeline(
            stages = [
                document,
                sentence,
                token,
                BioVec_embeddings,
                model,
                converter])

In [9]:
prediction_model = ner_pipeline.fit(empty_data)

In [20]:
text = ''' The patient was diagnosed with Stage 4 adenocarcinoma of lung with b/l lung nodules and he was prescribed 1 capsule of Advil for 5 days. Molecular testing of tumor demonstrates EGFR mutation. He was seen by the endocrinology service and he was discharged on 40 units of insulin glargine at night , 12 units of insulin lispro with meals , and metformin 1000 mg two times a day. It was determined that all SGLT2 inhibitors should be discontinued indefinitely for 3 months.'''

In [42]:
text = '''
A 28-year-old female with a history of gestational diabetes mellitus diagnosed eight years prior to presentation and subsequent type two diabetes mellitus ( T2DM ), one prior episode of HTG-induced pancreatitis three years prior to presentation , associated with an acute hepatitis , and obesity with a body mass index ( BMI ) of 33.5 kg/m2 , presented with a one-week history of polyuria , polydipsia , poor appetite , and vomiting . Two weeks prior to presentation , she was treated with a five-day course of amoxicillin for a respiratory tract infection . She was on metformin , glipizide , and dapagliflozin for T2DM and atorvastatin and gemfibrozil for HTG . She had been on dapagliflozin for six months at the time of presentation . Physical examination on presentation was significant for dry oral mucosa ; significantly , her abdominal examination was benign with no tenderness , guarding , or rigidity . Pertinent laboratory findings on admission were : serum glucose 111 mg/dl , bicarbonate 18 mmol/l , anion gap 20 , creatinine 0.4 mg/dL , triglycerides 508 mg/dL , total cholesterol 122 mg/dL , glycated hemoglobin ( HbA1c ) 10% , and venous pH 7.27 . Serum lipase was normal at 43 U/L . Serum acetone levels could not be assessed as blood samples kept hemolyzing due to significant lipemia .

'''

In [43]:
prediction_data = spark.createDataFrame([[text]]).toDF('text')

In [44]:
prediction = prediction_model.transform(prediction_data)

In [45]:
prediction.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|            document|            sentence|               token|          biowordvec|                 ner|            ner_span|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|
A 28-year-old fe...|[[document, 0, 13...|[[document, 1, 43...|[[token, 1, 1, A,...|[[word_embeddings...|[[named_entity, 1...|[[chunk, 40, 68, ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+



In [46]:
prediction.select(F.explode(F.arrays_zip('ner_span.result', 'ner_span.metadata')).alias('entities'))\
        .select(F.expr('entities["0"]').alias('chunk'),
                F.expr('entities["1"].entity').alias('entity'))\
        .show(20, truncate=False)

+-----------------------------+---------+
|chunk                        |entity   |
+-----------------------------+---------+
|gestational diabetes mellitus|problem  |
|type two diabetes mellitus   |problem  |
|T2DM                         |problem  |
|HTG-induced pancreatitis     |problem  |
|an acute hepatitis           |problem  |
|obesity                      |problem  |
|a body mass index            |test     |
|BMI                          |test     |
|polyuria                     |problem  |
|polydipsia                   |problem  |
|poor appetite                |problem  |
|vomiting                     |problem  |
|amoxicillin                  |treatment|
|a respiratory tract infection|problem  |
|metformin                    |treatment|
|glipizide                    |treatment|
|dapagliflozin                |treatment|
|T2DM                         |problem  |
|atorvastatin                 |treatment|
|gemfibrozil                  |treatment|
+-----------------------------+---

### LIGHTPIPELINE RESULTS

In [47]:
lp = LightPipeline(prediction_model)

In [48]:
result = lp.fullAnnotate(text)

In [49]:
ner_df= pd.DataFrame([(int(x.metadata['sentence']), x.result, x.begin, x.end, y.result) for x,y in zip(result[0]["token"], result[0]["ner"])], 
                      columns=['sent_id','token','start','end','ner'])
ner_df[:50]

Unnamed: 0,sent_id,token,start,end,ner
0,0,A,1,1,O
1,0,28-year-old,3,13,O
2,0,female,15,20,O
3,0,with,22,25,O
4,0,a,27,27,O
5,0,history,29,35,O
6,0,of,37,38,O
7,0,gestational,40,50,B-problem
8,0,diabetes,52,59,I-problem
9,0,mellitus,61,68,I-problem


In [83]:
import random
from IPython.core.display import display, HTML


def get_color():
    r = lambda: random.randint(10,255)
    return '#%02X%02X%02X' % (r(),r(),r())

In [128]:
import spacy
from spacy import displacy

def show_html_spacy(annotated_text, filter_labels=True):

    label_list = []
    sent_dict_list = []
    
    for n in annotated_text['ner_span']:

        ent = {'start': n.begin, 'end':n.end+1, 'label':n.metadata['entity'].upper()}
        
        label_list.append(n.metadata['entity'].upper())

        sent_dict_list.append(ent)
   
    document_text = [{'text':annotated_text['document'][0].result, 'ents':sent_dict_list,'title':None}]

    label_list = list(set(label_list))
                
    label_color={}
    
    for l in label_list:
        
        label_color[l]=get_color()
    
    colors = {'TEST': '#51B7F2', 'TREATMENT': '#20D16C', 'PROBLEM': '#E03630'} # for random color instead use: colors = {k:label_color[k] for k in label_list}
        
    
    html_text = displacy.render(document_text, style='ent', jupyter=True, manual=True, options= {"ents": label_list, 'colors': colors})

    return html_text

In [129]:
show_html_spacy(result[0])

In [130]:
# stop spark session
spark.stop()