In [24]:
# logger
import logging
logging.basicConfig(filename='logs.txt',
                    filemode='a',
                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                    datefmt='%H:%M:%S',
                    level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)

# imports
import os
import ast
from fhirclient.client import FHIRClient
import configparser
import gradio_client
import fhirclient.models.observation as o
import fhirclient.models.annotation as a
from linkml_runtime import SchemaView
import pickle

# get the config
config = configparser.ConfigParser()
config.read('../config.ini')

# environment var
os.environ['OPENAI_API_KEY'] = config['AZURE']['AZURE_API_KEY']

# connection to FHIR server
settings = {
    'app_id': 'my_web_app',
    'api_base': 'http://localhost:8080/fhir/'
}
fhir_client = FHIRClient(settings=settings)

# connection to Gradlio server
HOST_URL = "http://localhost:7860"
llm_client = gradio_client.Client(HOST_URL)

# get observations
search = o.Observation.where(struct={'status': 'final'})
observations = search.perform_resources(fhir_client.server)

# load LinkML template
path_to_template = 'test.yaml'
sv = SchemaView(path_to_template)

# llm call function
def run_llm(prompt):

    # string of dict for input
    kwargs = dict(instruction_nochat=prompt)
    res = llm_client.predict(str(dict(kwargs)), api_name='/submit_nochat_api')

    # string of dict for output
    return ast.literal_eval(res)['response']

def get_completion_prompt(cls_name, sv, text):
    """Get the prompt for the given template."""

    # system prompt
    prompt = (
        "From the text below, extract the following entities in the following format (if available):\n\n"
    )

    # schema prompt is concatinated from prompts for each entity group
    for slot in sv.class_induced_slots(cls_name):
        
        # description of the entity group
        slot_prompt = slot.description

        # custom instruction for categorical entities
        if slot.range in sv.all_enums():
            enum_def = sv.get_enum(slot.range)
            pvs = [str(k) for k in enum_def.permissible_values.keys()]
            slot_prompt += f"Must be one of: {', '.join(pvs)}"

        # entity group prompt
        prompt += f"{slot.name}: <{slot_prompt}>\n"
    
    # concatinate the prompt with doctor's note
    prompt = f"{prompt}\n\nText:\n{text}\n\n===\n\n"
    return prompt

def annotate_observation(observation):
    
    # generate the prompt
    prompt = get_completion_prompt(
        cls_name='ClinicalNote',
        sv=sv,
        text = observation.code.text
    )
    
    # LLM call to extract entities 
    result = run_llm(prompt)
    
    # update observation
    observation.note = [a.Annotation({
        "authorString" : "Annotated by LLama2",
        "text": result
    })]
    observation.update(fhir_client.server)

Loaded as API: http://localhost:7860/ ✔


In [25]:
# annotate all observations
for observation in observations:
    annotate_observation(observation)


In [26]:
observations

