# Adaptive RAG

<img src="../../data/images/adaptive_rag_cohere.png"  />

### Environment

In [5]:
import os
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())

True

In [33]:
hugging_face_token = os.getenv("HUGGINGFACEHUB_API_TOKEN")
langchain_token = os.getenv("LANGCHAIN_API_KEY")
cohere_api_key = os.getenv("COHERE_API_KEY")
mistral_api_key = os.getenv("MISTRAL_API_KEY")

In [7]:
project_name = "Cohere Adaptive Rag"
os.environ["LANGCHAIN_PROJECT"] = project_name  

### Indexing

Here I will build a vector store locally using Chroma and the Hugging face embeddings from three documents that I grab from the internet

In [9]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
# from langchain_huggingface import HuggingFaceEmbeddings
from langchain_cohere import CohereEmbeddings
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma

# Set embeddings
embeddings = CohereEmbeddings(model="embed-english-v3.0")

# Docs to index
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

# Load
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

# Split
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=256, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

* 'allow_population_by_field_name' has been renamed to 'populate_by_name'
* 'smart_union' has been removed


Add the embeddings of the docs to the vector store - Chroma in this case

In [12]:
# # Add to vectorstore
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    embedding=embeddings,
    collection_name = "Test_coll"
)

In [15]:
retriever = vectorstore.as_retriever()
retriever.invoke("agent memory")

