In [1]:
import os
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
LOCAL_LLM = 'llama3'
LLAMA3_70B = 'llama3-70b-8192'
LLAMA3_8B = 'llama3-8b-8192'
VECTOR_DB_URL = 'http://localhost:6333'
EMBEDDING_MODEL = 'NeuML/pubmedbert-base-embeddings'

In [3]:
positive_query = """
Given the following subjective and objective assessment, provide a well informed and researched differential diagnosis

Subjective assessment:
The patient, Mr. Smith, is a 45-year-old male who presents to the clinic with complaints of lower back pain that has been bothering him for the past two weeks. He describes the pain as dull and achy, located in the lumbar region, with occasional radiation down his left leg. He notes that the pain worsens with prolonged sitting or standing and is relieved by lying down. He denies any recent trauma or injury but mentions that he has a history of occasional low back pain, especially after heavy lifting or prolonged periods of inactivity. He rates the pain as a 6 out of 10 on the pain scale.

Objective assessment:
On physical examination, Mr. Smith appears uncomfortable but is able to walk into the examination room without assistance. Vital signs are within normal limits. Inspection of the lumbar spine reveals no obvious deformities or asymmetry. Palpation elicits tenderness over the paraspinal muscles of the lumbar spine, particularly on the left side. Range of motion of the lumbar spine is mildly restricted, with pain on forward flexion and left lateral bending. Straight leg raise test is positive on the left side at 45 degrees, reproducing his symptoms of radiating pain down the left leg. Neurological examination reveals intact sensation and strength in the lower extremities, with no signs of motor weakness or sensory deficits.
"""

In [4]:
from langchain_qdrant import Qdrant
from qdrant_client import QdrantClient
from langchain_huggingface import HuggingFaceEmbeddings

def get_vector_embeddings(embedding_model: str):
    embeddings = HuggingFaceEmbeddings(
        model_name=embedding_model
    )
    return embeddings

print("Loading vector embeddings and creating Qdrant client...")
embeddings = get_vector_embeddings(EMBEDDING_MODEL)
client = QdrantClient(VECTOR_DB_URL)
db = Qdrant(
    client=client,
    embeddings=embeddings,
    collection_name="physio-textbooks",
)

Loading vector embeddings and creating Qdrant client...


  from tqdm.autonotebook import tqdm, trange


In [5]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers.json import JsonOutputParser
from langchain.prompts import PromptTemplate
from langchain.output_parsers import PydanticOutputParser
from langchain_groq import ChatGroq
from pydantic import BaseModel, Field
from typing_extensions import TypedDict
from typing import List
from langchain.schema import Document
from pprint import pprint
from langgraph.graph import StateGraph, END
import asyncio

In [6]:
### Query translator

# Create a pydatnic object for the translation result
class QueryTranslatorAgentOutput(BaseModel):
    """Outputs a list of translated subqueries based on the input query"""
    subqueries: List[str] = Field(
        ...,
        description="List of translated subqueries"
    )
  
parser = PydanticOutputParser(pydantic_object=QueryTranslatorAgentOutput)

