<h2>Install Required Library</h2>

In [None]:
import os
import json
import time
import shutil

license = ""

if license and "json" in license:

    with open(license, "r") as creds_in:
        creds = json.loads(creds_in.read())

        for key in creds.keys():
            os.environ[key] = creds[key]
else:
    raise Exception("License JSON File is not specified")

In [None]:
!pip install --upgrade -q https://pypi.johnsnowlabs.com/$SPARK_OCR_SECRET/spark-ocr/spark_ocr-6.0.0rc2-py3-none-any.whl

!pip install --upgrade -q https://pypi.johnsnowlabs.com/$SECRET/spark-nlp-jsl/spark_nlp_jsl-5.5.3-py3-none-any.whl

!pip install -q spark-nlp==5.5.3

!pip install -q pandas

!pip install -q matplotlib

In [None]:
### RESTART SESSION!!!

<h2>Start Spark Session - Visual NLP, Healthcare NLP, Spark-NLP</h2>

In [2]:
from sparkocr import start
import os
import json
import time
import shutil

license = ""

if license and "json" in license:

    with open(license, "r") as creds_in:
        creds = json.loads(creds_in.read())

        for key in creds.keys():
            os.environ[key] = creds[key]
else:
    raise Exception("License JSON File is not specified")

In [5]:
extra_configurations = {
    "spark.extraListeners": "com.johnsnowlabs.license.LicenseLifeCycleManager", #required
    "spark.sql.legacy.allowUntypedScalaUDF" : "true", #required
    "spark.executor.instances" : "7", #change as per system
    "spark.executor.cores" : "16", #change as per system
    "spark.executor.memory" : "130G", #change as per system
    "spark.driver.memory" : "100G", #change as per system
    "spark.sql.shuffle.partitions" : "896" #change as per system
}

# Not needed for Google Collab
# os.environ['JAVA_HOME'] = '/home/linuxbrew/.linuxbrew/Cellar/openjdk@17/17.0.15'

spark = start(secret=os.environ.get("SPARK_OCR_SECRET"),
              nlp_secret=os.environ.get("SECRET"),
              nlp_internal=True,
              nlp_jsl=True,
              nlp_version=os.environ.get("PUBLIC_VERSION"),
              extra_conf=extra_configurations)

spark

Spark version: 3.5.0
Spark NLP version: 5.5.3
Spark NLP for Healthcare version: 5.5.3
Spark OCR version: 6.0.0rc2

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e767b472-e262-4ec7-a070-f0419ba48986;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.5.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.500 in central
	found com.g

<h2>Import Visual NLP, Healthcare NLP and Spark-NLP</h2>

In [6]:
import numpy as np
import pandas as pd
import os

#Pyspark Imports
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql import functions as F

# Necessary imports from Spark OCR library
import sparkocr
from sparkocr import start
from sparkocr.transformers import *
from sparkocr.enums import *
from sparkocr.utils import *

# import sparknlp packages
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp_jsl
from sparknlp_jsl.annotator import *
from collections import Counter
from sparknlp.pretrained import PretrainedPipeline

