# Multi-Agent with LangGraph and Amazon Bedrock Knowledge Bases

This notebook shows how to make use of Langgraph, Langchain, Amazon Bedrock Run-time Agents, and Amazon Bedrock Knowledge Bases as a RAG source to retrieve relevant documents. It uses Amazon OpenSearch serverless as vectorDB and S3 bucket as the storage of all the source/refrence files. It uses master agent controlled, multi-agents for the tasks like: 1. relevancy check of each document retrieved from AWS KB, 2. LLM's hallucination check, 3. relevancy check of the LLM's generated answer w.r.t question, 4. make call to live internet search(currently Dummy), if it doesn't find any relevant document in the Amazon Bedrock KB.

In [1]:
import boto3
from botocore.config import Config
from langchain_community.retrievers import AmazonKnowledgeBasesRetriever
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts.chat import ChatPromptTemplate
from langchain_community.embeddings import  BedrockEmbeddings
from langchain_aws import ChatBedrock
from langgraph.graph import END, StateGraph
from typing import Dict, TypedDict
import os

question = "what is required to confirm your identity?"
# setup boto3 config to allow for retrying
my_region = "us-east-1"
my_config = Config(
    region_name = my_region,
    signature_version = 'v4',
    retries = {
        'max_attempts': 3,
        'mode': 'standard'
    }
)
knowledge_base_id="8VKMIAOUCJ"
numberOfResults = 4
# setup bedrock runtime client. This doesn't need any existing bedrock agent 
bedrock_rt = boto3.client("bedrock-runtime", config = my_config)

# setup bedrock agent runtime client to call an existing bedrock agent
#agent_id = 'TSZAZFJEQQ'  # The unique identifier for your Bedrock agent
#agent_alias_id = 'TW3YH9W6XT'  # Replace with the actual agent alias ID
#session_id = str(uuid.uuid4())  # Replace with a unique session ID
#bedrock_agent_rt = boto3.client("bedrock-agent-runtime", config = my_config)

# setup S3 client
#s3 = boto3.client("s3", config = my_config)

# Initialize Bedrock  
In this notebook, we will be making use of Anthropic's Calude 3 Sonnet model and the Amazon titan embeddings model. If you would like to use a different model, all the model IDs are available [here](https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html)


In [2]:
sonnet_model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
model_kwargs =  { 
    "max_tokens": 2048,
    "temperature": 0.0,
    "top_k": 250,
    "top_p": 1,
    "stop_sequences": ["Human"],
}

sonnet_llm = ChatBedrock(
    client=bedrock_rt,
    model_id=sonnet_model_id,
    model_kwargs=model_kwargs,
)

embeddings_model_id = "amazon.titan-embed-text-v1"
embedding_llm = BedrockEmbeddings(client = bedrock_rt, model_id = embeddings_model_id)

### Set your bedrock knowledge base ID here

In [3]:
bedrock_retriever = AmazonKnowledgeBasesRetriever(
    knowledge_base_id=knowledge_base_id,
    region_name = my_region,
    retrieval_config={"vectorSearchConfiguration": {"numberOfResults": numberOfResults}},
)

In [4]:
### Retrieval Relevancy Grader Agent
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance 
    of a retrieved document to a user question. If the document contains keywords related to the user question, 
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["question", "document"],
)
retrieval_grader = prompt | sonnet_llm | JsonOutputParser()
docs = []
try:
    docs = bedrock_retriever.invoke(question)
except:
    docs.append("DUMMY")
    
doc_txt = ""
for doc in docs:
    doc_txt = doc_txt +"\n"+ doc.page_content
print("input text: \n"+doc_txt)
print("retrieval_grader: asking llm to grade if vector_retrieval result is relevant to question")

try:
    graded_answer = retrieval_grader.invoke({"question": question, "document": doc_txt})
except:
    graded_answer = {'score': 'no'}

print(graded_answer)

input text: 

Driver’s License (DriverLicense)  Your valid driver’s license is required to confirm your identity and driving privileges. Make sure to provide a copy of both sides of your driver’s license when filling a claim
Vehicle Registration (VehicleRegistration)    Proof of vehicle registration is essential to confirm the vehicle’s ownership. This document should contain details such as the vehicle’s make, model, year, and VIN (Vehicle Identification Number)
Accident Report (AccidentReport)  An accident report is a vital document that outlines the details of the incident. This report typically filled out by law enforcement officers and provided to you on the local of the accident. Please make sure to include:  · Date, time, and location of the accident  · Contact information of all parts involved  · Description of the accident and how it occurred  · Police report, if available    Alternatively, both parties involved in the accident can also fill up our “Agreed Statement of Facts o

