In [None]:
# pip install langgraph

# Corrective RAG App

Video Available on LinkedIN

In [138]:
import os

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.document_loaders import FireCrawlLoader
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain.docstore.document import Document


In [None]:
load_env()

In [141]:
urls = [
    "https://en.wikipedia.org/wiki/Computer_programming",
    "https://en.wikipedia.org/wiki/List_of_programming_languages"
]

docs = [FireCrawlLoader(url = url, mode = "scrape").load() for url in urls]

# Split Documents

docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size = 250, chunk_overlap = 20
)

docs_splits = text_splitter.split_documents(docs_list)

print(len(docs_splits))

# Filter out complex metadata and ensure proper document formatting
filtered_docs = []
for doc in docs_splits:
    # Ensure that doc is an instance of Document and has a metadata attribute
    if isinstance(doc, Document) and hasattr(doc, 'metadata'):
        clean_metadata = {k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
        filtered_docs.append(Document(page_content=doc.page_content, metadata = clean_metadata))

len(filtered_docs)

397


397

### Add to Vectordb

In [142]:
vector_store = Chroma.from_documents(
    documents = filtered_docs,
    collection_name = "rag-chroma",
    embedding = GPT4AllEmbeddings(),
)

retriever = vector_store.as_retriever()

## Grade Documents

In [143]:
from langchain.prompts import PromptTemplate
from langchain_groq import ChatGroq
from langchain_core.output_parsers import JsonOutputParser

### LLM

In [144]:
llm = ChatGroq(temperature = 0, model = "llama3-70b-8192",)


### Prompt

In [145]:
prompt = PromptTemplate(
    template = """<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance
    of a retrieved document to a user question. If the document contains keywords related to the user question,
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    In Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explaination.
      <|eot_id|><|start_header_id|>user<|end header_id|>
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables = ["question", "document"],
)

### Relevence Check

In [146]:

retrieval_grader = prompt | llm | JsonOutputParser()
question = "C++"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))


{'score': 'no'}


## Generate Answer

In [147]:
from langchain.prompts import PromptTemplate
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

### Prompt

In [148]:

prompt = PromptTemplate(
    template = """<|begin_of_text|><|start_header_id|>systenm<|end_header_id|> You are an assistant for question-answering tasks.
    Use the following pieces of retrien context to answer the question. If you don't know the answer, just say that you don't know.
    Use three sentences maximum and keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question}
    Context: {context}
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)


### Post Processing

In [149]:
def format_docs(docs):
  return "\n\n".join(doc.page_content for doc in docs)

### Chain

In [150]:
rag_chain = prompt | llm | StrOutputParser()

### Run

In [151]:
question = "So, how to program."
docs = retriever.invoke(question)
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

To program, you need to design and implement algorithms, which are step-by-step specifications of procedures, by writing code in one or more programming languages. Programmers typically use high-level programming languages that are more easily intelligible to humans than machine code.



## Web Search via Tavily

In [152]:
from langchain_community.tools.tavily_search import TavilySearchResults

In [153]:
web_search_tool = TavilySearchResults(k = 3)

## Hallucination Checker

In [154]:
prompt = PromptTemplate(
    template = """ </begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether
    an answer is grounded in / supported by a set of facts. Give a binary score 'yes' or 'no' score to indicate
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a
    single key 'score and no preamble or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are the facts:
    \n ------- \n
    {documents}
    \n ------- \n
    Here is the answer: {generation} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'yes'}

## Answer Question

In [155]:
prompt = PromptTemplate(
    template = """<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer:
    \n ------- \n
    {generation}
    \n ------- \n
    Here is the question: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question, "generation": generation})

{'score': 'yes'}

## Lang Graph - Setup states and nodes

In [156]:
from typing_extensions import TypedDict
from typing import List

### State

In [157]:
class GraphState(TypedDict):
  """
  Represents the state of out graph

  Attributes:
    question - your question
    generation - llm generation
    web_search - whether to add search
    documents - list of documents
  """

  question : str
  generation : str
  web_search : str
  documents : List[str]

### Nodes

Retrieve Node

In [158]:
def retrieve(state):
  """
  Retrieve docs from vector store

  Args:
    state (dict): The current graph state

  Returns:
    state (dict): New key added to state, documents, that contains retrieved documents.
  """

  print("---RETRIEVE---")
  question = state["question"]

  documents = retriever.invoke(question)
  return {"documents": documents, "question": question}