QUERY_TRANSLATOR_PROMPT = """<|begin_of_text|><|start_header_id|>system<|end_header_id|>You are a Query Translator Agent for a retrieval-augmented generation system,
your job is to translate a complex query into numerous concise and targetted queries
This sub-queries will be used to conduct a similarity search retrieval on a vector database containing medical information related to physiotherapy and musculoskeletal conditions.

## Objective:
Convert subjective and objective patient assessments into multiple, specific, and focused queries. 
These queries will be used for retrieval-augmented generation from a vector database to aid physiotherapists in identifying potential differential diagnoses.

## Audience: 
Physiotherapists requiring precise queries derived from patient assessments to facilitate differential diagnosis using retrieval-augmented generation.

## Instructions:
1. Synthesize Patient Assessments into Queries
- Combine key elements from both subjective and objective assessments into a comprehensive set of queries. Each query should be focused, addressing specific aspects of the patient's symptoms, findings, or history.
2. Extract Key Elements:
- Subjective Assessment: Include patient demographics, pain characteristics, symptom descriptions, aggravating/relieving factors, pain score, and relevant history.
- Objective Assessment: Incorporate physical examination findings, test results, clinical observations, and neurological signs.
3. Formulate Multiple Queries:
- Create a set of specific queries that cover different aspects of the assessment.
- Ensure the queries facilitate effective information retrieval to support differential diagnosis.
- Query Structure: Should address pain characteristics, potential underlying causes, associated symptoms, and specific physical findings.
4. Use Medical Terminology:
- Apply accurate medical terminology that reflects the clinical assessments.
- Example Terms: “lumbar region,” “radiating pain,” “positive straight leg raise,” etc.
5. Maintain Professional Tone:
- Ensure the queries are precise, clinically relevant, and professional.
- Avoid speculative language and focus on synthesizing provided data.
6. Ensure Query Completeness
- Make sure the set of queries comprehensively covers all relevant aspects of the assessment for effective retrieval.
- Completeness Check: Confirm the queries include all necessary details without omitting critical findings.

## Example main input query:
Given the following subjective and objective assessment, provide a well informed and researched differential diagnosis

Subjective assessment:
The patient, Mr. Smith, is a 45-year-old male who presents to the clinic with complaints of lower back pain that has been bothering him for the past two weeks. He describes the pain as dull and achy, located in the lumbar region, with occasional radiation down his left leg. He notes that the pain worsens with prolonged sitting or standing and is relieved by lying down. He denies any recent trauma or injury but mentions that he has a history of occasional low back pain, especially after heavy lifting or prolonged periods of inactivity. He rates the pain as a 6 out of 10 on the pain scale.

Objective assessment:
On physical examination, Mr. Smith appears uncomfortable but is able to walk into the examination room without assistance. Vital signs are within normal limits. Inspection of the lumbar spine reveals no obvious deformities or asymmetry. Palpation elicits tenderness over the paraspinal muscles of the lumbar spine, particularly on the left side. Range of motion of the lumbar spine is mildly restricted, with pain on forward flexion and left lateral bending. Straight leg raise test is positive on the left side at 45 degrees, reproducing his symptoms of radiating pain down the left leg. Neurological examination reveals intact sensation and strength in the lower extremities, with no signs of motor weakness or sensory deficits.

## Example output subqueries:
1. **Lumbar Region Pain:** What are the common causes of dull and achy lower back pain in a 45-year-old male, particularly with symptoms that radiate down the left leg?
2. **Leg Pain Radiation with Lower Back Pain:** What differential diagnoses should be considered for a patient who experiences radiating pain down the left leg with lower back pain, aggravated by prolonged sitting or standing?
3. **Positive Straight Leg Raise Test:** How does a positive straight leg raise test at 45 degrees on the left side correlate with specific lumbar spine pathologies?
4. ...
...

## Output instructions:
{format_instructions}<|eot_id|>

<|start_header_id|>user<|end_header_id|>
{question}<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>
"""

llm = ChatGroq(model=LLAMA3_70B, temperature=1, stop_sequences=["<|eot_id|>"])
prompt = PromptTemplate(
    template=QUERY_TRANSLATOR_PROMPT,
    input_variables=["question"],
    partial_variables={
        "format_instructions": parser.get_format_instructions(),
    }
)

query_translator = prompt | llm | JsonOutputParser(pydantic_object=QueryTranslatorAgentOutput)
query_translator = query_translator.with_retry()

translator_result = query_translator.invoke({
    "question": positive_query
})
subqueries = translator_result["subqueries"]
print(subqueries)


