安装必要的库

In [2]:
%%capture --no-stderr
%pip install -U langchain langchain_community pypdf sentence_transformers chromadb



In [3]:
# 导入必要的库并打印版本信息
import langchain, langchain_community, pypdf, sentence_transformers, chromadb

for module in (langchain, langchain_community, pypdf, sentence_transformers, chromadb):
    print(f"{module.__name__:<30}{module.__version__}")

[INFO] Note: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
[INFO] NumExpr defaulting to 8 threads.
[INFO] PyTorch version 2.5.1 available.
[INFO] Polars version 1.27.1 available.


langchain                     0.3.25
langchain_community           0.3.23
pypdf                         5.4.0
sentence_transformers         4.1.0
chromadb                      1.0.8


In [4]:
#导入操作系统和数据处理相关库
import os
import pandas as pd
from langchain_community.vectorstores import Chroma


In [5]:
#嵌入模型选择ollama拉取的"znbang/bge:large-zh-v1.5-q8_0"
EMBEDDING_MODEL_PATH = 'BAAI/bge-large-zh-v1.5'

In [6]:
#五月份第一次尝试的版本，输出物保存在该路径
dt = '2025-05'
version = 'v1'
output_dir = os.path.join('outputs', f'{version}_{dt}')

文档切分

先加载未切分过文档
再把切分以后的文档放在output_dir下
目前已经有了一个切分好的文档，位于"split_data\all_docs_split_400_40.json"

In [7]:
import os
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import MarkdownTextSplitter
from dataclasses import dataclass
from typing import List
import json
import uuid
import logging
from langchain.docstore.document import Document

logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')

@dataclass
class SplitConfig:
    chunk_size: int = 400
    chunk_overlap: int = 40
    separators: List[str] = ('\n\n\n', '\n\n')
    force_split: bool = False
    output_format: str = 'json'
    cache_dir: str = output_dir

def save_chunks_as_json(chunks, filepath):
    data = [
        {
            "uuid": chunk.metadata.get('uuid', str(uuid.uuid4())),
            "content": chunk.page_content,
            "metadata": chunk.metadata
        }
        for chunk in chunks
    ]
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

def load_chunks_from_json(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
    chunks = []
    for item in data:
        doc = Document(
            page_content=item['content'],
            metadata=item['metadata']
        )
        chunks.append(doc)
    logging.info(f"Loaded {len(chunks)} chunks from {filepath}")
    return chunks

def split_docs_with_config(documents, config: SplitConfig, cache_name="all_docs"):
    os.makedirs(config.cache_dir, exist_ok=True)
    filename = f"{cache_name}_split_{config.chunk_size}_{config.chunk_overlap}.{config.output_format}"
    filepath = os.path.join(config.cache_dir, filename)

    if os.path.exists(filepath) and not config.force_split:
        logging.info("Found existing cache. Loading...")
        return load_chunks_from_json(filepath)

    splitter = MarkdownTextSplitter(
        chunk_size=config.chunk_size,
        chunk_overlap=config.chunk_overlap
    )
    chunks = splitter.split_documents(documents)
    for chunk in chunks:
        chunk.metadata['uuid'] = str(uuid.uuid4())

    if config.output_format == 'json':
        save_chunks_as_json(chunks, filepath)

    return chunks

def load_multiple_documents_from_dir(directory: str, encoding='utf-8') -> List[Document]:
    docs = []
    for filename in os.listdir(directory):
        if filename.endswith(".md"):
            file_path = os.path.join(directory, filename)
            loader = TextLoader(file_path, encoding=encoding)
            file_docs = loader.load()
            for doc in file_docs:
                doc.metadata['source_file'] = filename
            docs.extend(file_docs)
    logging.info(f"Loaded {len(docs)} documents from {directory}")
    return docs

input_dir = "E:\RAG\split_data"  #已经处理过的文档的路径
documents = load_multiple_documents_from_dir(input_dir)

split_config = SplitConfig(
    chunk_size=400,
    chunk_overlap=40,
    separators=['\n\n\n', '\n\n'],
    force_split=False,
    output_format='json',
    cache_dir=output_dir
)
# 切分文档并保存到本地
logging.info(f"Splitting {len(documents)} documents...")
split_docs = split_docs_with_config(documents, split_config, cache_name="all_docs")


  input_dir = "E:\RAG\split_data"  #已经处理过的文档的路径
[INFO] Loaded 0 documents from E:\RAG\split_data
[INFO] Splitting 0 documents...
[INFO] Found existing cache. Loading...
[INFO] Loaded 0 chunks from outputs\v1_2025-05\all_docs_split_400_40.json


向量化

此部分代码是开源项目中的，后续会更改


In [8]:
# 初始化嵌入模型
from langchain.embeddings import HuggingFaceBgeEmbeddings
import torch

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'device: {device}')