Grade Document Node

In [159]:

def grade_documents(state):
  """
  Determines whether the retrieved documents are relevant to the question
  If any document is not relevant, we will set a flag to run web search

  Args:
    state (dict): The current graph state

  Returns:
    state (dict): Filtered out irrelevant documents and updated web-search state
  """

  print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
  question = state["question"]
  documents = state["documents"]

  # Score each doc
  filtered_docs = []
  web_search = "No"
  for d in documents:
    score = retrieval_grader.invoke({"question": question, "document": d.page_content})
    grade = score["score"]

    # Doc is relevant
    if grade.lower() == "yes":
      print("---GRADE: DOCUMENT RELEVANT---")
      filtered_docs.append(d)
    # Doc is irrelevant
    else:
      print("---GRADE: DOCUMENT RELEVANT---")
      # We dont include this in filtered_docs
      # We set a flag to indicate that whether we want to run web search

      web_search = "Yes"
      continue

  return {"documents": filtered_docs, "question": question, "web_search": web_search}


Generate Node

In [160]:
def generate(state):
  """
  Generate answer using RAG on retrieved documents

  Args:
    state (dict): The current graph state

  Returns:
    state (dict): New key added to state, generation, that contains LLM generation
  """

  print("---GENERATE---")

  question = state["question"]
  documents = state["documents"]

  generation = rag_chain.invoke({"context": documents, "question": question})

  return {"documents": documents, "question": question, "generation": generation}

Web Search Node


In [161]:


def web_search(state):
  """
  Web Search based on the question

  Args:
    state (dict): The current graph state

  Returns:
    state (dict): Appended web results to documents
  """

  print("---Web Search---")
  question = state["question"]
  documents = state["documents"]

  docs = web_search_tool.invoke({"query": question})
  web_results = "\n".join([d["content"] for d in docs])
  web_results = Document(page_content=web_results)

  if documents is not None:
    documents.append(web_results)
  else:
    documents = [web_results]

  return {"question": question, "documents": documents}


Conditional Edge: Relevant document or not

In [162]:
def decide_to_generate(state):
  """
  Decides whether to generate an answer or add web_search

  Args:
    state (dict): The current graph state

  Returns:
    str: Binary decision for next node to call
  """

  print("---ASSESS GRADED DOCUMENTS---")

  question = state["question"]
  web_search = state["web_search"] # "yes" or "no"
  filtered_documents = state["documents"]

  if web_search == "Yes":
    print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
    return "websearch"

  else:
    print("---DECISION: GENERATE---")
    return "generate"

Conditional Edge: Answer is Hallucinating or Not

In [163]:
def check_hallucination(state):
  """
  Decides whether to generate answer again or not

  Args:
    state (dict): The current graph state

  Returns:
    str: Binary decision for next node to call
  """

  question = state["question"]
  documents = state["documents"]
  generation = state["generation"]

  score = hallucination_grader.invoke({"documents": documents, "generation": generation})
  grade = score["score"]

  # Check Hallucination
  if grade == "yes":
    print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
    # Check question-answering
    print("---GRADE GENERATION VS QUESTION---")
    score = answer_grader.invoke({"question": question, "generation": generation})
    grade = score["score"]
    if grade == "yes":
      print("---DECISION: GENERATION ADDRESSES QUESTION---")
      return "useful"
    else:
      print("---DECISION: GENERATION DOES NOT ADDRESSES QUESTION---")
      return "not useful"

  else:
    print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
    return "not supported"


## Connect Nodes and Build Graph

In [164]:
from langgraph.graph import StateGraph, END

workflow = StateGraph(GraphState)

### Define the nodes

In [165]:
workflow.add_node("websearch", web_search)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)

### Build Graph

In [166]:
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch":"websearch",
        "generate":"generate"
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    check_hallucination,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch"
    },
)

## Compile

In [167]:
app = workflow.compile()

## Test

In [None]:
from pprint import pprint

inputs = {"question": "How many hours of study needed to master programming."}

for output in app.stream(inputs):
  for key, value in output.items():
    pprint(f"Finished Running: {key}:")

print()
print(value["generation"])