[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/NirDiamant/RAG_Techniques/blob/main/all_rag_techniques/crag.ipynb)

# 纠正性 RAG 流程：具有动态纠正功能的检索增强生成

## 概述

纠正性 RAG（检索增强生成）流程是一种先进的信息检索和响应生成系统。它通过动态评估和纠正检索过程来扩展标准 RAG 方法，结合了向量数据库、网络搜索和语言模型的强大功能，为用户查询提供准确且具有上下文感知能力的响应。

## 动机

虽然传统的 RAG 系统改进了信息检索和响应生成，但当检索到的信息不相关或过时时，它们仍然可能存在不足。纠正性 RAG 流程通过以下方式解决了这些限制：

1. 利用预先存在的知识库
2. 评估检索信息的相关性
3. 必要时动态搜索网络
4. 提炼和组合来自多个来源的知识
5. 基于最合适的知识生成类似人类的响应

## 关键组件

1. **FAISS 索引**：用于对预先存在的知识进行高效相似性搜索的向量数据库。
2. **检索评估器**：评估检索到的文档与查询的相关性。
3. **知识提炼**：必要时从文档中提取关键信息。
4. **网络搜索查询重写器**：当本地知识不足时，优化网络搜索的查询。
5. **响应生成器**：基于累积的知识创建类似人类的响应。

## 方法详情

1. **文档检索**：
   - 在 FAISS 索引中执行相似性搜索以查找相关文档。
   - 检索前 k 个文档（默认为 k=3）。

2. **文档评估**：
   - 计算每个检索到的文档的相关性得分。
   - 根据最高相关性得分确定最佳行动方案。

3. **纠正性知识获取**：
   - 如果相关性高（得分 > 0.7）：按原样使用最相关的文档。
   - 如果相关性低（得分 < 0.3）：通过使用重写的查询执行网络搜索进行纠正。
   - 如果模棱两可（0.3 ≤ 得分 ≤ 0.7）：通过将最相关的文档与网络搜索结果相结合进行纠正。

4. **自适应知识处理**：
   - 对于网络搜索结果：提炼知识以提取要点。
   - 对于模棱两可的情况：将原始文档内容与提炼后的网络搜索结果相结合。

5. **响应生成**：
   - 使用语言模型根据查询和获取的知识生成类似人类的响应。
   - 在响应中包含源信息以实现透明度。

## 纠正性 RAG 方法的优点

1. **动态纠正**：适应检索信息的质量，确保相关性和准确性。
2. **灵活性**：根据需要利用预先存在的知识和网络搜索。
3. **准确性**：在使用信息之前评估其相关性，确保高质量的响应。
4. **透明度**：提供源信息，允许用户验证信息的来源。
5. **效率**：使用向量搜索从大型知识库中快速检索。
6. **上下文理解**：必要时组合多个信息源以提供全面的响应。
7. **最新信息**：可以用当前的网络信息补充或替换过时的本地知识。

## 结论

纠正性 RAG 流程代表了标准 RAG 方法的复杂演变。通过智能地评估和纠正检索过程，它克服了传统 RAG 系统的常见限制。这种动态方法确保响应基于最相关和最新的可用信息，无论是来自本地知识库还是网络。该系统能够根据相关性得分调整其信息来源策略，使其特别适用于需要高精度和当前信息的应用，例如研究辅助、动态知识库和高级问答系统。

<div style="text-align: center;">

<img src="../images/crag.svg" alt="Corrective RAG" style="width:80%; height:auto;">
</div>

# 包安装和导入

下面的单元格安装了运行此笔记本所需的所有必要软件包。


In [None]:
# 安装所需的包
!pip install langchain langchain-openai python-dotenv

In [None]:
# 克隆存储库以访问辅助函数和评估模块
!git clone https://github.com/NirDiamant/RAG_TECHNIQUES.git
import sys
sys.path.append('RAG_TECHNIQUES')
# 如果您需要使用最新数据运行
# !cp -r RAG_TECHNIQUES/data .

In [15]:
import os
import sys
from dotenv import load_dotenv
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.pydantic_v1 import BaseModel, Field


# 为 Colab 兼容性替换了原始路径附加
from helper_functions import *
from evaluation.evalute_rag import *

# 从 .env 文件加载环境变量
load_dotenv()

