# 从头构建一个 RAG 系统，适用于中文

原始教程是对一段英文材料做 RAG Pipeline。使用的方法和工具链并不完全适用于中文 RAG。根据原教程提供的整体思路，这里从头实现了一个中文 RAG 系统，并将其封装成了几乎满足生成环境的类。

## 整体架构及工具链

1. 使用 `lease.txt` 的中文翻译 `lease-zh.txt` 作为语料。

2. 使用 GTE Embedding 的 `Autotokenizer` 功能实现精确地 chunk 分割。
3. 使用 `thenlper/gte-large-zh` 实现嵌入（embedding）。
4. 使用 lancedb 作为矢量存储。
5. `create_fts_index` 和 `create_index` 来加速全文搜索和矢量搜索。
6. 使用 lancedb 的 `search` 实现全文搜索和矢量搜索（retriever）。
7. 使用 `BAAI/bge-reranker-base` 实现重排（Rerank）。
8. 使用火山引擎部署的 DeepSeek V1 实现答案生成（Answer Generator）。
9. 使用 TEI 部署 `gte-large-zh` 和 `bge-reranker-base`。


In [6]:
from typing import List, Dict, Optional
from transformers import AutoTokenizer
import lancedb
import requests
import uuid
from tenacity import retry, stop_after_attempt, wait_exponential
from lancedb.pydantic import LanceModel, Vector


@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def tei_embed(texts: List[str], url: str) -> List[List[float]]:
    """调用 TEI 服务获取文本嵌入向量,添加重试机制"""
    response = requests.post(url, json={"inputs": texts}, timeout=10)
    response.raise_for_status()
    return response.json()


@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def tei_rerank(query: str, passages: List[str], url: str) -> List[Dict]:
    """调用 TEI 服务对文本进行重排序,添加重试机制"""
    response = requests.post(url, json={"query": query, "texts": passages}, timeout=10)
    response.raise_for_status()
    return [r["score"] for r in response.json()]


class Document(LanceModel):
    doc_id: str
    chunk_id: str
    text: str
    vector: Vector(1024)  # type: ignore
    # metadata: dict


class GTEChunkedRAGPipeline:
    """使用 GTE 分块的 RAG Pipeline,调用 TEI 服务进行 embedding 和 rerank"""

    def __init__(
        self,
        embedding_model: str = "http://localhost:8081/embed",
        reranker_model: str = "http://localhost:8082/rerank",
        lancedb_uri: str = "./rag_lancedb",
        table_name: str = "documents",
        chunk_token_limit: int = 256,
        chunk_overlap: int = 50,
        max_batch_tokens: int = 8192,
    ):
        """
        初始化 RAG Pipeline

        Args:
            embedding_model: embedding 服务地址
            reranker_model: reranker 服务地址
            lancedb_uri: lancedb 数据库地址
            table_name: 表名
            chunk_token_limit: 每个文本块的最大 token 数
            chunk_overlap: 文本块之间的重叠 token 数
            max_batch_tokens: 批处理的最大 token 数
        """
        self.embedding_url = embedding_model
        self.reranker_url = reranker_model
        self.tokenizer = AutoTokenizer.from_pretrained("thenlper/gte-large-zh")

        self.db = lancedb.connect(lancedb_uri)
        self.table = (
            self.db.open_table(table_name)
            if table_name in self.db.table_names()
            else self.db.create_table(table_name, schema=Document)
        )

        self.chunk_token_limit = chunk_token_limit
        self.chunk_overlap = chunk_overlap
        self.max_batch_tokens = max_batch_tokens

    def chunk_text(self, text: str) -> List[str]:
        """将文本分割成块"""
        tokens = self.tokenizer.encode(text, add_special_tokens=False)
        chunks = []
        start = 0

        while start < len(tokens):
            end = start + self.chunk_token_limit
            chunk_tokens = tokens[start:end]
            chunk_text = self.tokenizer.decode(chunk_tokens, skip_special_tokens=True)
            chunks.append(chunk_text.strip())  # 去除首尾空白

            if end >= len(tokens):
                break
            start += self.chunk_token_limit - self.chunk_overlap

        return [c for c in chunks if c]  # 过滤空字符串

    def embed_chunks(self, chunks: List[str]) -> List[List[float]]:
        """对文本块进行向量化"""
        if not chunks:
            return []

        # 分批控制总 token 数量
        batches = []
        batch, token_count = [], 0

        for chunk in chunks:
            tokens = self.tokenizer.encode(chunk, add_special_tokens=False)
            if token_count + len(tokens) > self.max_batch_tokens and batch:
                batches.append(batch)
                batch, token_count = [], 0
            batch.append(chunk)
            token_count += len(tokens)

        if batch:
            batches.append(batch)

        embeddings = []
        for batch in batches:
            batch_embeddings = tei_embed(batch, self.embedding_url)
            embeddings.extend(batch_embeddings)

        return embeddings

    def add_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None):
        """添加文本到数据库"""
        if not texts:
            return

        metadatas = metadatas or [{}] * len(texts)
        to_insert = []

        for i, (text, metadata) in enumerate(zip(texts, metadatas)):
            doc_id = str(uuid.uuid4())
            chunks = self.chunk_text(text)
            embeddings = self.embed_chunks(chunks)

            for idx, (chunk, emb) in enumerate(zip(chunks, embeddings)):
                to_insert.append(
                    {
                        "doc_id": doc_id,
                        "chunk_id": f"{doc_id}_{idx}",
                        "text": chunk,
                        "vector": emb,
                        # **metadata,
                    }
                )

        if to_insert:
            self.table.add(to_insert)

    def search(self, query: str, top_k: int = 5) -> List[Dict]:
        """搜索相似文本"""
        # 获取查询向量
        query_embedding = tei_embed([query], self.embedding_url)[0]

        # 向量搜索
        results = self.table.search(query_embedding).limit(top_k * 3).to_pandas()
        if results.empty:
            return []

        # 重排序
        texts = results["text"].tolist()
        rerank_scores = tei_rerank(query, texts, self.reranker_url)

        # 合并结果并排序
        sorted_results = sorted(
            zip(results.to_dict("records"), rerank_scores),
            key=lambda x: x[1]["score"] if isinstance(x[1], dict) else x[1],
            reverse=True,
        )

        return [
            {
                "text": r["text"],
                "score": s["score"] if isinstance(s, dict) else s,
                "doc_id": r.get("doc_id"),
                "chunk_id": r.get("chunk_id"),
            }
            for r, s in sorted_results[:top_k]
        ]


