![JohnSnowLabs](https://nlp.johnsnowlabs.com/assets/images/logo.png)

## Starting the Spark session

In [2]:
import os
import sparknlp
import sparknlp_jsl
import sparkocr
# Create or get Spark Session
from pyspark.sql import SparkSession

SECRET = os.environ["SECRET"]
JSL_VERSION = os.environ["JSL_VERSION"]
OCR_BASE_VERSION = os.environ["OCR_BASE_VERSION"]
OCR_SPARK_VERSION = os.environ["OCR_SPARK_VERSION"]
OCR_VERSION = f"{OCR_BASE_VERSION}-{OCR_SPARK_VERSION}"

spark = SparkSession.builder \
    .appName("Spark NLP") \
    .master("local[*]") \
    .config("spark.driver.memory","12G") \
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "800M")\
    .config("spark.sql.legacy.allowUntypedScalaUDF", "true") \
    .config("spark.jars.packages", f"com.johnsnowlabs.nlp:spark-nlp_2.12:{JSL_VERSION}") \
    .config("spark.jars", f"file:///jars/spark-nlp-jsl-{JSL_VERSION}.jar,file:///jars/spark-ocr-assembly-{OCR_VERSION}.jar") \
    .getOrCreate()

print("spark.version", spark.version)
print("sparknlp.version()", sparknlp.version())
print("sparkocr.version()", sparkocr.version())
print("sparknlp_jsl.version()", sparknlp_jsl.version())

spark.version 3.1.1
sparknlp.version() 3.0.3
sparkocr.version() 3.2.0
sparknlp_jsl.version() 3.0.3


# Spark NLP

In [3]:
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import *
from sparknlp.annotator import *

In [4]:
pipeline = PretrainedPipeline('recognize_entities_dl', 'en')
result = pipeline.annotate("Harry Potter is a great movie")
result

recognize_entities_dl download started this may take some time.
Approx size to download 160.1 MB
[OK!]


{'entities': ['Harry Potter'],
 'document': ['Harry Potter is a great movie'],
 'token': ['Harry', 'Potter', 'is', 'a', 'great', 'movie'],
 'ner': ['B-PER', 'I-PER', 'O', 'O', 'O', 'O'],
 'embeddings': ['Harry', 'Potter', 'is', 'a', 'great', 'movie'],
 'sentence': ['Harry Potter is a great movie']}

# Spark NLP for Healthcare

In [5]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp_jsl.annotator import *

In [30]:
# Annotator that transforms a text column from dataframe into an Annotation ready for NLP
documentAssembler = DocumentAssembler()\
        .setInputCol("text")\
        .setOutputCol("document")

# Sentence Detector annotator, processes various sentences per line

#sentenceDetector = SentenceDetector()\
        #.setInputCols(["document"])\
        #.setOutputCol("sentence")
sentenceDetector = SentenceDetectorDLModel.pretrained("sentence_detector_dl_healthcare","en","clinical/models")\
        .setInputCols(["document"])\
        .setOutputCol("sentence")
 
# Tokenizer splits words in a relevant format for NLP
tokenizer = Tokenizer()\
        .setInputCols(["sentence"])\
        .setOutputCol("token")

# Clinical word embeddings trained on PubMED dataset
word_embeddings = WordEmbeddingsModel.pretrained("embeddings_clinical","en","clinical/models")\
        .setInputCols(["sentence","token"])\
        .setOutputCol("embeddings")

# NER model trained on i2b2 (sampled from MIMIC) dataset
clinical_ner = MedicalNerModel.pretrained("ner_clinical_large","en","clinical/models")\
        .setInputCols(["sentence","token","embeddings"])\
        .setOutputCol("ner")

ner_converter = NerConverter()\
        .setInputCols(["sentence","token","ner"])\
        .setOutputCol("ner_chunk")

nlpPipeline = Pipeline(stages=[
        documentAssembler,
        sentenceDetector,
        tokenizer,
        word_embeddings,
        clinical_ner,
        ner_converter])