# 设置 OpenAI API 密钥环境变量
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
from langchain.tools import DuckDuckGoSearchResults


### 定义文件路径

In [None]:
# 下载所需的数据文件
import os
os.makedirs('data', exist_ok=True)

# 下载本笔记本中使用的 PDF 文档
!wget -O data/Understanding_Climate_Change.pdf https://raw.githubusercontent.com/NirDiamant/RAG_TECHNIQUES/main/data/Understanding_Climate_Change.pdf
!wget -O data/Understanding_Climate_Change.pdf https://raw.githubusercontent.com/NirDiamant/RAG_TECHNIQUES/main/data/Understanding_Climate_Change.pdf


In [2]:
path = "data/Understanding_Climate_Change.pdf"

### 创建向量存储

In [3]:
vectorstore = encode_pdf(path)

### 初始化 OpenAI 语言模型


In [4]:
llm = ChatOpenAI(model="gpt-4o-mini", max_tokens=1000, temperature=0)

### 初始化搜索工具

In [16]:
search = DuckDuckGoSearchResults()

### 定义检索评估器、知识提炼和查询重写器的 LLM 链

In [7]:
# 检索评估器
class RetrievalEvaluatorInput(BaseModel):
    relevance_score: float = Field(..., description="文档与查询的相关性得分。得分应该在 0 和 1 之间。")
def retrieval_evaluator(query: str, document: str) -> float:
    prompt = PromptTemplate(
        input_variables=["query", "document"],
        template="On a scale from 0 to 1, how relevant is the following document to the query? Query: {query}\nDocument: {document}\nRelevance score:"
    )
    chain = prompt | llm.with_structured_output(RetrievalEvaluatorInput)
    input_variables = {"query": query, "document": document}
    result = chain.invoke(input_variables).relevance_score
    return result

# 知识提炼
class KnowledgeRefinementInput(BaseModel):
    key_points: str = Field(..., description="要从中提取关键信息的文档。")
def knowledge_refinement(document: str) -> List[str]:
    prompt = PromptTemplate(
        input_variables=["document"],
        template="Extract the key information from the following document in bullet points:\n{document}\nKey points:"
    )
    chain = prompt | llm.with_structured_output(KnowledgeRefinementInput)
    input_variables = {"document": document}
    result = chain.invoke(input_variables).key_points
    return [point.strip() for point in result.split('\n') if point.strip()]

# 网络搜索查询重写器
class QueryRewriterInput(BaseModel):
    query: str = Field(..., description="要重写的查询。")
def rewrite_query(query: str) -> str:
    prompt = PromptTemplate(
        input_variables=["query"],
        template="Rewrite the following query to make it more suitable for a web search:\n{query}\nRewritten query:"
    )
    chain = prompt | llm.with_structured_output(QueryRewriterInput)
    input_variables = {"query": query}
    return chain.invoke(input_variables).query.strip()

### 解析搜索结果的辅助函数


In [22]:
def parse_search_results(results_string: str) -> List[Tuple[str, str]]:
    """
    将搜索结果的 JSON 字符串解析为标题-链接元组的列表。

    参数：
        results_string (str): 包含搜索结果的 JSON 格式字符串。

    返回：
        List[Tuple[str, str]]: 元组列表，每个元组包含搜索结果的标题和链接。
                               如果解析失败，则返回空列表。
    """
    try:
        # 尝试解析 JSON 字符串
        results = json.loads(results_string)
        # 从每个结果中提取并返回标题和链接
        return [(result.get('title', 'Untitled'), result.get('link', '')) for result in results]
    except json.JSONDecodeError:
        # 通过返回空列表处理 JSON 解码错误
        print("Error parsing search results. Returning empty list.")
        return []

### 定义 CRAG 流程的子函数

In [26]:
def retrieve_documents(query: str, faiss_index: FAISS, k: int = 3) -> List[str]:
    """
    使用 FAISS 索引根据查询检索文档。

    参数：
        query (str): 要搜索的查询字符串。
        faiss_index (FAISS): 用于相似性搜索的 FAISS 索引。
        k (int): 要检索的顶部文档数量。默认为 3。

    返回：
        List[str]: 检索到的文档内容列表。
    """
    docs = faiss_index.similarity_search(query, k=k)
    return [doc.page_content for doc in docs]

