In [1]:
import anthropic
import os
from dotenv import load_dotenv
import openai

load_dotenv(".env")

False

In [2]:
def guideline_to_flowchart_prompt(
    reformatted_guidelines: str,
) -> dict[str, str]:
    """
    Convert reformated guidelines to flowchart prompt.

    Args:
        reformatted_guidelines (str): Reformatted guidelines.

    Returns:
        Dict[str, str]: OpenAI messages with reformatted guidelines
    """

    # Payload for the request
    payload = {
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": "Create a flowchart in flowchart TD syntax from given medical \
                                guidelines. You need to convert each step of the guideline to question \
                                so that it could be answered 'Yes' or 'No' \
                                Pay close attention to details, since we do not want to omit anything. \
                                Best, go over guidelines and reason step by step. \
                                Make sure everything ends with either: \
                                'Procedure Medically Necessary' or 'Procedure not Medically Necessary' \
                                Also, make sure you encapsulate all nodes into square brackets: '[' \
                                and labels into double quotes. \
                                Please output generated flowchart only without any additional comments. \
                                Guidelines text: \n\n ``` "
                        + reformatted_guidelines
                        + " ```",
                    },
                ],
            },
        ],
    }

    return payload["messages"]

def extract_necessary_requirements_assertion_prompt(parsed_guideline: str) -> str:
    """
    Extracts the necessary requirements assertion prompt from the parsed guideline.

    Args:
        parsed_guideline (str): The parsed guideline.

    Returns:
        str: The necessary requirements assertion prompt.

    """
    extract_requirements_prompt = [
        {
            "role": "system",
            "content": """Please create a flowchart diagramming whether the stated procedure is \
                medically necessary given the following information. Ensure that there are no parenthesis \
                in the labels, and omit subgraphs and any nodes other than questions (diamonds in \
                flowchart parlance) that could be answered true or false by reading through a medical \
                pre-auth request. If the input is empty, return None.""",
        },
        {
            "role": "user",
            "content": """#### Craniocervical junction abnormalities\n\n_Includes atlantoaxial and \
                occipital instability as well as basilar\ninvagination_\n\nAdvanced imaging of the \
                spine is considered medically necessary for diagnosis\nand management following \
                nondiagnostic radiographs in persons with **ANY** of\nthe following high-risk \
                conditions:\n\n-\n\nDown syndrome\n\n-\n\nGrisel syndrome\n\n-\n\nSkeletal \
                dysplasia\n\n-\n\nRheumatoid arthritis\n\n**IMAGING STUDY**\n\n-\n\nCT cervical \
                spine\n\n-\n\nMRI cervical spine\n\n##### Rationale\n\nRheumatoid arthritis is a \
                systemic inflammatory disease that affects the\ncervical spine in up to 80% of \
                cases resulting in craniocervical instability,\nmost commonly from atlantoaxial \
                subluxation. MRI is the most sensitive exam to\nestablish the diagnosis,1 which \
                carries an increased risk of mortality and\nmorbidity in rheumatoid arthritis \
                patients,2 and lifetime radiological follow\nup may be required.\n""",
        },
        {
            "role": "assistant",
            "content": """flowchart TD
                A["Does the patient have craniocervical junction abnormalities?"] \
                    -->|Yes| B["Have nondiagnostic radiographs been performed?"]
                A -->|No| End1["Procedure not medically necessary"]
                B -->|Yes| C["Does the patient have any of the high-risk conditions?"]
                B -->|No| End1
                C -->|Yes| D["Down syndrome?"]
                C -->|No| End1
                D -->|Yes| E["Advanced imaging CT or MRI is medically necessary"]
                D -->|No| F["Grisel syndrome?"]
                F -->|Yes| E
                F -->|No| G["Skeletal dysplasia?"]
                G -->|Yes| E
                G -->|No| H["Rheumatoid arthritis?"]
                H -->|Yes| E
                H -->|No| End1
                E --> I["CT cervical spine"]
                E --> J["MRI cervical spine"]""",
        },
        {"role": "user", "content": parsed_guideline},
    ]

    return extract_requirements_prompt

In [3]:
client = anthropic.Anthropic(
    api_key=os.getenv("CLAUDE_KEY")
)

