In [1]:
from dotenv import load_dotenv
import os
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from data_processing import text_splitter
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain.schema.runnable import RunnablePassthrough

In [2]:
load_dotenv()
openai_api_key = os.environ['OPENAI_API_KEY']

### Chroma

In [3]:
texts = text_splitter('./data/news.json')

chunk_embedd = OpenAIEmbeddings(openai_api_key=openai_api_key)
vector_storage = Chroma.from_texts(texts=texts, embedding=chunk_embedd)
retriever = vector_storage.as_retriever()

In [4]:
texts[0]

'Báo cáo của Savill về tình hình diễn biến giá chung cư trong quý 3/2024 cho thấy, giá nhà để bán tại Hà Nội vẫn trên đà tiếp tục tăng. Giá bán sơ cấp ở mức 69 triệu đồng/m2, tăng 6% so với quý trước và 28% so với cùng kỳ năm ngoái. Trên thị trường thứ cấp, giá trung bình của căn hộ trong quý 3 là 51 triệu đồng/m2, tăng 10% so với quý trước và 41% so với năm ngoái. Giá thứ cấp trung bình tăng 17% mỗi năm kể từ năm 2020, với Hạng C tăng mạnh nhất đạt 20%/năm, tiếp đến là Hạng A tăng 16%/năm và Hạng B tăng 15%/năm. Nhiều người mua chuyển hướng từ thị trường sơ cấp sang thứ cấp do thị trường này có nhiều sự lựa chọn hơn, cũng như vấn đề pháp lý được đảm bảo hơn. Anh Nguyễn Hưng - hiện đang sinh sống, làm việc ở Hà Nội chia sẻ, hơn 10 năm sinh sống ở Hà Nội chưa từng thấy giá chung cư tăng mạnh'

### Call LLM

In [5]:
model = 'gpt-3.5-turbo'
llm = ChatOpenAI(model=model, temperature=0)

### Router

In [6]:
import time
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.output_parsers import StrOutputParser

In [7]:
prompt = PromptTemplate(
  template="""
    You are an expert at routing a user question to a vectorstore or web search. Use the vectorstore for questions on LLM  agents, prompt engineering, and adversarial attacks. You do not need to be stringent with the keywords in the question related to these topics. Otherwise, use web-search. Give a binary choice 'web_search' or 'vectorstore' based on the question. Return the a JSON with a single key 'datasource' and no premable or explaination. Question to route: {question}
  """,
  input_variables=["question"]
)

start = time.time()
question_router = prompt | llm | JsonOutputParser()

question = "llm agent memory"
print(question_router.invoke({"question": question}))
end = time.time()

print(f"Time for router question: {end - start}")

{'datasource': 'vectorstore'}
Time for router question: 1.0560798645019531


### Generate answer

In [8]:
prompt = PromptTemplate(
        input_variables=["question", "context"],
        template="""You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. 
        Use the following pieces of retrieved context to answer the question.
        If you don't know the answer, just say that you don't know. You have to answer in Vietnamese language. 
        Question: {question}
        Context: {context} 
        Answer:"""
    )

# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

context = format_docs(retriever.invoke(question))

start = time.time()
rag_chain = prompt | llm | StrOutputParser()
# rag_chain = RunnablePassthrough() | retrieve_and_format | prompt | llm
try:
    generation = rag_chain.invoke({"question": question, "context": context})
    print(f"Answer for question {question} is: {generation}")
except Exception as e:
    print(f"Error during LLM invocation: {e}")

end = time.time()
print(f"Time for generate answer is: {end-start}")

Answer for question llm agent memory is: Tôi không biết.
Time for generate answer is: 0.7998270988464355


### Retrieval Grader

In [9]:
prompt = PromptTemplate(
  template="""
  You are a grader assessing relevance 
    of a retrieved document to a user question. If the document contains keywords related to the user question, 
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explaination.
  Here is the retrieved document: \n\n {context} \n\n
  Here is the user question: {question}
  """,
  input_variables=["question", "context"]
)

start = time.time()
retrieval_grader = prompt | llm | JsonOutputParser()
# question = "prompt memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "context": doc_txt}))
end = time.time()
print(f"The time required to generate response by the retrieval grader in seconds:{end - start}")


{'score': 'no'}
The time required to generate response by the retrieval grader in seconds:1.2384169101715088


### Hallucination Grader

In [10]:
prompt = PromptTemplate(
    template=""" You are a grader assessing whether 
    an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate 
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
    single key 'score' and no preamble or explanation. 
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation}
    """,
    input_variables=["generation", "documents"],
)
start = time.time()
hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader_response = hallucination_grader.invoke({"documents": docs, "generation": generation})
end = time.time()
print(f"The time required to generate response by the generation chain in seconds:{end - start}")
print(hallucination_grader_response)

The time required to generate response by the generation chain in seconds:3.4121055603027344
{'score': 'no'}


### Answer grader

