# Self-RAG + LangGraph 例子

In [None]:
! pip install -U langgraph langchain-openai tavily-python langchain-community tiktoken langchainhub chromadb langchain langgraph


In [None]:
import os
from dotenv import load_dotenv

# 初始化 langsmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = "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"
os.environ["LANGCHAIN_PROJECT"] = "default"

# https://smith.langchain.com

# Tavily API Key
os.environ["TAVILY_API_KEY"] = "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"

# https://app.tavily.com/

# 加载 .env 文件
load_dotenv()

In [None]:
from langchain import hub as langchain_hub
from langchain.schema import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain.schema.runnable import RunnablePassthrough
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langchain_openai import OpenAIEmbeddings
import os
from langchain_community.vectorstores.chroma import Chroma
from langchain_core.prompts import PromptTemplate
from string import Template
import uuid

# 读取 ./data/data.md 文件作为知识库
file_path = os.path.join('/Users/chenchen/Desktop/egg/aiops/module_5/demo_6/data', 'data.md')
with open(file_path, 'r', encoding='utf-8') as f:
    docs_string = f.read()

# split the document into chunks base on markdown headers
headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
]
text_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
splits = text_splitter.split_text(docs_string)
print("Length of splits: ", len(splits))
print(splits)

# 向量化
# 保存到随机目录里
random_directory = "./" + str(uuid.uuid4())
# embedding = OpenAIEmbeddings(model="text-embedding-3-small",
#                              openai_api_key=os.getenv("OPENAI_API_KEY"), openai_api_base=os.getenv("OPENAI_API_BASE"))

embedding = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(documents=splits, embedding=embedding, persist_directory=random_directory)
vectorstore.persist()
retriever = vectorstore.as_retriever()

In [None]:
# 评估检索的文档与用户提出的问题是否相关
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI


class GradeDocuments(BaseModel):
    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")


llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, openai_api_key=os.getenv("OPENAI_API_KEY"),
                 openai_api_base=os.getenv("OPENAI_API_BASE"))

structured_llm_grader = llm.with_structured_output(GradeDocuments)

# prompt
system = """
您是一名评分员，负责评估检索到的文档与用户问题的相关性。\n
测试不需要很严格，目标是过滤掉错误的检索。\n
如果文档包含与用户问题相关的关键字或语义含义，则将其评为相关。\n
给出二进制分数"yes"或"no"，以指示文档是否与问题相关。
"""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: \n\n {question}")
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader

# 相关问题
question = "payment_backend 服务是谁维护的"
# 不相关问题
# question = "北京天气如何"

docs = retriever._get_relevant_documents(question, run_manager=None)

# 观察每一个文档块的相关性判断结果
for doc in docs:
    print("doc: \n", doc.page_content, "\n")
    print(retrieval_grader.invoke({"question": question, "document": doc.page_content}))
    print("\n")

In [None]:
# 生成回复

from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = hub.pull("rlm/rag-prompt")

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, openai_api_key=os.getenv("OPENAI_API_KEY"),
                 openai_api_base=os.getenv("OPENAI_API_BASE"))


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = prompt | llm | StrOutputParser()

generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

In [None]:
# 评估 LLM 的回复是否基于事实，有没有产生幻觉

class GraderHallucinations(BaseModel):
    """Binary score for hallucinations present in generation answer."""

    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")


llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, openai_api_key=os.getenv("OPENAI_API_KEY"),
                 openai_api_base=os.getenv("OPENAI_API_BASE"))
structured_llm_grader = llm.with_structured_output(GraderHallucinations)

# prompt
system = """
您是一名评分员，正在评估 LLM 生成是否基于一组检索的事实/由一组检索到的事实支持。\n
给出二进制分数"yes"或"no"，"yes"表示答案基于一组事实/由一组事实支持。
"""

hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Set of facts: \n\n {documents} \n\n LLM generations: {generation}")
    ]
)

hallucination_grader = hallucination_prompt | structured_llm_grader

hallucination_grader.invoke({"documents": docs, "generation": generation})

