In [1]:
! pip install -U langchain-nomic langchain_community tiktoken langchainhub chromadb langchain langgraph tavily-python gpt4all firecrawl-py python-dotenv

Collecting langgraph
  Downloading langgraph-0.2.27-py3-none-any.whl.metadata (13 kB)
Downloading langgraph-0.2.27-py3-none-any.whl (107 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m107.7/107.7 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: langgraph
  Attempting uninstall: langgraph
    Found existing installation: langgraph 0.2.23
    Uninstalling langgraph-0.2.23:
      Successfully uninstalled langgraph-0.2.23
Successfully installed langgraph-0.2.27


In [11]:
import os
from dotenv import load_dotenv
load_dotenv()

os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGCHAIN_API_KEY'] = os.getenv('LANGCHAIN_API_KEY')
os.environ['TAVILY_API_KEY'] = os.getenv('TAVILY_API_KEY')

In [3]:
local_llm = 'llama3.1'

LOAD BLOGPOSTS FROM INTERNET

RETRIEVER

In [21]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.document_loaders import FireCrawlLoader
from langchain_community.vectorstores.utils import  filter_complex_metadata
from langchain.docstore.document import Document

load_dotenv()

# Public urls to the blog post
urls = [
    "https://elsys-bg.org/priem/den-na-otvorenite-vrati",
    "https://tuesfest.bg/",
    "https://hacktues.bg/"
]


firecrawl_api_key = os.getenv('FIRECRAWL_API_KEY')

# Load the documents
docs = [FireCrawlLoader(api_key=firecrawl_api_key, url=url, mode="scrape").load() for url in urls]

# Flatten the list of lists
docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=512, chunk_overlap=50)

doc_splits = text_splitter.split_documents(docs_list)

# Filter out complex metadata and ensure proper document format
filtered_docs = []
for doc in doc_splits:
    # Ensure the doc is instance of Document and has proper metadata
    if isinstance(doc, Document) and hasattr(doc, 'metadata'):
        clean_metadata = {k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
        filtered_docs.append(Document(page_content=doc.page_content, metadata=clean_metadata))
        
# Add to vector DB
vector_store = Chroma.from_documents(
    documents=filtered_docs,
    collection_name="rag-chroma",
    embedding=GPT4AllEmbeddings(),
)

retriever = vector_store.as_retriever()

I will use Retrieval Grader for checking wheter the retrieved documents are fine

RETRIEVAL GRADER

In [22]:
from langchain.prompts import  PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

#LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>You are a grader assessing relevance of a retrieved document to a user question. 
    If the document contains keywords related to the user question, grade it as a relevant. It does not need to bea stringent test. The goal is to filter out erroneous retrievals.\n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the user question. \n
    Provide the binary score as a JSON string with a single key 'score' and no premable or explanaiton.
    <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieval document: \n \n {document} \n\n
    Here is the user question: \n \n {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>,
    """,
    input_variables=["questions", "document"],
)

# Define the grader using overloading pipeline
retrieval_grader = prompt | llm | JsonOutputParser()
user_question = "When was the first Hack TUES?"
docs = retriever.invoke(user_question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": user_question, "document": doc_txt}))

{'score': 'yes'}


GENERATE ANSWER

In [23]:
from langchain.prompts import  PromptTemplate
from langchain import hub
from langchain_core.output_parsers import StrOutputParser


# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the questions.
    If you don't know the answer, respond with 'I don't know'. Use three sentences maximum and keep the answers concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question}
    Context: {context}
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>,
    """,
    input_variables=["questions", "document"],
)

llm = ChatOllama(model=local_llm, temperature=0)

# Post processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Chain 
reg_chain = prompt | llm | StrOutputParser()

user_question = "When was the first Hack TUES?"
docs = retriever.invoke(user_question)
generation = reg_chain.invoke({"question": user_question, "context": format_docs(docs)})
print(generation)

The first Hack TUES was in 2014. I don't have the exact date, but it's mentioned as a "10-ТО ЮБИЛЕЙНО ИЗДАНИЕ" which means 10th anniversary edition.


HALLUCINATION GRADER

In [24]:
llm = ChatOllama(model=local_llm, temperature=0, format="json")

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assesing whether an answer is grounded in / supported by a set of facts. Give a binary score 
    'yes' or 'no' to indicate whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
    <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the facts:
    \n -------- \n 
    {documents}
    \n -------- \n
    Here is the answer: {generation} <|eot_id|><|start_header_id|>assistant<|end_header_id|>,
    """,
    input_variables=["questions", "document"],
)

hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'no'}

ANSWER GRADER

In [25]:
llm = ChatOllama(model=local_llm, temperature=0, format="json")

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an answer is useful to resolve a question. Give a binary score 'yes' or 'no'
    to indicate whether the answer is useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
    <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer:
    \n -------- \n
    {generation}
    \n -------- \n
    Here is the question: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>,
    """,
    input_variables=["questions", "document"],
)

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"generation": generation, "question": user_question})

