### 选择聊天模型

In [None]:
# 导入必要的库
import getpass
import os

# 检查并设置 OpenAI API 密钥
# 如果环境变量中没有设置 API 密钥，则提示用户输入
if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

# 导入 LangChain 的聊天模型初始化函数
from langchain.chat_models import init_chat_model

# 初始化 GPT-4o-mini 聊天模型
# 这个模型将用于生成 RAG 系统的最终答案
llm = init_chat_model("gpt-4o-mini", model_provider="openai")



### 选择嵌入模型

In [None]:
# 再次导入必要的库（如果在新的 cell 中运行）
import getpass
import os

# 确保 OpenAI API 密钥已设置
if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

# 导入 OpenAI 嵌入模型
from langchain_openai import OpenAIEmbeddings

# 初始化嵌入模型
# text-embedding-3-large 是 OpenAI 的高质量嵌入模型，用于将文本转换为向量表示
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

### 向量存储

In [None]:
# 导入内存向量存储
from langchain_core.vectorstores import InMemoryVectorStore

# 创建内存向量存储实例
# InMemoryVectorStore 将文档的向量表示存储在内存中，便于快速检索
# 传入之前创建的嵌入模型，用于将文档转换为向量
vector_store = InMemoryVectorStore(embeddings)

### simple indexing pipeline and RAG

In [None]:
# 导入所需的库
import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict

# 加载并分块博客内容
# 使用 WebBaseLoader 从指定 URL 加载网页内容
loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs=dict(
        # 使用 BeautifulSoup 只解析特定的 CSS 类，提高加载效率
        parse_only=bs4.SoupStrainer(
            class_=("post-content", "post-title", "post-header")
        )
    ),
)
docs = loader.load()

# 创建文本分割器
# chunk_size=1000: 每个文档块的最大字符数
# chunk_overlap=200: 相邻块之间的重叠字符数，确保上下文连续性
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)

# 将文档块索引到向量存储中
# 这一步将文档转换为向量并存储，用于后续的相似性搜索
_ = vector_store.add_documents(documents=all_splits)

# 定义问答提示模板
# 从 LangChain Hub 拉取预定义的 RAG 提示模板
# 注意：对于非美国的 LangSmith 端点，可能需要指定 api_url
prompt = hub.pull("rlm/rag-prompt")


# 定义应用程序的状态结构
# 使用 TypedDict 确保状态字段的类型安全
class State(TypedDict):
    question: str  # 用户提出的问题
    context: List[Document]  # 检索到的相关文档
    answer: str  # 生成的答案


# 定义应用程序步骤
# 检索函数：根据问题从向量存储中检索相关文档
def retrieve(state: State):
    retrieved_docs = vector_store.similarity_search(state["question"])
    return {"context": retrieved_docs}


# 生成函数：基于检索到的上下文生成答案
def generate(state: State):
    # 将所有检索到的文档内容合并为一个字符串
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    # 使用提示模板构建消息，包含问题和上下文
    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    # 调用 LLM 生成答案
    response = llm.invoke(messages)
    return {"answer": response.content}


# 编译应用程序并测试
# 创建状态图，按顺序执行检索和生成步骤
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()

In [None]:
# 测试 RAG 系统
# 调用图执行器，传入问题并获取答案
response = graph.invoke({"question": "What is Task Decomposition?"})
print(response["answer"])

### LangGraph 还提供了内置工具，用于可视化应用程序的控制流程：

In [None]:
# 可视化应用程序的控制流程
from IPython.display import Image, display

# 生成并显示 Mermaid 流程图，展示 RAG 系统的执行流程
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
# 详细查看 RAG 系统的输出
result = graph.invoke({"question": "What is Task Decomposition?"})

# 打印检索到的上下文和生成的答案
print(f"Context: {result['context']}\n\n")
print(f"Answer: {result['answer']}")

In [None]:
# 流式输出 RAG 系统的执行过程
# stream_mode="updates" 显示每个步骤的状态更新
for step in graph.stream(
    {"question": "What is Task Decomposition?"}, stream_mode="updates"
):
    print(f"{step}\n\n----------------\n")

In [None]:
for step in graph.stream(
    {"question": "What is Task Decomposition?"}, stream_mode="updates"
):
    print(f"{step}\n\n----------------\n")

In [None]:
# 创建自定义的 RAG 提示模板
from langchain_core.prompts import PromptTemplate

# 定义自定义提示模板，包含具体的回答要求
template = """Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Use three sentences maximum and keep the answer as concise as possible.
Always say "thanks for asking!" at the end of the answer.

{context}

Question: {question}

Helpful Answer:"""
# 从模板字符串创建 PromptTemplate 对象
custom_rag_prompt = PromptTemplate.from_template(template)

In [None]:
# 为文档添加元数据标签
# 将文档分为三个部分：开头、中间、结尾
total_documents = len(all_splits)
third = total_documents // 3

# 遍历所有文档块，为每个块添加 section 元数据
for i, document in enumerate(all_splits):
    if i < third:
        document.metadata["section"] = "beginning"
    elif i < 2 * third:
        document.metadata["section"] = "middle"
    else:
        document.metadata["section"] = "end"


# 查看第一个文档的元数据
all_splits[0].metadata

In [None]:
# 重新创建向量存储并添加带有元数据的文档
from langchain_core.vectorstores import InMemoryVectorStore

# 创建新的向量存储实例
vector_store = InMemoryVectorStore(embeddings)
# 将带有 section 元数据的文档添加到向量存储中
_ = vector_store.add_documents(all_splits)

In [None]:
# 定义结构化搜索查询类型
from typing import Literal

from typing_extensions import Annotated


# 定义搜索查询的结构化输出格式
# 这将用于让 LLM 生成结构化的搜索参数
class Search(TypedDict):
    """Search query."""

    query: Annotated[str, ..., "Search query to run."]  # 搜索查询字符串
    section: Annotated[  # 要搜索的文档部分
        Literal["beginning", "middle", "end"],
        ...,
        "Section to query.",
    ]

In [None]:
# 定义增强的状态结构，包含查询分析步骤
class State(TypedDict):
    question: str  # 用户问题
    query: Search  # 结构化的搜索查询
    context: List[Document]  # 检索到的文档
    answer: str  # 生成的答案


# 查询分析函数：将自然语言问题转换为结构化搜索查询
def analyze_query(state: State):
    # 使用结构化输出让 LLM 生成 Search 类型的查询
    structured_llm = llm.with_structured_output(Search)
    query = structured_llm.invoke(state["question"])
    return {"query": query}


# 增强的检索函数：根据结构化查询和元数据过滤进行检索
def retrieve(state: State):
    query = state["query"]
    # 使用相似性搜索，并根据 section 元数据进行过滤
    retrieved_docs = vector_store.similarity_search(
        query["query"],
        filter=lambda doc: doc.metadata.get("section") == query["section"],
    )
    return {"context": retrieved_docs}


# 生成函数保持不变
def generate(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}


# 构建增强的 RAG 图：查询分析 -> 检索 -> 生成
graph_builder = StateGraph(State).add_sequence([analyze_query, retrieve, generate])
graph_builder.add_edge(START, "analyze_query")
graph = graph_builder.compile()

In [None]:
# 可视化增强的 RAG 系统流程图
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
# 测试增强的 RAG 系统
# 这个问题会触发 LLM 分析并选择搜索文档的 "end" 部分
for step in graph.stream(
    {"question": "What does the end of the post say about Task Decomposition?"},
    stream_mode="updates",
):
    print(f"{step}\n\n----------------\n")