In [0]:
from johnsnowlabs import nlp, visual, medical

spark = nlp.start(visual=True)

![JohnSnowLabs](https://nlp.johnsnowlabs.com/assets/images/logo.png)

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/visual-nlp/3.3.Pdf_Deidentification.ipynb)

## Blogposts and videos

- [Text Detection in Spark OCR](https://medium.com/spark-nlp/text-detection-in-spark-ocr-dcd8002bdc97)

- [Table Detection & Extraction in Spark OCR](https://medium.com/spark-nlp/table-detection-extraction-in-spark-ocr-50765c6cedc9)

- [Extract Tabular Data from PDF in Spark OCR](https://medium.com/spark-nlp/extract-tabular-data-from-pdf-in-spark-ocr-b02136bc0fcb)

- [Signature Detection in Spark OCR](https://medium.com/spark-nlp/signature-detection-in-spark-ocr-32f9e6f91e3c)

- [GPU image pre-processing in Spark OCR](https://medium.com/spark-nlp/gpu-image-pre-processing-in-spark-ocr-3-1-0-6fc27560a9bb)

- [How to Setup Spark OCR on UBUNTU - Video](https://www.youtube.com/watch?v=cmt4WIcL0nI)


**More examples here**

https://github.com/JohnSnowLabs/spark-ocr-workshop

For get the trial license please go to:

https://www.johnsnowlabs.com/install/

### Colab Setup

In [0]:
# Install the johnsnowlabs library to access Spark-OCR and Spark-NLP for Healthcare, Finance, and Legal.
!pip install -q johnsnowlabs

In [0]:
%pip install google-colab

In [0]:
print('Please Upload your John Snow Labs License using the button below')
license_keys = "/dbfs/Volumes/yash_gupta_hackerspace/ingestorinator/archimedes/spark_nlp_for_healthcare_spark_ocr_9379.json"

In [0]:
from johnsnowlabs import nlp, visual

# After uploading your license run this to install all licensed Python Wheels and pre-download Jars the Spark Session JVM
nlp.install(refresh_install=True, visual=True)

## Start Visual NLP

In [0]:
from johnsnowlabs import nlp, visual, medical

# Automatically load license data and start a session with all jars user has access to
spark = nlp.start(visual=True)

In [0]:
from pyspark.ml import PipelineModel, Pipeline
from pyspark.sql import functions as F
from pyspark.sql.types import *

## Define de-identification  NLP pipeline

In [0]:
def deidentification_nlp_pipeline(input_column, prefix = "", model="ner_deid_large"):
    document_assembler = nlp.DocumentAssembler() \
        .setInputCol(input_column) \
        .setOutputCol(prefix + "document_raw")

    cleanUpPatterns = ["<[^>]>"]
    documentNormalizer = nlp.DocumentNormalizer() \
      .setInputCols(prefix + "document_raw") \
      .setOutputCol(prefix + "document") \
      .setAction("clean") \
      .setPatterns(cleanUpPatterns) \
      .setReplacement(" ") \
      .setPolicy("pretty_all")

    # Sentence Detector annotator, processes various sentences per line
    sentence_detector = nlp.SentenceDetector() \
        .setInputCols([prefix + "document"]) \
        .setOutputCol(prefix + "sentence")

    tokenizer = nlp.Tokenizer() \
        .setInputCols([prefix + "sentence"]) \
        .setOutputCol(prefix + "token")

    # Clinical word embeddings
    word_embeddings = nlp.WordEmbeddingsModel.pretrained("embeddings_clinical", "en", "clinical/models") \
        .setInputCols([prefix + "sentence", prefix + "token"]) \
        .setOutputCol(prefix + "embeddings") \
        .setEnableInMemoryStorage(True)

    clinical_ner = medical.NerModel.pretrained(model, "en", "clinical/models") \
        .setInputCols([prefix + "sentence", prefix + "token", prefix + "embeddings"]) \
        .setOutputCol(prefix + "ner")

    custom_ner_converter = nlp.NerConverter() \
        .setInputCols([prefix + "sentence", prefix + "token", prefix + "ner"]) \
        .setOutputCol(prefix + "ner_chunk") \
        .setWhiteList(['NAME', 'AGE', 'CONTACT', 'ID',
                   'LOCATION', 'PROFESSION', 'PERSON', 'DATE', 'DOCTOR'])

    nlp_pipeline = Pipeline(stages=[
            document_assembler,
            documentNormalizer,
            sentence_detector,
            tokenizer,
            word_embeddings,
            clinical_ner,
            custom_ner_converter
        ])
    empty_data = spark.createDataFrame([[""]]).toDF(input_column)
    nlp_model = nlp_pipeline.fit(empty_data)
    return nlp_model

In [0]:
input_column="text"
prefix=""
model="ner_deid_generic_augmented"

document_assembler = nlp.DocumentAssembler() \
    .setInputCol(input_column) \
    .setOutputCol(prefix + "document_raw")

cleanUpPatterns = ["<[^>]>"]
documentNormalizer = nlp.DocumentNormalizer() \
  .setInputCols(prefix + "document_raw") \
  .setOutputCol(prefix + "document") \
  .setAction("clean") \
  .setPatterns(cleanUpPatterns) \
  .setReplacement(" ") \
  .setPolicy("pretty_all")

# Sentence Detector annotator, processes various sentences per line
sentence_detector = nlp.SentenceDetector() \
    .setInputCols([prefix + "document"]) \
    .setOutputCol(prefix + "sentence")

tokenizer = nlp.Tokenizer() \
    .setInputCols([prefix + "sentence"]) \
    .setOutputCol(prefix + "token")

# Clinical word embeddings
word_embeddings = nlp.WordEmbeddingsModel.pretrained("embeddings_clinical", "en", "clinical/models") \
    .setInputCols([prefix + "sentence", prefix + "token"]) \
    .setOutputCol(prefix + "embeddings") \
    .setEnableInMemoryStorage(True)

clinical_ner = medical.NerModel.pretrained(model, "en", "clinical/models") \
    .setInputCols([prefix + "sentence", prefix + "token", prefix + "embeddings"]) \
    .setOutputCol(prefix + "ner")

custom_ner_converter = nlp.NerConverter() \
    .setInputCols([prefix + "sentence", prefix + "token", prefix + "ner"]) \
    .setOutputCol(prefix + "ner_chunk") \
    .setWhiteList(['NAME', 'AGE', 'CONTACT', 'ID',
               'LOCATION', 'PROFESSION', 'PERSON', 'DATE', 'DOCTOR'])

nlp_pipeline = Pipeline(stages=[
        document_assembler,
        documentNormalizer,
        sentence_detector,
        tokenizer,
        word_embeddings,
        clinical_ner,
        custom_ner_converter
    ])
empty_data = spark.createDataFrame([[""]]).toDF(input_column)
nlp_model = nlp_pipeline.fit(empty_data)

## Define OCR transformers and pipeline for image deidentification

In [0]:
from sparkocr.transformers import *

pdf_to_image = visual.PdfToImage() \
    .setInputCol("content") \
    .setOutputCol("image_raw") \
    .setPartitionNum(16)\
    .setSplitNumBatch(2)\
    .setPartitionNumAfterSplit(2) \
    .setSplittingStategy(visual.SplittingStrategy.FIXED_NUMBER_OF_PARTITIONS) \
    .setKeepInput(False)

ocr = visual.ImageToText() \
    .setInputCol("image_raw") \
    .setOutputCol("text") \
    .setIgnoreResolution(False) \
    .setPageIteratorLevel(visual.PageIteratorLevel.SYMBOL) \
    .setPageSegMode(visual.PageSegmentationMode.SPARSE_TEXT) \
    .setConfidenceThreshold(70)

# Found coordinates of sensitive data
position_finder = visual.PositionFinder() \
    .setInputCols(["ner_chunk"]) \
    .setOutputCol("regions") \
    .setPageMatrixCol("positions") \
    .setIgnoreSchema(True) \
    .setOcrScaleFactor(1.0)

#Draw filled rectangle for hide sensitive data
draw_regions = visual.ImageDrawRegions() \
    .setInputCol("image_raw") \
    .setInputRegionsCol("regions") \
    .setOutputCol("cleaned_images") \
    .setFilledRect(True) \
    .setRotated(False)

image_to_pdf = visual.ImageToPdf() \
    .setInputCol("cleaned_images") \
    .setOutputCol("pdf")

deidentification_nlp_pipeline(input_column="text", prefix="", model="ner_deid_generic_augmented")

# OCR pipeline
# pipeline = PipelineModel(stages=[
#     pdf_to_image,
#     ocr,
#     deidentification_nlp_pipeline(input_column="text", prefix="", model="ner_deid_generic_augmented"),
#     position_finder,
#     draw_regions
# ])

## Read PDF file and display it

In [0]:
pdf_path = visual.pkg_resources.resource_filename('sparkocr', 'resources/ocr/pdfs/test_document.pdf')

pdf_df = spark.read.format("binaryFile").load(pdf_path).cache()

visual.display_pdf(pdf_df)

## Run de-id pipeline

In [0]:
%%time

OUTPUT_PATH = "./de-id/"

def get_name(path, keep_subfolder_level=0):
    path = path.split("/")
    path[-1] = ".".join(path[-1].split('.')[:-1])
    return "/".join(path[-keep_subfolder_level-1:])


pages = pipeline.transform(pdf_df) \
    .cache() \
    .orderBy("pagenum")

image_to_pdf.transform(pages) \
    .withColumn("fileName", F.udf(get_name, StringType())(F.col("path"))) \
    .write \
    .format("binaryFormat") \
    .option("type", "pdf") \
    .option("field", "pdf") \
    .option("nameField", "fileName") \
    .option("extension", "pdf") \
    .option("prefix", "") \
    .mode("append") \
    .save(OUTPUT_PATH)

## Check results

In [0]:
%%bash
ls ./de-id

In [0]:
result_pdf_df = spark.read.format("binaryFile").load("./de-id/test_document.pdf")
visual.display_pdf(result_pdf_df)