<a href="https://colab.research.google.com/github/Mayo-Radiology-Informatics-Lab/MIDeL/blob/dev/chapters/17B.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# From Chaos to Clarity
## Ensuring Consistent LLM Outputs with Pydantic Models
**A Step-By-Step Guide**

In this notebook, we will explore the concept of enhancing data consistency in language model applications using Pydantic models and the Instructor package.

The main goal of this notebook is to provide you with a step-by-step guide on how to achieve data consistency when working with language models. We will specifically focus on using Pydantic models and the Instructor package to validate JSON responses from a Large Language Model (LLM).

### Introduction

`instructor` is a lightweight Python library that provides a convenient wrapper around the Client of the OpenAI compatible servers, extending it with validation features for obtaining valid JSON responses from a Large Language Model (LLM). Utilizing Pydantic, instructor allows users to specify models for JSON schemas and data validation, ensuring that responses from LLMs adhere to the defined schema.


#### Key Features
- **Easy integration** Seamlessly integrate with various LLM providers beyond OpenAI. See:
    - Working with different providers: https://jxnl.github.io/instructor/hub/
    - Examples: https://jxnl.github.io/instructor/examples/
    - In case of any question, you can also use [ChatGPT-instructor](https://chatgpt.com/g/g-EvZweRWrE-instructor-gpt/) to get a code snippet.
- **Data validation**: Ensure the JSON response from a LLM meets the specified schema. See:
    - https://docs.pydantic.dev/latest/
- **Retry Management**: Retries with error guidance if the LLM returns invalid responses. You can set the maxium number of retries.
- **Streaming Support**: Work with Lists and Partial responses effortlessly

`instructor` can be your starting point to build agents by your self. Have full control over agent flows without relying on complex agent framework.

#### Concept
<img src="https://raw.githubusercontent.com/lennartpollvogt/ollama-instructor/main/Concept.png" alt="Concept Image" width="60%">



By using the Instructor package, you can have full control over agent flows without relying on complex agent frameworks. It serves as a starting point for building your own agents and ensures that the responses from LLMs are consistent and conform to the defined schema.

In the next sections, we will walk through the steps involved in enhancing data consistency using Pydantic models and the Instructor package. We will cover topics such as port forwarding, installation, creating the client, defining the response model, prompting, and more.

Let's dive in and explore the power of Pydantic models and the Instructor package in achieving data consistency in language model applications!

## Step 1: Create a LLM server wiith LLaMa.cpp

For running thos notebook, we need to have a OpenAI Compatible server. You can connect you own OpenAI account, huggingface CLI or local models. But if you dont have those resource, you can run the next code clock to create a LLM server:

In [None]:
import os
import llama_cpp
import instructor
from llama_cpp.llama_speculative import LlamaPromptLookupDecoding
from huggingface_hub import hf_hub_download
import shutil

# Define model details
model_name = "lmstudio-community/Meta-Llama-3.1-8B-Instruct-GGUF"
model_file = "Meta-Llama-3.1-8B-Instruct-IQ4_XS.gguf"
model_path = os.path.join("./models", model_name.split("/")[-1], model_file)

# Function to download the model
def download_model(repo_id, filename, local_dir):
    os.makedirs(local_dir, exist_ok=True)
    print(f"Downloading {filename} from {repo_id}...")
    local_path = hf_hub_download(repo_id=repo_id, filename=filename)
    destination = os.path.join(local_dir, filename)
    shutil.move(local_path, destination)
    print(f"Model downloaded to {destination}")
    return destination

# Check if model exists, download if not
if not os.path.exists(model_path):
    model_path = download_model(model_name, model_file, os.path.dirname(model_path))

# Initialize the Llama model
llama = llama_cpp.Llama(
    model_path=model_path,
    n_gpu_layers=-1,
    chat_format="chatml",
    n_ctx=2048,
    draft_model=LlamaPromptLookupDecoding(num_pred_tokens=2),
    logits_all=True,
    verbose=False,
)

# Patch the model with instructor for enhanced functionality
create = instructor.patch(
    create=llama.create_chat_completion_openai_v1,
    mode=instructor.Mode.JSON,
)

# Test the model with a simple prompt
response = create(
    model="local-model",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is the capital of France?"}
    ],
    temperature=0.7,
    max_tokens=100
)

