In [None]:
!pip install -U langchain-nomic langchain_community tiktoken langchainhub chromadb langchain langgraph tavily-python gpt4all firecrawl-py langchain_ollama

In [2]:
import os

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = "x"
os.environ["LANGCHAIN_PROJECT"] = "corrective_rag_ollama3"

In [19]:
local_llm = "llama3.2:3b"

In [21]:
###Index
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.document_loaders import FireCrawlLoader
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain.docstore.document import Document

urls = [
    "https://www.ai-jason.com/learning-ai/how-to-reduce-llm-cost", 
    "https://www.ai-jason.com/learning-ai/gpt5-llm", 
    "https://www.ai-jason.com/learning-ai/how-to-build-ai-agent-tutorial-3"
    ]

docs = [FireCrawlLoader(api_key="fc-892b1e3b51294214b90326edd348af19", url = url, mode="scrape").load() for url in urls]

#Split documents
docs_list = [item for sublist in docs for item in sublist]

#Splt documents
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=250, chunk_overlap=0)
doc_splits = text_splitter.split_documents(docs_list)

#Filter out complex metadata and ensure proper document formatting
filtered_docs = []
for doc in doc_splits:
    #Ensure the doc is an instance of Document and has a "metadata" attribute
    if isinstance(doc, Document) and hasattr(doc, 'metadata'):
        clean_metadata = {k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
        filtered_docs.append(Document(page_content=doc.page_content, metadata=clean_metadata))

#Add to vectorDB
vectorstore = Chroma.from_documents(
    documents=filtered_docs,
    embedding=GPT4AllEmbeddings(),
    collection_name = "rag-chroma"
)

retriever = vectorstore.as_retriever()

In [27]:
##Retrival Grader

from langchain.prompts import PromptTemplate
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

#LLM
llm = ChatOllama(model=local_llm, temperature=0, format="json")

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance of a retrieved document to a user question. If the document contains keywords related to the user question, grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n Give a binary score "yes" or "no" scoret to indicate whether the document is relevant to the question. \n Provide the binary score as JSON with a single key "score" and no preamble or explaination. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["document", "question"]
)

retrieval_grader = prompt | llm | JsonOutputParser()
question = "How to save LLM cost?" 
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({'question': question, 'document': doc_txt}))

{'score': 'yes'}


GENERATE ANSWER

In [36]:
##GENERATE

from langchain.prompts import PromptTemplate
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser

#LLM
llm = ChatOllama(model=local_llm, temperature=0)

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences at maximum and keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question}
    Context: {context}
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "context"]
)

#Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

#Chain
rag_chain = prompt | llm | StrOutputParser()

#Run
question = "How to save LLM cost?" 
docs = retriever.invoke(question)
generation = rag_chain.invoke({'question': question, 'context': docs})
print(generation)

To save LLM cost, you can carefully select the right models for specific tasks, optimize agent memory by managing conversation history stored in memory, and use techniques like LLM Lingua. Additionally, using observability platforms to monitor and log costs can help identify areas for optimization. Implementing these strategies can lead to significant cost savings of up to 78% or more.


WEB SEARCH - TAVILY

In [35]:
os.environ["TAVILY_API_KEY"] = "x"

from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3)

HALLUCINATION GRADER

In [37]:
###HALLUCINATION GRADER

#LLM
llm = ChatOllama(model=local_llm, temperature=0, format="json")

#Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an answer is grounded in / supported by a set of facts. Give a binary score "yes" or "no" score to indicate whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a single key "score" and no preamble or explaination. <|eot_id|><|start_header_id|>user<|end_header_id|> Here are the facts:
    \n ------- \n
    {documents}
    \n ------- \n
    Here is the answer: {generation} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["documents", "generation"])

hallicination_grader = prompt | llm | JsonOutputParser()
hallicination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'yes'}

ANSWER GRADER

In [38]:
###ANSWER GRADER

#LLM
llm = ChatOllama(model=local_llm, temperature=0, format="json")

#Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an answer is useful to resolve a question. Give a binary score "yes" or "no" score to indicate whether the answer is useful to resolve a question. Provide the binary score as a JSON with a single key "score" and no preamble or explaination. <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer:
    \n ------- \n
    {generation}
    \n ------- \n
    Here is the answer: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["documents", "generation"])

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question, "generation": generation})

{'score': 'yes'}

LANGGRAPH - SETUP STATES & NODES

In [39]:
from typing import List
from typing_extensions import TypedDict

#STATE

class GraphState(TypedDict):
    question: str
    generation: str
    web_search: str
    documents: List[str]


from langchain.schema import Document

#NODES

#RETRIEVE FROM VECTORSTORE
def retrieve(state):
    print("---RETRIEVE---")
    question = state["question"]

    #Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

#CHECKS IF RETRIEVED DOCUMENTS ARE RELAVENT TO THE QUESTION
def grade_documents(state):
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    documents = state["documents"]
    question = state["question"]

    #STORE EACH DOC
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score["score"]
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = "Yes"
            continue
    
    return {"documents": filtered_docs, "question": question, "web_search": web_search}

#GENERATE ANSWER USING RAG ON RETRIEVED DOCUMENTS
def generate(state):
    print("---GENERATE---")
    documents = state["documents"]
    question = state["question"]

    #Generate answer
    generation = rag_chain.invoke({"question": question, "context": documents})
    return {"documents": documents, "question": question, "generation": generation}


#WEB SEARCH
def web_search(state):
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    
    return {"documents": documents, "question": question}


In [40]:
#CONDITIONAL EDGES

#DETERMINES WHETHER TO GENERATE OR WEB SEARCH
def decide_to_generate(state):
    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    filtered_docs = state["documents"]
    web_search = state["web_search"]

    if web_search == "Yes":
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
        return "web_search"
    else:
        print("---DECISION: GENERATE---")
        return "generate"
    
#DETERMINES WHETHER IS HALLICUNATING
def check_hallucination(state):
    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallicination_grader.invoke({"documents": documents, "generation": generation})
    grade = score["score"]

    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        #CHECK Q&A
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not_useful"
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS. RE-TRY---")
        return "not supported"

In [49]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

#DEFINE THE NODES
workflow.add_node("websearch", web_search)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)

<langgraph.graph.state.StateGraph at 0x127256860>

In [50]:
#BUILD GRAPH
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents", 
    decide_to_generate,
    {
        "websearch": "websearch", 
        "generate": "generate"
    }
)

workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate", 
    check_hallucination,
    {
        "useful": END,
        "not supported": "generate",
        "not useful": "websearch"
    }
)

<langgraph.graph.state.StateGraph at 0x127256860>

In [51]:
#COMPILE
app = workflow.compile()

#TEST
from pprint import pprint
inputs = {"question": "how to save llm cost?"}
for output in app.stream(inputs):
    for k, v in output.items():
        pprint(f"Finished running: {k}:")
print(v["generation"])

---RETRIEVE---
'Finished running: retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
To save LLM cost, you can carefully select the right models for specific tasks, optimize agent memory by managing conversation history stored in memory, and use techniques like LLM Lingua. Additionally, using observability platforms to monitor and log costs can help identify areas for optimization. Implementing these strategies can lead to significant cost savings of up to 78% or more.