[Document(metadata={'description': 'Building agents with LLM (large language model) as its core controller is a cool concept. Several proof-of-concepts demos, such as AutoGPT, GPT-Engineer and BabyAGI, serve as inspiring examples. The potentiality of LLM extends beyond generating well-written copies, stories, essays and programs; it can be framed as a powerful general problem solver.\nAgent System Overview In a LLM-powered autonomous agent system, LLM functions as the agent’s brain, complemented by several key components:', 'language': 'en', 'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/', 'title': "LLM Powered Autonomous Agents | Lil'Log"}, page_content='They also discussed the risks, especially with illicit drugs and bioweapons. They developed a test set containing a list of known chemical weapon agents and asked the agent to synthesize them. 4 out of 11 requests (36%) were accepted to obtain a synthesis solution and the agent attempted to consult documentation to ex

### Routing part

This is the first part, based on the user question the router should choose one of the following routes:
1. web search
2. vector_store search
3. llm call

I will define 2 tools: 
 - web_search - does the web search part
 - vectorstore - does the vectorstore search part 

Here I define the two tools

In [27]:
### Router
from langchain_cohere import ChatCohere
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field

# Data model

class web_search(BaseModel):
    """
    The internet. Use web_search for questions that are related to anything else than agents, prompt engineering, and adversarial attacks.
    """
    
    query: str = Field(description="The query to use when searching the internet.")

class vectorstore(BaseModel):
    """
    A vectorstore containing documents related to agents, prompt engineering, and adversarial attacks. Use the vectorstore for questions on these topics.
    """
    query: str = Field(description="The query to use when searching the vectorstore.")  

##### Bind the tools to the llm

Here I am writing a preamble prompt and I am biding the tools to the llm - in this case the Cohere command-r model

In [29]:
# Preamble
preamble = """You are an expert ar routing a user question to a vectorstore or web search. The vectorstore contains documents related to agens, prompt engineering and adversarial attacks. Use the vectorstore for questions on these topics. Otherwise, use web-search."""

# LLM with tool use and preamble
#Here I bind the tools to the llm
llm = ChatCohere(model="command-r", temperature=0)    
structured_llm_router = llm.bind_tools(
    tools=[web_search, vectorstore], preamble=preamble
)

Prompt - Define the prompt and the than invoke the chain with the questions defined - we can see the roting process takes place

In [30]:
route_prompt = ChatPromptTemplate.from_messages(
    [
        ("human", "{question}")
    ]
)

question_router = route_prompt | structured_llm_router

response = question_router.invoke({"question": "Who will the Bears draft first in the NFL draft"})
print(response.response_metadata["tool_calls"])

response = question_router.invoke({"question": "What are the types of agent memory?"})
print(response.response_metadata["tool_calls"])

response = question_router.invoke({"question": "Hi how are you?"})
print("tool_calls" in response.response_metadata)




[{'id': '220d2ced7e134c0faefcc858fbf1f3ac', 'function': {'name': 'web_search', 'arguments': '{"query": "who will the bears pick in the NFL draft"}'}, 'type': 'function'}]
[{'id': '0f27854aff2340a29d00a338b5f5ea56', 'function': {'name': 'vectorstore', 'arguments': '{"query": "agent memory"}'}, 'type': 'function'}]
False


### LLM

In [34]:
# from langchain_mistralai import ChatMistralAI

# llm = ChatMistralAI(
#     model="mistral-large-latest",
#     temperature=0.3,
#     max_retries=2,
#     api_key=mistral_api_key
# )

### Grader

We are at the stage wehre we deifne if the returned docs are relevant or not for our question

We will do a binary scoring mechanism for the retreived docs to see if they are useful for our question or not. We constrain the output to yes/no

We define a pydantic object for the grades, it wil store yes/no grades for the docs. We bind this datamodel object to the llm model

The system prompt acts like a form of test 

In [40]:
### Retrieval Grader

# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )

llm = ChatCohere(model="command-r", temperature=0)
# LLM with function call
structured_llm_grader = llm.with_structured_output(GradeDocuments)


# Prompt
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
    
    
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
question = "agent memory"

docs = retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content

# The test we made on the returned docs
response = retrieval_grader.invoke({"question": question, "document": doc_txt})
print(response)

binary_score='yes'


### Generation part

This is just a  simple RAG

In [44]:
from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import StrOutputParser

# Preamble
preamble = """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise."""


# LLM
llm = ChatCohere(model_name="command-r", temperature=0).bind(preamble=preamble)

# Prompt
def prompt(x):
    return ChatPromptTemplate.from_messages(
        [
            HumanMessage(
                f"Question: {x['question']} \nAnswer: ",
                additional_kwargs={"documents": x["documents"]},
            )
        ]
    )


# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
question = "agent memory"
generation = rag_chain.invoke({"documents": docs, "question": question})
print(generation)

The design of generative agents combines LLM with memory, planning and reflection mechanisms to enable agents to behave conditioned on past experience, as well as to interact with other agents.

Memory stream is a long-term memory module (external database) that records a comprehensive list of agents’ experience in natural language.

Short-term memory is used for in-context learning.


### LLM fallback

We use this if the routing doesn't match any tool - just answer based on the internal knowledge of the llm

In [91]:
### LLM fallback

from langchain_core.output_parsers import StrOutputParser

# Preamble
preamble = """You are an assistant for question-answering tasks. Answer the question based upon your knowledge. Use three sentences maximum and keep the answer concise."""

# LLM
llm = ChatCohere(model_name="command-r", temperature=0).bind(preamble=preamble)


# Prompt
def prompt(x):
    return ChatPromptTemplate.from_messages(
        [HumanMessage(f"Question: {x['question']} \nAnswer: ")]
    )


# Chain
llm_chain = prompt | llm | StrOutputParser()

# Run
question = "agent memory"
generation = llm_chain.invoke({"question": question})
print(generation)

Agent memory refers to the storage and retrieval capabilities of an AI model or agent, allowing it to retain and utilize information from previous interactions or training data. This enables the agent to learn and adapt its behavior over time, improving performance on tasks. It is a crucial aspect of building intelligent systems that can engage in complex and contextually aware conversations.


### Hallucinations Grader part

We do another grading mechanism to test if the model is hallucinating - same as first grader just with a different prompt

In [94]:
### Hallucination Grader


# Data model
class GradeHallucinations(BaseModel):
    """Binary score for hallucination present in generation answer."""

    binary_score: str = Field(
        description="Answer is grounded in the facts, 'yes' or 'no'"
    )


# LLM with function call
llm = ChatCohere(model="command-r", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeHallucinations)

# Prompt
system = """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts."""
     
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
    ]
)

hallucination_grader = hallucination_prompt | structured_llm_grader
hallucination_grader.invoke({"documents": docs, "generation": generation})

GradeHallucinations(binary_score='no')

### Answer Grader

Here we test if the final answer produced by the rag is usefull as a response for the question

In [93]:
### Answer Grader


# Data model
class GradeAnswer(BaseModel):
    """Binary score to assess answer addresses question."""

    binary_score: str = Field(
        description="Answer addresses the question, 'yes' or 'no'"
    )


# LLM with function call
llm = ChatCohere(model="command-r", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeAnswer)

# Prompt
system = """You are a grader assessing whether an answer addresses or resolves a question \n 
     Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question."""
     
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {generation}"),
    ]
)