print(response.choices[0].message.content)

## Step 2: Installation and Creating the Client

#### Installation
To install 'instructor', run the following command in your terminal:

In [None]:
# ! pip install instructor pydantic rich

In [None]:
# Importing the libraries
import yaml
import json
import csv
import os
import time
import logging
from pydantic import BaseModel, Field, create_model
from pydantic.config import ConfigDict
from typing import List, Literal, Optional, Any, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

import instructor
from openai import OpenAI

import rich

#### Adding YAML Configuration
In the next code block, we're introducing a YAML file to manage our configuration settings. YAML (YAML Ain't Markup Language) is a human-readable data serialization format that's particularly useful for configuration files.

In the upcoming code, we'll load this YAML file and extract various configuration elements such as general settings, messages, variables, and examples. This approach will make our project more modular and easier to manage as it grows in complexity.

In [None]:
# loading the yaml file which includes the configuration
yaml = yaml.safe_load(open("ExtractionConfig.yaml"))

config, messages, variables, examples = yaml['config'], yaml['messages'], yaml['variables'], yaml['examples']

In [None]:
# Setting main variables based on the yaml file
HOST = config['host']
API_KEY = config['api_key']
MODEL = config['model'] # Using llama3-70b-instruct

SEED = config['seed'] # random seed
TEMP = config['temperature'] # temperature
MAX_RETRIES = config['max_retries'] # Number of retries that the instructor tries to validate the response

And finally, it is time to create the client. `client` is responsible to contact the LLM server we have and return the response. In this notebook we are using OpenAI compatible server/clients. In case you are using `ollama`, it won't change the process.

In [None]:
# Create the client
client = instructor.from_openai(
    OpenAI(
        base_url=f"{HOST}/v1",
        api_key=API_KEY,  # required, but unused
    ),
    mode=instructor.Mode.JSON, # for more information: https://jxnl.github.io/instructor/concepts/patching/
)

