# Corrective RAG agent using llama 3

In [1]:
! pip install -U langchain-nomic langchainhub chromadb langchain langgraph tavily-python gpt4all firecrawl-py langchain_community tiktoken

Collecting langgraph
  Downloading langgraph-0.0.44-py3-none-any.whl.metadata (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.1/43.1 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
Downloading langgraph-0.0.44-py3-none-any.whl (67 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.1/67.1 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: langgraph
  Attempting uninstall: langgraph
    Found existing installation: langgraph 0.0.40
    Uninstalling langgraph-0.0.40:
      Successfully uninstalled langgraph-0.0.40
Successfully installed langgraph-0.0.44


# Setup the environment

In [2]:
import os
from dotenv import load_dotenv

load_dotenv()
filecrawl_loader_api_key = os.environ["FILECRAWL_LOADER_API_KEY"]
os.environ['LANGCHAIN_PROJECT'] = "rag_using_llama3 - {unique_id}"


We will be running the project using our local LLM

In [3]:
#Running on local llama3
local_llm = 'llama3'

We import all the libraries required to scan through the links provided
- Here we are using Filecrawler to scrape the results of our search query as Filecrawler allows us to get clean and structured data from a website in markdown or JSON format
- We are using Recursive character splitter to split the documents in chunks

In [4]:
#index
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.document_loaders import FireCrawlLoader
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain.docstore.document import Document

urls = [
    "https://www.ai-jason.com/learning-ai/how-to-reduce-llm-cost",
    "https://www.ai-jason.com/learning-ai/gpt5-llm",
    "https://www.ai-jason.com/learning-ai/how-to-build-ai-agent-tutorial-3",
]

docs = [FireCrawlLoader(api_key=filecrawl_loader_api_key, url=url, mode="scrape").load() for url in urls]

#Retireve data from the docs like page content, metadata and type of document
docs_list = [item for sublist in docs for item in sublist]
#initialize the splitter
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=256, chunk_overlap=0)
#Get the split documents in and store all the splits in doc_splits
doc_splits = text_splitter.split_documents(docs_list)
print(len(doc_splits))



39


Get metadata of each document and clean the metadata so that it only has values of string, integer, float, boolean and not lists or arrays (in this example: we don't have any of this in our original metadata so clean_metadata looks very familiar to the original one)

In [5]:
#Filtering out complex metadata and ensure proper document formating
filtered_docs = []
for doc in doc_splits:
    if isinstance(doc, Document) and hasattr(doc, "metadata"):
        clean_metadata = {k: v for k ,v in doc.metadata.items() if isinstance(v, (str ,int, float, bool))}
        #print(clean_metadata)
        filtered_docs.append(Document(page_content=doc.page_content, metadata=clean_metadata))

Now we have filtered_documents with clean metadata
We will now create embeddings for these chunks and store it in our vectorestore, here we are using ChromaDB

In [6]:
#Add to vectorDB: Chroma is used here for the sake of simplicity and ease of use.
vectorstore = Chroma.from_documents(
    documents = filtered_docs,
    collection_name = "rag_chroma",
    embedding = GPT4AllEmbeddings() #open source embedding by openAI (we could also use text-ada if we have api access)
)
#intialize our retriver
retriver =  vectorstore.as_retriever()

## RETRIEVER GRADER

This is a LLM based grader that will check the relavancy of our retrieved documents based on the question asked.
This will output a JSON format answer indicating
- Yes (Documents are relevant)
- No (Documents are irrelevant)

It will score this for each retrieved document

In [7]:
from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

#Initailizing the LLM, output needed in JSON format
llm = ChatOllama(model=local_llm, format="json", temperature=0)

#PromptTemplate to get that actually grades the documents with the question asked
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance of a retrieved document to a user question. If the document contains keywords related to the user question, grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n Give a binary score 'ygg" or 'no' score to indicate whether the document is relevant to the question. In Provide the binary score as a JSON with a single key 'score' and no premable or explaination. <|eot_id|>|start_header_id|>user<|end_header_id|> Here is the retrieved document: \n\n {document} \n\n
            Here is the user question: {question} In <|eot_id|>|start_header_id|>assistant<|end_header_id|>""" ,
    input_variables=["question", "document"],
)

#Creating: a CHAIN, We will use this chain later
retrieval_grader = prompt | llm | JsonOutputParser()


#Testing: the chain on one document
question = "how to save llm cost?"
docs = retriver.invoke(question)
doc_text = docs[1].page_content
print(retrieval_grader.invoke({"question":question, "document": doc_text}))

{'score': 'yes'}


## ANSWER GENERATOR

Creating a chain that will generate answer for the question using the context (documents or websearch results)
The result will be a string

In [8]:
from langchain.prompts import PromptTemplate
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

#prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|>|start_header_id|>system<|end_header_id|> You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise < leot_id|><|start_header_id|>user<| end_
_header_id|>
Question: {question}
Context: {context}
Answer: <|eot_id|>|start_header_id|>assistant<|end_header_id|›""",
    input_variables=["question", "document"],
)

#Initializing the LLM
llm = ChatOllama(model=local_llm, temperature=0)

#post-processing: joins all the context document recieved to one big context and sepearates 2 contexts using the \n\n
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

#RAG_CHAIN: Chain that will be used to call this answer generator
rag_chain = prompt | llm | StrOutputParser()

#Testing: testing this chain
question = "How to save LLM cost?"
docs = retriver.invoke(question)
context = format_docs(docs)
#print("--Context--")
#print(context)
generation = rag_chain.invoke({"question": question, "context": context})
print("--Generation--",generation)

--Generation-- To save LLM cost, you can try the following methods:

* Carefully selecting the right models for specific tasks
* Optimizing agent memory
* Using techniques like LLM Lingua to remove unnecessary tokens and words from input

These strategies can help reduce LLM costs without compromising performance.


## WEB SEARCH
Initializing Web_Search tool. We are using Tavily here: Returns search results for the query

In [9]:
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3) #Retrieving only the top 3 results

