In [2]:
!pip install presidio_analyzer presidio_anonymizer pycryptodome

Collecting presidio_analyzer
  Downloading presidio_analyzer-2.2.358-py3-none-any.whl.metadata (3.2 kB)
Collecting presidio_anonymizer
  Downloading presidio_anonymizer-2.2.358-py3-none-any.whl.metadata (8.1 kB)
Collecting pycryptodome
  Downloading pycryptodome-3.23.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting phonenumbers<9.0.0,>=8.12 (from presidio_analyzer)
  Downloading phonenumbers-8.13.55-py2.py3-none-any.whl.metadata (11 kB)
Collecting tldextract (from presidio_analyzer)
  Downloading tldextract-5.3.0-py3-none-any.whl.metadata (11 kB)
Collecting requests-file>=1.4 (from tldextract->presidio_analyzer)
  Downloading requests_file-2.1.0-py2.py3-none-any.whl.metadata (1.7 kB)
Downloading presidio_analyzer-2.2.358-py3-none-any.whl (114 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.9/114.9 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading presidio_anonymizer-2.2.358-py3-none-any.whl (31 kB)
Downloading p

In [4]:
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
import json
import base64
import os

# Initialize Presidio
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

# Mock language model function
def mock_language_model(anonymized_text):
    return f"Processed: {anonymized_text} -> Diagnosis: High risk of condition X"

# Step 1: Generate encryption key and nonce
key = get_random_bytes(32) # AES-256 key
nonce = get_random_bytes(12) # GCM nonce

# Step 2: Encrypt and decrypt helper functions
def encrypt_mapping(mapping, key, nonce):
    cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
    ciphertext, tag = cipher.encrypt_and_digest(json.dumps(mapping).encode('utf-8'))
    return base64.b64encode(nonce + ciphertext + tag).decode('utf-8')

def decrypt_mapping(encrypted_mapping, key):
    raw = base64.b64decode(encrypted_mapping)
    nonce, ciphertext, tag = raw[:12], raw[12:-16], raw[-16:]
    cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
    decrypted = cipher.decrypt_and_verify(ciphertext, tag)
    return json.loads(decrypted.decode('utf-8'))

# Step 3: Anonymize EHR data
def anonymize_ehr(text):
    # Detect PHI
    results = analyzer.analyze(text=text, entities=["PERSON", "DATE_TIME", "PHONE_NUMBER"], language="en")

    # Create mapping for deanonymization
    mapping = {}
    operators = {}
    counter = 1
    for result in results:
        original = text[result.start:result.end]
        token = f"{result.entity_type}_{counter:03d}"
        mapping[original] = token
        operators[result.entity_type] = OperatorConfig("replace", {"new_value": token})
        counter += 1

    # Anonymize text
    anonymized_result = anonymizer.anonymize(
        text=text,
        analyzer_results=results,
        operators=operators
    )

    return anonymized_result.text, mapping

# Step 4: Deanonymize model output
def deanonymize_output(anonymized_text, mapping):
    deanonymized_text = anonymized_text
    for original, token in mapping.items():
        deanonymized_text = deanonymized_text.replace(token, original)
    return deanonymized_text

# Step 5: Main pipeline
def process_ehr_pipeline(ehr_text):
    # Anonymize
    anonymized_text, mapping = anonymize_ehr(ehr_text)
    print(f"Anonymized: {anonymized_text}")

    # Save encrypted mapping
    encrypted_mapping = encrypt_mapping(mapping, key, nonce)
    with open("mapping.enc", "w") as f:
        f.write(encrypted_mapping)

    # Feed to language model (mock)
    model_output = mock_language_model(anonymized_text)
    print(f"Model Output: {model_output}")

    # Decrypt mapping
    with open("mapping.enc", "r") as f:
        encrypted_mapping = f.read()
    decrypted_mapping = decrypt_mapping(encrypted_mapping, key)

    # Deanonymize
    deanonymized_output = deanonymize_output(model_output, decrypted_mapping)
    print(f"Deanonymized Output: {deanonymized_output}")

    return deanonymized_output

# Example usage
if __name__ == "__main__":
    sample_ehr = "Patient John Doe was admitted on 2025-07-12 with phone 555-123-4567."
    process_ehr_pipeline(sample_ehr)



Anonymized: Patient PERSON_001 was admitted on DATE_TIME_002 with phone PHONE_NUMBER_003.
Model Output: Processed: Patient PERSON_001 was admitted on DATE_TIME_002 with phone PHONE_NUMBER_003. -> Diagnosis: High risk of condition X
Deanonymized Output: Processed: Patient John Doe was admitted on 2025-07-12 with phone 555-123-4567. -> Diagnosis: High risk of condition X