## Step 3: Define the Response Model
In this notebook, we are focusing on a simple text extraction task. For additional use cases, you can refer to [The cookbooks](https://jxnl.github.io/instructor/examples/), or try [Instructor GPT](https://chatgpt.com/g/g-EvZweRWrE-instructor-gpt). These resources provide various examples demonstrating how to use Instructor in different scenarios.

Before moving forward, let's prompt the LLM to extract information we like. We want to compare this response with the response we get in the next steps.
In the next code block, we write a test `system prompt` (which almost means the personality of the LLM or context) and a test `user prompt` (which is the main prompt or request from LLM + guidelines to choose between options). We will test these prompts on a sample report that we have in the yaml file.


In [None]:
# Test prompts
test1_system_prompt = "You are an expert in extracting data from radiology reports with 20 years of experience."
test1_user_prompt = """
Extract data elements from the MR guided ablation report in the <report> tag:

    Guidelines:
    - Focus on findings at the time of scan, not previous ones.
    - If information is not mentioned, use 'Not Mentioned'.
    - Ignore irrelevant information.
    - Use only the provided output format.
    - Expand abbreviations: sv (seminal vesicle), uvj (urethro vesicular junction), vuj (vesico urethral junction), VM (vascular malformation), US (ultrasound), LN (lymph node), CT (computed tomography), MRI (magnetic resonance imaging).

    only return your answer in this json from and always include the <json></json> tag with your answer. Include these tags in your response:
    organ: Extract the organ where the ablation was performed. Indicate 'Not Mentioned' if not specified. Use the provided dictionary to expand abbreviations.
    location: the exact anatomical location of the tissue
    tissueType: Specify the tissue type ablated: 'Muscle', 'Nerve', 'Fat', 'Ligament', 'Tendon', 'Cartilage', 'Bone', or 'Not Mentioned'. You can choose multiple tissues.
    complications: Specify whether complications occurred: 'Yes', 'No', or 'Not Mentioned'
    """

# Loading a sample report from the yaml file
sample_report = yaml['sample_report']

In [None]:
# Let's ask the model:

# Creating the conversation for the model to pass report and instructions
messages = [
        {"role": "system", "content": test1_system_prompt},
        {"role": "user", "content": f"{test1_user_prompt} \n <report> {sample_report} </report>"}
    ]


# Asking the model to extract the requested information
resp = client.chat.completions.create(
        model=MODEL,
        response_model=str,
        messages=messages,
        temperature=TEMP,
        seed=SEED,
        max_retries=MAX_RETRIES,
        ) #type: ignore


rich.print('Output:', resp)

We clearly asked the LLM to give us an answer in json format, but it didn't! And every time that you run you query there is no guarantee that you get the same response structure. Therefore, we need another tools to force LLM and make sure always get a similar response structure.

#### Pydantic Models
Pydantic models are classes that inherit from pydantic.BaseModel. They offer several key benefits:

- **Data Validation**: Models automatically validate input data, ensuring that it conforms to the defined field types and constraints.
- **Type Hinting**: Models leverage Python's type annotations, providing clear type information for fields.
- **Serialization**: Models can easily convert to and from JSON, making them ideal for API development.
- **Schema Generation**: Pydantic can automatically generate JSON schemas from models, useful for documentation and API specifications.


To create a Pydantic model, simply define a class that inherits from `BaseModel`. In the next code block, fields can be customized using the `Field` function. We are also using `typing` package. With the combination of these two packages, we can force the LLM to only response in the desired format:
- `str`: Free from response. There is no limitation for the model. Although we can use max_length to limit the field.
- `Literal`: Imagine that is similar to multiple choice question. LLM can only choose one of them.
- `List`: LLM would return multiple objects in a list. We are using `List` in tandem with `Literal` to force LLM return in a specific terminology, like checking the checkboxes.

In [None]:
# Defining a "Test" response model to understand the pydantic models
class TestModel(BaseModel):
    # Each attribute has a description that will be used by the model to generate the response
    organ: str = Field(...,
        description="Extract the organ where the ablation was performed. Indicate 'Not Mentioned' if not specified. Use the provided dictionary to expand abbreviations."
    )
    location: str = Field(...,
        description="Extract the specific anatomical location within the organ where the ablation was performed. Indicate 'Not Mentioned' if not specified. Use the provided dictionary to expand abbreviations."
    )
    tissueType: List[Literal['Muscle', 'Nerve', 'Fat', 'Ligament', 'Tendon', 'Cartilage', 'Bone', 'Not Mentioned']] = Field(...,
        description="Specify the tissue type ablated: 'Muscle', 'Nerve', 'Fat', 'Ligament', 'Tendon', 'Cartilage', 'Bone', or 'Not Mentioned'. You can choose multiple tissues"
    )
    complications: Literal['Yes', 'No', 'Not Mentioned'] = Field(...,
        description="Specify whether complications occurred: 'Yes', 'No', or 'Not Mentioned'."
    )

    # We can include an example in the pydantic model. Therefore our LLM would have behave like a FewShot classification task.
    model_config = ConfigDict(
        json_schema_extra={
        'examples':
            [
                {
                    "organ": "Liver",
                    "location": "Dome",
                    "tissueType": "Bone",
                    "complications": "No",
                }
            ]
        }
    )

Let's print the response model again to look in to it:

In [None]:
# Printing the model fields and examples
rich.print('Test Response Model Fields:', TestModel.model_fields)
rich.print('Test Response Model Example:', TestModel.model_config)

Since we have a test response model, let's try to sk the LLM again. We can have a fun experience this time. We also remove thhe response structure from the `user_prompt`.

In [None]:
# Test prompts
test2_system_prompt = "You are an expert in extracting data from radiology reports with 20 years of experience."
test2_user_prompt = """
Extract data elements from the MR guided ablation report in the <report> tag:

    Guidelines:
    - Focus on findings at the time of scan, not previous ones.
    - If information is not mentioned, use 'Not Mentioned'.
    - Ignore irrelevant information.
    - Use only the provided output format.
    - Expand abbreviations: sv (seminal vesicle), uvj (urethro vesicular junction), vuj (vesico urethral junction), VM (vascular malformation), US (ultrasound), LN (lymph node), CT (computed tomography), MRI (magnetic resonance imaging).
    """

# Loading a sample report from the yaml file
sample_report = yaml['sample_report']

In [None]:
# Let's ask the model:

# Creating the conversation for the model to pass report and instructions
messages = [
        {"role": "system", "content": test2_system_prompt},
        {"role": "user", "content": f"{test2_user_prompt} \n <report> {sample_report} </report>"}
    ]


# Asking the model to extract the requested information
resp = client.chat.completions.create(
        model=MODEL,
        response_model=TestModel,
        messages=messages,
        temperature=TEMP,
        seed=SEED,
        max_retries=MAX_RETRIES,
        ) #type: ignore


rich.print('Output:', resp.model_dump_json(indent=4))

Do you see the differences? With this neat guaranteed response structure, we are able to extract the requested information from the text and present it in a structured format. Also, we reduce the input token size by removing the instructions for structured response.

#### Creating a Pydantic model based on the YAML file

Depending on the task, we can hardcode the response model or we can define a function that can create the response model based on the yaml file. I prefer to use yaml file since it is more human readable and I can share it with my clinical colleagues to get their feedback, too!

In [None]:
# Creating a helper function to generate the response model from the yaml file
def create_pydantic_model_from_yaml(variables: list[dict], examples: dict):
    """
    Create a Pydantic model from the variables and examples in the yaml file.
    """
    field_definitions = {}
    examples = json.loads(examples)

    for var in variables:
        name = var['name']
        var_type = var['type']
        options = var.get('options')
        description = var['hint']

        if options:
            if var_type == "list":
                field_type = List[Literal[tuple(options)]]
            else:
                field_type = Literal[tuple(options)]
        else:
            if var_type == "str":
                field_type = str
            else:
                # Handle other types as needed
                field_type = Any

        # Create the field definition
        field_definitions[name] = (field_type, Field(description=description))

    # Create a config with json_schema_extra
    model_config = ConfigDict(json_schema_extra={"examples": examples})

    # Create the model using create_model
    ResponseClass = create_model(
        'ResponseClass',
        **field_definitions,
        __config__=model_config
    )
    return ResponseClass

Now, we can create the "Extraction" response model based on the yaml file and print it.

In [None]:
# Creating the response model
Extraction = create_pydantic_model_from_yaml(variables, examples)

# Printing the model fields and examples
rich.print('Extraction Response Model Fields:', Extraction.model_fields)
rich.print('Extraction Response Model Example:', Extraction.model_config)

## Step 4: Prompting
We are almost ready to prompt the model and get the response in desired format. Let's load the `system prompt`  and `user prompt` from the YAML file that our imaginary clinical-scientist colleague wrote for us.

In [None]:
# Loading variables from the yaml file
SYSTEM_PROMPT = messages['system_prompt']
USER_PROMPT = messages['user_prompt']

To check if we stored the correct variables:

In [None]:
rich.print('System prompt:', SYSTEM_PROMPT)
rich.print('User prompt:', USER_PROMPT)

It is time to test prompt the LLM by using a the same real sample report and see what will happen:

In [None]:
sample_report = yaml['sample_report']

In [None]:
# Creating the conversation for the model to pass report and instructions
messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": f"{USER_PROMPT} \n <report> {sample_report} </report>"}
    ]

