# LangGraph自适应RAG

## 1. 介绍

自适应 RAG 是一种 RAG 策略，它将 (1) [查询分析](https://blog.langchain.dev/query-construction/) 与 (2) [主动/自我纠正 RAG](https://blog.langchain.dev/agentic-rag-with-langgraph/) 结合起来。

这里实现的一个自适应RAG，它能够通过三种途径回答问题：

* 无检索，直接回答问题
* 网络搜索，回答问题
* 查询向量数据库，回答问题

它的执行过程如下：

* 问题路由：
    * vectorstore - 回答知识库所覆盖的问题
    * web_search - 回答知识库不能覆盖的问题
    * llm_fallback - 用于日常闲聊，以及查询知识库或者web_search但是查不到相关文档时进行fall back，例如“你是谁”
* 文档检查：对vectorstore和web_search查到的文档进行相关性检查，如果所有文档都不相关则进行fall back
    * vectorstore会fallback到web_search
    * web_search会fallback到llm_fallback
    * llm_fallbck只回答大模型知道答案的问题（并注明没有查到相关文档），其他问题一律回答“I don't know”
* 答案可靠性检查：如果答案与查到的事实不符，出现了大模型杜撰的内容（幻觉），则重新生成答案
* 答案有效性检测：如果生成内容不足以回答问题，则重新生成答案

## 2. 准备工作

### 2.1 安装依赖

In [1]:
with open('./requirements.txt', 'r') as file:
    for line in file:
        print(line.strip())

protobuf
langchain
langchain-openai
tiktoken
langchainhub
chromadb
langgraph
langchain-community
langchain-core
zhipuai
httpx_sse
bs4
lxml
faiss-cpu
streamlit
gradio
fitz
pil-utils
PyPDF2
langserve
pinecone
sse_starlette
uvicorn


In [2]:
# ! pip3 install -r ./requirements.txt --trusted-host pypi.org --trusted-host pypi.python.org --trusted-host files.pythonhosted.org

### 2.2 准备API Key

获取或购买API Key

* ChatGLM: [https://open.bigmodel.cn/](https://open.bigmodel.cn/)
* Open AI: [https://api.xty.app/](https://api.xty.app/)
* TAVILY: [https://tavily.com/](https://tavily.com/)
* KIMI: [https://platform.moonshot.cn/](https://platform.moonshot.cn/)
* Langchain (for Langsmith): 见下一小节

设置API Key

1. 替换`API KEY`的值、然后把下列命令添加到`~/.bash_profile`文件中

~~~bash
export OPENAI_API_KEY="sk-mt...vjl" 
export ZHIPU_API_KEY="210...w5y"
export TAVILY_API_KEY="tvly...E1R" # free API key with 1000 requests 
export LANGCHAIN_API_KEY="lsv2...599"
export KIMI_API_KEY="sk-...h59"
~~~

2. 在`shell`中按`Cmd + C`退出jupyter notebook
3. 载入`API KEY`然后重启`jupyter nootbook`

~~~bash
source ~/.bash_profile
jupyter notebook 
~~~

详细配置参考[setup.sh](setup.sh)
如果使用IDE，则将上述环境变量配置在IDE的运行设置中

In [3]:
# 测试API KEY是否已经在环境变量中 (不包括langsmith相关的）
from lib.config.environment import Environment
Environment.print_env_vars()

ZHIPU_API_KEY	: 210ba... 
OPENAI_API_KEY	: sk-mt...
TAVILY_API_KEY	: tvly-...
LANGCHAIN_API_KEY	: lsv2_...
KIMI_API_KEY	: sk-Jb...
LANGCHAIN_TRACING_V2	: false
LANGCHAIN_PROJECT	: 
LANGCHAIN_ENDPOINT	: 


### 2.3 准备langsmith

访问[https://smith.langchain.com/](https://smith.langchain.com/)

* 注册账号
* 点击`Setting`->`API Key`创建API Key，添加到环境变量中（参考上一小节）
* 点击`Projects`->`Create New Project`查看创建Project的代码，主要是设置下面的一组环境变量，包括LANGCHAIN_TRACING_V2、LANGCHAIN_ENDPOINT、LANGCHAIN_API_KEY、LANGCHAIN_PROJECT

上述方法，会将日志发送到langchain官网，在官网上进入相对应的project，就能查看tracing数据

如果不希望、可以使用官方提供的LangSmith Docker，将日志存储在本地，具体参考：[https://docs.smith.langchain.com/self_hosting/installation](https://docs.smith.langchain.com/self_hosting/installation)

In [4]:
from lib.config.environment import Environment

Environment.setup_up_env_vars(enable_langsmith=False, langsmith_proj='rag_demo_notebook')
Environment.print_env_vars()

ZHIPU_API_KEY	: 210ba... 
OPENAI_API_KEY	: sk-mt...
TAVILY_API_KEY	: tvly-...
LANGCHAIN_API_KEY	: lsv2_...
KIMI_API_KEY	: sk-Jb...
LANGCHAIN_TRACING_V2	: false
LANGCHAIN_PROJECT	: rag_demo_notebook
LANGCHAIN_ENDPOINT	: https://api.smith.langchain.com


### 2.3 知识库索引

需要安装`zhipuai`和`langchain_community`，安装后重启kernel就可以加载了

代码参考文档：

* ZhipuAIEmbedding：[https://api.python.langchain.com/en/latest/embeddings/langchain_community.embeddings.zhipuai.ZhipuAIEmbeddings.html](https://api.python.langchain.com/en/latest/embeddings/langchain_community.embeddings.zhipuai.ZhipuAIEmbeddings.html#langchain_community.embeddings.zhipuai.ZhipuAIEmbeddings)
* Chroma: [https://python.langchain.com/v0.2/docs/integrations/vectorstores/chroma/](https://python.langchain.com/v0.2/docs/integrations/vectorstores/chroma/)
* Load Html: [https://python.langchain.com/v0.2/docs/how_to/document_loader_html/](https://python.langchain.com/v0.2/docs/how_to/document_loader_html/)

In [5]:
### 建立索引
from langchain.text_splitter import RecursiveCharacterTextSplitter
from lib.util.llm_utils import EmbeddingUtil

# 初始化embedding
embd = EmbeddingUtil.getDefaultEmbeddingModel()

# 要加载文件的目录
file_dir = "data/file"

from langchain_community.document_loaders import TextLoader
txt_files = [
    f"{file_dir}/2023-03-15-prompt-engineering.txt",
    f"{file_dir}/2023-06-23-agent.txt",
    f"{file_dir}/2023-10-25-adv-attack-llm.txt",
]
docs = [TextLoader(text_file).load() for text_file in txt_files]
docs_list = [item for sublist in docs for item in sublist]

# 切分
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=512, chunk_overlap=0)
doc_splits = text_splitter.split_documents(docs_list)

# 日志
print(f"\nembedding model: \n{embd}")
print(f"\ndocument chunks: {len(doc_splits)}")


# 加载文件
# from langchain_community.document_loaders import WebBaseLoader
# loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
# docs_list = loader.load()

# from langchain_community.document_loaders import BSHTMLLoader
# backlog_file_dir="../backlog/data/file"
# loader = BSHTMLLoader(file_path=f"{backlog_file_dir}/2023-03-15-prompt-engineering.html")
# docs_list = loader.load()

# from langchain_community.document_loaders import MHTMLLoader
# backlog_file_dir="../backlog/data/file"
# loader = MHTMLLoader(file_path=f"{backlog_file_dir}/2023-03-15-prompt-engineering.mhtml")
# docs_list = loader.load()

ZHIPU_API_KEY	: 210ba... 

embedding model: 
client=<zhipuai._client.ZhipuAI object at 0x105b18320> model='embedding-2' api_key='210ba5bda6c6db7b3025b517addf684c.JKB3YnpY6QD1nw5y' dimensions=None

document chunks: 62


In [6]:
from lib.util.vector_store_utils import VectorStoreUtil
from lib.util.llm_utils import EmbeddingUtil
from lib.config.app_config import Config

# 初始化
embd = EmbeddingUtil.getDefaultEmbeddingModel()
vectorstore_wrapper = VectorStoreUtil.create_default_vectorstore_wrapper()

print(f"embedding model: {embd}\n")
print(f"vectorstore wrapper: {vectorstore_wrapper}\n")

ZHIPU_API_KEY	: 210ba... 
ZHIPU_API_KEY	: 210ba... 
init FAISS
load_chunk: 0
init vector store and load document
vector store load complete
embedding model: client=<zhipuai._client.ZhipuAI object at 0x12e1cc950> model='embedding-2' api_key='210ba5bda6c6db7b3025b517addf684c.JKB3YnpY6QD1nw5y' dimensions=None

vectorstore wrapper: <lib.util.vector_store_utils.FAISSWrapper object at 0x12e1ccaa0>



In [7]:
# 对文档进行embedding，添加到向量数据库，并备份到本地dump文件
# vectorstore_wrapper.init_from_docs(docs=doc_splits, embedding=embd)
# vectorstore_wrapper.trigger_dump(base_dir=default_vectorstore_dump_base_dir())

In [8]:
# 从dump文件中加载向量数据库
vectorstore_wrapper.init_from_dump(base_dir=Config.vectorstore_dump_dir())

# 返回langchain object给其它模块使用
vectorstore = vectorstore_wrapper.get_vector_store()
retriever = vectorstore.as_retriever()
retriever

load from data/vector_store/faiss_dump
load compete


VectorStoreRetriever(tags=['FAISS', 'ZhipuAIEmbeddings'], vectorstore=<langchain_community.vectorstores.faiss.FAISS object at 0x12e0ad340>)

## 3 RAG子模块
### 3.1 LLM Query意图分析及路由

使用路由器在工具之间进行选择，让大模型根据用户提问来判断使用哪条路由

参考文档

* ChatGLM用于langchain框架:[https://open.bigmodel.cn/dev/api#langchain_sdk](https://open.bigmodel.cn/dev/api#langchain_sdk)
* Langchain tool calling: [https://python.langchain.com/v0.2/docs/how_to/tool_calling/](https://python.langchain.com/v0.2/docs/how_to/tool_calling/)


In [9]:
from lib.util.code_utils import CodeUtils
from lib.chains.route_question import create_question_route_chain
CodeUtils.print_code(create_question_route_chain)

def create_question_route_chain() -> RunnableSerializable:
    # llm with tools
    llm = ChatModelUtil.getDefaultChatModel()
    structured_llm_router = llm.bind_tools(tools=[web_search, vectorstore])
    # prompt template
    route_prompt = ChatPromptTemplate(
        messages=[
            SystemMessage(
                content=f"""You are an expert at routing a user question to one of below target: {vectorstore} or {web_search} or {llm_fallback}.\n 
                        The vectorstore contains documents related to agents, prompt engineering, and adversarial attacks. Use the vectorstore for questions on these topics. Use the vectorstore for questions on these topics.\n
                        The llm_fallback is The llm model used for casual conversation.\n 
                        \n"""),
            HumanMessagePromptTemplate.from_template("{question}")
        ]
    )
    # router chain
    question_router = route_prompt | structured_llm_router
    return question_router



In [10]:
question_router_chain = create_question_route_chain()

In [11]:
response = question_router_chain.invoke({"question": "Who will the Bears draft first in the NFL draft?"})
print(response.additional_kwargs["tool_calls"])

[{'id': 'call_20240901185518e1e3f2efe5194f1e', 'function': {'arguments': '{"query": "Who will the Bears draft first in the NFL draft?"}', 'name': 'web_search'}, 'type': 'function', 'index': 0}]


In [12]:
response = question_router_chain.invoke({"question": "What are the types of agent memory?"})
print(response.additional_kwargs["tool_calls"])

[{'id': 'call_202409011855191b8c82c0cdd04c97', 'function': {'arguments': '{"query": "types of agent memory"}', 'name': 'vectorstore'}, 'type': 'function', 'index': 0}]


In [13]:
response = question_router_chain.invoke({"question": "Hi how are you?"})
print("tool_calls" in response.additional_kwargs)

False


### 3.2 评价检索到的文档与提问的相关性

In [14]:
from lib.util.code_utils import CodeUtils
from lib.chains.check_doc import create_doc_relevance_check_chain
CodeUtils.print_code(create_doc_relevance_check_chain)

def create_doc_relevance_check_chain() -> RunnableSerializable:
    # llm
    llm = ChatModelUtil.getDefaultChatModel()

    # structure output
    structured_llm_grader = llm.with_structured_output(
        schema=YesOrNoUtil.YesOrNo, method="json_mode", include_raw=False)

    # ensemble chain
    grade_prompt = ChatPromptTemplate(
        messages=[
            SystemMessage(content=f"""You are a grader assessing relevance of a retrieved-document to a user-question. 
            If the document contains keyword(s) or semantic meaning related to the user question, 
            grade it as relevant. \n 
            {YesOrNoUtil.json_mode_prompt(yes_means="the document is relevant to the question")}."""),
            HumanMessagePromptTemplate.from_template(
                "retrieved-document: \n\n {document} \n\n User question: {question}")
        ]
    )
    retrieval_grader = grade_prompt | structured_llm_grader
    return retrieval_grader



In [15]:
doc_relevance_check_chain = create_doc_relevance_check_chain()

In [16]:
question = "introduce prompt engineering"
docs = retriever.invoke(question)
doc_txt = docs[0].page_content
f'{doc_txt[:500]}...'

'Prompt Engineering\n\nDate: March 15, 2023 | Estimated Reading Time: 21 min | Author: Lilian Weng\nTable of Contents\nPrompt Engineering, also known as In-Context Prompting, refers to methods for how to communicate with LLM to steer its behavior for desired outcomes without updating the model weights. It is an empirical science and the effect of prompt engineering methods can vary a lot among models, thus requiring heavy experimentation and heuristics.\n\nThis post only focuses on prompt engineering f...'

In [17]:
response = doc_relevance_check_chain.invoke({"question": "introduce prompt engineering", "document": doc_txt})
print(response)

binary_score='yes'


In [18]:
response = doc_relevance_check_chain.invoke({"question": "types of ice cream", "document": doc_txt})
print(response)

binary_score='no'


### 3.3 让LLM回答问题: (1) RAG Chain

当拿到向量数据库中的Document，以及用户提问后，就可以用它们来回答问题

In [19]:
from lib.chains.answer_question import create_answer_with_doc_chain
CodeUtils.print_code(create_answer_with_doc_chain)

def create_answer_with_doc_chain(get_by_session_id: Optional[GetSessionHistoryCallable] = None) -> Runnable:
    # llm and background message
    llm = ChatModelUtil.getDefaultChatModel()
    background_msg = """You are an assistant for question-answering tasks. Use the following pieces of 
            retrieved context to answer the question. \n Answer 'I don't know' if you do not have information to 
            answer this question. \n Use three sentences maximum and keep the answer concise if you have information 
            to answer this question."""
    if get_by_session_id is None:
        # rag without chat history, require upstream input has below fields
        # * question  : user question
        # * documents : documents retrieved for answer this question
        non_history_template: Callable[[Any], ChatPromptTemplate] \
                = lambda upstream_input: ChatPromptTemplate.from_messages([
                    SystemMessage(content=background_msg),
                

In [20]:
answer_with_doc_chain = create_answer_with_doc_chain(get_by_session_id = None)

In [21]:
# test: an answer that covered by document
question = "introduce prompt engineering"
generation = answer_with_doc_chain.invoke({"documents": docs, "question": question})
print(generation)

Prompt engineering is the practice of crafting and fine-tuning prompts to guide AI models like language generators to produce desired outputs. It involves understanding the capabilities and limitations of AI to optimize the way users communicate with these systems. This technique is crucial for improving the relevance and quality of the responses generated by AI.


In [22]:
# test: an answer that model does not know
question = "What is the job of the person named RJXACAGEDSG LAMUX?"
generation = answer_with_doc_chain.invoke({"documents": docs, "question": question})
print(generation)

I don't know.


### 3.4 让LLM回答问题: (2) Non RAG Chain

In [23]:
from lib.chains.answer_question import create_answer_with_llm_chain
CodeUtils.print_code(create_answer_with_llm_chain)

def create_answer_with_llm_chain(get_by_session_id: Optional[GetSessionHistoryCallable] = None) -> Runnable:
    # llm and background message
    llm = ChatModelUtil.getDefaultChatModel()
    background_msg = """You are an assistant for question-answering tasks. Answer the question based upon your 
    knowledge. Use three sentences maximum and keep the answer concise."""
    if get_by_session_id is None:
        # answer question without chat history, require upstream message provides below fields
        # * question  : user question
        non_history_template: Callable[[Any], ChatPromptTemplate] \
            = lambda x: ChatPromptTemplate.from_messages([
                SystemMessage(content=background_msg),
                HumanMessage(f"Question: {x['question']} \nAnswer: ")
            ]
        )
        return non_history_template | llm | StrOutputParser()
    else:
        # https://api.python.langchain.com/en/latest/runnables/langchain_core.runnables.history.RunnableWithMe

In [24]:
answer_with_llm_chain = create_answer_with_llm_chain(get_by_session_id=None)

In [25]:
question = "Who are you?"
generation = answer_with_llm_chain.invoke({"question": question})
print(generation)

I am an AI assistant designed to provide information and answer questions based on the knowledge programmed into me. I don't have personal experiences or consciousness. How can I assist you today?


### 3.5 幻觉判定

In [26]:
from lib.chains.check_answer import create_trustworthiness_check_chain
CodeUtils.print_code(create_trustworthiness_check_chain)

def create_trustworthiness_check_chain() -> RunnableSerializable:
    # llm
    llm = ChatModelUtil.getDefaultChatModel()
    structured_llm_grader = llm.with_structured_output(YesOrNoUtil.YesOrNo, method='json_mode', include_raw=False)
    # prompt
    hallucination_prompt = ChatPromptTemplate.from_messages(
        [
            SystemMessage(content=f"""You are a grader assessing whether all content in LLM-generation is grounded in the set-of-facts and the information within this set of facts is sufficiently robust to substantiate. \n 
            {YesOrNoUtil.json_mode_prompt(
                yes_means="all content in LLM-generation is grounded in the set-of-facts and the information within this set-of-facts is sufficiently robust to substantiate.")}"""),
            HumanMessagePromptTemplate.from_template(
                "LLM-generation: \n\n {generation} \n\n"
                "set-of-facts: \n\n {documents}"),
        ]
    )
    chain = hallucination_prompt | structured_llm_gr

In [27]:
trustworthiness_check_chain = create_trustworthiness_check_chain()

In [28]:
# test 1: an answer related with the documents
response = trustworthiness_check_chain.invoke({"documents": docs, "generation": "Prompt engineering is the process of crafting and fine-tuning natural language prompts to guide AI models like GPT-3 towards generating desired outputs. It involves understanding the model's capabilities and limitations to elicit more accurate, relevant, and contextually appropriate responses. This technique is crucial for improving the performance and utility of AI in various applications."})

print(f'Reliability: {response}\n')
print(f'Reference documents: \n {docs[0].to_json().get('kwargs')}\n'[:300])

Reliability: binary_score='yes'

Reference documents: 
 {'metadata': {'source': '../data/file/2023-03-15-prompt-engineering.txt'}, 'page_content': "Prompt Engineering\n\nDate: March 15, 2023 | Estimated Reading Time: 21 min | Author: Lilian Weng\nTable of Contents\nPrompt Engineering, also known as In-Context Prompting, refers to m


In [29]:
# test 2: an answer not related with the documents
response = trustworthiness_check_chain.invoke({"documents": docs, "generation": "Prompt engineering is a technique for promoting and motivating subordinates."})

print(f'Reliability: {response}\n')
print(f'Reference documents: \n {docs[0].to_json().get('kwargs')}\n'[:300])

Reliability: binary_score='no'

Reference documents: 
 {'metadata': {'source': '../data/file/2023-03-15-prompt-engineering.txt'}, 'page_content': "Prompt Engineering\n\nDate: March 15, 2023 | Estimated Reading Time: 21 min | Author: Lilian Weng\nTable of Contents\nPrompt Engineering, also known as In-Context Prompting, refers to m


### 3.6 回答质量评价

In [30]:
from lib.chains.check_answer import create_effectiveness_check_chain
CodeUtils.print_code(create_effectiveness_check_chain)

def create_effectiveness_check_chain() -> RunnableSerializable:
    # llm with structure output
    llm = ChatModelUtil.getDefaultChatModel()
    structured_llm_grader = llm.with_structured_output(YesOrNoUtil.YesOrNo, method='json_mode', include_raw=False)
    # prompt
    answer_prompt = ChatPromptTemplate.from_messages(
        [
            SystemMessage(
                content=f"""
                You are a grader assessing whether an LLM-generation addresses / resolves the user-question. 
                {YesOrNoUtil.json_mode_prompt(yes_means="the LLM-generation resolves the user-question")}
                """),
            HumanMessagePromptTemplate.from_template(
                "user-question: \n\n {question} \n\n LLM-generation: {generation}")
        ]
    )
    # chain
    effectiveness_grader = answer_prompt | structured_llm_grader
    return effectiveness_grader



In [31]:
effectiveness_check_chain = create_effectiveness_check_chain()

In [32]:
# test
response=effectiveness_check_chain.invoke({"question": "who am i", "generation": "I am an AI assistant designed to provide information and answer questions based on the knowledge programmed into me. I don't have personal experiences or consciousness. How can I assist you today?"})
response

YesOrNo(binary_score='no')

In [33]:
response=effectiveness_check_chain.invoke({"question": "who are u", "generation": "I am an AI assistant designed to provide information and answer questions based on the knowledge programmed into me. I don't have personal experiences or consciousness. How can I assist you today?"})
response

YesOrNo(binary_score='yes')

In [34]:
response=effectiveness_check_chain.invoke({"question": "introduce prompt engineering", "generation": "Prompt engineering is the process of crafting and fine-tuning natural language prompts to guide AI models like GPT-3 towards generating desired outputs. It involves understanding the model's capabilities and limitations to elicit more accurate, relevant, and contextually appropriate responses. This technique is crucial for improving the performance and utility of AI in various applications."})
response

YesOrNo(binary_score='yes')

### 3.7 网页搜索工具

In [35]:
from lib.util.web_search_utils import create_web_search_tool
CodeUtils.print_code(create_web_search_tool)

def create_web_search_tool() -> BaseTool:
    # check API key
    print(os.environ['TAVILY_API_KEY'][:5])
    # sample search
    web_search_tool = TavilySearchResults()
    return web_search_tool



In [36]:
web_search_tool = create_web_search_tool()

tvly-


In [37]:
# search_result=web_search_tool.invoke("Tom and Jerry")
# search_result[0]

## 4 Graph组成

以图表形式捕获流程

### 4.1 定义Graph State

#### (1) graph state

In [38]:
from lib.graph.self_reflection_rag.state.graph_state import RagGraphState
CodeUtils.print_code(RagGraphState)

class RagGraphState(TypedDict):
    """
    表示图表的状态。在Graph运行过程中，存储各个Node产生的数据，
    - 用作Node的输入和输出。
    - 用作Edge的输入，帮助Edge决定路由到哪个Node上

    属性：
    - question：用户提问
    - generation：LLM生成的答案
    - documents：从向量数据库中检索到的文档列表
    """

    question: str
    generation: str
    documents: List[str]



### 4.2 定义Graph Node

#### (1) retrieve_node：查询vector store

In [39]:
from lib.graph.self_reflection_rag.node.retrieve_doc import create_retrieve_node
CodeUtils.print_code(create_retrieve_node)

def create_retrieve_node(vector_store_wrapper: VectorStoreWrapper) -> RunnableLike:
    def retrieve(state: RagGraphState, retriever: VectorStoreRetriever):
        """
        检索文档，从向量数据库查询与用户提问有关的内容

        参数： state (dict)：当前图形状态
        返回： state (dict)：添加到包含已检索文档的状态文档的新键
        """
        print("---检索---")
        question = state["question"]

        # 检索
        documents = retriever.invoke(question)

        # 返回更新后的graph state
        return {"documents": documents, "question": question}\

    print(vector_store_wrapper)
    print(vector_store_wrapper.get_vector_store())
    vector_store = vector_store_wrapper.get_vector_store()
    return lambda state: retrieve(state, vector_store.as_retriever())



In [40]:
retrieve_node = create_retrieve_node(vectorstore_wrapper)
retrieve_node

<lib.util.vector_store_utils.FAISSWrapper object at 0x12e1ccaa0>
<langchain_community.vectorstores.faiss.FAISS object at 0x12e0ad340>


<function lib.graph.self_reflection_rag.node.retrieve_doc.create_retrieve_node.<locals>.<lambda>(state)>

#### (2) answer_with_doc_node: 使用LLM和vector store生成答案

In [41]:
from lib.graph.self_reflection_rag.node.answer_question import create_answer_with_doc_node
CodeUtils.print_code(create_answer_with_doc_node())

    return lambda state: answer_with_doc(state, answer_with_doc_chain)



In [42]:
answer_with_doc_node = create_answer_with_doc_node()
answer_with_doc_node

<function lib.graph.self_reflection_rag.node.answer_question.create_answer_with_doc_node.<locals>.<lambda>(state)>

#### (3) answer_with_llm_node：只用LLM生成答案

In [43]:
from lib.graph.self_reflection_rag.node.answer_question import create_answer_with_llm_node
CodeUtils.print_code(create_answer_with_llm_node)

def create_answer_with_llm_node(get_by_session_id: Optional[GetSessionHistoryCallable] = None) -> RunnableLike:
    def answer_with_llm(state : RagGraphState, chain: Runnable):
        """
        只使用LLM生成答案

        参数：state (dict)：当前图形状态
        返回：state (dict)：添加到状态、generation 的新键，其中包含 LLM Generation
        """
        print("---LLM Fallback---")
        question = state["question"]
        generation = f"{chain.invoke({"question": question})} (no support document)"
        print(f"question: {question}")
        print(f"generation: {generation}")
        return {"question": question, "generation": generation}

    answer_with_llm_chain = create_answer_with_llm_chain(get_by_session_id)
    return lambda state: answer_with_llm(state, answer_with_llm_chain)



In [44]:
answer_with_llm_node = create_answer_with_llm_node()
answer_with_llm_node

<function lib.graph.self_reflection_rag.node.answer_question.create_answer_with_llm_node.<locals>.<lambda>(state)>

#### (4) check_doc_relevance_node: 检测文档与提问是否相

In [45]:
from lib.graph.self_reflection_rag.node.check_doc import create_doc_relevance_check_node
CodeUtils.print_code(create_doc_relevance_check_node)

def create_doc_relevance_check_node() -> RunnableLike:
    doc_relevance_check_chain = create_doc_relevance_check_chain()
    return lambda state: check_relevance(state, doc_relevance_check_chain)



In [46]:
doc_relevance_check_node = create_doc_relevance_check_node()
doc_relevance_check_node

<function lib.graph.self_reflection_rag.node.check_doc.create_doc_relevance_check_node.<locals>.<lambda>(state)>

#### (5) web_search: 网页搜索

In [47]:
from lib.graph.self_reflection_rag.node.web_search import create_web_search_node
CodeUtils.print_code(create_web_search_node)

def create_web_search_node() -> RunnableLike:
    def web_search(state, tool):
        """
        根据重新表述的问题进行网络搜索。

        参数：state (dict)：当前图形状态
        返回：state (dict)：使用附加的网络结果更新文档键
        """

        print("---网络搜索---")
        question = state["question"]

        # 网络搜索
        docs = tool.invoke({"query": question})
        print(f"web_results: {docs}"[:300])

        web_results = "\n".join([d["content"] for d in docs])
        web_results = Document(page_content=web_results)
        return {"documents": web_results, "question": question}

    web_search_tool = create_web_search_tool()
    return lambda state: web_search(state, web_search_tool)



In [48]:
web_search_node = create_web_search_node()
web_search_node

tvly-


<function lib.graph.self_reflection_rag.node.web_search.create_web_search_node.<locals>.<lambda>(state)>

### 4.3 定义Graph Edge Router

Edge的输入是Graph State，输出是Node注册在Graph中的Key（4.3小节介绍）

有些Edge会根据条件，将调用路由到不同Node上，需要为它们编写路由

#### (1) question_router: 根据问题类型决定回答方式

判断用户提问的类型

`web_search`：时事类问题，需要借助网络搜索才能回答

`vectorstore`：知识库里面的专业问题，需要借助向量数据库才能回答

`llm`：普通问题，由LLM独立回答

In [49]:
from lib.graph.self_reflection_rag.edge_router.route_by_question_type import create_question_router
CodeUtils.print_code(create_question_router)

def create_question_router() -> RunnableLike:
    def route_question(state: RagGraphState, chain: RunnableSerializable):
        """
        将问题路由到网络搜索或 RAG。

        参数： state (dict)：当前图形状态
        返回： str：路由结论，包括web_search, vectorstore, llm_fallback
        """

        print("---路由用户问题---")
        question = state["question"]
        source = chain.invoke({"question": question})

        # 如果没有决定则返回 LLM 或引发错误
        if "tool_calls" not in source.additional_kwargs:
            print("---把问题路由到LLM---")
            return "llm_fallback"
        if len(source.additional_kwargs["tool_calls"]) == 0:
            raise "路由无法确定来源"

        # 选择数据源
        datasource = source.additional_kwargs["tool_calls"][0]["function"]["name"]
        if datasource == "web_search":
            print("---把问题路由到网络搜索---")
            return "web_search"
        elif datasource == "vectorstore":
            print("---把问题路由到数据库---")
            return "vectorstore"
        else:
            print("---把问题路由到LL

In [50]:
question_router = create_question_router()
question_router

<function lib.graph.self_reflection_rag.edge_router.route_by_question_type.create_question_router.<locals>.<lambda>(state)>

#### (2) doc_relevance_check_router：检查文档是否有足够的文档用于生成答案

In [51]:
from lib.graph.self_reflection_rag.edge_router.route_by_doc_relevance import create_doc_relevance_check_router
CodeUtils.print_code(create_doc_relevance_check_router)

def create_doc_relevance_check_router() -> RunnableLike:
    def decide_to_generate(state: RagGraphState):
        """
        确定是否生成答案或重新生成问题。

        参数：state (dict)：当前图形状态
        返回：str：路由结论， web_search (所有文档都和问题无关，尝试网络搜索), generate (生成答案)
        """

        print("---评估已评分文件---")
        filtered_documents = state["documents"]

        if not filtered_documents or len(filtered_documents) == 0:
            # 所有文档都已过滤 check_relevance
            # 我们将重新生成一个新查询
            print("---所有文件与问题无关 ---")
            return "no_relevance"
        else:
            # 我们有相关文件，因此生成答案
            print(f"---{len(filtered_documents)}个文件与问题相关---")
            return "has_relevance"

    return lambda state: decide_to_generate(state)



In [52]:
doc_relevance_check_router = create_doc_relevance_check_router()
doc_relevance_check_router

<function lib.graph.self_reflection_rag.edge_router.route_by_doc_relevance.create_doc_relevance_check_router.<locals>.<lambda>(state)>

#### (3) answer_check_router：检测文档及回答

In [53]:
from lib.graph.self_reflection_rag.edge_router.route_by_answer_trustworthiness_and_effectiveness import create_answer_check_router

CodeUtils.print_code(create_answer_check_router)

def create_answer_check_router() -> RunnableLike:
    def check_answer(state: RagGraphState, trustworthiness_check_chain: RunnableSerializable, effectiveness_check_chain: RunnableSerializable):
        """
        确定生成是否基于文档并回答问题。

        参数： state (dict)：当前图形状态
        返回： str：路由结论，useful (答案能够解答问题）, not useful (答案不能解答问题），not support（大模型没有足够的信息，无法回答该问题，产生了幻觉）
        """
        question = state["question"]
        documents = state["documents"] if "documents" in state else None
        generation = state["generation"]
        print(f"question: {question}")
        print(f"generation: {generation}"[:200])

        # 检查幻觉
        if documents is not None and len(documents) > 0:
            print("---检查幻觉---")
            score = trustworthiness_check_chain.invoke(
                {"documents": documents, "generation": generation}
            )
            print(f"documents: {documents}")
            print(f"generation: {generation}")
            grade = score.binary_score
            

In [54]:
answer_check_router = create_answer_check_router()

## 5. 构建和调用Graph

### 5.1 构建Graph

In [55]:
from lib.graph.self_reflection_rag.graph import RAGGraphWrapper 

graph_wrapper = RAGGraphWrapper(vectorstore_wrapper)
app = graph_wrapper.get_graph()
app

<langchain_community.vectorstores.faiss.FAISS object at 0x12e0ad340>
tvly-
<lib.util.vector_store_utils.FAISSWrapper object at 0x12e1ccaa0>
<langchain_community.vectorstores.faiss.FAISS object at 0x12e0ad340>


CompiledStateGraph(nodes={'__start__': PregelNode(config={'tags': ['langsmith:hidden'], 'metadata': {}, 'configurable': {}}, channels=['__start__'], triggers=['__start__'], writers=[ChannelWrite<question,generation,documents>(recurse=True, writes=[ChannelWriteEntry(channel='question', value=<object object at 0x1078bb3d0>, skip_none=False, mapper=_get_state_key(recurse=False)), ChannelWriteEntry(channel='generation', value=<object object at 0x1078bb3d0>, skip_none=False, mapper=_get_state_key(recurse=False)), ChannelWriteEntry(channel='documents', value=<object object at 0x1078bb3d0>, skip_none=False, mapper=_get_state_key(recurse=False))], require_at_least_one_of=['question', 'generation', 'documents']), _route(recurse=True, _is_channel_writer=True)]), 'web_search_node': PregelNode(config={'tags': [], 'metadata': {}, 'configurable': {}}, channels={'question': 'question', 'generation': 'generation', 'documents': 'documents'}, triggers=['branch:__start__:condition:web_search_node', 'bran

### 5.2 测试

#### (1) 需要查询网页来回答的问题

In [68]:
import pprint

def run_with_web_search(inputs): 
    for output in app.stream(inputs):
        for key, value in output.items():
            # 节点
            pprint.pprint(f"Node '{key}':")
            # 可选：打印每个节点的完整状态
        pprint.pprint("\n---\n")
    # 最终生成
    pprint.pprint(value["generation"])
    
inputs = {
    "question": "introduce the latest game named Black Myth: Wukong"
}
# run_with_web_search(inputs)

---路由用户问题---
---把问题路由到网络搜索---
---网络搜索---
web_results: [{'url': 'https://www.thatsmandarin.com/blog-posts/black-myth-wukong/', 'content': 'Black Myth: Wukong is an action role-playing game that brings Chinese mythology to life. Developed by Game Science, the game takes players on an adventure with Sun Wukong (孙悟空 Sūn Wùkōng), who is a well-
"Node 'web_search_node':"
'\n---\n'
---检查文件与问题的相关性---
Question: introduce the latest game named Black Myth: Wukong
Documents: page_content='Black Myth: Wukong is an action role...
---打分：文档不相关---
('id', None)
---打分：文档不相关---
('metadata', {})
---打分：文档相关---
---打分：文档相关---
---评估已评分文件---
---2个文件与问题相关---
"Node 'web_relevance_check_node':"
'\n---\n'
---GENERATE---
question: introduce the latest game named Black Myth: Wukong
generation: Black Myth: Wukong is an upcoming action role-playing game developed by Game Science Studio, featuring stunning visuals and fast-paced combat inspired by Chinese mythology, with its protag
---检查幻觉---
documents: [('page_content'

#### (2) 需要查向量数据库来回答的问题

In [57]:
# Run
inputs = {"question": "What are the types of agent memory?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # 节点
        pprint.pprint(f"Node '{key}':")
        # 可选：打印每个节点的完整状态
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint.pprint("\n---\n")

# 最终生成
pprint.pprint(value["generation"])

---路由用户问题---
---把问题路由到数据库---
---检索---
"Node 'retrieve_node':"
'\n---\n'
---检查文件与问题的相关性---
Question: What are the types of agent memory?
Documents: [Document(metadata={'source': '../data/file/2023-0...
---打分：文档相关---
---打分：文档不相关---
page_content='Finite context length: The restricted context capacity limits the inclusion of historical information, detailed instructions, API call context, and responses. The design of the system has to work with this limited communication bandwidth, while mechanisms like self-reflection to learn from past mistakes would benefit a lot from long or infinite context windows. Although vector stores and retrieval can provide access to a larger knowledge pool, their representation power is not as powerful as full attention.

Challenges in long-term planning and task decomposition: Planning over a lengthy history and effectively exploring the solution space remain challenging. LLMs struggle to adjust plans when faced with unexpected errors, making them less robust

#### (3) 需要由LLM直接回答的问题

In [58]:
# Run
inputs = {"question": "Hello, who I am talking to?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # 节点
        pprint.pprint(f"Node '{key}':")
        # 可选：打印每个节点的完整状态
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint.pprint("\n---\n")

# 最终生成
pprint.pprint(value["generation"])

---路由用户问题---
---把问题路由到LLM---
---LLM Fallback---
question: Hello, who I am talking to?
generation: Hello, you're talking to an AI assistant designed to help with question-answering tasks. How can I assist you today? (no support document)
"Node 'answer_with_llm_node':"
'\n---\n'
("Hello, you're talking to an AI assistant designed to help with "
 'question-answering tasks. How can I assist you today? (no support document)')


#### (4) 同时使用Chat History以及Documents时，Response的结构

In [59]:
from typing import Optional
from lib.graph.self_reflection_rag.graph import RAGGraphWrapper 
from langchain_core.chat_history import BaseChatMessageHistory, InMemoryChatMessageHistory

g_history = InMemoryChatMessageHistory()
def get_by_session_id(session_id: Optional[str]=None) -> BaseChatMessageHistory:
    return g_history

graph_wrapper_with_history = RAGGraphWrapper(
    vector_store_wrapper = vectorstore_wrapper, get_by_session_id = get_by_session_id)
app_with_history = graph_wrapper.get_graph()

<langchain_community.vectorstores.faiss.FAISS object at 0x12e0ad340>
tvly-
<lib.util.vector_store_utils.FAISSWrapper object at 0x12e1ccaa0>
<langchain_community.vectorstores.faiss.FAISS object at 0x12e0ad340>


In [60]:
def ask_question_with_history(question : str): 
    r = app_with_history.invoke({"question": question})
    get_by_session_id().add_user_message(question)
    get_by_session_id().add_ai_message(r['generation'])
    return r

In [61]:
resp = ask_question_with_history("What are you?")
resp

---路由用户问题---
---把问题路由到LLM---
---LLM Fallback---
question: What are you?
generation: I am an AI assistant designed to provide information and answer questions based on my programming and available data. I do not have consciousness or emotions. My responses are generated through algorithms and natural language processing. (no support document)


{'question': 'What are you?',
 'generation': 'I am an AI assistant designed to provide information and answer questions based on my programming and available data. I do not have consciousness or emotions. My responses are generated through algorithms and natural language processing. (no support document)'}

In [69]:
# 因为只使用了snippet，提供给大模型的信息非常有限，导致大模型对于比较难回答的问题产生了幻觉
# 幻觉检查模块发现了这个问题，并要求graph route back并进行重试
# resp = ask_question_with_history("Who's Lvcai Xu?")
# resp

---路由用户问题---
---把问题路由到网络搜索---
---网络搜索---
web_results: [{'url': 'https://www.usenix.net/conference/usenixatc11/order-object-centric-deterministic-replay-java', 'content': 'Lvcai Xu, Fudan University. Haibo Chen, Fudan University. Binyu Zang, Fudan University. Open Access Media. USENIX is committed to Open Access to the research presented at
---检查文件与问题的相关性---
Question: Who's Lvcai Xu?
Documents: page_content='Lvcai Xu, Fudan University. Haibo Ch...
---打分：文档不相关---
('id', None)
---打分：文档不相关---
('metadata', {})
---打分：文档相关---
---打分：文档相关---
---评估已评分文件---
---2个文件与问题相关---
---GENERATE---
question: Who's Lvcai Xu?
generation: Lvcai Xu is a Chinese engineer and entrepreneur known for his work in the field of autonomous vehicles and as the co-founder of AutoX, a company focused on developing Level 4 autonomous dr
---检查幻觉---
documents: [('page_content', 'Lvcai Xu, Fudan University. Haibo Chen, Fudan University. Binyu Zang, Fudan University. Open Access Media. USENIX is committed to Open Access to the

GraphRecursionError: Recursion limit of 25 reached without hitting a stop condition. You can increase the limit by setting the `recursion_limit` config key.

In [70]:
resp = ask_question_with_history("Introduce Java programming")
resp

---路由用户问题---
---把问题路由到网络搜索---
---网络搜索---
web_results: [{'url': 'https://introcs.cs.princeton.edu/java/home/', 'content': 'Introduction to Programming in Java. Our textbook Introduction to Programming in Java [ Amazon · Pearson · InformIT] is an interdisciplinary approach to the traditional CS1 curriculum with Java. We teach the classic ele
---检查文件与问题的相关性---
Question: Introduce Java programming
Documents: page_content='Introduction to Programming in Java....
---打分：文档不相关---
('id', None)
---打分：文档不相关---
('metadata', {})
---打分：文档相关---
---打分：文档相关---
---评估已评分文件---
---2个文件与问题相关---
---GENERATE---
question: Introduce Java programming
generation: Java is a widely-used, object-oriented programming language known for its portability and platform independence, enabling developers to write code that runs on any device with a Java Virtu
---检查幻觉---
documents: [('page_content', 'Introduction to Programming in Java. Our textbook Introduction to Programming in Java [ Amazon · Pearson · InformIT] is an int

{'question': 'Introduce Java programming',
 'generation': 'Java is a widely-used, object-oriented programming language known for its portability and platform independence, enabling developers to write code that runs on any device with a Java Virtual Machine (JVM). It is commonly used for developing enterprise-level applications, web applications, and Android apps. Java emphasizes readability and simplicity, with a syntax similar to C++, and provides a robust standard library.',
 'documents': [('page_content',
   'Introduction to Programming in Java. Our textbook Introduction to Programming in Java [ Amazon · Pearson · InformIT] is an interdisciplinary approach to the traditional CS1 curriculum with Java. We teach the classic elements of programming, using an "objects-in-the-middle" approach that emphasizes data abstraction. We motivate each ...\nGet started with Java by learning about the basics of a Java program and variables! 4.6. 1,311 ratings. Start. 35,567 learners enrolled. This 

In [71]:
resp = ask_question_with_history("who is WQXTEXVS")
resp

---路由用户问题---
---把问题路由到网络搜索---
---网络搜索---
web_results: [{'url': 'https://www.whois.com/whois/', 'content': "The Whois database contains details such as the registration date of the domain name, when it expires, ownership and contact information, nameserver information of the domain, the registrar via which the domain was purchased, etc.\n A
---检查文件与问题的相关性---
Question: who is WQXTEXVS
Documents: page_content='The Whois database contains details ...
---打分：文档不相关---
('id', None)
---打分：文档不相关---
('metadata', {})
---打分：文档不相关---
('page_content', "The Whois database contains details such as the registration date of the domain name, when it expires, ownership and contact information, nameserver information of the domain, the registrar via which the domain was purchased, etc.\n Alternatively, if the domain name has already been registered, you can either register similar available domain names that we suggest, or use the contact information provided in order to get in touch with the owner and resp

{'question': 'who is WQXTEXVS',
 'generation': 'WQXTEXVS does not refer to a known individual or recognized entity; it appears to be a random or encrypted sequence of characters. Without additional context, it is impossible to determine who or what WQXTEXVS is. (no support document)',
 'documents': []}

In [72]:
resp = ask_question_with_history("Introduce prompt engineering")
resp

---路由用户问题---
---把问题路由到数据库---
---检索---
---检查文件与问题的相关性---
Question: Introduce prompt engineering
Documents: [Document(metadata={'source': '../data/file/2023-0...
---打分：文档相关---
---打分：文档相关---
---打分：文档相关---
---打分：文档相关---
---评估已评分文件---
---4个文件与问题相关---
---GENERATE---
question: Introduce prompt engineering
generation: Prompt engineering is a technique used in artificial intelligence where a user crafts and fine-tunes prompts to guide the AI's responses, enhancing the quality and relevance of the output.
---检查幻觉---
documents: [Document(metadata={'source': '../data/file/2023-03-15-prompt-engineering.txt'}, page_content="Prompt Engineering\n\nDate: March 15, 2023 | Estimated Reading Time: 21 min | Author: Lilian Weng\nTable of Contents\nPrompt Engineering, also known as In-Context Prompting, refers to methods for how to communicate with LLM to steer its behavior for desired outcomes without updating the model weights. It is an empirical science and the effect of prompt engineering methods can var

{'question': 'Introduce prompt engineering',
 'generation': "Prompt engineering is a technique used in artificial intelligence where a user crafts and fine-tunes prompts to guide the AI's responses, enhancing the quality and relevance of the output. It involves understanding how to elicit the desired information or behavior from an AI model by carefully designing the input. This method is particularly useful in improving the performance of language models on specific tasks.",
 'documents': [Document(metadata={'source': '../data/file/2023-03-15-prompt-engineering.txt'}, page_content="Prompt Engineering\n\nDate: March 15, 2023 | Estimated Reading Time: 21 min | Author: Lilian Weng\nTable of Contents\nPrompt Engineering, also known as In-Context Prompting, refers to methods for how to communicate with LLM to steer its behavior for desired outcomes without updating the model weights. It is an empirical science and the effect of prompt engineering methods can vary a lot among models, thus r

In [73]:
get_by_session_id() 

InMemoryChatMessageHistory(messages=[HumanMessage(content='What are you?'), AIMessage(content='I am an AI assistant designed to provide information and answer questions based on my programming and available data. I do not have consciousness or emotions. My responses are generated through algorithms and natural language processing. (no support document)'), HumanMessage(content='Introduce "tom and jerry"'), AIMessage(content='Tom and Jerry are iconic animated characters created by William Hanna and Joseph Barbera. They star in a series of comedic short films where Tom, the cat, often chases Jerry, the mouse, leading to a variety of slapstick scenarios. The series is known for its clever humor and enduring popularity since its debut in 1940.'), HumanMessage(content='Introduce Java programming'), AIMessage(content='Java is a widely-used, object-oriented programming language known for its portability and platform independence, enabling developers to write code that runs on any device with a

In [74]:
print(get_by_session_id().messages[0].type)
print(get_by_session_id().messages[1].type)

human
ai