['What are the common causes of dull and achy lower back pain in a 45-year-old male, particularly with symptoms that radiate down the left leg?', 'What differential diagnoses should be considered for a patient who experiences radiating pain down the left leg with lower back pain, aggravated by prolonged sitting or standing?', 'How does a positive straight leg raise test at 45 degrees on the left side correlate with specific lumbar spine pathologies?', 'What role does paraspinal muscle tenderness play in the diagnosis of lumbar spine disorders, particularly on the left side?', 'What are the possible causes of mildly restricted range of motion of the lumbar spine, with pain on forward flexion and left lateral bending?', 'What is the significance of a history of occasional low back pain, especially after heavy lifting or prolonged periods of inactivity, in the differential diagnosis of lumbar spine disorders?', 'What are the implications of intact sensation and strength in the lower extre

In [7]:
### Retrieval Grader

# Create pydantic object for the grader result
class GraderResult(BaseModel):
    score: str = Field(
        "", 
        description="A binary score 'yes' or 'no' to indicate whether the document is relevant to the question, 'yes' if relevant and 'no' if not relevant"
    )
    # reason: str = Field(
    #     "", 
    #     description="A short explanation of the reason for the score given, indicating why the document is relevant or not relevant to the question"
    # )

parser = PydanticOutputParser(pydantic_object=GraderResult)

RETRIEVAL_GRADER_PROMPT = """
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a grader assessing relevance of a retrieved document to a user question. 
If the document contains keywords related to the user question or provides relevant information to answer the question,
grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals.

{format_instructions}<|eot_id|>
<|start_header_id|>user<|end_header_id|>

Here is the retrieved document:

{document}

Here is the user question: {question}<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
"""

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# llm = ChatGroq(model=LLAMA3_80B, temperature=1)
# llm = ChatOllama(model=LOCAL_LLM, format="json", temperature=0)

prompt = PromptTemplate(
    template=RETRIEVAL_GRADER_PROMPT, 
    input_variables=["document", "question"], 
    partial_variables={"format_instructions": parser.get_format_instructions()}
)

# Create a retrieval grader by chaining the prompt, llm model, and the json output parser
retrieval_grader = prompt | llm | JsonOutputParser(pydantic_object=GraderResult)
retrieval_grader = retrieval_grader.with_retry()

default_query = "When did sir Standford Raffles founded singapore?"

def retrieval_grader_test(query: str = default_query):
    docs = db.similarity_search_with_score(query=query, k=3)

    for i , (doc, score) in enumerate(docs, start=1):
        result = retrieval_grader.invoke({
            "document": doc,
            "question": query
        })
        print(f"<<\n{doc.page_content}\n>>")
        print(f"{i} - {result}", sep="\n\n", end="\n\n\n" + "-" * 50 + "\n\n\n")

query = input("Enter a query (press enter if you want to use the default query): ")
retrieval_grader_test(query=query or default_query)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


<<
is raised with ‘ peaking’  centrally ). (B) T he l ate ral  radi og r aph s how s  i nc re ase d de nsi ty  ante ri or to the oblique f ssure (arrows).
>>
1 - {'score': 'no'}


--------------------------------------------------


<<
. 
J Aging
Res.
 2012;2012: :695854.
27 
Andersen-Ranberg K., Schroll M., Jeune B.
Healthy centenarians do not exist, but
autonomous centenarians do: a population-
>>
2 - {'score': 'no'}


--------------------------------------------------


<<
Accessed May 9, 2018.
98 
Kitwood T.M. 
Dementia Reconsidered: The
Person Comes First
.
 Bukingham, England:
Open University Press; 1997.
>>
3 - {'score': 'no'}


--------------------------------------------------




In [8]:
### Async retrieval grader test

# Use each subquery to perform similarity search on vector database
documents = []
for i, subquery in enumerate(subqueries, start=1):
    print(f"{i} - Performing similarity search for subquery: {subquery}")
    results = db.similarity_search_with_score(subquery, k=3)
    documents.append(
        {
            "question": subquery,
            "documents": [doc.page_content for doc, score in results],
        }
    )


async def grade_query_document_pair(query, document, query_index, document_index):
    llm_result = await retrieval_grader.ainvoke(
        {"document": document, "question": query}
    )

    return {
        "query_index": query_index,
        "document_index": document_index,
        "is_relevant": llm_result["score"] == "yes",
    }


