# Presidio test bed

## Installs

In [0]:
%pip install --quiet presidio_analyzer presidio_anonymizer presidio_image_redactor azureml pydicom

In [0]:
dbutils.library.restartPython()

In [0]:
%sh python -m spacy download en_core_web_lg

In [0]:
import logging
import sys
logging.basicConfig(stream=sys.stderr,
                    level=logging.WARNING,
                    format='%(asctime)s [%(name)s][%(levelname)s] %(message)s')
logging.getLogger('dbx.pixels').setLevel(logging.DEBUG)
logger = logging.getLogger('dbx.pixels')

## Test Setup

In [0]:
paths = [
    "/Volumes/hls_radiology/pseudo-phi-dicom-data/pseudo-phi-dicom-data/pseudo_phi_dicom_data/manifest-1744203894606/Pseudo-PHI-DICOM-Data/292821506/07-13-2013-NA-XR CHEST AP PORTABLE for Douglas Davidson-46198/1001.000000-NA-37718/1-1.dcm",

    "/Volumes/hls_radiology/pseudo-phi-dicom-data/pseudo-phi-dicom-data/pseudo_phi_dicom_data/manifest-1744203894606/Pseudo-PHI-DICOM-Data/292821506/07-13-2013-NA-XR CHEST AP PORTABLE for Douglas Davidson-46198/1002.000000-NA-53238/1-1.dcm",

    "/Volumes/hls_radiology/pseudo-phi-dicom-data/pseudo-phi-dicom-data/pseudo_phi_deid_dicom_data/manifest-1744157828937/Pseudo-PHI-DICOM-Data/Pseudo-PHI-001/06-26-2003-NA-XR CHEST AP PORTABLE-96544/1001.000000-NA-42825/1-1.dcm"
]

# Download
import shutil
for i, path in enumerate(paths):
    shutil.copy(path, f"/local_disk0/tmp/{i}.dcm")

## Test Presidio engine
https://microsoft.github.io/presidio/

In [0]:
from typing import Iterator

import pandas as pd
from pyspark.ml.pipeline import Transformer
from pyspark.sql.functions import col, pandas_udf

from presidio_image_redactor import DicomImagePiiVerifyEngine

def safe_slice(arr, index, property):
      return arr[index][property] if (0 <= index < len(arr) and arr[index].get(property) is not None) else None

@pandas_udf("array<struct<label string, is_PII boolean, conf float, analyzer_entity_type string, analyzer_score float, analyzer_is_PII boolean, left int, top int, width int, height int>>")
def presidio_verify_udf(paths: pd.Series) -> pd.Series:
    from presidio_analyzer import AnalyzerEngine
    import pytesseract
    import pydicom

    # Initialize Presidio engine
    analyzer = DicomImagePiiVerifyEngine()

    # Analyze text for PII entities
    padding_width = 25

    def detect_phi(dcm_path:str):
        logger.info(f"Path: {dcm_path}")
        instance = pydicom.dcmread(dcm_path.replace("dbfs:",""))
        # Get OCR and NER results
        verification_image, ocr_results, analyzer_results = analyzer.verify_dicom_instance(instance, padding_width, use_metadata=True)

        #logger.debug(f"ocr_results: {ocr_results}, analyzer_results: {analyzer_results}")
        results = []
        for i,r in enumerate(ocr_results):
            results.append({
#                            "path": dcm_path,
                            "label": r['label'],
                            "is_PII": bool(r.get('is_PII', False)),
                            "conf": float(r['conf']),
                            "analyzer_entity_type": safe_slice(analyzer_results,i, 'entity_type'),
                            "analyzer_score": safe_slice(analyzer_results,i, 'score'),
                            "analyzer_is_PII": bool(safe_slice(analyzer_results,i, 'is_PII')),
                            "left": int(r['left']),
                            "top": int(r['top']),
                            "width": int(r['width']),
                            "height": int(r['height'])
            })
        return results
    return paths.apply(detect_phi)

In [0]:
source_path = "/Volumes/hls_radiology/pseudo-phi-dicom-data/pseudo-phi-dicom-data/pseudo_phi_dicom_data/manifest-1744203894606/Pseudo-PHI-DICOM-Data"

### Presidio as a Spark UDF

In [0]:
%sql truncate table douglas_moore.pixels.pixels_deid_presidio;

In [0]:
# Create a DataFrame with the DICOM file paths
# df = spark.createDataFrame(paths, schema='string')

## Read all the DICOM path names
df = (spark.read.format("binaryFile")
  .option("pathGlobFilter", "*.dcm")
  .option("recursiveFileLookup", "true")
  .load(source_path)
  .drop("content")
)

# Apply the UDF to extract and analyze PII
result_df = df.withColumn("pii_analysis", presidio_verify_udf(col("path")))

# Display the results
#display(result_df.limit(20))
from pyspark.sql.functions import current_timestamp
res = (result_df
       .withColumn('insert_dt', current_timestamp())
       .write.mode('append')
       .option("mergeSchema", "true")
       .saveAsTable('douglas_moore.pixels.pixels_deid_presidio'))
res

In [0]:
%sql select insert_dt, count(1) 
from douglas_moore.pixels.pixels_deid_presidio
group by insert_dt

In [0]:
%sql
select * from douglas_moore.pixels.pixels_deid_presidio;

## presidio timing

| files - frames   | run time | worker-core-seconds / frame | config |
| -------- | -------- | --------------- | ----|
|1,693  | 8m 13s  | 4.659 | Single GPU A10 cluster |
|1,693  | 6m 13s  | 3.525 | Single GPU A10 cluster |
|1,693  | 5m 42s. | 12.929  | 4 x `c6gd.4xlarge` CPU cluster (config 3) |
| 1,693 | 4m 56s.  | 6.464 | 2 x `c6gd.4xlarge` CPU cluster (config 3) |

Config 3: 
```
{
    "cluster_name": "DM - Pixels",
    "spark_version": "16.4.x-scala2.13",
    "aws_attributes": {
        "first_on_demand": 1,
        "zone_id": "auto",
        "spot_bid_price_percent": 100
    },
    "node_type_id": "c6gd.4xlarge",
    "custom_tags": {
        "solacc": "pixels",
        "owner": "douglas.moore@databricks.com",
        "removeAfter": "20250531"
    },
    "autotermination_minutes": 60,
    "enable_elastic_disk": true,
    "single_user_name": "douglas.moore@databricks.com",
    "enable_local_disk_encryption": false,
    "data_security_mode": "DATA_SECURITY_MODE_DEDICATED",
    "runtime_engine": "STANDARD",
    "kind": "CLASSIC_PREVIEW",
    "use_ml_runtime": true,
    "is_single_node": false,
    "num_workers": 4,
    "apply_policy_default_values": false
}```

In [0]:
32 * (5*60+42) / 1693 # cores * seconds / frames [assume one frame per file]
# = core-seconds per frame