# Redact PHI using Google Cloud DLP

## Installation and imports

In [None]:
!pip install google-api-python-client google-auth google-auth-oauthlib google-auth-httplib2 --quiet
!pip install google-cloud-dlp google-api-core --quiet
!pip install jdc

In [8]:
import csv
import json
import os
import logging
from google.cloud import dlp_v2
import pandas as pd
import jdc
from collections.abc import Sequence, Mapping
from typing import Final, Union
from pathlib import Path
import sys
import numpy as np

## CSV to JSONL

In [None]:
directory : Path = Path('/content/drive/MyDrive')
input_csv_path : Path = directory / 'rx_refill_or_change_request_evals.csv'
output_jsonl_path : Path = directory / 'rx_refill_or_change_request_evals.jsonl'

with open(input_csv_path, newline='', encoding='utf-8') as csvfile, open(output_jsonl_path, 'w', encoding='utf-8') as jsonlfile:
    reader : csv.DictReader[str, str] = csv.DictReader(csvfile)

    for idx, row in enumerate(reader):
        llm_label : str = row['LLM_LABEL'].strip().lower()
        if llm_label not in {'true', 'false'}:
            continue

        json_line : Mapping[str, str] = {
            "prompt": row['MESSAGE_TEXT'],
            "completion": 1 if llm_label == "true" else 0
        }
        jsonlfile.write(json.dumps(json_line) + '\n')

## Setup Google API

In [2]:
#drive.mount('/content/drive')
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = directory / 'upbeat-arch-463922-n3-8a21012ce142.json'
PROJECT_ID : Final[str] = "upbeat-arch-463922-n3"
LOCATION : Final[str] = "global"

In [3]:
logging.getLogger('google').setLevel(logging.ERROR)
logging.getLogger('googleapiclient').setLevel(logging.ERROR)

logging.basicConfig(level=logging.INFO)
logger : logging.Logger = logging.getLogger(__name__)

# Redactor Class

In [4]:
class GoogleDLPPHIRedactor:
    def __init__(self, project_id: str, location: str = "global"):
        """
        Args:
            project_id: The ID of the Google Cloud project to use.
        """
        self.project_id : str = project_id
        self.dlp_client : dlp_v2.DlpServiceClient = dlp_v2.DlpServiceClient()
        self.parent : str = f"projects/{project_id}/locations/{location}}"

        # Rate limiting: max 10 requests per second
        # .12s between calls = ~8 calls/second (safe margin)
        self.min_delay : np.half = .12 
        self.last_call_time : np.half = 0

        self.builtin_info_types : Sequence[Mapping[str, str]] = [
            {"name": "PERSON_NAME"},
            {"name": "PHONE_NUMBER"},
            {"name": "EMAIL_ADDRESS"},
            {"name": "DATE"},
            {"name": "US_SOCIAL_SECURITY_NUMBER"},
            {"name": "LOCATION"},
        ]

        self.custom_info_types : Sequence[Mapping[str, str]] = [
            {
                "info_type": {"name": "CUSTOM_MEDICAL_RECORD"},
                "regex": {"pattern": r"MRN[:\s]?\d{6,10}"},
            },
        ]

        self.default_replacements : Mapping[str, str] = {
            "PERSON_NAME": "[NAME]",
            "PHONE_NUMBER": "[PHONE]",
            "EMAIL_ADDRESS": "[EMAIL]",
            "DATE": "[DATE]",
            "US_SOCIAL_SECURITY_NUMBER": "[SSN]",
            "LOCATION": "[LOCATION]",
            "CUSTOM_MEDICAL_RECORD": "[MRN]"
        }


In [None]:
%%add_to GoogleDLPPHIRedactor
def _rate_limit(self):
    """Ensure calls don't exceed API rate limits"""
    elapsed np.timedelta64 = time.time() - self.last_call_time
    if elapsed < self.min_delay:
        time.sleep(self.min_delay - elapsed)
    self.last_call_time = time.time()