async def filter_subquery_documents(documents):
    # Create a list of coroutines for grading each query-document pair
    invocations = []
    for i, item in enumerate(documents):
        query = item["question"]
        docs = item["documents"]

        for j, doc in enumerate(docs):
            invocations.append(grade_query_document_pair(query, doc, i, j))

    grading_results = await asyncio.gather(*invocations)

    # Create a list of filtered documents based on the grading results
    filtered_documents = [
        {"question": item["question"], "documents": []} for item in documents
    ]
    for result in grading_results:
        query_index = result["query_index"]
        document_index = result["document_index"]
        is_relevant = result["is_relevant"]

        if is_relevant:
            filtered_documents[query_index]["documents"].append(
                documents[query_index]["documents"][document_index]
            )

    return filtered_documents


filtered_documents = await filter_subquery_documents(documents)

# check for query with no relevant documents
for i, item in enumerate(filtered_documents, start=1):
    if not item["documents"]:
        print(f"No relevant documents found for query {i} - {item['question']}")
        

1 - Performing similarity search for subquery: What are the common causes of dull and achy lower back pain in a 45-year-old male, particularly with symptoms that radiate down the left leg?
2 - Performing similarity search for subquery: What differential diagnoses should be considered for a patient who experiences radiating pain down the left leg with lower back pain, aggravated by prolonged sitting or standing?
3 - Performing similarity search for subquery: How does a positive straight leg raise test at 45 degrees on the left side correlate with specific lumbar spine pathologies?
4 - Performing similarity search for subquery: What role does paraspinal muscle tenderness play in the diagnosis of lumbar spine disorders, particularly on the left side?
5 - Performing similarity search for subquery: What are the possible causes of mildly restricted range of motion of the lumbar spine, with pain on forward flexion and left lateral bending?
6 - Performing similarity search for subquery: What i

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


No relevant documents found for query 1 - What are the common causes of dull and achy lower back pain in a 45-year-old male, particularly with symptoms that radiate down the left leg?
No relevant documents found for query 2 - What differential diagnoses should be considered for a patient who experiences radiating pain down the left leg with lower back pain, aggravated by prolonged sitting or standing?


In [9]:
### Generation Agent

from langchain_core.output_parsers.string import StrOutputParser

PROMPT = """
<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a expert in physiotherapy, you will be presented with a set of subjective and object assessment and your job is to come up with a well informed and researched differential diagnosis of the medical scenerio.
Here are examples of the subjective and objective assessments and the expected differential diagnosis:

Subjective Assessment:
The patient, Mr. Smith, is a 45-year-old male who presents to the clinic with complaints of lower back pain that has been bothering him for the past two weeks. He describes the pain as dull and achy, located in the lumbar region, with occasional radiation down his left leg. He notes that the pain worsens with prolonged sitting or standing and is relieved by lying down. He denies any recent trauma or injury but mentions that he has a history of occasional low back pain, especially after heavy lifting or prolonged periods of inactivity. He rates the pain as a 6 out of 10 on the pain scale. On physical examination, Mr. Smith appears uncomfortable but is able to walk into the examination room without assistance.

Objective Assessment:
Vital signs are within normal limits. Inspection of the lumbar spine reveals no obvious deformities or asymmetry. Palpation elicits tenderness over the paraspinal muscles of the lumbar spine, particularly on the left side. Range of motion of the lumbar spine is mildly restricted, with pain on forward flexion and left lateral bending. Straight leg raise test is positive on the left side at 45 degrees, reproducing his symptoms of radiating pain down the left leg. Neurological examination reveals intact sensation and strength in the lower extremities, with no signs of motor weakness or sensory deficits. On physical examination, Mr. Smith appears uncomfortable but is able to walk into the examination room without assistance. Vital signs are within normal limits. Inspection of the lumbar spine reveals no obvious deformities or asymmetry. Palpation elicits tenderness over the paraspinal muscles of the lumbar spine, particularly on the left side. Range of motion of the lumbar spine is mildly restricted, with pain on forward flexion and left lateral bending. Straight leg raise test is positive on the left side at 45 degrees, reproducing his symptoms of radiating pain down the left leg. Neurological examination reveals intact sensation and strength in the lower extremities, with no signs of motor weakness or sensory deficits.

Differential Diagnosis:
Based on the assessments provided, my differential diagnosis for Mr. Smith would include:
1. ...
2. ...
...

IMPORTANT:
- Only use the retrieved context as the source of ground truth to answer the question.
- If the query is not related to physiotherapy or unrelated from the retrieved context, please answer with "I am sorry, I am not able to answer this question."
- Always include citations and references to support your answer. <|eot_id|>
- If context is not enough to answer the question, you can use parametric data to generate the answer, but make sure to label it as such.
<|start_header_id|>user<|end_header_id|> 

Answer the question based only on the following context, this context should be used as source of ground truth to answer the question.
The retrieved context is a set of documents that are retrieved based on the subqueries derived from the patient assessments.

{context}

---

Answer the question based on the above context: 
{question}<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
"""

