# DeID Inference Pipeline

### Installation and initialise dependencies

In [None]:
import jsonlines
import json
import glob
import os, tabulate, torch
from openai import AzureOpenAI
import pandas as pd
import bitsandbytes, accelerate
from typing import Union
from evaluate import load
from transformers import AutoTokenizer, pipeline, AutoModelForCausalLM
import torch, gc, json, requests
from tqdm.notebook import tqdm
from medcat.cat import CAT
import re
import os

torch.cuda.empty_cache()
gc.collect()

#Disable parallelism
os.environ["TOKENIZERS_PARALLELISM"] = "false"

pd.set_option('display.max_rows', 70)

In [None]:
from config import settings

# Top-level configuration variables
data_dir = str(settings.DATA_ROOT)
AZURE_HDS_URL = AZURE_HDS_URL
hyperparameters = {
    "num_shots": [0],
    "max_new_tokens": 3000,
    "temperature": 0.1,
    "top_k": 50,
    "top_p": 0.95,
}

In [None]:
from config import settings

# Top-level configuration variables
# Data directory (from .env via settings)
data_dir = str(settings.DATA_ROOT)

# Azure Health DeID service URL (editable)
AZURE_HDS_URL = AZURE_HDS_URL

# Hyperparameters (moved to top)
hyperparameters = {
    "num_shots": [0],
    "max_new_tokens": 3000,
    "temperature": 0.1,
    "top_k": 50,
    "top_p": 0.95,
}


## Define Keys and Dependencies

In [None]:
# Import access_token -- to be included in gitignore
import os, sys

sys.path.append("src")
# Import inference pipeline files
import inference_functions as inference

# Import dataset processing files and define datadir
import results_processing as processing

from dotenv import load_dotenv
load_dotenv()

from config import settings

AZURE_HDS_BEARER_TOKEN = settings.AZURE_HDS_BEARER_TOKEN or os.getenv("AZURE_HDS_BEARER_TOKEN")
if not AZURE_HDS_BEARER_TOKEN:
    raise RuntimeError("Missing AZURE_HDS_BEARER_TOKEN in environment. Set it in your .env file.")
print("Azure HDS token loaded from environment.")

import importlib
importlib.reload(inference)

import time

## Load All Data sets

In [None]:
import glob

# A dictionary containing the names of the tasks, and the corresponding directories where the .jsonl fires are stored
task_list = {
    "CT_scaphoid": f"{data_dir}/CT_scaphoid",
    "MR_scaphoid": f"{data_dir}/MR_scaphoid",
    "XR_scaphoid": f"{data_dir}/XR_scaphoid",
    "IORD_hist":f"{data_dir}/IORD_hist",
    "IORD_rad":f"{data_dir}/IORD_rad"
}

# Define a dictionary that will contain loaded dataframes
data_list = {}

# Load JSONl files in to a df for each task. Return a dict of dataframes, where each df contains all responses for a task
for task, dirname in task_list.items():
    df = processing.loadall_jsonl_to_df(dirname + "/*.jsonl")

    #Now flatten all reads in to a single datafram
    df = df.sort_values(by="id", ascending=True)
    grouped = df.groupby(['id','text'])['label'].apply(list).reset_index()

    # Convert list of labelled values into separate columns
    labels = grouped['label'].apply(pd.Series).rename(columns = lambda x : 'labelled_' + str(x))

    #Reintroduce id and text columns
    labels.insert(0, 'id', grouped['id'])
    labels.insert(1, 'text', grouped['text'])

    data_list[task] = labels

## Define Prompts

In [None]:
#Prompt is structured: 
#Task - Please anonymize the following text
#Rules - Replace all...word by word - then a list
#Examples - for example.
#Instructions about output - This is medical...text.