def redact_phi(self, text: str, exclude_categories: Sequence[str] = None) -> str:
    """
    Redact PHI with selective category exclusion

    Args:
        text: The text to redact
        exclude_categories: List of category names to NOT redact (leave original)
                            e.g., ['DATE', 'PERSON_NAME'] will leave dates and names unredacted

    Returns:
        Redacted text with excluded categories left untouched
    """
    self._rate_limit()

    if exclude_categories is None:
        exclude_categories = []

    try:
        transformations : Sequence[Mapping[str, str]] = []
        categories_to_redact : Mapping[str, str] = {}

        for category, replacement in self.default_replacements.items():
            if category not in exclude_categories:
                categories_to_redact[category] = replacement

        for info_type, replacement in categories_to_redact.items():
            transformations.append({
                "info_types": [{"name": info_type}],
                "primitive_transformation": {
                    "replace_config": {"new_value": {"string_value": replacement}}
                }
            })

        deidentify_config : Mapping[str, str] = {
            "info_type_transformations": {"transformations": transformations}
        }

        inspect_config : Mapping[str, str] = {
            "info_types": self.builtin_info_types,
            "custom_info_types": self.custom_info_types,
            "min_likelihood": dlp_v2.Likelihood.POSSIBLE,
        }

        response : dlp_v2.DeidentifyContentResponse = self.dlp_client.deidentify_content(
            request={
                "parent": self.parent,
                "deidentify_config": deidentify_config,
                "inspect_config": inspect_config,
                "item": {"value": text},
            }
        )

        return response.item.value

    except Exception as e:
        sys.stderr.write(f"Error redacting text: {e}\n")
        return text

In [12]:
%%add_to GoogleDLPPHIRedactor
def process_jsonl_file(self, input_file : Path, output_file : Path, exclude_categories : Sequence[str] = None):
    """
    Process JSONL with selective redaction

    Args:
        input_file: Path to input JSONL file
        output_file: Path to output JSONL file
        exclude_categories: List of categories to leave unredacted
    """
    with open(input_file, 'r') as fin, open(output_file, 'w') as fout:
        for i, line in enumerate(fin, 1):
            sample : Mapping[str, Union[str, Mapping[str, bool]]] = json.loads(line.strip())
            prompt : str = sample.get("prompt", "")

            if prompt:
                sample["prompt"] = self.redact_phi(prompt, exclude_categories=exclude_categories)

            fout.write(json.dumps(sample) + '\n')

            if not i % 200:
                print(f"Processed {i} lines")
                if exclude_categories:
                    print(f"Excluding from redaction: {exclude_categories}")

## Run redactor

In [None]:
redactor : GoogleDLPPHIRedactor = GoogleDLPPHIRedactor(project_id=PROJECT_ID)
redactor.process_jsonl_file(directory / 'rx_refill_or_change_request_evals.jsonl',
                           directory / 'rx_refill_or_change_request_evals_redacted.jsonl',
)

## Custom data

In [None]:
sample_data : Sequence[Mapping[str, str]] = [
    {"prompt": "Zepbound 7.5 mg. The date of my last dose is 5/26. No side effects. My preferred pharmacy is 208 S Akard St # PC08 Dallas, TX 75202 0 0 Phone (214) 741-4912 Fax (401) 770-7108"},
    {"prompt": "Hi! I have been trying to put in a request for a refill via Lilly direct and haven't heard back. Just following up as I have no more medication. Thank you!"},
    {"prompt": "Hello Dr Russell! I hope you had some good time off! I am requesting, please, Zepbound refills, with the possible option of going up in dosage? I still have a pen injectable remaining, so I'm good to discuss as you get back into office the first week of June. I went on that cruise and feel I've lost momentum and am having cravings for sweets, especially at night. No other side effects. Thanks so much!"},
    {"prompt": "Hi, Franky! My insurance recently changed (again). I have one more dose of my medication before I need to refill, but I know PAs can take some time to go through. Would you please ask Dr. Gause to submit when she has a moment?"},
    {"prompt": "Hi Dr. Smith, I am on my last pen of Zepbound 7.5 and require a refill. I am requesting to go up to the next dosage."},
    {"prompt": "Hi Ms Harris, I hope you are doing well, CVS sent me letter to tell me that Zepbound as of July 1st my insurance not going to cover these medication and told me that either ORLISTAT, QSYMIA, SAXENDA, Wagovy , anyone of this will be covered by my insurance , please if you can send the refill to CVS one more time and after the July 1st we have to choose which one, do you think!"},
    {"prompt": "Hi Dr. Madriaga. Walgreens isn't showing that my Rx has any refills. Could you take a look? (If you wanted me to get more blood tests first I can do that. I was thinking I should get the next blood tests right before leaving for Paris.)"},
    {"prompt": "The pharmacy asked either up the dose or override since the insurance company is saying waiting your approval"},
    {"prompt": "Good afternoon, will you please refill the prescription for the 12.5mg. I took the last dose on Sunday"}
]