[<fhirclient.models.observation.Observation at 0x10edf52d0>,
 <fhirclient.models.observation.Observation at 0x110289150>,
 <fhirclient.models.observation.Observation at 0x110289550>,
 <fhirclient.models.observation.Observation at 0x1102898d0>,
 <fhirclient.models.observation.Observation at 0x110298e10>,
 <fhirclient.models.observation.Observation at 0x1102993d0>,
 <fhirclient.models.observation.Observation at 0x11029ac90>,
 <fhirclient.models.observation.Observation at 0x110298510>,
 <fhirclient.models.observation.Observation at 0x110298110>,
 <fhirclient.models.observation.Observation at 0x110298a10>,
 <fhirclient.models.observation.Observation at 0x110299a90>,
 <fhirclient.models.observation.Observation at 0x11029b350>,
 <fhirclient.models.observation.Observation at 0x11029ad10>,
 <fhirclient.models.observation.Observation at 0x110298e90>,
 <fhirclient.models.observation.Observation at 0x110237510>,
 <fhirclient.models.observation.Observation at 0x110235c50>,
 <fhirclient.models.obse

In [30]:
with open('../data/observations_with_annotations.pkl', 'wb') as file:
    pickle.dump(observations_as_json, file)

In [29]:
observations_as_json = [a.as_json() for a in observations]

In [1]:
with open('../data/observations_with_annotations.pkl', 'rb') as file:
    test = pickle.load(file)
test

NameError: name 'pickle' is not defined

In [29]:
##################
# FROM ACCENTURE #
##################

# logger
import logging
logging.basicConfig(filename='logs.txt',
                    filemode='a',
                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                    datefmt='%H:%M:%S',
                    level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)

# imports
import pickle
import fhirclient.models.observation as o
from linkml_runtime import SchemaView

# load data
with open('../data/observations_with_annotations.pkl', 'rb') as file:
    observations_as_json = pickle.load(file)
observations = [o.Observation(a) for a in observations_as_json]

# load LinkML template
path_to_template = 'test.yaml'
sv = SchemaView(path_to_template)

# select test case
test_observation = observations[0]
ann_text = test_observation.note[0].text

# delete intro
ann_text = ann_text[ann_text.find('age'):]

In [30]:
def _parse_line_to_dict(line, sv, cls_name):
    logging.info(f"PARSING LINE: {line}")
    field, val = line.split(":", 1)
    # Field nornalization:
    # The LLML may mutate the output format somewhat,
    # randomly pluralizing or replacing spaces with underscores
    field = field.lower().replace(" ", "_")
    cls_slots = sv.class_slots(cls_name)
    slot = None
    if field in cls_slots:
        slot = sv.induced_slot(field, cls_name)
    else:
        if field.endswith("s"):
            field = field[:-1]
        if field in cls_slots:
            slot = sv.induced_slot(field, cls_name)
    if not slot:
        logging.error(f"Cannot find slot for {field} in {line}")
        # raise ValueError(f"Cannot find slot for {field} in {line}")
        return
    if not val:
        msg = f"Empty value in key-value line: {line}"
        if slot.required:
            raise ValueError(msg)
        if slot.recommended:
            logging.warning(msg)
        return
    inlined = slot.inlined
    slot_range = sv.get_class(slot.range)
    if not inlined:
        if slot.range in sv.all_classes():
            inlined = sv.get_identifier_slot(slot_range.name) is None
    val = val.strip()
    if slot.multivalued:
        vals = [v.strip() for v in val.split(";")]
    else:
        vals = [val]
    vals = [val for val in vals if val]
    logging.debug(f"SLOT: {slot.name} INL: {inlined} VALS: {vals}")
    if inlined:
        transformed = False
        slots_of_range = sv.class_slots(slot_range.name)
        if len(slots_of_range) > 2:
            vals = [_extract_from_text_to_dict(v, slot_range) for v in vals]
        else:
            for sep in [" - ", ":", "/", "*", "-"]:
                if all([sep in v for v in vals]):
                    vals = [dict(zip(slots_of_range, v.split(sep, 1))) for v in vals]
                    for v in vals:
                        for k in v.keys():
                            v[k] = v[k].strip()
                    transformed = True
                    break
            if not transformed:
                logging.warning(f"Did not find separator in {vals} for line {line}")
                return
    # transform back from list to single value if not multivalued
    if slot.multivalued:
        final_val = vals
    else:
        if len(vals) != 1:
            logging.error(f"Expected 1 value for {slot.name} in '{line}' but got {vals}")
        final_val = vals[0]
    return field, final_val

def parse_response_to_dict(result, cls_name, sv):
    lines = result.splitlines()
    ann = {}
    promptable_slots = sv.class_induced_slots(cls_name)
    for line in lines:
        line = line.strip()
        if not line:
            continue
        if ":" not in line:
            if len(promptable_slots) == 1:
                slot = promptable_slots[0]
                logging.warning(
                    f"Coercing to YAML-like with key {slot.name}: Original line: {line}"
                )
                line = f"{slot.name}: {line}"
            else:
                logging.error(f"Line '{line}' does not contain a colon; ignoring")
                return
        r = _parse_line_to_dict(line, sv, cls_name)
        if r is not None:
            field, val = r
            ann[field] = val
    return ann

def _extract_from_text_to_dict(raw_text, cls_name):
    return parse_response_to_dict(raw_text, cls_name)

ann = parse_response_to_dict(
    result = ann_text, 
    cls_name = 'ClinicalNote', 
    sv = sv
)

ann

INFO:root:PARSING LINE: age: Not available (as it is not mentioned in the text)
DEBUG:root:SLOT: age INL: None VALS: ['Not available (as it is not mentioned in the text)']
INFO:root:PARSING LINE: gender: Male (as the patient's name is "Mr. John Doe")
DEBUG:root:SLOT: gender INL: None VALS: ['Male (as the patient\'s name is "Mr. John Doe")']
INFO:root:PARSING LINE: conditions:
ERROR:root:Line '* Diabetes management' does not contain a colon; ignoring


In [26]:
with open('../data/test_results.pkl', 'rb') as file:
    result = pickle.load(file)



NameError: name 'self' is not defined