instruction = """Please anonymize the following text. Replace all of the identifiers with their classification, as specified below. Do this word by word. 

The following is the list of identifiers and the identifier classification, in the format identifier, [classification]. Each tuple is separated by a semi-colon (;): 

1. name-doctor, [doctor];
2. name-patient, [patient];
3. address, [address];
4. age if over 89 years old, [age];
5. dates, [dates];
6. telephone numbers, [phone];
7. fax numbers, [fax];
8. email, [email];
9. social security numbers, [SSN];
10. medical record numbers, [MRN];
11. NHS number, [NHS number];
12. health plan beneficiary numbers, [healthplan];
13. account numbers, [account];
14. license numbers-GMC, [GMC];
15. license numbers-other, [license];
16. specimen numbers, [specimen number];
17. serial numbers, [serial number];
18. vehicle numbers, [vehicle number];
19. web-page universal resource locators (URL), [URL];
20. internet protocol (IP) numbers, [IP];
21. hospital or unit names, [hospital];
22. professional titles (for example, dr, or patient professions such as firefighters), [profession];
23, organisational or company names, [external healthcare organisation].

For example, if you find a doctor's name, such as 'Dr John Doe' replace this with [profession] [doctor] [doctor]. If you find an email address, such as 'jane.doe@nhs.uk', replace this with [email].  

This is medical text, please only redact words if they are identifiers. Information such as bones or diseases should be retained. Do not redact categories other than those specified above.

Do not output any additional or unnecessary text. Do not offer any opinion or summarisation of the text."""


""" In Context learning is a list of tuples, with 'prompt', 'answer' pairs. The prompt generator function will subsequently generate an appropriate thread, e.g. [('raw example 1', 'manually deidentified example 1'), ('raw example 2', 'manually deidentified example 2')]. Please replace these with examples from your own dataset/healthcare context. """
in_context_examples = ['raw example 1', 'manually deidentified example 1'), ('raw example 2', 'manually deidentified example 2')]

## Experimental Configuration - Check before running

In [None]:
#Set true to run sub-samples & False to run a full analysis
sub_sample = False
sample_size = 10

#Set True to save output during processing (FALSE) for test-only runs
save_during_execution = True

#Specify whether to use the FULL dataset, or for fine-tuned model, to use ONLY the evaluation set (after removal of a training set that is used for fine-tuning)
use_test_set_only = True  #MUST set to FALSE if not using Fine-Tuned models
fine_tuned_eval_set_location = '{data_dir}/MedCAT_TEST_set.csv'

#Output file name
output_folder = "results/"
output_file_name = output_folder+"2025-02-11 FT-Anoncat New Concepts Added 0.00002 10ep.csv"

#Location of medcat model pack, and any fine tuned versions
medcat_model_pack_location = "/collaborative_development/EHR-DeID-Eval/models/medcat_deid_model_d88707e29606019f.zip"
OUH_fine_tuned_model_location = 'collaborative_development/EHR-DeID-Eval/models/2025-02-07 OUH Anoncat Added Concepts FT 10 epochs 0.00002/model_8056e540f79d9ff6.zip'

# Experiment configurations
# Define the models, the kwargs to pass to the models, and whether to run on a single GPU or multi-gpu
models = pd.DataFrame(
    columns=["source", "model_name", "cache_folder", "model_kwargs", "use_system_prompt", "multi_GPU"],
    data=[
        ["medcat","anoncat", "", {}, False, False],
        ["cogstack-modelserve", "11 Feb 2025 OUH Fine Tuned AnonCAT 0.00002 10ep new concepts", "", {}, False, False],
        ["hf", "google/gemma-7b-it", "", {"torch_dtype": torch.bfloat16, "repetition_penalty": 2.0}, False, False,],
        ["hf-bert", "obi/deid_bert_i2b2", "", {}, False, False],
        ["hf-bert", "obi/deid_roberta_i2b2", "", {}, False, False],
        ["hf", "microsoft/Phi-3-mini-128k-instruct", "", {}, False, False],
        ["hf", "meta-llama/Llama-2-7b-chat-hf", "", {"torch_dtype": torch.bfloat16}, False, False],
        ["hf", "meta-llama/Llama-2-13b-chat-hf", "", {"torch_dtype": torch.bfloat16}, False, True],
        ["hf", "meta-llama/Llama-2-70b-chat-hf", "", {"torch_dtype": torch.bfloat16}, False, True],
        ["hf", "meta-llama/Meta-Llama-3-8B-Instruct", "", {"torch_dtype": torch.bfloat16}, False, False],
        #["vllm", "meta-llama/Meta-Llama-3-70B-Instruct", "/home/andrewsoltan/.cache/huggingface/hub/models--meta-llama--Meta-Llama-3-70B-Instruct/snapshots/5fcb2901844dde3111159f24205b71c25900ffbd", {"torch_dtype": torch.bfloat16}, False, True],
        ["openai", "GPT35-turbo-base", "{}", True, False],
        ["openai", "GPT-4-0125", "", "", True, False],
        ["microsoft_azure_hds", "Microsoft Azure De-Identification", "", {}, False, False],
    ],
)

