# 制作一个RAG应用-1

[build a RAG app | part 1](https://python.langchain.com/docs/tutorials/rag/)

**简介：**<br>
**本节介绍了以下内容：**
- 逐步骤得介绍了一个简单RAG应用的搭建方法。该应用是针对网页内容进行检索。且仅支持单轮对话、单轮检索
- 本节利用 Chains 实现，其中指出 `retrieve` 在本节中是流程的一部分，而不是作为一个 “工具”。Chains链为: `_start_ --> retrieve --> generate` 
- 在本节最后提出增加一个 `query_analysis` 对用户输入问题进行重写（/提纯/增强...），使得检索能够更加高效。即使得 `_start_ --> query_analysis --> retrieve --> generate`

> 这里只关注对于非结构数据的QA，结构数据的QA参考 [Build a QA system over sql data](https://python.langchain.com/docs/tutorials/sql_qa/)

### 一个经典得RAG应用通常包含以下两个部分：

#### 1. Inxdexing
从数据源中获取数据并index。这通常在离线情况下发生。又包含以下步骤：
1. Load： 使用document loader加载数据
2. spkit：使用text spliter将文本分为更小的chunk，以便于index和输入到模型中。（模型输入往往有最大token限制）
3. store：需要一些地方存储和index我们的chunks。这通常使用VectorStore和Embeding模型完成

#### 2. Retrieval and generation
实际的 RAG 链在运行时接受用户查询，并从索引中检索相关数据，然后将其传递给模型。<br>
1. Retrieve: 给定用户输入，使用 `Retriever` 从storage中检索出与用户数输入有关得片段
2. Generate: 使用一个包含用户输入与检索数据得prompt从CahtModel或LLM中得到回答

## 加载重要组件

语言模型、嵌入模型、向量存储库<br>
这里语言模型和嵌入模型都选的Gemini模型，向量数据库使用得是FAISS

In [2]:
import os
from langchain.chat_models import init_chat_model

os.environ.get('GOOGLE_API_KEY')
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:7890'
os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'

llm = init_chat_model("gemini-2.5-flash", model_provider='google-genai')

In [4]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings

embeddings = GoogleGenerativeAIEmbeddings(model='models/gemini-embedding-001')

文本 --> 向量 --> FAISS索引 --> 检索 --> 文档

In [5]:
import faiss
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS

embedding_dim = len(embeddings.embed_query('hellow, world!'))
index = faiss.IndexFlatL2(embedding_dim)  # 创建FAISS索引，用于存储和检索向量，L2表示用欧几里得距离来衡量向量相似度

# 用FAISS封装类，把FAISS检索和文档存储结合起来
vector_store = FAISS(
    embedding_function=embeddings,
    index=index,
    docstore=InMemoryDocstore(), # 用于存储原始文档内容。由于FAISS只存向量不存文本，因此需要一个docstore来做向量-文本得映射
    index_to_docstore_id={}  # 记录FAISS索引位置->文档ID得映射关系
)

## Preview

In [None]:
import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict

loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs=dict(
        parse_only = bs4.SoupStrainer(
            class_=('post-content', 'post-title', 'post-header')  # 只保留网页的title、headers、content
        )
    ),
)

docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)

_ = vector_store.add_documents(documents=all_splits)

prompt = hub.pull('rlm/rag-prompt')

class State(TypedDict):
    question: str
    context: List[Document]
    answer: str

def retrieve(state: State):
    retrieved_docs = vector_store.similarity_search(state['question'])
    return {'context': retrieved_docs}

def generate(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state['context'])
    messages = prompt.invoke({'question': state['question'], 'context': docs_content})
    response = llm.invoke(messages)
    return {'answer': response.content}

graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, 'retrieve')
graph = graph_builder.compile()



In [7]:
response = graph.invoke({'question': 'what is task decomposition?'})
print(response['answer'])