In [163]:
def evaluate_predictions(SOURCE_GT_PATH, DF_SAVE_PATH, SAVE_MAPPING_PATH):
    """
    Method to Calculate Precision, Recall and F1-Score
    Saves final file with prediction, ground truth, precision, recall
    """
    
    def calculate_metrics(preds, gts):
      gt_counter = Counter(gts)
      pred_counter = Counter(preds)

      tp = 0
      for item in pred_counter:
          if item in gt_counter:
              tp += min(pred_counter[item], gt_counter[item])

      fp = sum(pred_counter.values()) - tp
      fn = sum(gt_counter.values()) - tp

      precision = tp / (tp + fp) if (tp + fp) else 0
      recall = tp / (tp + fn) if (tp + fn) else 0

      return precision, recall

    with open(SOURCE_GT_PATH, "r") as f:
        ground_truth = json.load(f)

    df_predictions = spark.read.format("parquet").load(DF_SAVE_PATH)

    predictions_by_file = {}

    for row in df_predictions.select("path").distinct().toLocalIterator():
        file_path = row.asDict()["path"]
        filename = os.path.basename(file_path)

        if filename not in ground_truth:
            continue

        extracted_results = []
        rows = df_predictions.filter(F.col("path") == file_path).select("positions_ner")

        for r in rows.toLocalIterator():
            for ner in r.asDict()["positions_ner"]:
                extracted_results.append(ner.asDict()["result"])

        predictions_by_file[filename] = extracted_results

    summary = {}
    all_precisions = []
    all_recalls = []

    for filename, predictions in predictions_by_file.items():
        gt_values = ground_truth[filename]
        precision, recall = calculate_metrics(predictions, gt_values)

        all_precisions.append(precision)
        all_recalls.append(recall)

        summary[filename] = {
            "precision": round(precision, 4),
            "recall": round(recall, 4),
            "gt": gt_values,
            "pred": predictions
        }

        print(f"Filename: {filename} | Precision: {precision:.4f} | Recall: {recall:.4f}")

    avg_precision = round(sum(all_precisions) / len(all_precisions), 4)
    avg_recall = round(sum(all_recalls) / len(all_recalls), 4)
    f1_score = round(2 * (avg_precision * avg_recall) / (avg_precision + avg_recall), 4)

    print(f"\nOverall Precision: {avg_precision}")
    print(f"Overall Recall: {avg_recall}")
    print(f"F1 Score: {f1_score}")

    with open(SAVE_MAPPING_PATH, "w") as f:
        json.dump(summary, f, indent=4)

    print(f"Mapping File Saved To : {SAVE_MAPPING_PATH}")

In [154]:
# Ner Threshold
ner_threshold = 0.90

# OCR Output Threshold
ocr_threshold = 70

# Ner Whitelist Entites
whitelist = ['HOSPITAL', 'NAME', 'DOCTOR', 'PATIENT', 'AGE', 'ID', 'MEDICALRECORD', 'IDNUM', 'COUNTRY', 'LOCATION', 'STREET', 'STATE', 'ZIP', 'CONTACT', 'PHONE', 'DATE']

# Matcher is used for regex matching from already detected NER
# NER threshold is used to select detected NER for matching
matcherWhitelist = {i : ner_threshold for i in whitelist}
matcherWhitelist

{'HOSPITAL': 0.9,
 'NAME': 0.9,
 'DOCTOR': 0.9,
 'PATIENT': 0.9,
 'AGE': 0.9,
 'ID': 0.9,
 'MEDICALRECORD': 0.9,
 'IDNUM': 0.9,
 'COUNTRY': 0.9,
 'LOCATION': 0.9,
 'STREET': 0.9,
 'STATE': 0.9,
 'ZIP': 0.9,
 'CONTACT': 0.9,
 'PHONE': 0.9,
 'DATE': 0.9}

<h2>Define Pipeline</h2>

In [155]:
pdf_to_image = PdfToImage() \
  .setInputCol("content") \
  .setSplitNumBatch(10) \
  .setOutputCol("image_raw") \
  .setImageType(ImageType.TYPE_3BYTE_BGR) \
  .setSplittingStategy(SplittingStrategy.FIXED_NUMBER_OF_PARTITIONS)

ocr = ImageToText() \
    .setInputCol("image_raw") \
    .setOutputCol("text") \
    .setIgnoreResolution(False) \
    .setPageIteratorLevel(PageIteratorLevel.SYMBOL) \
    .setPageSegMode(PageSegmentationMode.SPARSE_TEXT) \
    .setWithSpaces(True) \
    .setKeepLayout(False) \
    .setConfidenceThreshold(70)

document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