## HALLUCINATION GRADER 
Using LLM to check if the answer is hallucinating/ is grounded to our provided context or no
- If Yes: The Answer is Grounded to our context
- If No: The Answer is not Grounded to the context

Outputs the result in JSON format

In [10]:
#Retriever Grader

#Initializing the LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
    template="""<|begin_of_text|>|start_header_id|>system<|end_header_id|> You are a grader assessing whether an answer is grounded in / supported by a set of facts. Give a binary score 'yes' or 'no' score to indicate whether the answer is grounded in / supported by a set of facts. Provide the binary seore as a JSON with a single key 'score' and no preamble or explanation. <leot_id|><|start_header_id|>user<|ed_header_id|>
Here are the facts:
\n --- \n
{documents}
\n --- \n
Here is the answer: {generation} <leot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)

#Creating a Chain: This will be used to invoke this grader later
hallucination_grader = prompt | llm | JsonOutputParser()#chain

#Testing if this chain works correctly
hallucination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'yes'}

## Answer Grader 

After checking if the answer is hallucinating we will use this grader to check if the generated answer is answering the question.
- If yes: The generated answer answers the question
- If no: The generated answer doesnot answer the question

Outputs the result in JSON format

In [11]:
#Initialize LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
    template="""<|begin_of_text|>|start_header_id|>system<|end_header_id|> You are a grader assessing whether an answer is userful to resolve a question. Give a binary score 'yes' or 'no' score to indicate whether the answer is useful to resolve a question. Provide the binary seore as a JSON with a single key 'score' and no preamble or explanation. <leot_id|><|start_header_id|>user<|ed_header_id|>
Here is the answer:
\n --- \n
{generation}
\n --- \n
Here is the question: {question} <leot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)

#Creating a Chain: This will be used to invoke this grader later
answer_grader = prompt | llm | JsonOutputParser()#chain

#Testing if this chain works correctly
answer_grader.invoke({"question":question, "generation": generation})

{'score': 'yes'}

# Langraph Setup

Here we will create a graph using langraph to give agentic behaviour to all the chains that we have created
This will be used to define the logic and routing of the query and give the final structure to the Corrective RAG agent that we are building