# Define the task name
task_name = "De-Identification"

# Set hyperparameters
hyperparameters = {
    "num_shots": [0],  # A list of the number of shots to include in the prompts/to test
    "max_new_tokens": 3000,
    "temperature": 0.1,
    "top_k": 50,
    "top_p": 0.95,
}

# Define a list of output metrics
metrics_list = ["rouge_l", "bleu"]

# Set up the results dataframe
task_results = pd.DataFrame(
    columns=[
        "dataset",
        "id_number_in_dataset",
        "task_name",
        "model_name",
        "model_kwargs",
        "num_shots",
        "original_report",
        "model_output",
    ]
    #+ metrics_list
)

# A flag to prevent the script outputting all outputs dynamically
extremely_verbose = False

# Set an ID (to be the pseudonymised ID in future iterations)
id = 1
counter = 1

#GC and clear caches prior to starting experiments
torch.cuda.empty_cache()
gc.collect()

In [None]:
""" More often than not, the error JSONDecodeError: Expecting value: line 1 column 1 (char 0) means that the Azure DeID bearer token has expired. Check this! """

In [None]:
#Reconstruct the data_list, which is a dictionary of task-specific data, using the test-set if the TEST SET flag is activated
if use_test_set_only:
    test_set = pd.read_csv(fine_tuned_eval_set_location)
    # Group by 'dataset' column
    grouped_by_dataset = test_set.groupby('dataset')

    data_list = {name: group for name, group in grouped_by_dataset}

# If using cogstack-model serve, now run the service, e.g. with this command in terminal
# The below is the location for the fine-tuned model
# python3 /collaborative_development/EHR-DeID-Eval/models/cogstack-modelserve/CogStack-ModelServe-master/app/cli/cli.py serve --model-type medcat_deid --model-path '/collaborative_development/EHR-DeID-Eval/models/2024_06_29 OUH Fine Tuned Anoncat/model_704dddef1fb26e21.zip'  --host 127.0.0.1 --port 8000

In [None]:
#Reload the inference functions file
import importlib
importlib.reload(inference)
import re