{'score': 'yes'}

In [41]:
from typing_extensions import TypedDict
from typing import List
from langchain.schema import Document

from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=3)

# Define the state which will be passed between the steps
class GraphState(TypedDict):
    documents: List[str] #
    question: str
    generation: str
    web_search : bool


# Define the functions that will be executed in the graph
def retrieve(state):
    """ 
    Retrieve documents from the vectorstore
    Args:
        state (dict): Current graph state
    Returns:
        state (dict): Updated graph state with retrieved documents
    """
    
    print("------Retrieving documents------")
    question = state["question"]
    
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

def generate(state):
    """ 
    Generate answer based on the retrieved documents
    Args:
        state (dict): Current graph state
    Returns:
        state (dict): Updated graph state with generated answer
    """
    
    print("------Generating answer------")
    question = state["question"]
    documents = state["documents"]
    
    generation = reg_chain.invoke({"question": question, "context": documents})
    return {"generation": generation, "question": question, "documents": documents}    

def grade_documents(state):
    """ 
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search
    Args:
        state (dict): Current graph state
    Returns:
        state (dict): Filtered out documents and updated web_search state
    """
    
    print("------Grading documents------")
    questions = state["question"]
    documents = state["documents"]
    
    # Score each document
    filtered_docs = []
    web_search = False
    for doc in documents:
        score = retrieval_grader.invoke({"question": questions, "document": doc.page_content})
        grade = score["score"]
        if grade.lower() == "yes":
            print("Document is relevant")
            filtered_docs.append(doc)
        # Document is not relevant, set flag to run web search
        else:
            print("Document is not relevant")
            web_search = True
            continue
    return {"documents": filtered_docs, "question": questions, "web_search": web_search}

def web_search(state):
    """ 
    Run web search to retrieve additional information
    Args:
        state (dict): Current graph state
    Returns:
        state (dict): Updated graph state with additional information from web search
    """
    
    print("------Running web search------")
    question = state["question"]
    documents = state["documents"]
    
    # Run web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([doc['content'] for doc in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    return {"documents": documents, "question": question}

CONDITIONAL CHECKS FOR GRADING

In [42]:
def decide_to_generate(state):
    """
    Determine whether to generate an answer or add web search
    
    Args:
        state (dict): Current graph state
    
    Returns:
        str: Binary decision for next node to call
    """
    print("------Deciding to generate answer or run web search------")
    question = state["question"]
    web_search = state["web_search"]
    fittered_docs = state["documents"]
    
    if web_search:
        # Will regenerate the answer with the new documents from web search
        return "web_search"
    else:
        # We have all the relevant documents, generate the answer
        return "generate"

def check_answer(state):
    """ 
    Determines whether the generated answer is grounded in the retrieved documents
    """
    
    print("------Checking HALLUCINATIONS------")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    
    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score["score"]
    
    # Check whether is hallucination
    if grade.lower() == 'yes':
        print("Answer is grounded in the documents")
        # Check question answering
        print("-----Grade Generation vs Question-----")
        score = answer_grader.invoke({"generation": generation, "question": question})
        grade = score["score"]
        if grade.lower() == 'yes':
            print("Answer is useful")
            return "useful"
        else:
            print("Answer is not useful")
            return "not_useful"
    else:
        print("Answer is not grounded in the documents")
        return "not_supported"
            

Link all the above components and run the code


In [46]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

# Adding nodes explicitly
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("run_web_search", web_search)  # Renaming node to avoid conflict
workflow.add_node("generate", generate)

# Build the graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges("grade_documents", decide_to_generate, {"generate": "generate", "web_search": "run_web_search"})  # Updating reference
workflow.add_edge("run_web_search", "generate")
workflow.add_conditional_edges("generate", check_answer, {"useful": END, "not_useful": "run_web_search", "not_supported": "generate"})  # Updating reference

# Execute the graph
app = workflow.compile()

# Test
from pprint import pprint
inputs = {"question": "When was the first Hack TUES?"}
for output in app.stream(inputs):
    for key, value in output.items():
        print(f"Finished running: {key}")
print(value['generation'])

------Retrieving documents------
Finished running: retrieve
------Grading documents------
Document is relevant
Document is relevant
Document is relevant
Document is relevant
------Deciding to generate answer or run web search------
Finished running: grade_documents
------Generating answer------
------Checking HALLUCINATIONS------
Answer is grounded in the documents
-----Grade Generation vs Question-----
Answer is useful
Finished running: generate
The first Hack TUES was held on March 10-13, 2022. It had an online format and the theme "Space for everybody".
