![JohnSnowLabs](https://sparknlp.org/assets/images/logo.png)

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/healthcare-nlp/31.0.Structured_Streaming_with_SparkNLP_for_Healthcare.ipynb)

# Structured Streaming with Spark NLP for Healthcare

This notebook demonstrates the integration of Spark NLP for Healthcare with Spark Structured Streaming. We'll illustrate a straightforward example that performs real-time clinical entity duplication counting.

## Start Spark Session

In [None]:
! pip install -q johnsnowlabs

In [None]:
from google.colab import files
print('Please Upload your John Snow Labs License using the button below')
license_keys = files.upload()

In [None]:
from johnsnowlabs import nlp, medical

# After uploading your license run this to install all licensed Python Wheels and pre-download Jars the Spark Session JVM
nlp.install()

In [None]:
from johnsnowlabs import nlp, medical

# Automatically load license data and start a session with all jars user has access to
spark = nlp.start()

In [5]:
spark

## Healthcare NLP for Data Scientists Course

If you are not familiar with the components in this notebook, you can check [Healthcare NLP for Data Scientists Udemy Course](https://www.udemy.com/course/healthcare-nlp-for-data-scientists/) and the [MOOC Notebooks](https://github.com/JohnSnowLabs/spark-nlp-workshop/tree/master/Spark_NLP_Udemy_MOOC/Healthcare_NLP) for each components.

## Read Streaming

First, we create a directory where the files for streaming will reside

In [6]:
!mkdir oncology_notes

In [7]:
# Downloading sample datasets.
! wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Healthcare/data/oncology_notes/mt_oncology_0.txt -P oncology_notes/

The core syntax for reading the streaming data in Apache Spark:



```
spark.readStream \
     .format() \ # this is the raw format you are reading from
     .option("key", "value") \
     .schema() \ # require to specify the schema
     .load(path)
```

The core syntax to read the static and streaming data are pretty similar; there are two main differences:

- We are using read in static read mode but using readStream in streaming read mode.
- By default, Structured Streaming from file-based sources requires you to specify the schema rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. You can reenable schema inference by setting `spark.sql.streaming.schemaInference` to true for the ad-hoc use case.



In [8]:
# Create DataFrame representing the stream of input lines

from pyspark.sql.types import StructType

userSchema = StructType()\
    .add("index", "string")\
    .add("text", "string")

lines = spark \
    .readStream \
    .option("sep", ",") \
    .option("header", "true") \
    .schema(userSchema) \
    .csv("oncology_notes/", multiLine=True)

In [9]:
lines.printSchema()

root
 |-- index: string (nullable = true)
 |-- text: string (nullable = true)



In [10]:
# Split the lines into sentences
text_df = lines.select(lines.text)

## NER Pipeline

In [11]:
# Annotator that transforms a text column from dataframe into an Annotation ready for NLP
documentAssembler = nlp.DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentenceDetector = nlp.SentenceDetectorDLModel.pretrained("sentence_detector_dl_healthcare","en","clinical/models")\
    .setInputCols(["document"])\
    .setOutputCol("sentence")

# Tokenizer splits words in a relevant format for NLP
tokenizer = nlp.Tokenizer()\
    .setInputCols(["sentence"])\
    .setOutputCol("token")

# Clinical word embeddings trained on PubMED dataset
word_embeddings = nlp.WordEmbeddingsModel.pretrained("embeddings_clinical","en","clinical/models")\
    .setInputCols(["sentence","token"])\
    .setOutputCol("embeddings")

# NER model trained on i2b2 (sampled from MIMIC) dataset
jsl_ner = medical.NerModel.pretrained("ner_jsl","en","clinical/models")\
    .setInputCols(["sentence","token","embeddings"])\
    .setOutputCol("ner")

ner_converter = medical.NerConverterInternal()\
    .setInputCols(["sentence","token","ner"])\
    .setOutputCol("ner_chunk")

# Assemble the pipeline
nlpPipeline = nlp.Pipeline(
    stages=[
        documentAssembler,
        sentenceDetector,
        tokenizer,
        word_embeddings,
        jsl_ner,
        ner_converter
        ])

# Fit the pipeline on the data
model = nlpPipeline.fit(text_df)

sentence_detector_dl_healthcare download started this may take some time.
Approximate size to download 367.3 KB
[OK!]
embeddings_clinical download started this may take some time.
Approximate size to download 1.6 GB
[OK!]
ner_jsl download started this may take some time.
Approximate size to download 14.5 MB
[OK!]


In [12]:
# Transform the data
result_df = model.transform(lines)

In [13]:
# Explode the entities and their labels
import pyspark.sql.functions as F
entities_df = result_df.select(F.explode(F.arrays_zip(result_df.ner_chunk.result,
                                                      result_df.ner_chunk.metadata)).alias("cols"))\
                  .select(F.expr("cols['0']").alias("entity"),
                          F.expr("cols['1']['entity']").alias("ner_label"))

entities_df.printSchema()

root
 |-- entity: string (nullable = true)
 |-- ner_label: string (nullable = true)



## Write Streaming
The core syntax for writing the streaming data in Apache Spark:



```
df.writeStream \
  .outputMode('complete') \ # by default is append
  .format('parquet') \ # this is optional, parquet is default
  .option("key", "value") \
  .start(path)
```

Pyspark has a method `outputMode()` to specify the saving mode:

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

- **Complete Mode** - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

- **Append Mode** - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

- **Update Mode** - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

*`memory` : Store the results of your streaming query in memory for debugging, testing, or interactive analysis purposes.



We will create a streaming that shows the entity occurences in the dataset we have. We will add new data later and check how streaming work.

In [14]:
# Group by 'entity' and 'tag', and count the occurrences
entity_counts_df = entities_df.groupBy("entity", "ner_label").count() \
        .writeStream \
        .queryName("entity_counts_table") \
        .outputMode("complete") \
        .format("memory") \
        .start()

In [15]:
import threading
threading.Event().wait(45)  # Pauses the execution for 45 seconds to allow refreshing streaming to process

False

In [16]:
spark.sql("select * from entity_counts_table").show(truncate=False)   # interactively query in-memory table

+-------------------+-------------------------+-----+
|entity             |ner_label                |count|
+-------------------+-------------------------+-----+
|Phenergan          |Drug_BrandName           |1    |
|August 14          |Date                     |1    |
|hospital           |Clinical_Dept            |1    |
|illicit drug       |Substance                |1    |
|pleural effusion   |Disease_Syndrome_Disorder|2    |
|heart rate 83      |Pulse                    |1    |
|alcohol            |Alcohol                  |1    |
|urgent care center |Clinical_Dept            |1    |
|CVA                |Cerebrovascular_Disease  |1    |
|pericardial window |Procedure                |1    |
|clubbing           |Symptom                  |1    |
|nondistended       |Symptom                  |1    |
|atrial fibrillation|Heart_Disease            |1    |
|2007               |Date                     |3    |
|Chest x-ray        |Test                     |1    |
|right-sided        |Directi

In [17]:
# entity counts in the table
spark.sql("select * from entity_counts_table").count()

16

Lets check the entities that occurs in the table more than one.

In [18]:
# check the entities occured more than one time
spark.sql(f"SELECT * FROM entity_counts_table WHERE count > 1 ORDER BY count DESC").show()

+----------------+--------------------+-----+
|          entity|           ner_label|count|
+----------------+--------------------+-----+
|            2007|                Date|    3|
|pleural effusion|Disease_Syndrome_...|    2|
+----------------+--------------------+-----+



Add another file for streaming.

In [19]:
# Downloading sample datasets.
! wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Healthcare/data/oncology_notes/mt_oncology_5.txt -P oncology_notes/

In [20]:
threading.Event().wait(30)  # Pauses the execution for 30 seconds to allow refreshing streaming to process

False

Lets check how the entity count table changed after adding the new data.

In [21]:
spark.sql("select * from entity_counts_table").count()

38

In [22]:
spark.sql(f"SELECT * FROM entity_counts_table WHERE count > 1 ORDER BY count DESC").show(truncate=False)

+----------------+-------------------------+-----+
|entity          |ner_label                |count|
+----------------+-------------------------+-----+
|him             |Gender                   |5    |
|he              |Gender                   |4    |
|2007            |Date                     |3    |
|his             |Gender                   |2    |
|rectal bleeding |Symptom                  |2    |
|endoscopy       |Procedure                |2    |
|pleural effusion|Disease_Syndrome_Disorder|2    |
+----------------+-------------------------+-----+



Lets add a new file.

In [23]:
# add another file
! wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Healthcare/data/oncology_notes/mt_oncology_9.txt -P oncology_notes/

In [24]:
threading.Event().wait(30)  # Pauses the execution for 30 seconds to allow refreshing streaming to process

False

In [25]:
spark.sql("select * from entity_counts_table").count()

52

In [26]:
spark.sql(f"SELECT * FROM entity_counts_table WHERE count > 1 ORDER BY count DESC").show(truncate=False)

+----------------+----------------------------+-----+
|entity          |ner_label                   |count|
+----------------+----------------------------+-----+
|he              |Gender                      |5    |
|him             |Gender                      |5    |
|2007            |Date                        |3    |
|acute           |Modifier                    |2    |
|rectal bleeding |Symptom                     |2    |
|his             |Gender                      |2    |
|endoscopy       |Procedure                   |2    |
|right           |Direction                   |2    |
|pleural effusion|Disease_Syndrome_Disorder   |2    |
|lower extremity |External_body_part_or_region|2    |
|hemoglobin      |Test                        |2    |
+----------------+----------------------------+-----+



# Stop And Restart Streaming

You may need to refresh the query cells to visualize the result

In [27]:
entity_counts_df.stop()  # Stop the current query

In [28]:
# Optionally, you may want to wait for the query to complete before restarting
entity_counts_df.awaitTermination()

In [29]:
# start the query
new_query = entities_df.groupBy("entity", "ner_label").count() \
        .writeStream \
        .queryName("entity_counts_table") \
        .outputMode("complete") \
        .format("memory") \
        .start()

In [30]:
threading.Event().wait(30)  # Pauses the execution for 30 seconds to allow refreshing streaming to process

False

In [31]:
spark.sql("select * from entity_counts_table").count()

52

In [32]:
spark.sql(f"SELECT * FROM entity_counts_table WHERE count > 1 ORDER BY count DESC").show(truncate=False)

+----------------+----------------------------+-----+
|entity          |ner_label                   |count|
+----------------+----------------------------+-----+
|he              |Gender                      |5    |
|him             |Gender                      |5    |
|2007            |Date                        |3    |
|acute           |Modifier                    |2    |
|rectal bleeding |Symptom                     |2    |
|his             |Gender                      |2    |
|endoscopy       |Procedure                   |2    |
|right           |Direction                   |2    |
|pleural effusion|Disease_Syndrome_Disorder   |2    |
|lower extremity |External_body_part_or_region|2    |
|hemoglobin      |Test                        |2    |
+----------------+----------------------------+-----+