llm = ChatGroq(model=LLAMA3_70B, temperature=0.5, stop_sequences=["<|eot_id|>"])
# llm = ChatOpenAI(model="gpt-4o", temperature=0.5)

prompt = PromptTemplate(template=PROMPT, input_variables=["context", "question"])
rag_chain = prompt | llm | StrOutputParser()

docs = db.similarity_search_with_score(query=positive_query, k=3)

def format_retrieved_docs(docs):
    return "\n\n---\n\n".join([f"metadata: {doc.metadata}\nscore: {score}\ncontent: {doc.page_content}" for doc, score in docs])

# Test the generation agent
print(prompt.format(context=format_retrieved_docs(docs), question=positive_query))
print("Waiting for LLM response...")
stream = rag_chain.stream({
  "context": format_retrieved_docs(docs),
  "question": positive_query
})
generation = ""
for response in stream:
    generation += response
    print(response, flush=True, end="")




<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a expert in physiotherapy, you will be presented with a set of subjective and object assessment and your job is to come up with a well informed and researched differential diagnosis of the medical scenerio.
Here are examples of the subjective and objective assessments and the expected differential diagnosis:

Subjective Assessment:
The patient, Mr. Smith, is a 45-year-old male who presents to the clinic with complaints of lower back pain that has been bothering him for the past two weeks. He describes the pain as dull and achy, located in the lumbar region, with occasional radiation down his left leg. He notes that the pain worsens with prolonged sitting or standing and is relieved by lying down. He denies any recent trauma or injury but mentions that he has a history of occasional low back pain, especially after heavy lifting or prolonged periods of inactivity. He rates the pain as a 6 out of 10 on the pain scale. On

In [10]:
class HallucinationGraderResult(BaseModel):
    score: str = Field(
        "", 
        description="A binary score 'yes' or 'no' to indicate whether the answer is grounded in / supported by the facts provided."
    )
    reason: str = Field(
        "",
        description="A reason for the score given, identifying the specific part of the answer that is not grounded in the facts."
    )

parser = PydanticOutputParser(pydantic_object=HallucinationGraderResult)

PROMPT = """
<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether
an answer is grounded in / supported by as set of facts. 
Your will be grading based on a context of a medical scenerio and the differential diagnosis provided by an expert in physiotherapy.
The retrieved facts are sourced from medical sources such as medical journals, textbooks, and other reliable sources.
This sources might contain examples of different musculoskeletal conditions, their symptoms, and the differential diagnosis for each condition.
The facts might contain example conditions of arbitrary patients, and the differential diagnosis provided by an expert in physiotherapy, hence the facts might not be directly related to the user question.
Therefore You should not focus on the names of the patients or the specific conditions mentioned in the facts, but rather on the general medical knowledge and the differential diagnosis provided by the expert in physiotherapy.
For example if the facts contains conditions for a patient called Mr X, and patient in the user question is called Mr Y, you should not grade the answer as not grounded in the facts just because the names of the patients are different.
Even if the answer contains a condition that is not mentioned in the facts, grade it as grounded if there is valid reference to the condition in the medical literature provided.

{format_instructions}<|eot_id|>
<|start_header_id|>user<|end_header_id|>

Here are the facts:

----------------------------------------
{facts}
----------------------------------------

Here is the answer: {answer}<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
"""

llm = ChatGroq(model=LLAMA3_70B, temperature=1, stop_sequences=["<|eot_id|>"])
# llm = ChatOpenAI(model="gpt-4o", temperature=1, max_tokens=500)