In [11]:
prompt = PromptTemplate(
    template="""You are a grader assessing whether an 
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is 
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
    Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    Here is the question: {question}""",
    input_variables=["generation", "question"],
)
start = time.time()
answer_grader = prompt | llm | JsonOutputParser()
answer_grader_response = answer_grader.invoke({"question": question,"generation": generation})
end = time.time()
print(f"The time required to generate response by the answer grader in seconds:{end - start}")
print(answer_grader_response)


The time required to generate response by the answer grader in seconds:0.7782087326049805
{'score': 'no'}


### Tavily search

In [12]:
from langchain_community.tools.tavily_search import TavilySearchResults
os.environ['TAVILY_API_KEY'] = "TAVILY_API_KEY"
web_search_tool = TavilySearchResults(k=3)  

### LangGraph

In [13]:
from typing_extensions import TypedDict
from typing import List

### State

class GraphState(TypedDict):
    question : str
    generation : str
    web_search : str
    documents : List[str]

In [14]:
from langchain.schema import Document
def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    print(f"State before web search: {state}")
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    context = format_docs(documents)
    return {"documents": documents, "context": context ,"question": question}
#
def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    print(f"State before web search: {state}")
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    
    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}
#
def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """
    print("---GRADE---")
    print(f"State before web search: {state}")
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each document
    filtered_docs = []
    web_search = "No"
    for doc in documents:
        score = retrieval_grader.invoke({"question": question, "context": doc.page_content})
        grade = score["score"]
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(doc)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # Flag for web search
            web_search = "Yes"
            continue
    # Update the state
    return {"documents": filtered_docs, "context": format_docs(filtered_docs), "question": question, "web_search": web_search}


def web_search(state):
    """
    Web search based on the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """
    print("---WEB SEARCH---")
    print(f"State before web search: {state}")
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state.get("documents", [])

    # Perform web search
    docs = web_search_tool.invoke({"query": question})

    # Debugging output
    print("---WEB SEARCH RESPONSE---")
    print(docs)

    # Check response type and handle accordingly
    if isinstance(docs, str):
        # If `docs` is a plain string, wrap it into a single Document
        web_results = Document(page_content=docs)
        documents.append(web_results)
    elif isinstance(docs, list):
        # If `docs` is a list, process each element
        try:
            web_results = "\n".join(d["content"] for d in docs if isinstance(d, dict) and "content" in d)
            documents.append(Document(page_content=web_results))
        except Exception as e:
            print(f"Error processing web search results: {e}")
    else:
        print("Unexpected response type from web_search_tool.invoke().")

    return {"documents": documents, "question": question}


In [15]:
# #Debug
# print(f"State before grading: {state}")

# if "context" not in state:
#     raise ValueError("Key 'context' is missing in the state. Ensure retrieve function sets it properly.")


In [16]:
def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    source = question_router.invoke({"question": question})  
    print(source)
    print(source['datasource'])
    if source['datasource'] == 'web_search':
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source['datasource'] == 'vectorstore':
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"

In [17]:
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

In [18]:
def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question,"generation": generation})
        grade = score['score']
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

In [19]:
from langgraph.graph import END, StateGraph
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generatae

<langgraph.graph.state.StateGraph at 0x164e40dab40>

In [20]:
workflow.set_conditional_entry_point(
    route_question,
    {
        "websearch": "websearch",
        "vectorstore": "retrieve",
    },
)

workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)

<langgraph.graph.state.StateGraph at 0x164e40dab40>

In [21]:
app = workflow.compile()

In [None]:
from IPython.display import Image, display

try:
  display(Image(app.get_graph().draw_mermaid_png()))
except Exception:
  print("wrong config")

### TestTest

In [22]:
from pprint import pprint
inputs = {"question": "Luật bẩt động sản 2024?"}
for output in app.stream(inputs, {"recursion_limit": 50}):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])


---ROUTE QUESTION---
Luật bẩt động sản 2024?
{'datasource': 'web_search'}
web_search
---ROUTE QUESTION TO WEB SEARCH---
---WEB SEARCH---
State before web search: {'question': 'Luật bẩt động sản 2024?'}
---WEB SEARCH---
---WEB SEARCH RESPONSE---
HTTPError('401 Client Error: Unauthorized for url: https://api.tavily.com/search')
'Finished running: websearch:'
---GENERATE---
State before web search: {'question': 'Luật bẩt động sản 2024?', 'documents': [Document(metadata={}, page_content="HTTPError('401 Client Error: Unauthorized for url: https://api.tavily.com/search')")]}
---GENERATE---
---CHECK HALLUCINATIONS---
'---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---'
'Finished running: generate:'
---GENERATE---
State before web search: {'question': 'Luật bẩt động sản 2024?', 'generation': 'Tôi không biết.', 'documents': [Document(metadata={}, page_content="HTTPError('401 Client Error: Unauthorized for url: https://api.tavily.com/search')")]}
---GENERATE---
---CHECK HALLUCINATIO

GraphRecursionError: Recursion limit of 50 reached without hitting a stop condition. You can increase the limit by setting the `recursion_limit` config key.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/GRAPH_RECURSION_LIMIT