## Setup

In [1]:
import os
import json
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing_extensions import TypedDict, Literal
from IPython.display import Image, display


import numpy as np
from tqdm import tqdm

from langchain import hub
from langchain.chat_models import init_chat_model
from langchain.schema import StrOutputParser
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_core.runnables import RunnablePassthrough, ConfigurableField
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.document_loaders import PDFPlumberLoader

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Any

from constants import (
    DATA_DIR,
    EMBEDDINGS_MODEL_NAME,
    INDEX_CACHE_DIR,
    TOGETHER_META_LLAMA_70B_FREE,
    TOGETHER_DEEPSEEK_DISTILL_LLAMA_70B_FREE,
    TOGETHER_META_LLAMA_VISION_FREE,
    GROUND_TRUTH_ACTUAL_JSON,
)

In [2]:
load_dotenv()

True

## Load Docs

In [5]:
dirs_to_explore = ["LIFEPAK 15", "LIFEPAK 20"]

device_with_pdf_file_name = []
pdf_files_to_explore = []
for pdf_data_dir in dirs_to_explore:
    pdf_files_path = os.path.join(DATA_DIR, pdf_data_dir)
    pdf_files_to_explore += [
        os.path.join(pdf_files_path, pdf_file_name)
        for pdf_file_name in os.listdir(pdf_files_path)
    ]
    device_with_pdf_file_name += [
        (pdf_data_dir.lower(), pdf_file_name)
        for pdf_file_name in os.listdir(pdf_files_path)
    ]

In [6]:
def load_docs(pdf_files, device_with_pdf_title_metadata):
    docs = []

    for file_path, (device, pdf_title) in tqdm(
        zip(pdf_files, device_with_pdf_title_metadata), total=len(pdf_files)
    ):
        loader = PDFPlumberLoader(file_path)
        metadata = {"device": device, "pdf_title": pdf_title}
        for doc in loader.load():
            doc.metadata.update(metadata)
            docs.append(doc)

    print("total docs:", len(docs))
    return docs

## Build Index

In [7]:
index_name = "LIFEPAK_index"

In [8]:
def get_vectorstore(embeddings_model_name, collection_name, persist_directory):
    embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name)

    if os.path.exists(persist_directory):
        vstore = Chroma(
            collection_name=collection_name,
            embedding_function=embeddings,
            persist_directory=persist_directory,
        )
        print("Successfully loaded vectorstore!")
    else:
        print("Failed to load vectorstore. Creating new one...")
        documents = load_docs(pdf_files_to_explore, device_with_pdf_file_name)
        embeddings.show_progress = True
        vstore = Chroma.from_documents(
            documents,
            embeddings,
            collection_name=collection_name,
            persist_directory=persist_directory,
        )
        embeddings.show_progress = False

    return vstore


vectorstore = get_vectorstore(EMBEDDINGS_MODEL_NAME, index_name, INDEX_CACHE_DIR)

Successfully loaded vectorstore!


In [9]:
res = vectorstore.similarity_search(
    query="How do I troubleshoot Dampened Waveform on Lifepak 15?", k=1
)[0].page_content
print(res)

LIFEPAK®20e Defibrillator/Monitor
Performance Inspection Procedure (PIP)
PIP - 5-Lead ECG Tests
Note: Perform this test if 5-lead ECG cable is used. Otherwise, skip to the next test.
The 5-Lead ECG tests consist of:
PIP - 5-Lead ECG Leads Off Detection Test
PIP - 5-Lead ECG Gain Test
PIP - 5-Lead ECG Leads Off Detection Test
1. Connect the 5-wire ECG cable between the device and Impulse 7000DP as shown in Figure 1.12.
2. Set the Impulse 7000DP output to a 1-mv, 10-Hz sine wave.
3. Set the device Lead selection to LEAD II.
4. Remove the RL lead from the Impulse 7000DP, and verify the device displays an ECG LEADS
OFF message and a repeating priority 3 tones shall sound when the Lead is removed. Reconnect
the RL lead.
5. Remove the RA lead from the Impulse 7000DP, and verify the device displays an RA LEADS
OFF message and a repeating priority 3 tones shall sound when the Lead is removed. Reconnect
the RA lead.
6. Remove the LL lead from the Impulse 7000DP, and verify the device displays a

## Build RAG