Task decomposition is a technique used to break down complex tasks into smaller, simpler, and more manageable steps. This process enhances model performance on difficult tasks by allowing models to "think step by step." It transforms large problems into multiple, manageable sub-tasks.


## 1. Indexing

包括document_loaders, embedings, and vector stores

### 1）Loading documents

DocumenLoaders: 用于加载博客中的内容。返回的是Documents对象的列表

本示例中使用`WebBaseLoader`从指定url中加载HTML，然后用`BeautifulSoup`把它解析成文本。同时使用`bs_kwargs`筛选想要的内容

In [9]:
import bs4
from langchain_community.document_loaders import WebBaseLoader

bs4_strainer =bs4.SoupStrainer(class_=('post-title', 'post-header', 'post-content'))
loader = WebBaseLoader(
    web_paths=('https://lilianweng.github.io/posts/2023-06-23-agent/', ), 
    bs_kwargs={'parse_only': bs4_strainer}
)
docs = loader.load()

assert len(docs) == 1
print(f'total characters: {len(docs[0].page_content)}')

total characters: 43047


### 2）spliting documents

将整个文档分割为多个`chunk`以便嵌入和存储。在检索时返回最相关的片段。

这里使用`RecursiveCharacterTextSplitter`，它使用一些常用的分隔符（如转行...）来递归的分割文本，直到每个chunk的大小合适为止。<br>
这是针对一般文本使用情况推荐的文本分割器。

In [10]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,  # chunk size (characters)
    chunk_overlap=200,  # chunk overlap (characters)
    add_start_index=True,  # track index in original document
)
all_splits = text_splitter.split_documents(docs)

print(f"Split blog post into {len(all_splits)} sub-documents.")

Split blog post into 63 sub-documents.


### 3）storing documents

In [11]:
document_ids = vector_store.add_documents(documents=all_splits)

print(document_ids[:3])

['093e71b9-ed06-405f-b25d-a4b635a2d79e', '8eefee29-e61d-40f8-9c0f-f0441e437a79', '29917a41-3057-46cf-a575-7b0311983a94']


## 2. Retrieval and Generation