# Cycle through models and run through each model
for i, model in models.iterrows():
    # To run for multiple examples, add in a second nested loop among training examples. Suggest writing to a JSONL file and then cycling through for each model
    print(model["model_name"])

    # Empty the cache before loading the model
    torch.cuda.empty_cache()
    gc.collect()

    """ For HF models, load the model prior to the looping through items, for speed"""
    if model["source"] == "hf":

        pipe = inference.hf_load_model(
            model["model_name"],
            model["model_kwargs"],
            multi_gpu=model["multi_GPU"],
        )
    
    if model["source"] == "hf-bert":
        pipe = inference.hf_load_bert_model(
            model["model_name"],
            model["model_kwargs"],
        )
    
    """ For Medcat models, load the model prior to looping for speed"""
    if model["source"] == "medcat":
        cat = CAT.load_model_pack(medcat_model_pack_location)

    """ For models being served through cogstack-model serve"""
    if model["source"] == "cogstack-modelserve":
        print ("*** PLEASE ENSURE THE COGSTACK MODEL-SERVE SERVICE IS RUNNING ON PORT 8000, AND USING THE CORRECT MODEL VERSION *** If this is not done correctly, an error will follow")

    # Cycle through the list of tasks and datasets
    for task, data in data_list.items():
        """For each task dataframe in task_list, run the LLMs on the resultant dataframe"""
        print (task)
        
        """ Sample-sample for test runs"""
        if sub_sample:
            data = data.sample(sample_size)
            print("A limit of "+str(sample_size)+" is being applied as subsampling is active.")

        #### And creates dynamically a list of tuples, called in_context_examples, in format [ (unredacted, redacted) , (unredacted, redacted) ...]

        """Cycle through each item in each task DF and pass through the model"""
        for index, item in tqdm(data.iterrows(), total=data.shape[0]):
            # Set the text in the item as the inference item
            item_for_task = item.text

            #### Define item id to ensure it can be stored in the output the output
            id_of_item_in_dataset = item.id
            #print(task)
            #print(id_of_item_in_dataset)

            """ Run HF Models"""
            if model["source"] == "hf":
                """Generate appropriate prompts - here with 1 shot learning"""
                prompt_list = inference.generate_prompt_threads(
                    instruction,
                    in_context_examples,
                    item_for_task,
                    use_system_prompt=model["use_system_prompt"],
                    number_shots=hyperparameters["num_shots"],
                )

                formatted_prompt_list = inference.hf_generate_prompts(
                    pipe,
                    prompt_list,
                )

                # print (formatted_prompt_list[1])

                outputs = inference.hf_run_model_using_prompts(
                    pipe,
                    formatted_prompt_list,
                    max_new_tokens=hyperparameters["max_new_tokens"],
                    temperature=hyperparameters["temperature"],
                    top_k=hyperparameters["top_k"],
                    top_p=hyperparameters["top_p"],
                )

            elif model["source"] == "hf-bert":
                outputs = [inference.inference_with_bert(pipe, item_for_task)]
                #outputs = [inference.reconstruct_bert_output_text(item_for_task,bert_output)]


            elif model["source"] == "openai":
                """Run OpenAI Models - here with 1 shot learning"""
                prompt_list = inference.generate_prompt_threads(
                    instruction,
                    in_context_examples,
                    item_for_task,
                    use_system_prompt=model["use_system_prompt"],
                    number_shots=hyperparameters["num_shots"],
                )

                outputs = inference.openai_load_client_and_perform_inference(
                    os.getenv("AZURE_OPENAI_KEY"),
                    os.getenv("AZURE_OPENAI_ENDPOINT"),
                    prompt_list,
                    deployment_name=model["model_name"],
                    max_new_tokens=hyperparameters["max_new_tokens"],
                    temperature=hyperparameters["temperature"],
                    top_k=hyperparameters["top_k"],
                    top_p=hyperparameters["top_p"],
                )

            elif model["source"] == "microsoft_azure_hds":
                """Use Microsoft Azure Health Sciences DeID service -- NB there is no option to prompt this service with examples, and hence we only feed in the item for the task"""
                URL = AZURE_HDS_URL
                target = "/api/v1/deid"
                # Return type should be a list with 1 element, only done for the 0-shot version
                outputs = [inference.make_request_to_microsoft_deid(URL, target, item_for_task, bearer_token=os.environ["AZURE_HDS_BEARER_TOKEN"])]

            elif model["source"] == "medcat":
                #Get entities within the source text
                text = item_for_task
                entities = cat.get_entities(text)
                #Reconstruct text by iterating through dict backwards, and inserting placeholders for each PIID type, using the pretty_name from the return output
                for key, value in reversed(entities['entities'].items()):
                    text = inference.reconstruct_anoncat_text(text, value['pretty_name'], value['start'], value['end'])
                
                #Set the output of the reconstruction as the text
                outputs = [text]
            
            elif model["source"] == "cogstack-modelserve":
                text = item_for_task
                #Next, Make an example POST request to redact text, to test the API
                URL = "http://127.0.0.1:8000/"
                target = "redact/"
                headers = {}

                #Automatically re-run if getting an error 429 from overwhelming
                while True:
                    returned_result = requests.post(URL + target, headers=headers, data=json.dumps(text))
                    if returned_result.status_code == 429:
                        print("Received 429 Too Many Requests, waiting 2 seconds before retrying...")
                        time.sleep(2)  # Wait for 2 seconds before retrying
                    else:
                        outputs = [returned_result.text]
                        break  # Exit the loop if the status code is not 429

            # For the purposes of this demo, the reference text that is being summarised is the last item in the prompt list
            referenceText = item_for_task

            num_shot_iter = 0

            # Return the model summary output and the evaluation scores (in this case, rouge_l), for each of the outputs (i.e. for each of the num_shots values)
            for output in outputs:
                # Now perform evaluation on the output using the eval function - RougeL has been implemented for this example
                #evals = inference.evaluate_output(output, referenceText, metrics_list)

                # Create dict per row by merging metrics in with the procedural results in to a single dict
                results_dict = {
                    "dataset": task,
                    "id_number_in_dataset": id_of_item_in_dataset,
                    "task_name": task_name,
                    "model_name": model["model_name"],
                    "model_kwargs": model["model_kwargs"],
                    "num_shots": hyperparameters["num_shots"][num_shot_iter],
                    "original_report": item_for_task,
                    "model_output": output,
                }

                # # Append the metrics to the results dict
                # list_metrics = [list(metric) for metric in evals.items()]
                # results_dict.update(
                #     {
                #         metric_name: metric[1:]
                #         for metric_name, metric in zip(metrics_list, list_metrics)
                #     }
                # )

                # Write results to the dataframe, using a list comprehension to separate out the evals
                task_results.loc[len(task_results)] = results_dict

                # Increment counter
                num_shot_iter = num_shot_iter + 1

                """ Set extremely_verbose to False to spare your screen """
                if extremely_verbose:
                    print(output)
                    print("\n")
                    print(evals)
                    print("\n")
        
        #If for saving during execution, output to file
        if (save_during_execution):
            print ("Saving output")
            task_results.to_csv(output_file_name)