abbreviations = ['Bros', 'No', 'al', 'vs', 'etc', 'Fig', 'Dr', 'Prof', 'PhD', 'MD', 'Co', 'Corp', 'Inc', 'bros', 'VS', 'Vs', 'ETC', 'fig', 'dr', 'prof', 'PHD', 'phd', 'md', 'co', 'corp', 'inc', 'Jan', 'Feb', 'Mar', 'Apr', 'Jul', 'Aug', 'Sep', 'Sept', 'Oct', 'Nov', 'Dec', 'St', 'st', 'AM', 'PM', 'am', 'pm', 'e.g', 'f.e', 'i.e']
sentence_detector = SentenceDetectorDLModel.pretrained("sentence_detector_dl", "en") \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setImpossiblePenultimates(abbreviations) \
    .setUseCustomBoundsOnly(False) \
    .setSplitLength(2147483647) \
    .setExplodeSentences(False)

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

regex_matcher = RegexMatcher()\
    .setInputCols("sentence")\
    .setOutputCol("regex_chunk")\
    .setRules(["(0?[1-9]|[12][0-9]|3[01])/(0?[1-9]|1[0-2])/\d{4};DATE",
               "\d{3}-\d{2}-\d{4};IDNUM",
               "\(\d{3}\)\s\d{3}-\d{4};PHONE",
               "HOSP\d{8};IDNUM",
               "DR[A-Za-z0-9]{5,6};ID"])\
    .setDelimiter(";")

ner_docwise_large = PretrainedZeroShotNER().pretrained("zeroshot_ner_deid_subentity_docwise_large", "en", "clinical/models") \
    .setInputCols("sentence", "token") \
    .setOutputCol("ner_docwise_large") \
    .setLabels(["AGE", "CITY", "COUNTRY", "DATE", "DOCTOR", "HOSPITAL", "IDNUM", "ORGANIZATION","PATIENT", "PHONE", "PROFESSION", "STATE", "STREET", "ZIP"])

ner_chunk_docwise_large = NerConverterInternal() \
    .setInputCols("sentence", "token", "ner_docwise_large") \
    .setOutputCol("ner_chunk_docwise_large") \
    .setThreshold(0.90)

chunk_merger = ChunkMergeApproach() \
    .setInputCols('regex_chunk', 'ner_chunk_docwise_large') \
    .setOutputCol('merged_ner_chunk') \
    .setMergeOverlapping(True)

deid_obfuscated = DeIdentification() \
    .setInputCols(["sentence", "token", "merged_ner_chunk"]) \
    .setOutputCol("obfuscated") \
    .setMode("obfuscate") \
    .setKeepMonth(True) \
    .setKeepYear(True) \
    .setObfuscateDate(True) \
    .setSameEntityThreshold(0.7) \
    .setKeepTextSizeForObfuscation(True) \
    .setFakerLengthOffset(2) \
    .setReturnEntityMappings(True) \
    .setDays(2) \
    .setMappingsColumn("aux") \
    .setIgnoreRegex(True) \
    .setGroupByCol("path") \
    .setRegion("us") \
    .setSeed(40) \
    .setConsistentObfuscation(True) \
    .setChunkMatching(matcherWhitelist)

cleaner = NerOutputCleaner() \
    .setInputCol("aux") \
    .setOutputCol("new_aux") \
    .setOutputNerCol("positions_ner")

position_finder = PositionFinder() \
    .setInputCols("positions_ner") \
    .setOutputCol("coordinates") \
    .setPageMatrixCol("positions")

draw_regions = ImageDrawRegions() \
  .setInputCol("image_raw") \
  .setInputRegionsCol("coordinates") \
  .setRectColor(Color.black) \
  .setFilledRect(True) \
  .setOutputCol("image_with_regions")

stages = [
    pdf_to_image,
    ocr,
    document_assembler,
    sentence_detector,
    tokenizer,
    regex_matcher,
    ner_docwise_large,
    ner_chunk_docwise_large,
    chunk_merger,
    deid_obfuscated,
    cleaner,
    position_finder,
    draw_regions
]

pipe = Pipeline(stages=stages)

sentence_detector_dl download started this may take some time.


25/05/06 10:37:22 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


Approximate size to download 354.6 KB
[OK!]
zeroshot_ner_deid_subentity_docwise_large download started this may take some time.
[OK!]


In [166]:
stages