In [12]:
from typing_extensions import TypedDict
from typing import List

#State: This is where you store all the global variables that you will use through all the nodes and edges
class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: weather to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search:str
    documents: List[str]

from langchain.schema import Document

## Creating Node
    # Nodes: These are the tasks that the multi-actor LLM workflow will use

#Retrieve node - Job of this node is to get the retirved document based on the question asked
def retrieve(state):
    """
    Retrieve documents from the vectorstore

    Args:
        state(dict): the current graph state

    Returns:
        state(dict): New key added to state, documents, that contain the retrieved documents
    """
    print("---RETRIEVING DOCS---")

    #Calling the state function and creating a local copy within the function
    question = state["question"]

    #invoking the retirever and getting the documents: This gets the filtered documents by default (check code parameters of the retriever above)
    documents = retriver.invoke(question)
    return {"documents": documents, "question":question} #This will overwrite the global state variables mentioned in the function above

## Document Grader Node - Job of this node is to grade the retrived documents using the documents and question that will be populated most likely by Retrieve node
# if not relevant document called then turn on web search FLAG
# if relevant document the see if next document is relevant
def grade_docs(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant to the question, we will set a flag to run the websearch

    Args:
        state(dict): the current graph state

    Returns:
        state(dict): Filtered out irrelevant docs and updated web_search state
    """
    print("---CHECK Documents Relevance ---")
    question = state["question"]
    documents = state["documents"]

    #if document is relevant then add those document to filtered documents list
    filtered_doc = []
    #By default web search is a NO
    web_search = "No"

    #Search Each document
    for d in documents:
        #Invoke the grader chain
        score = retrieval_grader.invoke({"question":question, "document":d.page_content})
        #Get the score in str format from the json output recieved {score: 'yes'}
        grade = score['score']
        
        #Document Relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT ---")
            filtered_doc.append(d)
        #Document Non Relevant (then activate the websearch flag and move to the next document)
        else:
            web_search = "Yes"
            print("---GRADE: IRRELEVANT DOC, DO WEB SEARCH FLAG ACTIVATED---")
            continue

    #Update the global state with the new filtered doc and with the websearch flag, no need for question but we are putting this just in case
    return {"documents": filtered_doc, "question": question, "web_search": web_search}

#ANSWER GENERATION: This node is to generate answers based on the documents and questions
def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state(dict): the current graph state

    Returns:
        state(dict): New key added to state, generation that contains the generated LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    #Invoke the generator chain
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

#WEB SEARCH CONTEXT GENERATION: This node is responsible to generate context for a question using web that has no releveant documents.
def web_search(state):
    """
    Run the web search on the question

    Args:
    	state(dict): the current graph state

    Returns:
    	state(dict): Appended web results to the documents

    """
    print("---DOING WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    #Use the web_search_tool against teh question initailised earlier and store the answer in web_docs
    web_docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join(d["content"] for d in web_docs) #Join data of all the content in all the webdocs using \n as seperator
    web_results = Document(page_content=web_results) #Create a new document and set its page contents from the combined data from above

    #if documents state variable already has documents then add this newly created doc to it as well
    if documents is not None:
        documents.append(web_results)
    #if documents state variable has no documents then add this new document in it
    else:
        documents = [web_results] #documents is a list of Documents

    #Update the global variables
    return {"documents": documents, "question": question}




#Conditional Edges - This is the place where logic is applied on what to do when YES, or NO is recieved output an then routes to the correct Node, 
    #Condtional Edges do not add anything to the state, they are used to create next executable node for the workflow, hence return values are always nodes

##DECIDE IF CAN GENERATE OR DO WEB SEARCH
#if the answer after grading docs node raise the flag websearch now here we will route it to the websearch node
#if the answer after grading docs node did not raise flag websearch, the use the filtered docs generated from that node and send it forward to generate answer
def decide_to_generate(state):
    """
    Determine whether to generate and answer of add web search
    This Edge is just after we have finished grading the retrieved documents

    Args:
    	state(dict): the current graph state

    Returns:
        state(dict): Binary Decision of next node to call
    
    """
    print("---CONDITIONAL EDGE: WEBSEARCH v/s GENERATE---")
    web_search = state["web_search"]

    if web_search == "Yes":
        print("---CONDITIONAL DECISION: NO RELEVANT DOCUMENTS FOUND, DO WEB SEARCH")
        return "websearch"
    else:
        print("---CONIDITIONAL DECISION: YES RELEVANT DOCUMENT(S) FOUND ---")
        return "generate"


##AFTER GENERATING ANSWER CHECK IF ANSWE IS HALLUCINATING AND CORRECT

#This conditional node will check if the answer is hallucinating or not
#IF answer is not hallucinating (Meaning it is grounded), it will check if the answer is correct
    #If answer is correct, answer is good and end the workflow
    #If answer is incorrect, do web search
#If answer is hallucinating it will go to generate node again

def hallucination_edge(state):
    """
    Determine whether the generated answer is grounded or not
    if the answer is grounded then 

    Args:
    	state(dict): the current graph state

    Returns:
        state(dict): Binary Decision of next node to call
    
    """
    print("---CHECK HALLUCINATION")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    #using our hallucination grader chain
    hallucination_score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = hallucination_score['score']

    #check hallucination
    if grade == "yes":
        print("---DECISION: THE GENERATION IS GROUNDED IN DOCUMENTS")
        print("---IS THE ANSWER USEFUL OR NOT? ---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade  = score['score']
        if grade == "yes":
            print("---DECISION: YES THE ANSWER IS USEFUL ---")
            return "useful"
        else:
            print("---DECISION: NO THE ANSWER IS NOT USEFUL ---")
            return "not useful"
    else:
        print("---DECISION: YES THE GENERATION IS HALLUCINATING IN DOCUMENTS ---")
        return "not supported"

## Creating the langgraph worflow using all the nodes and conditional edges created before

In [13]:
from langgraph.graph import END, StateGraph

#Initialize the graph with all the state variables
workflow = StateGraph(GraphState)

#Define the nodes
workflow.add_node("retrieve", retrieve) #name, actual node name
workflow.add_node("websearch", web_search)
workflow.add_node("generate", generate)
workflow.add_node("grade_documents", grade_docs)

#Build Graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents") #connects retrive -> grade_documents
workflow.add_conditional_edges(
    "grade_documents", #starting node for the conditional edge
    decide_to_generate, # end node based on whatever result comes in
    {
        "generate": "generate",
        "websearch": "websearch",
    },
)
workflow.add_edge("websearch", "generate") #direct edge from websearch -> generate
workflow.add_conditional_edges( #2 conditions in one originating from the generate edge
    "generate",
    hallucination_edge,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)

#Compile the workflow
app = workflow.compile()

#test
from pprint import pprint
inputs = {"question": "How to save LLM cost?"}

for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
print(value["generation"])

---RETRIEVING DOCS---
'Finished running: retrieve:'
---CHECK Documents Relevance ---
---GRADE: DOCUMENT RELEVANT ---
---GRADE: DOCUMENT RELEVANT ---
---GRADE: DOCUMENT RELEVANT ---
---GRADE: DOCUMENT RELEVANT ---
---CONDITIONAL EDGE: WEBSEARCH v/s GENERATE---
---CONIDITIONAL DECISION: YES RELEVANT DOCUMENT(S) FOUND ---
'Finished running: grade_documents:'
---GENERATE---
---CHECK HALLUCINATION
---DECISION: THE GENERATION IS GROUNDED IN DOCUMENTS
---IS THE ANSWER USEFUL OR NOT? ---
---DECISION: YES THE ANSWER IS USEFUL ---
'Finished running: generate:'
To save Large Language Model (LLM) costs, you can try the following strategies:

* Carefully select the right models for specific tasks.
* Optimize agent memory to minimize the number of tokens required for each interaction.
* Use techniques like LLM Lingua to remove unnecessary tokens and words from the input.

These methods can help achieve cost savings while maintaining high performance and user experience.