data = spark.createDataFrame([["""The human KCNJ9 (Kir 3.3, GIRK3) is a member of the G-protein-activated inwardly rectifying potassium (GIRK) channel family. Here we describe the genomicorganization of the KCNJ9 locus on chromosome 1q21-23 as a candidate gene forType II diabetes mellitus in the Pima Indian population. The gene spansapproximately 7.6 kb and contains one noncoding and two coding exons separated byapproximately 2.2 and approximately 2.6 kb introns, respectively. We identified14 single nucleotide polymorphisms (SNPs), including one that predicts aVal366Ala substitution, and an 8 base-pair (bp) insertion/deletion. Ourexpression studies revealed the presence of the transcript in various humantissues including pancreas, and two major insulin-responsive tissues: fat andskeletal muscle. The characterization of the KCNJ9 gene should facilitate furtherstudies on the function of the KCNJ9 protein and allow evaluation of thepotential role of the locus in Type II diabetes."
"""]]).toDF("text")

model = nlpPipeline.fit(data)
result = model.transform(data)

sentence_detector_dl_healthcare download started this may take some time.
Approximate size to download 363.9 KB
[OK!]
embeddings_clinical download started this may take some time.
Approximate size to download 1.6 GB
[OK!]
ner_clinical_large download started this may take some time.
Approximate size to download 13.9 MB
[OK!]


In [31]:
result.selectExpr("explode(ner_chunk) as result") \
    .select("result.result", "result.metadata") \
    .show(5, 50)

+--------------------------------------------------+--------------------------------------------------+
|                                            result|                                          metadata|
+--------------------------------------------------+--------------------------------------------------+
|the G-protein-activated inwardly rectifying pot...|{entity -> TREATMENT, sentence -> 0, chunk -> 0...|
|                           the genomicorganization|{entity -> TREATMENT, sentence -> 1, chunk -> 1...|
|     a candidate gene forType II diabetes mellitus|{entity -> PROBLEM, sentence -> 1, chunk -> 2, ...|
|                                   byapproximately|{entity -> TREATMENT, sentence -> 2, chunk -> 3...|
|                   single nucleotide polymorphisms|{entity -> TREATMENT, sentence -> 3, chunk -> 4...|
+--------------------------------------------------+--------------------------------------------------+
only showing top 5 rows



# Spark OCR

In [33]:
!wget -L "http://www.asx.com.au/asxpdf/20171103/pdf/43nyyw9r820c6r.pdf" -O sample_doc.pdf 

--2021-06-18 16:30:28--  http://www.asx.com.au/asxpdf/20171103/pdf/43nyyw9r820c6r.pdf
Resolving www.asx.com.au (www.asx.com.au)... 203.15.147.66
Connecting to www.asx.com.au (www.asx.com.au)|203.15.147.66|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://www.asx.com.au/asxpdf/20171103/pdf/43nyyw9r820c6r.pdf [following]
--2021-06-18 16:30:28--  https://www.asx.com.au/asxpdf/20171103/pdf/43nyyw9r820c6r.pdf
Connecting to www.asx.com.au (www.asx.com.au)|203.15.147.66|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 212973 (208K) [application/pdf]
Saving to: 'sample_doc.pdf'


2021-06-18 16:30:31 (213 KB/s) - 'sample_doc.pdf' saved [212973/212973]



In [34]:
from sparkocr.transformers import *
from sparkocr.transformers import *
from pyspark.ml import PipelineModel
from sparkocr.utils import display_image
from sparkocr.metrics import score
def pipeline():
    # Transforrm PDF document to images per page
    pdf_to_image = PdfToImage()\
          .setInputCol("content")\
          .setOutputCol("image")
    # Run OCR
    ocr = ImageToText()\
          .setInputCol("image")\
          .setOutputCol("text")\
          .setConfidenceThreshold(65)
    
    pipeline = PipelineModel(stages=[
        pdf_to_image,
        ocr
    ])
    
    return pipeline

ModuleNotFoundError: No module named 'skimage'

In [None]:
pdf = 'sample_doc.pdf'
pdf_example_df = spark.read.format("binaryFile").load(pdf).cache()

In [None]:
result = pipeline().transform(pdf_example_df).cache()

In [None]:
result.select("pagenum","text", "confidence").show()

In [None]:
result.select("text").collect()