In [8]:
# def format_docs(docs):
#     # return {"context": "\n\n".join(doc.page_content for doc in docs)}
#     return {"context": "\n\n".join(dummy_doc for dummy_doc in ["doc1", "doc2", "doc3"])}
#
#
# with open(GROUND_TRUTH_ACTUAL_JSON, "r", encoding="utf-8") as file:
#     ground_truth_actual_data = json.load(file)

In [9]:
# retriever = vectorstore.as_retriever()
#
# configurable_retriever = retriever.configurable_fields(
#     search_kwargs=ConfigurableField(
#         id="search_kwargs",
#         name="Search Kwargs",
#         description="The search kwargs to use",
#     )
# )

In [10]:
# dummy_config = {
#     "configurable": {"search_kwargs": {"k": 10, "filter": {"device": "lifepak 20"}}}
# }
#
# dummy_res = configurable_retriever.invoke(
#     "How do I troubleshoot Dampened Waveform", config=dummy_config
# )
# dummy_res[0].metadata

In [11]:
# rag_system_prompt = """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise. Also return response with provided structure, including used context and answer.
# """
# rag_user_prompt = """Question: {question}
# Context: {context}
# """
#
#
# class ContextUnit(BaseModel):
#     title: str = Field(description="The title of pdf document.")
#     pages: list[int] = Field(description="The pages in pdf document.")
#
#
# class RetrievalOutput(BaseModel):
#     context: list[ContextUnit] = Field(description="Used context documents and pages.")
#     answer: str = Field(
#         description="The answer to the question.",
#     )
#
#
# class VectorStoreInputKwargs(BaseModel):
#     k: int = 5
#     filter: dict[str, str] = None
#
#
# class State(BaseModel):
#     question: str = Field(description="The question text.")
#     context: list[Document] = Field(default=None, description="The context Documents.")
#     retriever_out: RetrievalOutput = Field(
#         default=None, description="The retriever formatted output."
#     )
#     vectorstore_input_kwargs: VectorStoreInputKwargs = Field(
#         default=None, description="The vectorstore input kwargs."
#     )
#
#
# llm = init_chat_model(model=TOGETHER_META_LLAMA_70B_FREE, temperature=0)
# rag_structured = llm.with_structured_output(RetrievalOutput)
#
#
# # Define application steps
# def retrieve(state: State):
#     retrieved_docs = vectorstore.similarity_search(
#         state.question, **state.vectorstore_input_kwargs.model_dump()
#     )
#     return {"context": retrieved_docs}
#
#
# def generate(state: State):
#     docs_content = "\n\n".join(
#         f"Source Doc: {doc.metadata['pdf_title']}\n"
#         # f"Page: {doc.metadata['page']}\n"
#         f"{doc.page_content}"
#         for doc in state.context
#     )
#     response = rag_structured.invoke(
#         [
#             {"role": "system", "content": rag_system_prompt},
#             {
#                 "role": "user",
#                 "content": rag_user_prompt.format(
#                     question=state.question, context=docs_content
#                 ),
#             },
#         ]
#     )
#     return {"retriever_out": response}
#
#
# # input_variables = {"context": configurable_retriever | format_docs, "question": RunnablePassthrough()}
# # prompt = hub.pull("rlm/rag-prompt")
# # llm = init_chat_model(model=TOGETHER_META_LLAMA_90B_FREE, temperature=0)
# #
# # rag_chain = input_variables | prompt | llm

In [34]:
# graph_builder = StateGraph(State).add_sequence([retrieve, generate])
# graph_builder.add_edge(START, "retrieve")
# graph = graph_builder.compile()

In [35]:
# initial_state = State(
#     question="How do I troubleshoot low volume on Lifepak 20?",
#     vectorstore_input_kwargs=VectorStoreInputKwargs(
#         filter={"device": "lifepak 20"}, k=5
#     ),
# )
#
#
# result = graph.invoke(initial_state)

In [12]:
# generate.__name__

In [13]:
# rag_system_prompt = """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
# """
#
# ChatPromptTemplate(
#     [
#         (
#             "user",
#             "You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.\nQuestion: {question} \nContext: {context} \nAnswer:",
#         ),
#     ]
# )

In [14]:
# a = (
#     configurable_retriever
#     | format_docs
#     | RunnablePassthrough().assign(copy_context=lambda x: x["context"])
#     | RunnablePassthrough()
# )
# a.invoke("hi")

In [15]:
# example_idx = 1
#
# print("Q:", ground_truth_actual_data[example_idx]["question"])
# print("Expected:", ground_truth_actual_data[example_idx]["ground_truth"])
# print(
#     "\nRAG Response:",
#     rag_chain.invoke(ground_truth_actual_data[example_idx]["question"]).content,
# )