embeddings = HuggingFaceBgeEmbeddings(
    model_name=EMBEDDING_MODEL_PATH,
    model_kwargs={'device': device},
    encode_kwargs={'normalize_embeddings': True}
)

# 确保模型只加载一次
if 'embeddings' not in globals():
    embeddings = HuggingFaceBgeEmbeddings(
        model_name=EMBEDDING_MODEL_PATH,
        model_kwargs={'device': device},
        encode_kwargs={'normalize_embeddings': True}
    )
    print("嵌入模型加载成功")

  embeddings = HuggingFaceBgeEmbeddings(
[INFO] Load pretrained SentenceTransformer: BAAI/bge-large-zh-v1.5


device: cpu


In [9]:
# 定义向量数据库生成函数
from tqdm.auto import tqdm

def get_vector_db(docs, store_path, force_rebuild=False):
    if not os.path.exists(store_path):
        force_rebuild = True

    if force_rebuild:
        vector_db = Chroma.from_documents(
            docs,
            embedding=embeddings,
            persist_directory=store_path
        )
    else:
        vector_db = Chroma(
            persist_directory=store_path,
            embedding_function=embeddings
        )
    return vector_db

创建向量数据库

区别：

1.开源的数据库使用的是chromab
2.实践中使用的是FAISS

此处只是简单地修改了一下开源项目中的代码


In [10]:
# 创建向量数据库
vector_db = get_vector_db(split_docs, store_path=os.path.join(output_dir, '向量数据库', 'bge_large_v1.5'))

  vector_db = Chroma(
[INFO] Anonymized telemetry enabled. See                     https://docs.trychroma.com/telemetry for more information.


In [None]:
# 加载已经准备好的向量数据库
from langchain.vectorstores import FAISS
import pickle

faiss_path = os.path.join('outputs', 'v1_2025-05', 'faiss_metadata.pkl')
with open(faiss_path, 'rb') as f:
    vector_db = pickle.load(f)
print("向量数据库加载成功")

检索

检索部分进行了增强
添加了混合检索器并且对问题进行了更改

In [11]:
# 替换混合检索器的实现
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 定义检索器，as_retriever方法用于创建检索器,k参数指定返回的文档数量
retriever = vector_db.as_retriever(search_kwargs={'k': 5})

# 自定义排序函数
def rank_results(results, query):
    # 示例：按相似性分数排序
    return sorted(results, key=lambda x: x.metadata.get('score', 0), reverse=True)

# 检索并排序
query ="如果购买了医疗保险，得重病时是否可以报销医疗费用？"
# 检索与query相关文档
results = retriever.get_relevant_documents(query)
# 对检索结果进行排序
ranked_results = rank_results(results, query)

# 格式化输出排序后的检索结果
print("\n=== 检索结果 ===")
for idx, result in enumerate(ranked_results, start=1):
    print(f"\n===========结果{idx}==========:")
    print(f"内容: {result.page_content}")
    print(f"分数: {result.metadata.get('score', 0)}")

  results = retriever.get_relevant_documents(query)



=== 检索结果 ===


In [12]:
# 定义混合检索器类
class HybridRetriever:
    def __init__(self, retriever, keyword_weight=0.5, vector_weight=0.5):
        self.retriever = retriever
        self.keyword_weight = keyword_weight
        self.vector_weight = vector_weight

    def get_relevant_documents(self, query):
        # 调用原始检索器获取文档
        results = self.retriever.get_relevant_documents(query)
        # 混合排序逻辑（示例：按权重调整分数）
        for result in results:
            result.metadata['hybrid_score'] = (
                self.keyword_weight * result.metadata.get('keyword_score', 0) +
                self.vector_weight * result.metadata.get('vector_score', 0)
            )
        return sorted(results, key=lambda x: x.metadata['hybrid_score'], reverse=True)

# 初始化混合检索器
hybrid_retriever = HybridRetriever(retriever, keyword_weight=0.6, vector_weight=0.4)

# 示例：使用混合检索器
query = "如果购买了医疗保险，得重病时是否可以报销医疗费用？"
hybrid_results = hybrid_retriever.get_relevant_documents(query)
for idx, result in enumerate(hybrid_results[:5], start=1):
    print(f"\n=== 混合检索结果 {idx} ===")
    print(f"内容: {result.page_content}")
    print(f"混合分数: {result.metadata['hybrid_score']}")

计算命中率

In [13]:
# 初始化测试数据集
import pandas as pd

test_data = [
    {"question": "问题1", "uuid": "uuid1"},
    {"question": "问题2", "uuid": "uuid2"},
    {"question": "问题3", "uuid": "uuid3"}
]
test_df = pd.DataFrame(test_data)

# 在混合检索器基础上计算命中率
from tqdm.auto import tqdm

# 初始化统计数据
top_k_arr = list(range(1, 9))
hit_stat_data = []

# 遍历测试集并计算命中率
for idx, row in tqdm(test_df.iterrows(), total=len(test_df)):
    question = row['question']
    true_uuid = row['uuid']
    chunks = hybrid_retriever.get_relevant_documents(question)
    retrieved_uuids = [doc.metadata.get('uuid', '') for doc in chunks]

    for k in top_k_arr:
        hit_stat_data.append({
            'question': question,
            'top_k': k,
            'hit': int(true_uuid in retrieved_uuids[:k])
        })

# 转换命中率数据为DataFrame
hit_stat_df = pd.DataFrame(hit_stat_data)

# 计算平均命中率
average_hit_rate = hit_stat_df.groupby('top_k')['hit'].mean().reset_index()
print(average_hit_rate)

  0%|          | 0/3 [00:00<?, ?it/s]

   top_k  hit
0      1  0.0
1      2  0.0
2      3  0.0
3      4  0.0
4      5  0.0
5      6  0.0
6      7  0.0
7      8  0.0


以下是拓展检索器类以调整权重的板块，由于用户选择结果未定，所以暂时不跑通
# 扩展检索器类以支持权重调整
class CustomRetriever:
    def __init__(self, retriever):
        self.retriever = retriever
        self.keyword_weight = 1.0
        self.vector_weight = 1.0

    def get_relevant_documents(self, query):
        # 调用原始检索器的检索方法
        return self.retriever.get_relevant_documents(query)

    def adjust_weights(self, feedback):
        # 根据反馈调整权重
        results = self.get_relevant_documents(feedback["query"])
        if any(result.metadata.get('uuid') == feedback["preferred_result"] for result in results):
            self.keyword_weight += 0.1
            self.vector_weight -= 0.1
        print(f"权重已调整: keyword_weight={self.keyword_weight}, vector_weight={self.vector_weight}")

# 包装原始检索器
custom_retriever = CustomRetriever(retriever)

# 模拟用户反馈
user_feedback = {
    "preferred_result": "文档A",  # 用户选择的结果
    "query": "2023年10月美国ISM制造业PMI指数较上月有何变化？"
}

# 调整检索器权重
custom_retriever.adjust_weights(user_feedback)



以下是多轮检索的部分，由于出现内存不足的问题，所以暂时不跑通

from langchain.chains import ConversationalRetrievalChain
# 修复LLM初始化问题
from langchain.llms import Ollama

# 确保服务运行并检查 base_url
try:
    llm1 = Ollama(
        model='qwen:7b',
        base_url="http://localhost:11434"
    )
    print("LLM1 加载成功")
except Exception as e:
    print(f"LLM1 加载失败: {e}")

# 定义多轮检索链
retrieval_chain = ConversationalRetrievalChain.from_llm(
    retriever=vector_db.as_retriever(search_kwargs={'k': 5}),
    llm=llm1,
    return_source_documents=True
)

# 初始化对话历史
chat_history = []

# 模拟多轮对话
queries = [
    "2023年10月美国ISM制造业PMI指数是多少？",
    "与上月相比有什么变化？",
    "这种变化的原因是什么？"
]

print("\n=== 多轮对话 ===")
for query in queries:
    try:
        response = retrieval_chain({"question": query, "chat_history": chat_history})
        print(f"\n用户问题: {query}")
        print(f"模型回答: {response['answer']}")
        chat_history.append((query, response['answer']))
    except Exception as e:
        print(f"对话失败: {e}")

问答

流式输出

In [14]:
# 定义流式问答处理流程
from langchain.llms import Ollama
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import PromptTemplate

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

llm = Ollama(
    model='qwen:7b',
    base_url="http://localhost:11434"
)

prompt_tmpl = """
你是一名金融保险领域的专家，擅长根据所获取的信息片段，对问题进行分析和推理。
你的任务是根据所获取的信息片段（<<<<context>>><<<</context>>>之间的内容）回答问题。
回答保持简洁，不必重复问题，不要添加与答案无关的任何内容。
已知信息：
<<<<context>>>
{context}
<<<</context>>>

问题：{question}
请回答：
"""
prompt = PromptTemplate.from_template(prompt_tmpl)
retriever = vector_db.as_retriever(search_kwargs={'k': 4})

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

for chunk in rag_chain.stream("购买医疗保险后，重病是否可以报销医疗费用？"):
    print(chunk, end="", flush=True)

  llm = Ollama(


是的，购买医疗保险后，通常会覆盖因重病引起的医疗费用，但具体保障范围和报销比例需要参照保险合同条款。

In [15]:
# 非流式问答输出
print(rag_chain.invoke('购买医疗保险后，重病是否可以报销医疗费用？'))

是的，购买医疗保险后，通常会覆盖因重大疾病发生的医疗费用，但具体情况可能根据保险条款有所不同。