In [5]:
### LLM Generate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an assistant for question-answering tasks. 
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. 
    Use three sentences maximum and keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question} 
    Context: {context} 
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "context"],
)


# Chain
rag_chain = prompt | sonnet_llm | StrOutputParser()

# Run

generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

According to the provided context, to confirm your identity when filing an insurance claim, you are required to provide a copy of both sides of your valid driver's license. The driver's license document is necessary to verify your identity and driving privileges.




In [6]:
### LLM Hallucination Grader Agent

# Prompt
prompt = PromptTemplate(
    template=""" <|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether 
    an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate 
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
    single key 'score' and no preamble or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation}  <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | sonnet_llm | JsonOutputParser()
try:
    hal_grader = hallucination_grader.invoke({"documents": docs, "generation": generation})
except:
    hal_grader = {'score': 'no'}
    
print(hal_grader)

{'score': 'yes'}


In [7]:
### LLM Answer Relevancy Grader Agent

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an 
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is 
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    Here is the question: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "question"],
)

answer_grader = prompt | sonnet_llm | JsonOutputParser()
try:
    ans_grader = answer_grader.invoke({"question": question, "generation": generation})
except:
    ans_grader = {'score': 'no'}

print(ans_grader)

{'score': 'yes'}


## Create Master/Supervisor Agent
We will now define the router that will make decision on which RAG or functions to use. In this example we will only include 1 RAG, but feel free to extend this section to add your own tools

In [8]:
### Master/Supervisor Agent

from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an expert at delegating a 
    user question to a vectorstore or web search. Use the vectorstore first for all the questions. You do not need to be stringent with the keywords 
    in the question related to these topics. If the vector search result fails to answer user's question then only use the web-search. Give a binary choice 'vectorstore' or 'web_search' 
    based on the question. Return the a JSON with a single key 'datasource' and 
    no premable or explanation. Question to route: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question"],
)

question_router = prompt | sonnet_llm | JsonOutputParser()

#docs = retriever.get_relevant_documents(question)
#doc_txt = docs[1].page_content

try:
    q_router = question_router.invoke({"question": question})
except:
    q_router = {'datasource': 'vectorstore'}

print(q_router)


{'datasource': 'vectorstore'}


In [9]:
### Search
#from langchain_community.utilities import GoogleSerperAPIWrapper
#import os
#from langchain_community.tools.tavily_search import TavilySearchResults
#TAVILY_API_KEY=os.environ.get('TAVILY_API_KEY')
#TAVILY_API_KEY=os.environ('TAVILY_API_KEY')
def FixedResults():
    docs = []
    docs.append("DUMMY")
    return docs

#print(TAVILY_API_KEY)
#TAVILY_API_KEY="ssss"
web_search_tool = FixedResults()

## Graph Assembly

This graph will consist of routing between 2 different data sources and generation. It will pick the correct data source based on the inputted query. We have queries that will be able to read the original knowledge base containing information about amazon and the fixed output string talking about the IRS

In [10]:
from pprint import pprint
from typing import List

from langchain_core.documents import Document
from typing_extensions import TypedDict

from langgraph.graph import END, StateGraph

### State


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search: str
    documents: List[str]


### Nodes


def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = []
    try:
        documents = bedrock_retriever.invoke(question)
    except:
        documents.append("DUMMY")

    return {"documents": documents, "question": question}


def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATING LLM RESPONSE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "Yes"
    for d in documents:

        print("input text: \n"+d.page_content)
        print("retrieval_grader: asking llm to grade if vector_retrieval result is relevant to question")

        try:
            score = retrieval_grader.invoke({"question": question, "document": d.page_content})

            print("\nllm graded: \n")
            print(score)        
                
            grade = score["score"]
            # Document relevant
        
            if grade.lower() == "yes":
                print("---GRADE: DOCUMENT RELEVANT---")
                filtered_docs.append(d)
                web_search = "No"
           
            # Document not relevant
            else:
                print("---GRADE: DOCUMENT NOT RELEVANT---")
                # We do not include the document in filtered_docs
                # We set a flag to indicate that we want to run web search
                continue
        except:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
        
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    # Web search
    docs = web_search_tool
#    web_results = "\n".join([d["content"] for d in docs])
    web_results = "\n"+docs[0]
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]

    print("---QUESTION---\n")
    print(question)
    print("---SEARCH_RESULT---\n")
    print(documents)

    return {"documents": documents, "question": question}


### Conditional edge


def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    source = question_router.invoke({"question": question})
    print(source)
    print(source["datasource"])
    if source["datasource"] == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source["datasource"] == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS RELEVENCY---")
    state["question"]
    web_search = state["web_search"]
    state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
        )
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: DOCUMENTS RELEVENT. CALL LLM TO GENERATE ANSWER---")
        return "generate"


### Conditional edge


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS. SCORE yes MEANS NO HALLUCINATION---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    try:
        score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    except:
        score = {'score': 'no'}
    
    print(score)
        
    grade = score["score"]

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"


workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("websearch", web_search)  # web search

In [11]:
# Build graph
workflow.set_conditional_entry_point(
    route_question,
    {
        "vectorstore": "retrieve",
        "websearch": "websearch",
    },
)

workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)


In [12]:
# Compile
app = workflow.compile()

# Test

inputs = {"question": question}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
what is required to confirm your identity?
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
---RETRIEVE---
'Finished running: retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
input text: 
Driver’s License (DriverLicense)  Your valid driver’s license is required to confirm your identity and driving privileges. Make sure to provide a copy of both sides of your driver’s license when filling a claim
retrieval_grader: asking llm to grade if vector_retrieval result is relevant to question

llm graded: 

{'score': 'yes'}
---GRADE: DOCUMENT RELEVANT---
input text: 
Vehicle Registration (VehicleRegistration)    Proof of vehicle registration is essential to confirm the vehicle’s ownership. This document should contain details such as the vehicle’s make, model, year, and VIN (Vehicle Identification Number)
retrieval_grader: asking llm to grade if vector_retrieval result is relevant to question
---GRADE: DOCUMENT NOT RELEVANT---
input text: 
Accident 

In [13]:
from pprint import pprint

# Compile
#app = workflow.compile()
inputs = {"question": "What are the top news today?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
What are the top news today?
{'datasource': 'web_search'}
web_search
---ROUTE QUESTION TO WEB SEARCH---
---WEB SEARCH---
---QUESTION---

What are the top news today?
---SEARCH_RESULT---

[Document(page_content='\nDUMMY')]
'Finished running: websearch:'
---GENERATING LLM RESPONSE---
---CHECK HALLUCINATIONS. SCORE yes MEANS NO HALLUCINATION---
{'score': 'yes'}
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION DOES NOT ADDRESS QUESTION---
'Finished running: generate:'
---WEB SEARCH---
---QUESTION---

What are the top news today?
---SEARCH_RESULT---

[Document(page_content='\nDUMMY'), Document(page_content='\nDUMMY')]
'Finished running: websearch:'
---GENERATING LLM RESPONSE---
---CHECK HALLUCINATIONS. SCORE yes MEANS NO HALLUCINATION---
{'score': 'yes'}
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION DOES NOT ADDRESS QUESTION---
'Finished running

In [14]:
from pprint import pprint

# Compile
#app = workflow.compile()
inputs = {"question": "What is harry potter?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
What is harry potter?
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
---RETRIEVE---
'Finished running: retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
input text: 
Driver’s License (DriverLicense)  Your valid driver’s license is required to confirm your identity and driving privileges. Make sure to provide a copy of both sides of your driver’s license when filling a claim
retrieval_grader: asking llm to grade if vector_retrieval result is relevant to question

llm graded: 

{'score': 'no'}
---GRADE: DOCUMENT NOT RELEVANT---
input text: 
Vehicle Registration (VehicleRegistration)    Proof of vehicle registration is essential to confirm the vehicle’s ownership. This document should contain details such as the vehicle’s make, model, year, and VIN (Vehicle Identification Number)
retrieval_grader: asking llm to grade if vector_retrieval result is relevant to question
---GRADE: DOCUMENT NOT RELEVANT---
input text: 
Accident Report (AccidentRe