In [81]:
# prompt_args = {
#     "instructions": {"devices": ",".join(dummy_full_list_of_devices_lower)},
#     "rules": {
#         "continue_rule": "1) device name is clear; \n2) it is popular abbreviation, like LP - LIFEPAK.",
#         "specify_rule": "1) there is no series number specification.",
#     },
# }
#
# triage_system_prompt = """
# < Role >
# You are technical devices medical assistant.
# </ Role >
#
# < Instructions >
#
# You have access to list of devices:
# {devices}
#
# Clients ask you a question about instructions about devices. Your first goal is specify exact title of device, to make filtering search in the future.
#
# Here you have two options:
# 1. CONTINUE - if question contains clear title of device, including series number, and this device is form your accessible list.
# 2. SPECIFY - if device name is unclear.
#
# Classify the below question into one of these categories. And specify exact device name from list.
#
# </ Instructions >
#
# < Rules >
# You can continue with device name if:
# {continue_rule}
#
# Ask user to specify device name if:
# {specify_rule}
# </ Rules >
#
# < Few shot examples >
# {examples}
# </ Few shot examples >
# """
# var: str = "hey"
# dev = "lifepak 15"
#
#
# class DeviceClassificationRouter(BaseModel):
#     """Analyze the question and extract device name."""
#
#     reasoning: str = Field(
#         description="Step-by-step reasoning behind the classification."
#     )
#     device: Literal[*dummy_full_list_of_devices_lower, "none"] = Field(
#         description="Exact available device names. Use 'none' if device name is unclear."
#     )
#     classification: Literal["continue", "specify"] = Field(
#         description="The classification of device: 'continue' weather device is specified, 'specify' if device unclear or no device in question."
#     )

In [82]:
# # TOGETHER_META_LLAMA_VISION_FREE
# # TOGETHER_META_LLAMA_90B_FREE
# device_extractor_llm = init_chat_model(
#     model=TOGETHER_META_LLAMA_70B_FREE, temperature=0
# )
# device_extractor_llm_router = device_extractor_llm.with_structured_output(
#     DeviceClassificationRouter
# )
#
# system_prompt = triage_system_prompt.format(
#     devices=prompt_args["instructions"]["devices"],
#     continue_rule=prompt_args["rules"]["continue_rule"],
#     specify_rule=prompt_args["rules"]["specify_rule"],
#     examples=None,
# )

In [83]:
# result = device_extractor_llm_router.invoke(
#     [
#         {"role": "system", "content": system_prompt},
#         {"role": "user", "content": "lifepak: how do I troubleshoot Dampened Waveform?"},
#     ]
# )
#
# result

In [84]:
# class State(TypedDict):
#     k: int
#     question: str
#     device: str
#     config: dict
#     response: str

In [85]:
# def medical_device_retriever_router(state: State) -> Command[Literal["__end__"]]:
#     question = state["question"]
#
#     res = device_extractor_llm_router.invoke(
#         [
#             {"role": "system", "content": system_prompt},
#             {"role": "user", "content": question},
#         ]
#     )
#
#     if (res.classification == "continue") and (
#         res.device in prompt_args["instructions"]["devices"]
#     ):
#         print("Continue with retrieval agent.")
#         config = {
#             "configurable": {
#                 "search_kwargs": {"k": state["k"], "filter": {"device": res.device}}
#             }
#         }
#         response = rag_chain.invoke(input=question, config=config)
#         update = {"device": res.device, "config": config, "response": response}
#
#     elif (res.classification == "specify") and (res.device == "none"):
#         print("🚫 Device is unclear.")
#         update = None
#     else:
#         raise ValueError(f"Invalid classification: {res.classification}")
#
#     goto = END
#     return Command(goto=goto, update=update)

In [16]:
# med_agent = StateGraph(State).add_node(medical_device_retriever_router)
# med_agent = med_agent.add_edge(START, "medical_device_retriever_router")
# med_agent = med_agent.compile()
#
# display(Image(med_agent.get_graph(xray=True).draw_mermaid_png()))

In [17]:
# dummy_res = med_agent.invoke(
#     {"question": "Mizuho: how do I troubleshoot Dampened Waveform?", "k": 5}
# )

## Evaluation