### 提示词模板
这里我们使用RAG的常用prompt模板，该模板被收录进[LangChain prompt hub](https://smith.langchain.com/hub/rlm/rag-prompt?_gl=1*17aaaip*_gcl_au*MTc2MjgwMjUyNy4xNzU4NjE1MzM4*_ga*NjA0MzU1NzU3LjE3NTg2MTUzMzk.*_ga_47WX3HKKY2*czE3NTg5NTgyNDIkbzE1JGcxJHQxNzU4OTU5MTExJGoxMCRsMCRoMA..&organizationId=0ff31a47-e168-4da9-9677-0b214e215a1a)


In [13]:
from langchain import hub

prompt = hub.pull('rlm/rag-prompt')

example_messages = prompt.invoke(
    {'context': '(context goes here)', 'question': '(questions goes here)'}
).to_messages()

assert len(example_messages) == 1
print(example_messages[0].content)

You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
Question: (questions goes here) 
Context: (context goes here) 
Answer:


### 利用LangGraph实现
接下来使用LangGraph将检索和生成步骤整合到一个应用中。

> 其实不一定非要使用LangGraph，但使用LangGraph的优点很多（其它方法参考官网）

使用LangGrpah，需要定义：
1. 应用的state
2. 应用的nodes
3. 应用的控制流

#### 1）State
state用于控制输入到应用中的输入是什么样的，在steps中传输和应用的输出。<br>
它通常是 `TypeDict` 或 `Pydantic BaseModel`

In [14]:
from langchain_core.documents import Document
from typing_extensions import List, TypedDict

class State(TypedDict):
    question: str
    context: List[Document]
    answer: str

#### 2）Nodes（application steps）

In [15]:
def retrieve(state: State):
    retrieved_docs = vector_store.similarity_search(state["question"])
    return {"context": retrieved_docs}


def generate(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}

#### 3）Control flow

`_start_ --> retrieve --> generate`

In [16]:
from langgraph.graph import START, StateGraph

graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()

In [None]:
result = graph.invoke({"question": "What is Task Decomposition?"})

print(f"Answer: {result['answer']}")

Task decomposition is the process of breaking down a complex or hard task into smaller, simpler, and more manageable steps or subgoals. This can be achieved by Large Language Models (LLMs) using simple prompts or task-specific instructions, as| well as through human inputs. Techniques like Chain of Thought (CoT) instruct models to "think step by step" to transform big tasks into multiple manageable ones.|

#### 不使用LangGraph实现

In [None]:
question = "..."

retrieved_docs = vector_store.similarity_search(question)
docs_content = "\n\n".join(doc.page_content for doc in retrieved_docs)
prompt = prompt.invoke({"question": question, "context": docs_content})
answer = llm.invoke(prompt)

## 扩展

目前，我们执行查询使用的是原始的用户输入。但用户输入可能包含不相关信息或过于低效等等。为此，我们希望能够先对用户输入进行重写，得到查询。<br>
为上述结构增加一个流程——问句查询`query analysis`<br>
使得流程为 ``_start_ --> query analysis --> retrieve --> generate``

query analysis利用模型从用户输入中构建一个优化的搜索查询。为了达到这个目的，首先为每个splits增加一些metadata

In [20]:
total_documents = len(all_splits)
third = total_documents // 3

for i, document in enumerate(all_splits):
    if i < third:
        document.metadata["section"] = "beginning"
    elif i < 2 * third:
        document.metadata["section"] = "middle"
    else:
        document.metadata["section"] = "end"

all_splits[0].metadata

{'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/',
 'start_index': 8,
 'section': 'beginning'}

由于更新了split，相应的，要更新向量库

In [21]:
from langchain_core.vectorstores import InMemoryVectorStore

vector_store = InMemoryVectorStore(embeddings)
_ = vector_store.add_documents(all_splits)

接下来为我们的搜索查询定义一个模式，为此，将使用结构化输出

In [22]:
from typing import Literal
from typing_extensions import Annotated

class Search(TypedDict):
    query: Annotated[str, ..., 'search query to run']
    section: Annotated[Literal['beginning', 'middle', 'end'], ..., 'section to query']

设置控制流

In [23]:
class State(TypedDict):
    question: str
    query: Search
    context: List[Document]
    answer: str


def analyze_query(state: State):
    structured_llm = llm.with_structured_output(Search)
    query = structured_llm.invoke(state["question"])
    return {"query": query}


def retrieve(state: State):
    query = state["query"]
    retrieved_docs = vector_store.similarity_search(
        query["query"],
        filter=lambda doc: doc.metadata.get("section") == query["section"],
    )
    return {"context": retrieved_docs}


def generate(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}


graph_builder = StateGraph(State).add_sequence([analyze_query, retrieve, generate])
graph_builder.add_edge(START, "analyze_query")
graph = graph_builder.compile()

In [24]:
for step in graph.stream(
    {"question": "What does the end of the post say about Task Decomposition?"},
    stream_mode="updates",
):
    print(f"{step}\n\n----------------\n")

{'analyze_query': {'query': {'section': 'end', 'query': 'Task Decomposition'}}}

----------------

{'retrieve': {'context': [Document(id='07c70bd7-5b59-4c27-a867-fde8bf34fc9c', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/', 'start_index': 38621, 'section': 'end'}, page_content='are imported by that file, and so on.\\nFollow a language and framework appropriate best practice file naming convention.\\nMake sure that files contain all imports, types etc. The code should be fully functional. Make sure that code in different files are compatible with each other.\\nBefore you finish, double check that all parts of the architecture is present in the files.\\n"'), Document(id='433da80f-863e-438e-ab9b-548a8d0933aa', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/', 'start_index': 34990, 'section': 'end'}, page_content='Conversatin samples:\n[\n  {\n    "role": "system",'), Document(id='f5cbf61f-516a-4670-a0bd-dbfa374f4f2b', metadata={'sourc