In [None]:
# 评估 LLM 回答是否解决了用户的问题

class GraderAnswer(BaseModel):
    """Binary score to access answer addresses question."""

    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")


llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, openai_api_key=os.getenv("OPENAI_API_KEY"),
                 openai_api_base=os.getenv("OPENAI_API_BASE"))
structured_llm_grader = llm.with_structured_output(GraderAnswer)

# prompt
system = """
您是一名评分员，评估答案是否解决了某个问题。\n
给出二进制分数"yes"或"no"，"yes"表示答案解决了问题。
"""

answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generations: {generation}")
    ]
)

answer_grader = answer_prompt | structured_llm_grader

answer_grader.invoke({"question": question, "generation": generation})

In [None]:
# 重写问题

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, openai_api_key=os.getenv("OPENAI_API_KEY"),
                 openai_api_base=os.getenv("OPENAI_API_BASE"))

system = """
您有一个问题重写器，可将输入问题转换为针对 vectorstore 检索进行了优化的更好版本 \n
查看输入并尝试推断底层语义意图/含义，使用用户语言回复。
"""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Here is the initial question: \n\n {question} \n Formulate an improved question")
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

In [None]:
# langGraph 构造 Agent

from typing import List
from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    Represents the state of our graph.
    
    Attributes；
        question: question
        generation: LLM generation
        documents: list of documents
    """
    question: str
    generation: str
    documents: List[str]


def retrieve(state):
    question = state["question"]
    documents = retriever.get_relevant_documents(question)
    return {"documents": documents, "question": question}


def generate(state):
    question = state["question"]
    documents = state["documents"]
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


# 判断检索到的文档是否和问题相关
def grade_document(state):
    question = state["question"]
    documents = state["documents"]
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            print("文档和用户问题相关")
            filtered_docs.append(d)
        else:
            print("文档和用户问题不相关")
            continue
    return {"documents": filtered_docs, "question": question}


def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """
    print("改写问题\n")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    print("LLM 改写优化后更好的问题:", better_question)
    return {"documents": documents, "question": better_question}


# Edge
def decide_to_generate(state):
    print("访问检索到的相关知识库\n")
    filtered_documents = state["documents"]
    if not filtered_documents:
        print("所有的文档都不相关，重写问题")
        return "transform_query"
    else:
        print("文档跟问题相关，生成回答")
        return "generate"


# 评估生成的回复是否基于知识库
def grade_generation_v_documents_and_question(state):
    print("评估生成的回答是否基于知识库事实(是否产生了幻觉)")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({
        "documents": documents, "generation": generation
    })
    grade = score.binary_score
    if grade == "yes":
        print("生成的回复基于知识库，没有幻觉\n")
        # 评估 LLM 的回答是否解决了用户问题
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("问题得到解决\n")
            return "useful"
        else:
            print("问题没有得到解决\n")
            return "not useful"
    else:
        print("生成的回复不是基于知识库，继续重试...\n")
        return "not supported"

In [None]:
from langgraph.graph import END, StateGraph, START
from IPython.display import Image, display

workflow = StateGraph(GraphState)

workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_document)
workflow.add_node("generate", generate)
workflow.add_node("transform_query", transform_query)

# Graph
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    }
)

workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    }
)
app = workflow.compile()
display(Image(app.get_graph().draw_mermaid_png()))

In [63]:
from pprint import pprint

inputs = {"question": "谁管理的服务最多"}

for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Node '{key}': ")

    pprint("\n--\n")

pprint(value["generation"])       

"Node 'retrieve': "
'\n--\n'
文档和用户问题相关
文档和用户问题不相关
文档和用户问题不相关
文档和用户问题不相关
文档跟问题相关，生成回答
"Node 'grade_documents': "
'\n--\n'
评估生成的回答是否基于知识库事实(是否产生了幻觉)
生成的回复基于知识库，没有幻觉
问题得到解决

"Node 'generate': "
'\n--\n'
'小王管理的服务最多，共管理两个系统：payment 和 payment-1。其他人管理的服务数量较少。小李和小张各自只管理一个系统。'