[PdfToImage_8dc5743ecb14,
 ImageToText_c39499abab3e,
 DocumentAssembler_d6e2031954fe,
 SentenceDetectorDLModel_c83c27f46b97,
 Tokenizer_d776e2e1fb9b,
 RegexMatcher_53482bca004d,
 PretrainedZeroShotNER_ca8c4dfe310f,
 NerConverterInternal_29475eb63ff9,
 ChunkMergeApproach_1c81c028c04e,
 DeIdentification_647e75912e36,
 NerOutputCleaner_d504cff86a0d,
 PositionFinder_2c179a047455,
 ImageDrawRegions_0550feeca735]

<h2>Easy Dataset</h2>
<h4>Total Files : 30</h4>

In [164]:
SOURCE_PDF_PATH = "/workspace/PDF_FILES_EASY/*"
DF_SAVE_PATH = "/workspace/easy/"
SOURCE_GT_PATH = "/workspace/pdf_deid_gts_easy.json"
SAVE_MAPPING_PATH = "/workspace/easy_result_mapping.json"
SAVE_OUTPUT_PDF = "/workspace/easy_pdf_output/"

os.makedirs(SAVE_OUTPUT_PDF, exist_ok=True)

In [157]:
df = spark.read.format("binaryFile").load(SOURCE_PDF_PATH)
result = pipe.fit(df).transform(df)
result.write.format('parquet').mode('overwrite').save(DF_SAVE_PATH)

25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Helvetica-Bold
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Times-Roman
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Times-Bold
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Helvetica-Bold
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Times-Roman
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Times-Bold
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Helvetica-Bold
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Times-Roman
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Times-Bold
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Helvetica-Bold
25/05/06 10:37:51 WARN PDType1Font: Using fallback font LiberationSans for Times-Roman
25/05/06 10:37:51 WARN PDType1Font

In [158]:
evaluate_predictions(SOURCE_GT_PATH=SOURCE_GT_PATH, DF_SAVE_PATH=DF_SAVE_PATH, SAVE_MAPPING_PATH=SAVE_MAPPING_PATH)

Filename: PDF_Deid_Deidentification_21.pdf | Precision: 1.0000 | Recall: 1.0000
Filename: PDF_Deid_Deidentification_5.pdf | Precision: 0.9302 | Recall: 0.9756
Filename: PDF_Deid_Deidentification_9.pdf | Precision: 0.9091 | Recall: 0.9756
Filename: PDF_Deid_Deidentification_16.pdf | Precision: 0.9111 | Recall: 1.0000
Filename: PDF_Deid_Deidentification_11.pdf | Precision: 1.0000 | Recall: 1.0000
Filename: PDF_Deid_Deidentification_4.pdf | Precision: 0.9756 | Recall: 0.9756
Filename: PDF_Deid_Deidentification_6.pdf | Precision: 0.9756 | Recall: 0.9756
Filename: PDF_Deid_Deidentification_2.pdf | Precision: 0.9302 | Recall: 0.9756
Filename: PDF_Deid_Deidentification_24.pdf | Precision: 0.9524 | Recall: 0.9756
Filename: PDF_Deid_Deidentification_15.pdf | Precision: 0.9286 | Recall: 0.9512
Filename: PDF_Deid_Deidentification_12.pdf | Precision: 0.8222 | Recall: 0.9487
Filename: PDF_Deid_Deidentification_10.pdf | Precision: 0.8125 | Recall: 0.9512
Filename: PDF_Deid_Deidentification_29.pdf | 

In [165]:
OBFUSCATED_IMAGE_COL = "image_with_regions"

img_to_pdf = ImageToPdf() \
    .setPageNumCol("pagenum") \
    .setOriginCol("path") \
    .setOutputCol("pdf") \
    .setInputCol(OBFUSCATED_IMAGE_COL) \
    .setAggregatePages(True)

source = spark.read.format("parquet").load(DF_SAVE_PATH)
result_pdf = img_to_pdf.transform(source)

for row in result_pdf.select("path", "pdf").toLocalIterator():
  filename = row.asDict()["path"]
  basename = os.path.basename(filename)

  savename = os.path.join(SAVE_OUTPUT_PDF, basename)
    
  pdfFile = open(savename, "wb")
  pdfFile.write(row.asDict()["pdf"])
  pdfFile.close()