prompt = PromptTemplate(
    template=PROMPT, 
    input_variables=["facts", "answer"], 
    partial_variables={"format_instructions": parser.get_format_instructions()}
)

hallucination_grader = prompt | llm | JsonOutputParser(pydantic_object=HallucinationGraderResult)
hallucination_grader = hallucination_grader.with_retry()

hallucination_result = hallucination_grader.invoke({
    "facts": format_retrieved_docs(docs),
    "answer": generation
})
print(hallucination_result)

{'score': 'yes', 'reason': "The answer is grounded in the facts. The provided differential diagnosis for Mr. Smith includes Lumbar Disc Herniation, Lumbar Facet Joint Dysfunction, and Lumbar Strain, which are all musculoskeletal conditions. The facts provided contain examples of musculoskeletal conditions, their symptoms, and the differential diagnosis for each condition. Although the names of the patients in the facts are different (Jenny and Mary), the general medical knowledge and the differential diagnosis provided by the expert in physiotherapy are applicable to Mr. Smith's case. The references provided in the answer are from reliable medical sources, supporting the differential diagnosis."}


In [11]:
### Answer Grader Agent

class AnswerGraderResult(BaseModel):
    score: str = Field("", description="A binary score 'yes' or 'no' to indicate whether the answer is useful to resolve the question")
    reason: str = Field("", description="A reason for the score given, explaining why the answer is useful or not useful to resolve the question.")

parser = PydanticOutputParser(pydantic_object=AnswerGraderResult)

PROMPT = """
<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether
an answer is useful to resolve a question. Give a binary score of 'yes' or 'no' to indicate whether the answer is useful to resolve the question.
{format_instructions}<|eot_id|>
<|start_header_id|>user<|end_header_id|> 

Here is the answer:
----------------------------------------
{answer}
----------------------------------------

Here is the question: 
{question}<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
"""

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# llm = ChatGroq(model=LLAMA3_70B, temperature=0)

prompt = PromptTemplate(
    template=PROMPT, 
    input_variables=["answer", "question"], 
    partial_variables={"format_instructions": parser.get_format_instructions()}
)
answer_grader = prompt | llm | JsonOutputParser(pydantic_object=AnswerGraderResult)
answer_grader = answer_grader.with_retry()

answer_result = answer_grader.invoke({
    "answer": generation,
    "question": query
})
print(answer_result)

{'score': 'yes', 'reason': 'The answer provides a detailed and well-structured list of possible differential diagnoses for Mr. Smith based on the symptoms described. It also includes references to support the diagnoses and emphasizes the importance of further examinations and imaging studies to confirm the diagnosis. Overall, the answer is useful in addressing the question and guiding further evaluation and management.'}


In [12]:
### Router Agent

class RouterResult(BaseModel):
    datasource: str = Field(
        default="",
        description="A binary answer 'vectorstore' or 'websearch' to indicate whether the question should be routed to the vectorstore or web search."
    )
parser = PydanticOutputParser(pydantic_object=RouterResult)


prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an expert at routing a 
    user question to a vectorstore or web search. 
    Use the vectorstore for questions on Muscle skeletal conditions, phyisotherapy domain and clinic evaluations.
    You do not need to be stringent with the keywords 
    in the question related to these topics. Otherwise, use web-search.
    {format_instructions}<|eot_id|>
    <|start_header_id|>user<|end_header_id|> 
    
    Question to route: {question} <|eot_id|>
    <|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question"],
    partial_variables={"format_instructions": parser.get_format_instructions()},
)

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# llm = ChatGroq(model=LLAMA3_70B, temperature=0)

negative_question = "llm agent memory"
positive_question = positive_query

question_router = prompt | llm | JsonOutputParser(pydantic_object=RouterResult)

router_result = question_router.invoke({"question": positive_question})
print(router_result) 

{'datasource': 'vectorstore'}


In [13]:
### Web Search Tool
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(max_results=3)

In [14]:
### Langgraph State

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    subqueries: List[str]
    generation: str
    web_search: str
    documents: List[str]