with open('custom_data.jsonl', 'w') as f:
    for item in sample_data:
        f.write(json.dumps(item) + '\n')

print("Medical data JSONL file created: custom_data.jsonl")

redactor : GoogleDLPPHIRedactor = GoogleDLPPHIRedactor(project_id=PROJECT_ID)
redactor.process_jsonl_file('custom_data.jsonl',
                           "custom_data_redacted.jsonl",
)

## Unmask selected categories

In [8]:
def create_efficient_redacted_dataset(input_file : Path, output_file : Path,
                                    total_samples : np.uintc = 13000,
                                    negative_ratio : np.half = 0.75,
                                    exclude_categories : Sequence[str] = None):
    df : pd.DataFrame = pd.read_json(input_file, lines=True)

    n_negatives : np.uintc = int(total_samples * negative_ratio)
    n_positives : np.uintc = int(total_samples * (1 - negative_ratio))

    sample_0 : pd.DataFrame = df[df['completion'] == 0].sample(n=n_negatives, random_state=42)
    sample_1 : pd.DataFrame = df[df['completion'] == 1].sample(n=n_positives, random_state=42)
    sampled_df : pd.DataFrame = pd.concat([sample_0, sample_1]).sample(frac=1, random_state=42).reset_index(drop=True)

    redactor : GoogleDLPPHIRedactor = GoogleDLPPHIRedactor(project_id=PROJECT_ID)
    print(f"Excluding categories: {exclude_categories if exclude_categories else 'None (redacting all)'}")

    redacted_samples : Sequence[Mapping[str, Union[str, Mapping[str, bool]]]] = []
    for i, (_, row) in enumerate(sampled_df.iterrows(), 1):
        original_prompt : str = row['prompt']
        redacted_prompt : str = redactor.redact_phi(original_prompt, exclude_categories=exclude_categories)
        data : Mapping[str, Union[str, Mapping[str, bool]]] = {
            'prompt': redacted_prompt,
            'completion': {"refill request": row['completion']}
        }
        redacted_samples.append(data)

        if not i % 500:
            print(f"Processed {i}/{len(sampled_df)} samples.")
    print(f"Saving to {output_file}.")
    with open(output_file, 'w') as outfile:
        for sample in redacted_samples:
            outfile.write(json.dumps(sample) + '\n')
    return output_file

In [None]:
categories_to_unmask : Sequence[str] = [
    'PERSON_NAME', 'PHONE_NUMBER',
    'EMAIL_ADDRESS',
    'DATE', 'US_SOCIAL_SECURITY_NUMBER', 'LOCATION',
    'CUSTOM_MEDICAL_RECORD'
]

for category in categories_to_unmask:
    output_file : Path = directory / f"efficient_unmasked_{category.lower()}_13000_75.jsonl"

    create_efficient_redacted_dataset(
        input_file='/content/drive/MyDrive/rx_refill_or_change_request_evals.jsonl',
        output_file=output_file,
        exclude_categories=[category]
    )