In [None]:
# Asking the model to extract the requested information
resp = client.chat.completions.create(
        model=MODEL,
        response_model=Extraction,
        messages=messages,
        temperature=TEMP,
        seed=SEED,
        max_retries=MAX_RETRIES,
        )


rich.print(resp.model_dump_json(indent=4))


## Step 5: Batch Processing

We have successfully tested our model and established a response structure. Often, we need to extract information from multiple reports and save it in a CSV file for further analysis. In this step, we will develop an engine that processes these reports and stores the results in an `output.csv` file.

In [None]:
# Creating a helper function to process a single report
def process_report(client, response_model, report_data, model, temperature, seed, max_retries, system_prompt, user_prompt):
    """
    Process a single report and return the combined data.
    """
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"{user_prompt} \n <report> {report_data['report']} </report>"}
    ]
    try:
        resp = client.chat.completions.create(
            model=model,
            response_model=response_model,
            messages=messages,
            temperature=temperature,
            seed=seed,
            max_retries=max_retries,
        )
        resp_dict = json.loads(resp.model_dump_json())
        return {**resp_dict, **report_data}
    except Exception as e:
        logging.error(f"An error occurred while processing report: {e}")
        return None

In [None]:
# Creating the main engine function to process multiple reports
def engine(
        input: List[Dict],
        output: str,
        log_file: str,
        response_model: BaseModel = Extraction,
        model: str = MODEL,
        temperature: float = TEMP,
        seed: int = SEED,
        max_retries: int = MAX_RETRIES,
        host: str = HOST,
        api_key: str = API_KEY,
        system_prompt: str = SYSTEM_PROMPT,
        user_prompt: str = USER_PROMPT,
        concurrency: int = config['concurrency']
) -> str:
    """
    Extracts information from the provided reports and stores the results in a CSV file.
    Processes up to 32 reports simultaneously and shows progress.
    Logs all variables, total time, and average time per report.
    """
    # Configure logging
    logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logging.info("Starting the engine function")
    logging.info(f"Parameters: model={model}, temperature={temperature}, seed={seed}, max_retries={max_retries}, host={host}, api_key={api_key}, \n system_prompt={system_prompt}, \n user_prompt={user_prompt}")

    start_time = time.time()

    # Initialize the OpenAI client
    client = instructor.from_openai(
        OpenAI(
            base_url=f"{host}/v1",
            api_key=api_key,
        ),
        mode=instructor.Mode.JSON,
    )

    # Check if the output exists
    if os.path.exists(output):
        logging.warning(f"Output file {output} already exists. The file content will be REPLACED.")

    # Open the CSV file for writing
    with open(output, 'w', newline='', encoding='utf-8') as csvfile:
        csvwriter = None

        # Create a thread pool with a higher number of workers
        max_workers = min(concurrency, len(input))  # Use up to 32 workers or the number of reports, whichever is smaller
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Initialize a progress bar
            with tqdm(total=len(input), desc="Processing Reports", unit="report") as pbar:
                # Submit tasks for each report
                future_to_report = {executor.submit(process_report, client, response_model, report_data, model, temperature, seed, max_retries, system_prompt, user_prompt): report_data for report_data in input}

                for future in as_completed(future_to_report):
                    report_data = future_to_report[future]
                    try:
                        combined_data = future.result()
                        if combined_data:
                            # Initialize the CSV writer and write the header if it's the first row
                            if csvwriter is None:
                                csvwriter = csv.DictWriter(csvfile, fieldnames=combined_data.keys())
                                csvwriter.writeheader()

                            # Write the combined data to the CSV file
                            csvwriter.writerow(combined_data)

                        # Update the progress bar
                        pbar.update(1)
                    except Exception as e:
                        logging.error(f"An error occurred while processing report: {e}")

    end_time = time.time()
    total_time = end_time - start_time
    average_time_per_report = total_time / len(input) if input else 0

    # Log the total and average time
    logging.info(f"Total processing time: {total_time:.2f} seconds")
    logging.info(f"Average time per report: {average_time_per_report:.2f} seconds")

    return f"Data has been successfully written to {output}"

Since we have the engine, we can move to load our data:

In [None]:
import pandas as pd
# Load the Excel file with the reports
df = pd.read_excel("reports.xlsx")

# Extract 'Radiology Report' column and accession reports
reports = [{"report": row['Radiology Report'], "Accession Number": row['Accession Number']} for index, row in df.iterrows()]

# Process the reports using the engine function and save the output to a CSV file
output_file = "output.csv"
log_file = "log.log"

In [None]:
if __name__ == "__main__":
    engine(
        input=sample_report,
        output=output_file,
        log_file=log_file,
    )

Authors:
Ali Ganjizadeh, Bradley J. Erickson

This notebook is a part of [MIDel.org](http://midel.org/). `MIDeL` is a website to help healthcare professionals and medical imaging scientists learn to apply deep learning methods to medical images.