[Stage 4236:>                                                       (0 + 1) / 1]

<h2>Medium Dataset</h2>
<h4>Total Files : 40 [ 30 Easy + 10 Medium ]</h4>

In [159]:
SOURCE_PDF_PATH = "/workspace/PDF_FILES_MEDIUM/*"
DF_SAVE_PATH = "/workspace/medium/"
SOURCE_GT_PATH = "/workspace/pdf_deid_gts_medium.json"
SAVE_MAPPING_PATH = "/workspace/medium_result_mapping.json"
SAVE_OUTPUT_PDF = "/workspace/medium_pdf_output/"

os.makedirs(SAVE_OUTPUT_PDF, exist_ok=True)

In [160]:
df = spark.read.format("binaryFile").load(SOURCE_PDF_PATH)
result = pipe.fit(df).transform(df)
result.write.format('parquet').mode('overwrite').save(DF_SAVE_PATH)

25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Helvetica-Bold
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Times-Roman
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Times-Bold
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Helvetica-Bold
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Helvetica-Bold
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Times-Roman
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Times-Roman
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Times-Bold
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Helvetica-Bold
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Times-Bold
25/05/06 10:42:54 WARN PDType1Font: Using fallback font LiberationSans for Times-Roman
25/05/06 10:42:54 WARN PDType1Font

In [161]:
evaluate_predictions(SOURCE_GT_PATH=SOURCE_GT_PATH, DF_SAVE_PATH=DF_SAVE_PATH, SAVE_MAPPING_PATH=SAVE_MAPPING_PATH)

Filename: PDF_Deid_Deidentification_Medium_4.pdf | Precision: 0.9038 | Recall: 0.9038
Filename: PDF_Deid_Deidentification_Medium_5.pdf | Precision: 0.8519 | Recall: 0.8846
Filename: PDF_Deid_Deidentification_Medium_1.pdf | Precision: 0.8113 | Recall: 0.8269
Filename: PDF_Deid_Deidentification_Medium_2.pdf | Precision: 0.9318 | Recall: 0.7885
Filename: PDF_Deid_Deidentification_Medium_9.pdf | Precision: 0.9200 | Recall: 0.8846
Filename: PDF_Deid_Deidentification_Medium_8.pdf | Precision: 0.8302 | Recall: 0.8462
Filename: PDF_Deid_Deidentification_Medium_3.pdf | Precision: 0.9216 | Recall: 0.9038
Filename: PDF_Deid_Deidentification_Medium_7.pdf | Precision: 0.7778 | Recall: 0.8077
Filename: PDF_Deid_Deidentification_Medium_6.pdf | Precision: 0.6949 | Recall: 0.7885
Filename: PDF_Deid_Deidentification_Medium_0.pdf | Precision: 0.8431 | Recall: 0.8269
Filename: PDF_Deid_Deidentification_6.pdf | Precision: 0.9756 | Recall: 0.9756
Filename: PDF_Deid_Deidentification_26.pdf | Precision: 1.000

In [162]:
OBFUSCATED_IMAGE_COL = "image_with_regions"

img_to_pdf = ImageToPdf() \
    .setPageNumCol("pagenum") \
    .setOriginCol("path") \
    .setOutputCol("pdf") \
    .setInputCol(OBFUSCATED_IMAGE_COL) \
    .setAggregatePages(True)

source = spark.read.format("parquet").load(DF_SAVE_PATH)
result_pdf = img_to_pdf.transform(source)

for row in result_pdf.select("path", "pdf").toLocalIterator():
  filename = row.asDict()["path"]
  basename = os.path.basename(filename)

  savename = os.path.join(SAVE_OUTPUT_PDF, basename)
    
  if "Medium" in filename:
      pdfFile = open(savename, "wb")
      pdfFile.write(row.asDict()["pdf"])
      pdfFile.close()

[Stage 4204:>                                                       (0 + 1) / 1]