In [13]:
from trulens.core import TruSession, Feedback
from trulens.providers.langchain import Langchain
from trulens.apps.langchain import TruChain
from trulens.dashboard.run import run_dashboard

In [14]:
session = TruSession()
session.reset_database()

🦑 Initialized with db url sqlite:///default.sqlite .
🛑 Secret keys may be written to the database. See the `database_redact_keys` option of `TruSession` to prevent this.


Updating app_name and app_version in apps table: 0it [00:00, ?it/s]
Updating app_id in records table: 0it [00:00, ?it/s]
Updating app_json in apps table: 0it [00:00, ?it/s]


In [28]:
eval_llm = init_chat_model(model=TOGETHER_META_LLAMA_70B_FREE, temperature=0.0)
provider = Langchain(chain=eval_llm)

In [29]:
context = TruChain.select_context(rag_chain)

f_answer_relevance = Feedback(
    provider.relevance, name="Answer Relevance"
).on_input_output()

f_context_relevance = (
    Feedback(provider.context_relevance, name="Context Relevance")
    .on_input()
    .on(context)
    .aggregate(np.mean)
)

f_groundedness = (
    Feedback(provider.groundedness_measure_with_cot_reasons, name="Groundedness")
    .on(context.collect())
    .on_output()
    # .aggregate()
)

✅ In Answer Relevance, input prompt will be set to __record__.main_input or `Select.RecordInput` .
✅ In Answer Relevance, input response will be set to __record__.main_output or `Select.RecordOutput` .
✅ In Context Relevance, input question will be set to __record__.main_input or `Select.RecordInput` .
✅ In Context Relevance, input context will be set to __record__.app.first.steps__.context.first.invoke.rets[:].page_content .
✅ In Groundedness, input source will be set to __record__.app.first.steps__.context.first.invoke.rets[:].page_content.collect() .
✅ In Groundedness, input statement will be set to __record__.main_output or `Select.RecordOutput` .


In [30]:
recorder_kwargs = {
    "app_name": "RAG App",
    "app_version": "0.1.0",
    "feedbacks": [f_answer_relevance, f_context_relevance, f_groundedness],
}

tru_recorder = TruChain(rag_chain, **recorder_kwargs)

instrumenting <class 'langchain_core.runnables.base.RunnableParallel'> for base <class 'langchain_core.runnables.base.RunnableParallel'>
	instrumenting invoke
	instrumenting ainvoke
	instrumenting stream
	instrumenting astream
instrumenting <class 'langchain_core.runnables.base.RunnableParallel'> for base <class 'langchain_core.runnables.base.RunnableSerializable[-Input, dict[str, Any]]'>
	instrumenting invoke
	instrumenting ainvoke
	instrumenting stream
	instrumenting astream
instrumenting <class 'langchain_core.runnables.base.RunnableParallel'> for base <class 'langchain_core.runnables.base.RunnableSerializable'>
	instrumenting invoke
	instrumenting ainvoke
	instrumenting stream
	instrumenting astream
instrumenting <class 'langchain_core.runnables.base.RunnableParallel'> for base <class 'langchain_core.load.serializable.Serializable'>
instrumenting <class 'langchain_core.vectorstores.base.VectorStoreRetriever'> for base <class 'langchain_core.vectorstores.base.VectorStoreRetriever'>


In [31]:
from time import sleep

for sample in tqdm(ground_truth_actual_data[-4:]):
    with tru_recorder as recording:
        rag_chain.invoke(sample["question"])
    # sleep(30)

 25%|██▌       | 1/4 [00:10<00:32, 10.92s/it]


RateLimitError: Error code: 429 - {'id': 'nnbtVPr-4Yz4kd-9269804ea97a3bcb', 'error': {'message': 'You have reached the rate limit specific to this model meta-llama/Llama-3.3-70B-Instruct-Turbo-Free. The maximum rate limit for this model is 6.0 queries per minute. This limit differs from the general rate limits published at Together AI rate limits documentation (https://docs.together.ai/docs/rate-limits). For inquiries about increasing your model-specific rate limit, please contact our sales team (https://www.together.ai/forms/contact-sales)', 'type': 'model_rate_limit', 'param': None, 'code': None}}

In [19]:
run_dashboard(session)

Starting dashboard ...


Accordion(children=(VBox(children=(VBox(children=(Label(value='STDOUT'), Output())), VBox(children=(Label(valu…

Dashboard started at http://localhost:53645 .


<Popen: returncode: None args: ['streamlit', 'run', '--server.headless=True'...>