# 创建 RAG 实例
rag = GTEChunkedRAGPipeline()

# 读取租赁合同文本
with open("lease-zh.txt", "r", encoding="utf-8") as f:
    lease_text = f.read()

# 添加文档到向量数据库
rag.add_texts([lease_text], [{"source": "lease-zh.txt"}])

# 提问
query = "合同是什么时候签署的？"
results = rag.search(query)

print(f"问题: {query}\n")
print("相关内容:")
for r in results:
    print(f"- {r['text']}\n  (相关度: {r['score']:.3f})")

问题: 合同是什么时候签署的？

相关内容:
- 年 12 月 1 日 。 第 三 条 ： 续 期 本 协 议 双 方 可 选 择 续 签 ， 并 就 续 签 的 条 款 和 条 件 另 行 书 面 达 成 一 致 并 签 署 文 件 。 第 四 条 ： 租 金 确 定 第 1 节 ： 月 租 金 承 租 方 同 意 在 租 赁 期 内 按 月 向 出 租 方 支 付 租 金 ， 每 月 金 额 为 40, 000 美 元 ， 付 款 地 点 由 出 租 方 书 面 通 知 确 定 。 第 2 节 ： 逾 期 费 用 若 月 租 金 未 在 每 月 第 十 日 之 前 （ 含 ） 邮 寄 或 被 出 租 方 收 到 ， 将 收 取 5 % 的 滞 纳 金 。 第 五 条 ： 保 证 金 承 租 方 已 向 出 租 方 支 付 20, 000 美 元 作 为 履 行 本 协 议 条 款 的 保 证 金 。 如 承 租 方 在 租 期 满 后 完 全 履 行 其 义 务 ， 则 该 金 额 将 全 额 退 还 。 若 租 赁 物 业 被 真 实 出 售 ， 出 租 方 有 权 将 该 保 证 金 转 交 买 方 ， 并 解 除 其 归 还 义 务 。 第 六 条 ： 税
  (相关度: 0.230)
- 年 12 月 1 日 。 第 三 条 ： 续 期 本 协 议 双 方 可 选 择 续 签 ， 并 就 续 签 的 条 款 和 条 件 另 行 书 面 达 成 一 致 并 签 署 文 件 。 第 四 条 ： 租 金 确 定 第 1 节 ： 月 租 金 承 租 方 同 意 在 租 赁 期 内 按 月 向 出 租 方 支 付 租 金 ， 每 月 金 额 为 40, 000 美 元 ， 付 款 地 点 由 出 租 方 书 面 通 知 确 定 。 第 2 节 ： 逾 期 费 用 若 月 租 金 未 在 每 月 第 十 日 之 前 （ 含 ） 邮 寄 或 被 出 租 方 收 到 ， 将 收 取 5 % 的 滞 纳 金 。 第 五 条 ： 保 证 金 承 租 方 已 向 出 租 方 支 付 20, 000 美 元 作 为 履 行 本 协 议 条 款 的 保 证 金 。 如 承 租 方 在 租 期 满 后 完 全 履 行 其 义 务 ， 则 该 金 额 将 全 额 退 还 。 若 租 

In [7]:
%env ARK_API_KEY=d04594ba-8fd8-4849-9e71-8734e4bf45f3

import os
from openai import OpenAI

# gets API Key from environment variable OPENAI_API_KEY
client = OpenAI(
    api_key=os.environ.get("ARK_API_KEY"),
    base_url="https://ark.cn-beijing.volces.com/api/v3",
)

# Context Prompt

base_prompt = """You are an AI assistant. Your task is to understand the user question, and provide an answer using the provided contexts. Every answer you generate should have citations in this pattern  "Answer [position].", for example: "Earth is round [1][2].," if it's relevant.

Your answers are correct, high-quality, and written by an domain expert. If the provided context does not contain the answer, simply state, "The provided context does not have the answer."

User question: {}

Contexts:
{}
"""

# Your prompt
prompt = f"{base_prompt.format(query, [r['text'] for r in results])}"

# 替换成 Doubao
# response = openai.ChatCompletion.create(
#     model="gpt-4o",
#     temperature=0,
#     messages=[
#         {"role": "system", "content": "You are a helpful assistant."},
#         {"role": "user", "content": prompt},
#     ],
# )

response = client.chat.completions.create(
    model="deepseek-v3-250324",
    temperature=0,
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": prompt},
    ],
)

print(response.choices[0].message.content)

env: ARK_API_KEY=d04594ba-8fd8-4849-9e71-8734e4bf45f3


ImportError: cannot import name 'OpenAI' from 'openai' (/home/rlee/.miniconda3/envs/vdb/lib/python3.11/site-packages/openai/__init__.py)