answer_grader = answer_prompt | structured_llm_grader
answer_grader.invoke({"question": question, "generation": generation})

GradeAnswer(binary_score='yes')

### Web search tool

In [95]:
### Search

from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=3)

## Graph

I want to link all the flow together as a graph. We want to mimic the schema from the image. Each circle is a node and each arrow is an edge. We define a function for each nodes

### Graph State

It will have the following things:
1. question
2. generation
3. documents

Each node will return the state after they do their thing

In [96]:
from typing import List

from typing_extensions import TypedDict


class GraphState(TypedDict):
    """|
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation: str
    documents: List[str]

### Graph Flow 

We define all the nodes and edges

##### Nodes

Retrieval node

In [110]:
from langchain.schema import Document


def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

Fallback node

In [111]:
def llm_fallback(state):
    """
    Generate answer using the LLM without vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---LLM Fallback---")
    question = state["question"]
    generation = llm_chain.invoke({"question": question})
    return {"question": question, "generation": generation}


Generation node

In [112]:
def generate(state):
    """
    Generate answer using the vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    if not isinstance(documents, list):
        documents = [documents]

    # RAG generation
    generation = rag_chain.invoke({"documents": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

#### Grade docs

We use the grader defined above - grader node

In [113]:
def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc by using the retrieval_grader defined above
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}

Web search node - we use the web search tool from tavily_search

In [114]:
def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print("---WEB SEARCH---")
    question = state["question"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)

    return {"documents": web_results, "question": question}

#### Edges

Route question edge - the first thing that happens - we use the question router defined above to see what path to take

In [115]:
def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    source = question_router.invoke({"question": question})

    # Fallback to LLM or raise error if no decision
    if "tool_calls" not in source.additional_kwargs:
        print("---ROUTE QUESTION TO LLM---")
        return "llm_fallback"
    if len(source.additional_kwargs["tool_calls"]) == 0:
        raise "Router could not decide source"

    # Choose datasource
    datasource = source.additional_kwargs["tool_calls"][0]["function"]["name"]
    if datasource == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "web_search"
    elif datasource == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"
    else:
        print("---ROUTE QUESTION TO LLM---")
        return "vectorstore"

Decide to generate edge. Here if the docs are relevant for the question we genrate, else we do a web_search

In [116]:
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, WEB SEARCH---")
        return "web_search"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

Hallucination and answer question testing edges

In [117]:
def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score.binary_score # yes = grounder | no = hallucination

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

## Build Graph

In [125]:
import pprint

from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("web_search", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # rag
workflow.add_node("llm_fallback", llm_fallback)  # llm


# Build graph
workflow.add_conditional_edges(
    START,
    route_question,
    {
        "web_search": "web_search",
        "vectorstore": "retrieve",
        "llm_fallback": "llm_fallback",
    },
)

workflow.add_edge("web_search", "generate")
workflow.add_edge("retrieve", "grade_documents")

workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "web_search": "web_search",
        "generate": "generate"
    },
)

workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",  # Hallucinations: re-generate
        "not useful": "web_search",  # Fails to answer question: fall-back to web-search
        "useful": END,
    },
)

workflow.add_edge("llm_fallback", END)

# Compile
app = workflow.compile()


### Run the graph

Web Search

In [126]:
# Run
inputs = {
    "question": "What player are the Bears expected to draft first in the 2024 NFL draft?"
}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint.pprint(f"Node '{key}':")
        # Optional: print full state at each node
    pprint.pprint("\n---\n")

# Final generation
pprint.pprint(value["generation"])

---ROUTE QUESTION---
---ROUTE QUESTION TO WEB SEARCH---
---WEB SEARCH---
"Node 'web_search':"
'\n---\n'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
"Node 'generate':"
'\n---\n'
('The Chicago Bears are expected to draft USC quarterback Caleb Williams first '
 'in the 2024 NFL draft. Williams is likely to be the starter from Day 1, '
 'replacing Justin Fields, who was traded to the Pittsburgh Steelers in the '
 'offseason.')


### Vector store retrieve

In [None]:
# Run
inputs = {"question": "What are the types of agent memory?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint.pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint.pprint("\n---\n")

# Final generation
pprint.pprint(value["generation"])

LLM fallback

In [None]:
# Run
inputs = {"question": "Hello, how are you today?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint.pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint.pprint("\n---\n")

# Final generation
pprint.pprint(value["generation"])