In [15]:
### Conditional entry point
def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """
    
    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    
    source = question_router.invoke({"question": question})
    
    print(source["datasource"])
    if source["datasource"] == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source["datasource"] == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"
    return "websearch"

In [16]:
### Node

def retrieve_subqueries(state):
    """
    Retrieve documents from the vector store using subqueries.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): The updated graph state with documents for each subquery
    """
    
    print("---RETRIEVAL WITH SUBQUERIES---")
    subqueries = state["subqueries"]
    
    documents = []
    for subquery in subqueries:
        query_result = {
            "question": subquery,
            "documents": []
        }
        results = db.similarity_search_with_score(subquery, k=3)
        for doc, score in results:
            query_result["documents"].append(doc.page_content)
        
        documents.append(query_result)
    
    return {
        **state,
        "documents": documents,
    }
    
    

In [17]:
### Node

def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = db.similarity_search_with_score(query=question, k=5)
    documents = [doc.page_content for doc, score in documents]
    return {"documents": documents, "question": question}

In [18]:
### Node

async def agrade_subquery_documents(state):
    """
    Determines whether the retrieved documents are relevant to the subqueries.
    If any of the documents are not relevant, the state is updated to reflect this.
    
    Args:
        state (dict): The current graph state
        
    Returns:
        state (dict): The updated graph state
    """
    
    print("---CHECK RELEVANCE OF SUBQUERY DOCUMENTS---")
    documents = state["documents"]
    web_search = "No"
    
    # Check relevance of each document to the subquery
    filtered_documents = []
    
    async def grade_query_document_pair(query, document, query_index, document_index):
        llm_result = await retrieval_grader.ainvoke(
            {"document": document, "question": query}
        )

        return {
            "query_index": query_index,
            "document_index": document_index,
            "is_relevant": llm_result["score"] == "yes",
        }

    # Create a list of coroutines for grading each query-document pair
    invocations = []
    for i, item in enumerate(documents):
        query = item["question"]
        docs = item["documents"]

        for j, doc in enumerate(docs):
            invocations.append(grade_query_document_pair(query, doc, i, j))

    grading_results = await asyncio.gather(*invocations)

    # Create a list of filtered documents based on the grading results
    filtered_documents = [
        {"question": item["question"], "documents": []} for item in documents
    ]
    for result in grading_results:
        query_index = result["query_index"]
        document_index = result["document_index"]
        is_relevant = result["is_relevant"]

        if is_relevant:
            filtered_documents[query_index]["documents"].append(
                documents[query_index]["documents"][document_index]
            )
            
    # check for query with no relevant documents
    for item in filtered_documents:
        if not item["documents"]:
            web_search = "Yes"
            break
            
    return {
        **state,
        "documents": filtered_documents,
        "web_search": web_search
    }
    
graph_state = GraphState(
    question=positive_query,
    subqueries=subqueries,
    generation="",
    web_search="No",
    documents=documents
)
graph_state = await agrade_subquery_documents(state=graph_state)


---CHECK RELEVANCE OF SUBQUERY DOCUMENTS---


In [19]:
### Node

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "No"
    
    for doc in documents:
        score = retrieval_grader.invoke(
            {"document": doc ,"question": question}
        )
        grade = score["score"]
        
        if grade.lower() == "yes": # Document relevant
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(doc)
        else: # Document not relevant
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "No"
            
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


### Conditional Edge

def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    web_search = state["web_search"]

    if web_search == "Yes":
        print("---DECISION: NOT ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
        return "websearch"
    else:
        print("---DECISION: GENERATE---")
        return "generate"

In [20]:
def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


In [21]:
def generate_with_subqueries(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    
    formatted_documents = []
    for item in documents:
        docs = item["documents"]
        question = item["question"]
        docs_str = "\n\n---\n\n".join(docs)
        text = f"Subquery:\n{question}\n\nDocuments:\n{docs_str}"
        formatted_documents.append(text)
        
        
    formatted_context = "\n\n***\n\n".join(formatted_documents)

    # RAG generation
    generation = rag_chain.invoke({"context": formatted_context, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

print(generate_with_subqueries(graph_state)["generation"])

---GENERATE---
Based on the provided context, the implications of intact sensation and strength in the lower extremities, with no signs of motor weakness or sensory deficits, for the diagnosis of lumbar spine disorders are that it reduces the likelihood of neurological involvement, such as nerve root compression or peripheral neuropathy, as a cause of the patient's symptoms.

The intact sensation and strength in the lower extremities suggest that the nerve roots and peripheral nerves are functioning normally, which is supported by the document on Lower Extremity Movement, Active Muscles, and Contributing Nerve Roots. This document outlines the nerve roots involved in various lower extremity movements, and the absence of any deficits in these movements suggests that the nerve roots are not compressed or damaged.

Therefore, the focus of the diagnosis should shift towards musculoskeletal or mechanical causes of the patient's symptoms, such as muscle strain, ligamentous sprain, or joint d

In [22]:
async def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    documents = state["documents"]
    
    async def query_web_search(query, query_index):
        web_results = await web_search_tool.ainvoke(query)
        return {"query_index": query_index, "documents": web_results}

    # Web search for each query that did not have relevant documents
    invocations = []
    for i, item in enumerate(documents):
        # Skip web search for queries that had relevant documents
        if item["documents"]:
            continue

        # Perform web search for the query with no relevant documents
        query = item["question"]
        invocations.append(query_web_search(query, i))

    # Gather the results of the web search
    results = await asyncio.gather(*invocations)
    
    for result in results:
        query_index = result["query_index"]
        docs = result["documents"]
        
        documents[query_index]["documents"] = [doc["content"] for doc in docs]
            
    return {
        **state,
        "documents": documents
    }
    
await web_search(graph_state)

# Confirm that all queries have documents
for item in documents:
    if not item["documents"]:
        raise ValueError("Web search failed to retrieve documents for all queries")
print("Web search successful")


---WEB SEARCH---
Web search successful


In [23]:
### Conditional edge

def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    # Run agent to check hallucinations
    result = hallucination_grader.invoke({"facts": documents,"answer": generation})
    score = result["score"]
    reason = result["reason"]

    if score == "yes": # If no hallucinations, check if generation answers question
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "answer": generation})
        grade = score["score"]
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
        
    else: # Hallucinations in generation, re-try
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        pprint(reason)
        return "not supported"

In [24]:
### Create Graph and add nodes

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generate

In [25]:
### Build graph

# Set the entry point to start with a router agent that decides whether to route to web search or RAG based on the question
workflow.set_conditional_entry_point(
    route_question,
    {
        "websearch": "websearch",
        "vectorstore": "retrieve",
    },
)

# Connects the retrieve node to the grade_documents node
workflow.add_edge("retrieve", "grade_documents")

# Connects the grade_documents node to the generate node or web search node based on the decision
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)

# Connects the websearch results from the websearch node back to the generate node for generation
workflow.add_edge("websearch", "generate")

# Connects the generated results from the generate node to the grade_generation_v_documents_and_question node to check for hallucinations and relevance to the question
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)


In [29]:
# Compile and run the graph
app = workflow.compile()

with open("../test_prompts/query_3.txt", "r") as f:
    query = f.read()
    
inputs = {"question": query}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
Given the following subjective and objective assessment, provide a well informed and researched differential diagnosis

Subjective assessment:
Mr. Davis, a 60-year-old male, complains of chronic neck pain that he has experienced for the past six months. He describes the pain as constant, with a stiffness that is worse in the morning and improves slightly with movement throughout the day. He reports occasional headaches at the base of the skull and some tingling in the right arm. He has a history of cervical spondylosis diagnosed 5 years ago. The pain is rated at 4 out of 10 and does not radiate beyond his shoulder blades.

Objective assessment:
On examination, Mr. Davis has a decreased cervical range of motion, particularly in rotation and lateral flexion to the right. Palpation reveals tenderness over the cervical paraspinal muscles and reduced mobility in the lower cervical spine. Spurling's test reproduces tingling in the right arm. Reflexes are normal, and ther