In [4]:
import json
import hashlib
import diskcache as dc

cache = dc.Cache("data")

def hash_parameters(*args, **kwargs):
    """Create a hash of the function parameters."""
    kwargs.pop("response_format", None)
    params = json.dumps((args, kwargs))
    return hashlib.md5(params.encode()).hexdigest()

def cached(func):
    def wrapper(*args, **kwargs):
        cache_kwargs = kwargs.copy()
        if "client" in cache_kwargs:
            cache_kwargs.pop("client")
        key = f"{func.__name__}:{hash_parameters(*args, **cache_kwargs)}"
        if key in cache:
            return cache[key]
        result = func(*args, **kwargs)
        cache[key] = result
        return result

    return wrapper

In [5]:
@cached
def get_claude_response(messages):
    response = client.messages.create(
        model="claude-3-5-sonnet-20240620",
        max_tokens=1024,
        messages=messages,
        temperature=0.0,
        top_p=0.95,
    )
    return response

In [6]:
import backoff
import requests


# Implement backoff logic that will retry the request
@backoff.on_predicate(
    backoff.runtime,  # Exponential backoff
    predicate=lambda r: r.status_code == 429,
    value=lambda r: int(r.headers.get("Retry-After")),
)
@cached
def get_gpt_response_api(prompt, model_name, endpoint, key, temperature):
    headers = {
        # "authorization": f"Bearer {key}",
        "api-key": key,
        "Content-Type": "application/json"
    }

    modified_schema = {
        "type": "object",
        "required": ["results"],
        "additionalProperties": False,
        "properties": {
            "results": {
                "type": "object",
                    "required": ["summary", "discussion", "answers"],
                    "additionalProperties": False,
                    "properties": {
                        "summary": {
                            "type": "string",
                            "description": "Summary of the patient information focusing on aspects that are relevant to the question(s)"
                        },
                        "discussion": {
                            "type": "string",
                            "description": "How does the provided information help or not help to answer the questions?"
                        },
                        "answers": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "required": ["answer", "supporting evidence"],
                                "additionalProperties": False,
                                "properties": {
                                    "answer": {
                                        "type": "string",
                                        "description": "The correct answer choice among the multiple choices"
                                    },
                                    "supporting evidence": {
                                        "type": "string",
                                        "description": "A direct quote from the patient information that supports the answer if one exists."
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }

    # The data being sent in the request, including the schema and prompt
    data = {
        "model": model_name,
        "messages": prompt,
        "temperature": temperature,
        # "logprobs": True,
        # "top_logprobs": 5,
        # "response_format": {
        #     "type": "json_schema",
        #     "json_schema": {
        #         "name": "QuestionAnswers",
        #         "strict": True,
        #         "schema": modified_schema
        #     }
        # }
    }

    # Send the request
    print(f"ENDPOINT: {endpoint}")
    response = requests.post(endpoint, headers=headers, json=data)
    return response

@cached
def get_gpt_response(messages, model_name, endpoint, key):
    try:
        return openai.ChatCompletion.create(
            api_key=key,
            api_base=endpoint,
            api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
            api_type="azure",
            engine=model_name,
            messages=messages,
            temperature=0.0,
            top_p=0.95,
            frequency_penalty=0,
            presence_penalty=0,
            stop=None,
        )
    except openai.InvalidRequestError:
        return {"choices": [{"message": {"content": "Question 1: The request was deemed invalid and likely violated the content filters."}}]}


In [7]:

url = "https://guidelines.carelonmedicalbenefitsmanagement.com/imaging-of-the-chest-2024-04-14/"
markdown = "#Other thoracic mass lesions Advanced imaging is considered medically necessary for diagnosis and management of **ANY** of the following findings or conditions: - Mediastinal mass (see separate indication for lymphadenopathy) - Pancoast tumor - Pleural mass - Thymoma - Benign tumors (pediatric only) **IMAGING STUDY** \*\*_ADULT_ \*\* - CT chest - MRI chest for evaluation of mediastinal and hilar masses when CT is insufficient for problem solving or for evaluation of chest wall extension in Pancoast tumor \*\*_PEDIATRIC_ \*\* - CT or MRI chest"
topic = "Other thoracic mass lesions"
def get_mermaid_from_gpt(topic, markdown):
    message = guideline_to_flowchart_prompt(markdown)
    response = get_gpt_response(message, 
                                    model_name="gpt-4o-latest", 
                                    endpoint=os.getenv("AZURE_OPENAI_ENDPOINT_EAST2"), 
                                    key=os.getenv("AZURE_OPENAI_API_KEY_EAST2"), 
                                    # temperature=0.0
                                    )
    return response["choices"][0]["message"]["content"]

def get_mermaid_from_claude(topic, markdown):
    message = guideline_to_flowchart_prompt(markdown)
    response = get_claude_response(message)
    return response.content[0].text

import time
start_time = time.time()
get_mermaid_from_claude(topic, markdown)
print(f"Finished in {(time.time()-start_time):.4f} seconds")


Finished in 0.0008 seconds


  markdown = "#Other thoracic mass lesions Advanced imaging is considered medically necessary for diagnosis and management of **ANY** of the following findings or conditions: - Mediastinal mass (see separate indication for lymphadenopathy) - Pancoast tumor - Pleural mass - Thymoma - Benign tumors (pediatric only) **IMAGING STUDY** \*\*_ADULT_ \*\* - CT chest - MRI chest for evaluation of mediastinal and hilar masses when CT is insufficient for problem solving or for evaluation of chest wall extension in Pancoast tumor \*\*_PEDIATRIC_ \*\* - CT or MRI chest"


In [8]:
import scorcery
import scorcery.create_local_overrides

override_path = "overrides.db"
if not os.path.isfile(override_path):
    scorcery.create_local_overrides.migrate_postgres_to_sqlite(override_path)

In [9]:
import scorcery.final_db
from peewee import SqliteDatabase
import scorcery.final_db.guideline_repo
import scorcery.flows
import graph_tools


target_db = SqliteDatabase(override_path)
count = 0
match_count = 0
score_total = 0
target_title = "Penile, Vaginal, and Vulvar Cancers"
for target in scorcery.final_db.guideline_repo.GuidelineRepo.all(target_db):
    count += 1
    response = get_mermaid_from_gpt(target.title, target.id)
    try:
        experiment_flow = graph_tools.graph.EMGraph.from_mermaid(response)
        experiment_flow.as_logic_graph()
        score = scorcery.flows.score_flows(target.flow, experiment_flow.get_mermaid_text())
        print(f"{score:.4f} - {target.title}")
        score_total += score
        match_count += 1
        if score < 0.6:
            print("&"*100)
            print(target.flow)
            print("*"*50)
            print(response)
            print("&"*100)
    
    except Exception as e:
        print("&"*100)
        print(f"Error: {target.title} failed because: {e}")
        print("*"*50)
        print(target.flow)
        print("*"*50)
        print(response)
        print("&"*100)
    
    # break

print(count)
print(match_count)


0.5535 - Head and Neck Cancer
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
graph TD
    A["Start"] --> B{"Is CT Primary Site and Neck Requested?"}
    A --> C{"Is CT Chest / CT Abdomen and Pelvis Requested?"}
    A --> D{"Is MRI Primary Site and Neck Requested?"}
    A --> E{"Is FDG-PET/CT Requested?"}

    B --> F{"Is there documented head and neck cancer?"}
    F --> G{"Is this for initial diagnostic workup?"}
    F --> H{"Is this to assess response to neoadjuvant treatment or after concurrent chemoradiotherapy?"}
    F --> I{"Is this for surveillance imaging?"}
    G --> J["Procedure Medically Necessary"]
    H --> J
    I --> J

    C --> K{"Is there documented head and neck cancer?"}
    K --> L{"Is this for initial diagnostic workup?"}
    K --> M{"Is this part of the management strategy?"}
    K --> N{"Is this for surveillance imaging?"}
    L --> J
    M --> J
    N --> J

    D --> O{"Is there documented head and neck can

In [10]:

print("CLAUDE")
print(score_total)
print(score_total / count)
print(score_total / match_count)

CLAUDE
20.784501185743046
0.4948690758510249
0.6495156620544702


In [11]:

print("gpt-4o-latest")
print(score_total)
print(score_total / count)
print(score_total / match_count)

gpt-4o-latest
20.784501185743046
0.4948690758510249
0.6495156620544702