In [None]:
# View the results!
# To view the results in true order, do a multi-level sort for ["dataset","id_number_in_dataset"]
task_results.sort_values(by=["dataset","id_number_in_dataset"]).head(50)
task_results.to_csv("output.csv")

In [None]:
def make_request_to_microsoft_deid (URL, target, text, bearer_token=os.environ["AZURE_HDS_BEARER_TOKEN"]):
    
    headers = {
        "Authorization": "Bearer " + bearer_token,
        "Content-Type" : "application/json"
    }

    payload = {
            "DataType":"Plaintext",
            "Operation":"Redact",
            "InputText": text
    }

    #Make the request to the Microsoft DeID Service
    #If there is an error, a likely outcome is the bearer token is incorrect
    #try:
    response = requests.post(URL+target, headers=headers, json=payload)

    # Status Code
    print("Status Code:", response.status_code)

    # Response Text
    print("Response Text:", response.text)

    # Response JSON (if available)
    try:
        print("JSON Response:", response.json())
    except ValueError:
        print("No JSON in response.")

    # Response Headers
    print("Headers:", response.headers)

    # Response Content (raw bytes)
    print("Content (bytes):", response.content)

    # URL (final URL after redirections, if any)
    print("Final URL:", response.url)

    # Reason for the response status
    print("Reason:", response.reason)

    # Encoding of the response
    print("Encoding:", response.encoding)

    # Cookies (if any)
    print("Cookies:", response.cookies)

    # Elapsed time for the request
    print("Elapsed Time:", response.elapsed)

    #output=anonreturn['outputText']  
    print (anonreturn)
    # except:
    #     #print ("Error, is your bearer token correct?")
    #     output = "Error, is your bearer token valid?"
    #     #print (anonreturn)

    #Now return the reports and add to the series
    #return (output)