def evaluate_documents(query: str, documents: List[str]) -> List[float]:
    """
    根据查询评估文档的相关性。

    参数：
        query (str): 查询字符串。
        documents (List[str]): 要评估的文档内容列表。

    返回：
        List[float]: 每个文档的相关性得分列表。
    """
    return [retrieval_evaluator(query, doc) for doc in documents]

def perform_web_search(query: str) -> Tuple[List[str], List[Tuple[str, str]]]:
    """
    根据查询执行网络搜索。

    参数：
        query (str): 要搜索的查询字符串。

    返回：
        Tuple[List[str], List[Tuple[str, str]]]: 
            - 从网络搜索获得的提炼知识列表。
            - 包含来源标题和链接的元组列表。
    """
    rewritten_query = rewrite_query(query)
    web_results = search.run(rewritten_query)
    web_knowledge = knowledge_refinement(web_results)
    sources = parse_search_results(web_results)
    return web_knowledge, sources

def generate_response(query: str, knowledge: str, sources: List[Tuple[str, str]]) -> str:
    """
    使用知识和来源生成对查询的响应。

    参数：
        query (str): 查询字符串。
        knowledge (str): 用于响应的提炼知识。
        sources (List[Tuple[str, str]]): 包含来源标题和链接的元组列表。

    返回：
        str: 生成的响应。
    """
    response_prompt = PromptTemplate(
        input_variables=["query", "knowledge", "sources"],
        template="Based on the following knowledge, answer the query. Include the sources with their links (if available) at the end of your answer:\nQuery: {query}\nKnowledge: {knowledge}\nSources: {sources}\nAnswer:"
    )
    input_variables = {
        "query": query,
        "knowledge": knowledge,
        "sources": "\n".join([f"{title}: {link}" if link else title for title, link in sources])
    }
    response_chain = response_prompt | llm
    return response_chain.invoke(input_variables).content


### CRAG 流程


In [29]:
def crag_process(query: str, faiss_index: FAISS) -> str:
    """
    通过检索、评估和使用文档或执行网络搜索来处理查询，以生成响应。

    参数：
        query (str): 要处理的查询字符串。
        faiss_index (FAISS): 用于文档检索的 FAISS 索引。

    返回：
        str: 基于查询生成的响应。
    """
    print(f"\nProcessing query: {query}")

    # 检索和评估文档
    retrieved_docs = retrieve_documents(query, faiss_index)
    eval_scores = evaluate_documents(query, retrieved_docs)
    
    print(f"\nRetrieved {len(retrieved_docs)} documents")
    print(f"Evaluation scores: {eval_scores}")

    # 根据评估分数确定操作
    max_score = max(eval_scores)
    sources = []
    
    if max_score > 0.7:
        print("\nAction: Correct - Using retrieved document")
        best_doc = retrieved_docs[eval_scores.index(max_score)]
        final_knowledge = best_doc
        sources.append(("Retrieved document", ""))
    elif max_score < 0.3:
        print("\nAction: Incorrect - Performing web search")
        final_knowledge, sources = perform_web_search(query)
    else:
        print("\nAction: Ambiguous - Combining retrieved document and web search")
        best_doc = retrieved_docs[eval_scores.index(max_score)]
        # 提炼检索到的知识
        retrieved_knowledge = knowledge_refinement(best_doc)
        web_knowledge, web_sources = perform_web_search(query)
        final_knowledge = "\n".join(retrieved_knowledge + web_knowledge)
        sources = [("Retrieved document", "")] + web_sources

    print("\nFinal knowledge:")
    print(final_knowledge)
    
    print("\nSources:")
    for title, link in sources:
        print(f"{title}: {link}" if link else title)

    # 生成响应
    print("\nGenerating response...")
    response = generate_response(query, final_knowledge, sources)

    print("\nResponse generated")
    return response

### 与文档高度相关的示例查询


In [None]:
query = "What are the main causes of climate change?"
result = crag_process(query, vectorstore)
print(f"Query: {query}")
print(f"Answer: {result}")

### 与文档低相关的示例查询


In [None]:
query = "how did harry beat quirrell?"
result = crag_process(query, vectorstore)
print(f"Query: {query}")
print(f"Answer: {result}")