# 混合检索（Vilnvsembdedding + BM25）

## 固定权重

In [None]:
from pymilvus import connections, Collection


def get_text_list_from_milvus(
        collection_name: str,
        host: str = "192.168.0.188",
        port: str = "19530",
        expr: str = "",
        limit: int = 1000,
        output_fields: list = ["text"],
) -> list:
    """
    从 Milvus 集合中读取指定字段（默认是 text）并返回列表

    Args:
        collection_name: Milvus 集合名称
        host: Milvus 服务器地址（默认 localhost）
        port: Milvus 端口（默认 19530）
        expr: 过滤条件表达式（默认无过滤）
        limit: 返回数据条数上限（默认 1000）
        output_fields: 要提取的字段列表（默认 ["text"]）

    Returns:
        list: 包含目标字段值的列表
    """
    # 1. 连接 Milvus
    connections.connect(alias="default", host=host, port=port)

    # 2. 加载集合
    collection = Collection(name=collection_name)
    collection.load()

    # 3. 查询数据
    results = collection.query(
            expr=expr,
            output_fields=output_fields,
            limit=limit
        )

    # 4. 提取目标字段为列表
    if not output_fields:
        raise ValueError("output_fields 不能为空")

    field_name = output_fields[0]  # 默认取第一个字段
    data_list = [item[field_name] for item in results]
    return data_list


# 示例调用
if __name__ == "__main__":
    # 示例1：读取默认的 text 字段
    texts = get_text_list_from_milvus(collection_name="Vmaxs")
    print(f"获取 {len(texts)} 条文本，前5条: {texts[:5]}")

In [None]:
from langchain_chroma import Chroma
from langchain_community.vectorstores import Milvus
from langchain_community.embeddings import OllamaEmbeddings
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_community.retrievers import BM25Retriever
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from pymilvus import Collection, connections

# Initialize memory outside the function so it persists across questions
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

# 初始化 Milvus 向量数据库
def get_vectordb():
    emb_bgem3 = OllamaEmbeddings(base_url='http://localhost:11434', model="bge-m3:latest")

    # Milvus 连接参数
    vectordb = Milvus(
        embedding_function=emb_bgem3,
        collection_name="Vmaxs",  # Milvus 集合名称
        connection_args={
            "host": "192.168.0.188",  # Milvus 服务器地址
            "port": "19530",  # Milvus 默认端口
        },
    )
    return vectordb

def get_llm():
    return OllamaLLM(base_url='http://localhost:11434', model='deepseek-r1:1.5b', temperature=0.1, streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()])

def get_text_list_from_milvus(
        collection_name: str,
        host: str = "192.168.0.188",
        port: str = "19530",
        expr: str = "",
        limit: int = 1000,
        output_fields: list = ["text"],
) -> list:
    """
    从 Milvus 集合中读取指定字段（默认是 text）并返回列表
    """
    # 1. 连接 Milvus
    connections.connect(alias="default", host=host, port=port)

    # 2. 加载集合
    collection = Collection(name=collection_name)
    collection.load()

    # 3. 查询数据
    results = collection.query(
            expr=expr,
            output_fields=output_fields,
            limit=limit
        )

    # 4. 提取目标字段为列表
    if not output_fields:
        raise ValueError("output_fields 不能为空")

    field_name = output_fields[0]  # 默认取第一个字段
    data_list = [item[field_name] for item in results]
    return data_list

def get_qa_chain_with_memory(question: str):
    vectordb = get_vectordb()

    # 1. 初始化 BM25 检索器（关键词检索）
    documents = get_text_list_from_milvus(collection_name="Vmaxs")
    bm25_retriever = BM25Retriever.from_texts(documents)
    bm25_retriever.k = 10  # 返回前10个BM25检索结果

    # 2. 初始化向量检索器
    vector_retriever = vectordb.as_retriever(
        search_kwargs={"k": 10},  # 返回前10个向量检索结果
        search_type="mmr",  # 多样性检索
    )

    # 3. 混合检索（EnsembleRetriever）作为最终检索器
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[0.5, 0.5],  # 调整BM25和向量检索的权重
    )

    # 4. 定义提示模板
    QA_CHAIN_PROMPT = PromptTemplate(
        input_variables=["chat_history", "question", "context"],
        template="""
    你是一位专业的VMAX-S技术专家助手，负责回答关于VMAX-S产品的技术问题。请根据以下规则回答问题：

    1. 严格基于提供的上下文信息回答，不要编造或假设
    2. 如果上下文不包含答案，明确表示"根据现有资料，我无法回答这个问题"
    3. 回答要专业、准确、简洁
    4. 对于操作类问题，提供分步骤说明
    5. 对于需要比较或列举的问题，使用表格或列表形式
    6. 保持友好专业的语气

    当前对话历史：
    {chat_history}

    相关技术资料：
    {context}

    用户问题：{question}

    请按照以下格式回答：
    [专业回答]
    (你的回答内容)

    [补充说明]
    (如有需要，可添加额外说明或建议)

    感谢您咨询VMAX-S相关问题！
    """
    )

    # 5. 构建对话式检索链
    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=get_llm(),
        retriever=ensemble_retriever,  # 直接使用混合检索器
        memory=memory,
        output_key="answer",
        combine_docs_chain_kwargs={
            "prompt": QA_CHAIN_PROMPT
        },
        verbose=False
    )

    result = qa_chain({"question": question})
    return result

# 测试问题
questions = [
    "什么是VMAX的上网日志业务？",
    "上网日志业务包含哪些功能？",
    "整理成excel表格"
]

for question in questions:
    result = get_qa_chain_with_memory(question)
    print("\n" + "=" * 50 + "\n")

## 动态权重

In [None]:
from langchain_chroma import Chroma
from langchain_community.vectorstores import Milvus
from langchain_community.embeddings import OllamaEmbeddings
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_community.retrievers import BM25Retriever
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from pymilvus import Collection, connections
import re

# Initialize memory outside the function so it persists across questions
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

# 初始化 Milvus 向量数据库
def get_vectordb():
    emb_bgem3 = OllamaEmbeddings(base_url='http://localhost:11434', model="bge-m3:latest")

    # Milvus 连接参数
    vectordb = Milvus(
        embedding_function=emb_bgem3,
        collection_name="Vmaxs",  # Milvus 集合名称
        connection_args={
            "host": "192.168.0.188",  # Milvus 服务器地址
            "port": "19530",  # Milvus 默认端口
        },
    )
    return vectordb

def get_llm():
    return OllamaLLM(base_url='http://localhost:11434', model='deepseek-r1:1.5b', temperature=0.1, streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()])

def get_text_list_from_milvus(
        collection_name: str,
        host: str = "192.168.0.188",
        port: str = "19530",
        expr: str = "",
        limit: int = 1000,
        output_fields: list = ["text"],
) -> list:
    """
    从 Milvus 集合中读取指定字段（默认是 text）并返回列表
    """
    # 1. 连接 Milvus
    connections.connect(alias="default", host=host, port=port)

    # 2. 加载集合
    collection = Collection(name=collection_name)
    collection.load()

    # 3. 查询数据
    results = collection.query(
            expr=expr,
            output_fields=output_fields,
            limit=limit
        )

    # 4. 提取目标字段为列表
    if not output_fields:
        raise ValueError("output_fields 不能为空")

    field_name = output_fields[0]  # 默认取第一个字段
    data_list = [item[field_name] for item in results]
    return data_list

def determine_query_type(question: str) -> str:
    """
    根据问题内容判断查询类型，返回权重调整策略

    返回:
        "keyword" - 更适合关键词检索的问题
        "semantic" - 更适合语义检索的问题
        "balanced" - 平衡型问题
    """
    # 关键词型问题特征
    keyword_patterns = [
        r"什么是.*\?",  # 定义类问题
        r".*包括哪些.*",  # 列举类问题
        r".*有哪些.*",  # 列举类问题
        r".*多少种.*",  # 数量类问题
        r".*步骤.*",  # 流程类问题
        r".*如何.*",  # 方法类问题
        r".*怎样.*",  # 方法类问题
        r".*整理.*表格",  # 结构化输出要求
        r".*列出.*",  # 列举要求
        r".*对比.*",  # 比较类问题
    ]

    # 语义型问题特征
    semantic_patterns = [
        r".*解决.*问题",  # 解决方案类
        r".*原因.*",  # 原因分析类
        r".*为什么.*",  # 原因分析类
        r".*建议.*",  # 建议类
        r".*优缺点.*",  # 分析类
        r".*影响.*",  # 影响分析类
        r".*解释.*",  # 解释说明类
        r".*理解.*",  # 理解类
        r".*意味着什么",  # 含义类
    ]

    # 检查是否是关键词型问题
    for pattern in keyword_patterns:
        if re.search(pattern, question):
            return "keyword"

    # 检查是否是语义型问题
    for pattern in semantic_patterns:
        if re.search(pattern, question):
            return "semantic"

    # 默认平衡型
    return "balanced"


def get_dynamic_weights(query_type: str) -> tuple:
    """
    根据查询类型返回动态权重

    返回:
        tuple: (bm25_weight, vector_weight)
    """
    if query_type == "keyword":
        return (0.7, 0.3)  # 更侧重关键词检索
    elif query_type == "semantic":
        return (0.3, 0.7)  # 更侧语义检索
    else:
        return (0.5, 0.5)  # 平衡权重

def get_qa_chain_with_memory(question: str):
    vectordb = get_vectordb()

    # 1. 确定查询类型和动态权重
    query_type = determine_query_type(question)
    bm25_weight, vector_weight = get_dynamic_weights(query_type)
    print(f"问题类型: {query_type}, 权重设置: BM25={bm25_weight}, Vector={vector_weight}")

    # 2. 初始化 BM25 检索器
    documents = get_text_list_from_milvus(collection_name="Vmaxs")
    bm25_retriever = BM25Retriever.from_texts(documents)
    bm25_retriever.k = 10

    # 3. 初始化向量检索器
    vector_retriever = vectordb.as_retriever(
        search_kwargs={"k": 10},
        search_type="mmr",
    )

    # 4. 使用动态权重的混合检索
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[bm25_weight, vector_weight],  # 使用动态权重
    )
    # 5. 混合检索（EnsembleRetriever）作为最终检索器
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[0.5, 0.5],  # 调整BM25和向量检索的权重
    )

   # 6. 定义提示模板（加入权重信息）
    QA_CHAIN_PROMPT = PromptTemplate(
        input_variables=["chat_history", "question", "context"],
        template=f"""
    你是一位专业的VMAX-S技术专家助手，负责回答关于VMAX-S产品的技术问题。请根据以下规则回答问题：

    1. 严格基于提供的上下文信息回答，不要编造或假设
    2. 如果上下文不包含答案，明确表示"根据现有资料，我无法回答这个问题"
    3. 回答要专业、准确、简洁
    4. 对于操作类问题，提供分步骤说明
    5. 对于需要比较或列举的问题，使用表格或列表形式
    6. 保持友好专业的语气

    当前问题类型: {query_type} (检索权重: BM25={bm25_weight}, Vector={vector_weight})

    当前对话历史：
    {{chat_history}}

    相关技术资料：
    {{context}}

    用户问题：{{question}}

    请按照以下格式回答：
    [专业回答]
    (你的回答内容)

    [补充说明]
    (如有需要，可添加额外说明或建议)

    感谢您咨询VMAX-S相关问题！
    """
    )


    # 7. 构建对话式检索链
    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=get_llm(),
        retriever=ensemble_retriever,  # 直接使用混合检索器
        memory=memory,
        output_key="answer",
        combine_docs_chain_kwargs={
            "prompt": QA_CHAIN_PROMPT
        },
        verbose=False
    )

    result = qa_chain({"question": question})
    return result

# 测试问题
questions = [
    "什么是VMAX的上网日志业务？",
    "上网日志业务包含哪些功能？"
]

for question in questions:
    print(f"\n问题: {question}")
    result = get_qa_chain_with_memory(question)
    print(f"\n回答: {result['answer']}")
    print("=" * 50)


问题: 什么是VMAX的上网日志业务？
问题类型: balanced, 权重设置: BM25=0.5, Vector=0.5


  result = qa_chain({"question": question})


<think>
嗯，用户的问题是关于ZXVMAX-S的上网日志业务。我得先理解什么是上网日志业务。根据产品描述，ZXVMAX-S主要用于网络运维和运营分析，特别是多维价值分析。上网日志可能是指设备包装箱中存储的数据记录。

首先，我需要明确“上网”在这里

# 混合检索 + rerank

In [None]:
from langchain_chroma import Chroma
from langchain_community.vectorstores import Milvus
from langchain_community.embeddings import OllamaEmbeddings
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever
from langchain.retrievers.document_compressors import CohereRerank
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_community.retrievers import BM25Retriever
import cohere

# Initialize memory outside the function so it persists across questions
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)


# 初始化 Milvus 向量数据库
def get_vectordb():
    emb_bgem3 = OllamaEmbeddings(base_url='http://localhost:11434', model="bge-m3:latest")

    # Milvus 连接参数
    vectordb = Milvus(
        embedding_function=emb_bgem3,
        collection_name="Vmaxs",  # Milvus 集合名称
        connection_args={
            "host": "192.168.0.188",  # Milvus 服务器地址
            "port": "19530",  # Milvus 默认端口
        },
    )
    return vectordb

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
def get_llm():
    return OllamaLLM(base_url='http://localhost:11434', model='deepseek-r1:1.5b', temperature=0.1, streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()])


from pymilvus import Collection


# 从milvus获取完整文档
# def get_documents_from_milvus():
#     # 连接 Milvus 集合
#     collection = Collection("Vmaxs")  # 替换为你的集合名
#     collection.load()
#
#     # 获取所有文档的 text 字段
#     documents = []
#     res = collection.query(
#         expr="",  # 空表达式表示获取所有
#         output_fields=["text"],  # 假设你的文本存储在 "text" 字段
#         limit=10000  # 限制最大数量（根据实际情况调整）
#     )
#     documents = [item["text"] for item in res]
#     return documents


def get_qa_chain_with_memory(question: str):
    vectordb = get_vectordb()

    # 1. 初始化 BM25 检索器（关键词检索）
    # 假设你已经有一个文档列表 `documents`（如果没有，可以从 vectordb 获取）
    # 示例：documents = vectordb.get_all_documents()
    # 这里仅作演示，实际使用时需要替换成你的文档数据
    documents = ["doc1", "doc2", "doc3"]  # 替换成你的文档
    # 从 Milvus 获取所有原始文本
    # documents = get_documents_from_milvus()
    bm25_retriever = BM25Retriever.from_texts(documents)
    bm25_retriever.k = 10  # 返回前10个BM25检索结果

    # 2. 初始化向量检索器
    vector_retriever = vectordb.as_retriever(
        search_kwargs={"k": 10},  # 返回前10个向量检索结果
        search_type="mmr",  # 多样性检索
    )

    # 3. 混合检索（EnsembleRetriever）
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[0.5, 0.5],  # 调整BM25和向量检索的权重
    )

    # 4. 使用 Jina Rerank 优化结果
    # Jina Rerank配置
    JINA_API_KEY = "jina_63bb115e2d5f42d581f42643294792b5CE4nrEINMDcT4vJZJaSLcr5tkbIB"  # 替换为你的Jina API密钥
    
    compressor = JinaRerank(
        jina_api_key=JINA_API_KEY,
        top_n=3,
        model="jina-reranker-v2-base-multilingual"  # Jina的多语言rerank模型[5](@ref)
    )


    compression_retriever = ContextualCompressionRetriever(
        base_compressor=compressor,
        base_retriever=ensemble_retriever  # 使用混合检索作为基础检索器
    )

    # 5. 定义提示模板
    QA_CHAIN_PROMPT = PromptTemplate(
        input_variables=["chat_history", "question", "context"],
        template="""
    你是一位专业的VMAX-S技术专家助手，负责回答关于VMAX-S产品的技术问题。请根据以下规则回答问题：

    1. 严格基于提供的上下文信息回答，不要编造或假设
    2. 如果上下文不包含答案，明确表示"根据现有资料，我无法回答这个问题"
    3. 回答要专业、准确、简洁
    4. 对于操作类问题，提供分步骤说明
    5. 对于需要比较或列举的问题，使用表格或列表形式
    6. 保持友好专业的语气

    当前对话历史：
    {chat_history}

    相关技术资料：
    {context}

    用户问题：{question}

    请按照以下格式回答：
    [专业回答]
    (你的回答内容)

    [补充说明]
    (如有需要，可添加额外说明或建议)

    感谢您咨询VMAX-S相关问题！
    """
    )

    # 6. 构建对话式检索链
    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=get_llm(),
        retriever=compression_retriever,  # 使用混合检索 + Cohere Rerank
        memory=memory,
        output_key="answer",
        combine_docs_chain_kwargs={
            "prompt": QA_CHAIN_PROMPT
        },
        # verbose=True, # 这个参数会开启调试模式，输出链的详细执行过程，包括传递给模型的完整提示词
        verbose=False
    )

    result = qa_chain({"question": question})
    return result


# 测试问题
questions = [
    "什么是VMAX的上网日志业务？",
    "上网日志业务包含哪些功能？",
    "整理成excel表格"
]

for question in questions:
    result = get_qa_chain_with_memory(question)
    # print(f"问题：{question}")
    # print(f"回答：{result['answer']}")
    # print("对话历史：", memory.load_memory_variables({}))
    print("\n" + "=" * 50 + "\n")


获取milvus数据库中数据text字段

In [None]:
from pymilvus import connections, Collection


def get_text_list_from_milvus(
        collection_name: str,
        host: str = "129.201.70.35",
        port: str = "19530",
        expr: str = "",
        limit: int = 1000,
        output_fields: list = ["text"],
) -> list:
    """
    从 Milvus 集合中读取指定字段（默认是 text）并返回列表

    Args:
        collection_name: Milvus 集合名称
        host: Milvus 服务器地址（默认 localhost）
        port: Milvus 端口（默认 19530）
        expr: 过滤条件表达式（默认无过滤）
        limit: 返回数据条数上限（默认 1000）
        output_fields: 要提取的字段列表（默认 ["text"]）

    Returns:
        list: 包含目标字段值的列表
    """
    # 1. 连接 Milvus
    connections.connect(alias="default", host=host, port=port)

    # 2. 加载集合
    collection = Collection(name=collection_name)
    collection.load()

    # 3. 查询数据
    results = collection.query(
            expr=expr,
            output_fields=output_fields,
            limit=limit
        )

    # 4. 提取目标字段为列表
    if not output_fields:
        raise ValueError("output_fields 不能为空")

    field_name = output_fields[0]  # 默认取第一个字段
    data_list = [item[field_name] for item in results]
    return data_list


# 示例调用
if __name__ == "__main__":
    # 示例1：读取默认的 text 字段
    texts = get_text_list_from_milvus(collection_name="Vmaxs")
    print(f"获取 {len(texts)} 条文本，前5条: {texts[:5]}")

将本地文件替换

In [None]:
from langchain_chroma import Chroma
from langchain_community.vectorstores import Milvus
from langchain_community.embeddings import OllamaEmbeddings
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever
from langchain.retrievers.document_compressors import CohereRerank
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_community.retrievers import BM25Retriever
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
import cohere

# Initialize memory outside the function so it persists across questions
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)


# 初始化 Milvus 向量数据库
def get_vectordb():
    emb_bgem3 = OllamaEmbeddings(base_url='http://localhost:11434', model="bge-m3:latest")

    # Milvus 连接参数
    vectordb = Milvus(
        embedding_function=emb_bgem3,
        collection_name="Vmaxs",  # Milvus 集合名称
        connection_args={
            "host": "192.168.0.188",  # Milvus 服务器地址
            "port": "19530",  # Milvus 默认端口
        },
    )
    return vectordb


def get_llm():
    return OllamaLLM(base_url='http://localhost:11434', model='deepseek-r1:1.5b', temperature=0.1, streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()])


from pymilvus import Collection

def get_text_list_from_milvus(
        collection_name: str,
        host: str = "192.168.0.188",
        port: str = "19530",
        expr: str = "",
        limit: int = 1000,
        output_fields: list = ["text"],
) -> list:
    """
    从 Milvus 集合中读取指定字段（默认是 text）并返回列表

    Args:
        collection_name: Milvus 集合名称
        host: Milvus 服务器地址（默认 localhost）
        port: Milvus 端口（默认 19530）
        expr: 过滤条件表达式（默认无过滤）
        limit: 返回数据条数上限（默认 1000）
        output_fields: 要提取的字段列表（默认 ["text"]）

    Returns:
        list: 包含目标字段值的列表
    """
    # 1. 连接 Milvus
    connections.connect(alias="default", host=host, port=port)

    # 2. 加载集合
    collection = Collection(name=collection_name)
    collection.load()

    # 3. 查询数据
    results = collection.query(
            expr=expr,
            output_fields=output_fields,
            limit=limit
        )

    # 4. 提取目标字段为列表
    if not output_fields:
        raise ValueError("output_fields 不能为空")

    field_name = output_fields[0]  # 默认取第一个字段
    data_list = [item[field_name] for item in results]
    return data_list


def get_qa_chain_with_memory(question: str):
    vectordb = get_vectordb()

    # 1. 初始化 BM25 检索器（关键词检索）
    # 假设你已经有一个文档列表 `documents`（如果没有，可以从 vectordb 获取）
    # 示例：documents = vectordb.get_all_documents()
    # 这里仅作演示，实际使用时需要替换成你的文档数据
    # documents = ["doc1", "doc2", "doc3"]  # 替换成你的文档
    # 从 Milvus 获取所有原始文本
    documents = get_text_list_from_milvus(collection_name="Vmaxs")
    bm25_retriever = BM25Retriever.from_texts(documents)
    bm25_retriever.k = 10  # 返回前10个BM25检索结果

    # 2. 初始化向量检索器
    vector_retriever = vectordb.as_retriever(
        search_kwargs={"k": 10},  # 返回前10个向量检索结果
        search_type="mmr",  # 多样性检索
    )

    # 3. 混合检索（EnsembleRetriever）
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[0.5, 0.5],  # 调整BM25和向量检索的权重
    )

    # 4. 使用 Cohere Rerank 优化结果
    cohere_client = cohere.Client(api_key="Tahx1eySFbKvu9sTyTXrRLf59la3ZUG9vy02stRZ")
    compressor = CohereRerank(
        client=cohere_client,
        top_n=5,  # 最终保留5个最相关的文档
        model="rerank-multilingual-v3.0"
    )

    compression_retriever = ContextualCompressionRetriever(
        base_retriever=ensemble_retriever,  # 使用混合检索作为基础检索器
        base_compressor=compressor # 进行rerank

    )

    # 5. 定义提示模板
    QA_CHAIN_PROMPT = PromptTemplate(
        input_variables=["chat_history", "question", "context"],
        template="""
    你是一位专业的VMAX-S技术专家助手，负责回答关于VMAX-S产品的技术问题。请根据以下规则回答问题：

    1. 严格基于提供的上下文信息回答，不要编造或假设
    2. 如果上下文不包含答案，明确表示"根据现有资料，我无法回答这个问题"
    3. 回答要专业、准确、简洁
    4. 对于操作类问题，提供分步骤说明
    5. 对于需要比较或列举的问题，使用表格或列表形式
    6. 保持友好专业的语气

    当前对话历史：
    {chat_history}

    相关技术资料：
    {context}

    用户问题：{question}

    请按照以下格式回答：
    [专业回答]
    (你的回答内容)

    [补充说明]
    (如有需要，可添加额外说明或建议)

    感谢您咨询VMAX-S相关问题！
    """
    )

    # 6. 构建对话式检索链
    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=get_llm(),
        retriever=compression_retriever,  # 使用混合检索 + Cohere Rerank
        memory=memory,
        output_key="answer",
        combine_docs_chain_kwargs={
            "prompt": QA_CHAIN_PROMPT
        },
        # verbose=True, # 这个参数会开启调试模式，输出链的详细执行过程，包括传递给模型的完整提示词
        verbose=False
    )

    result = qa_chain({"question": question})
    return result


# 测试问题
questions = [
    "什么是VMAX的上网日志业务？",
    "上网日志业务包含哪些功能？",
    "整理成excel表格"
]

for question in questions:
    result = get_qa_chain_with_memory(question)
    # print(f"问题：{question}")
    # print(f"回答：{result['answer']}")
    # print("对话历史：", memory.load_memory_variables({}))
    print("\n" + "=" * 50 + "\n")


# 混合检索动态权重 + rerank

In [None]:
from langchain_chroma import Chroma
from langchain_community.vectorstores import Milvus
from langchain_community.embeddings import OllamaEmbeddings
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever
from langchain.retrievers.document_compressors import CohereRerank
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_community.retrievers import BM25Retriever
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
import cohere
from pymilvus import connections, Collection
import re

# Initialize memory outside the function so it persists across questions
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)


# 初始化 Milvus 向量数据库
def get_vectordb():
    emb_bgem3 = OllamaEmbeddings(base_url='http://localhost:11434', model="bge-m3:latest")

    # Milvus 连接参数
    vectordb = Milvus(
        embedding_function=emb_bgem3,
        collection_name="Vmaxs",  # Milvus 集合名称
        connection_args={
            "host": "192.168.0.188",  # Milvus 服务器地址
            "port": "19530",  # Milvus 默认端口
        },
    )
    return vectordb


def get_llm():
    return OllamaLLM(base_url='http://localhost:11434', model='deepseek-r1:1.5b', temperature=0.1, streaming=True,
                     callbacks=[StreamingStdOutCallbackHandler()])


def get_text_list_from_milvus(
        collection_name: str,
        host: str = "192.168.0.188",
        port: str = "19530",
        expr: str = "",
        limit: int = 1000,
        output_fields: list = ["text"],
) -> list:
    """
    从 Milvus 集合中读取指定字段（默认是 text）并返回列表
    """
    connections.connect(alias="default", host=host, port=port)
    collection = Collection(name=collection_name)
    collection.load()
    results = collection.query(
        expr=expr,
        output_fields=output_fields,
        limit=limit
    )
    field_name = output_fields[0]
    data_list = [item[field_name] for item in results]
    return data_list


def determine_query_type(question: str) -> str:
    """
    根据问题内容判断查询类型，返回权重调整策略

    返回:
        "keyword" - 更适合关键词检索的问题
        "semantic" - 更适合语义检索的问题
        "balanced" - 平衡型问题
    """
    # 关键词型问题特征
    keyword_patterns = [
        r"什么是.*\?",  # 定义类问题
        r".*包括哪些.*",  # 列举类问题
        r".*有哪些.*",  # 列举类问题
        r".*多少种.*",  # 数量类问题
        r".*步骤.*",  # 流程类问题
        r".*如何.*",  # 方法类问题
        r".*怎样.*",  # 方法类问题
        r".*整理.*表格",  # 结构化输出要求
        r".*列出.*",  # 列举要求
        r".*对比.*",  # 比较类问题
    ]

    # 语义型问题特征
    semantic_patterns = [
        r".*解决.*问题",  # 解决方案类
        r".*原因.*",  # 原因分析类
        r".*为什么.*",  # 原因分析类
        r".*建议.*",  # 建议类
        r".*优缺点.*",  # 分析类
        r".*影响.*",  # 影响分析类
        r".*解释.*",  # 解释说明类
        r".*理解.*",  # 理解类
        r".*意味着什么",  # 含义类
    ]

    # 检查是否是关键词型问题
    for pattern in keyword_patterns:
        if re.search(pattern, question):
            return "keyword"

    # 检查是否是语义型问题
    for pattern in semantic_patterns:
        if re.search(pattern, question):
            return "semantic"

    # 默认平衡型
    return "balanced"


def get_dynamic_weights(query_type: str) -> tuple:
    """
    根据查询类型返回动态权重

    返回:
        tuple: (bm25_weight, vector_weight)
    """
    if query_type == "keyword":
        return (0.7, 0.3)  # 更侧重关键词检索
    elif query_type == "semantic":
        return (0.3, 0.7)  # 更侧语义检索
    else:
        return (0.5, 0.5)  # 平衡权重


def get_qa_chain_with_memory(question: str):
    vectordb = get_vectordb()

    # 1. 确定查询类型和动态权重
    query_type = determine_query_type(question)
    bm25_weight, vector_weight = get_dynamic_weights(query_type)
    print(f"问题类型: {query_type}, 权重设置: BM25={bm25_weight}, Vector={vector_weight}")

    # 2. 初始化 BM25 检索器
    documents = get_text_list_from_milvus(collection_name="Vmaxs")
    bm25_retriever = BM25Retriever.from_texts(documents)
    bm25_retriever.k = 10

    # 3. 初始化向量检索器
    vector_retriever = vectordb.as_retriever(
        search_kwargs={"k": 10},
        search_type="mmr",
    )

    # 4. 使用动态权重的混合检索
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[bm25_weight, vector_weight],  # 使用动态权重
    )

    # 5. 使用 Cohere Rerank 优化结果
    cohere_client = cohere.Client(api_key="Tahx1eySFbKvu9sTyTXrRLf59la3ZUG9vy02stRZ")
    compressor = CohereRerank(
        client=cohere_client,
        top_n=5,
        model="rerank-multilingual-v3.0"
    )

    compression_retriever = ContextualCompressionRetriever(
        base_compressor=compressor,
        base_retriever=ensemble_retriever
    )

    # 6. 定义提示模板（加入权重信息）
    QA_CHAIN_PROMPT = PromptTemplate(
        input_variables=["chat_history", "question", "context"],
        template=f"""
    你是一位专业的VMAX-S技术专家助手，负责回答关于VMAX-S产品的技术问题。请根据以下规则回答问题：

    1. 严格基于提供的上下文信息回答，不要编造或假设
    2. 如果上下文不包含答案，明确表示"根据现有资料，我无法回答这个问题"
    3. 回答要专业、准确、简洁
    4. 对于操作类问题，提供分步骤说明
    5. 对于需要比较或列举的问题，使用表格或列表形式
    6. 保持友好专业的语气

    当前问题类型: {query_type} (检索权重: BM25={bm25_weight}, Vector={vector_weight})

    当前对话历史：
    {{chat_history}}

    相关技术资料：
    {{context}}

    用户问题：{{question}}

    请按照以下格式回答：
    [专业回答]
    (你的回答内容)

    [补充说明]
    (如有需要，可添加额外说明或建议)

    感谢您咨询VMAX-S相关问题！
    """
    )

    # 7. 构建对话式检索链
    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=get_llm(),
        retriever=compression_retriever,
        memory=memory,
        output_key="answer",
        combine_docs_chain_kwargs={
            "prompt": QA_CHAIN_PROMPT
        },
        verbose=False
    )

    result = qa_chain({"question": question})
    return result


# 测试问题
questions = [
    "什么是VMAX的上网日志业务？",  # 定义类问题，更适合关键词检索
    "上网日志业务包含哪些功能？",  # 列举类问题，更适合关键词检索
    "整理成excel表格",  # 结构化输出要求，更适合关键词检索
    "为什么我的VMAX设备会出现日志丢失问题？",  # 原因分析类，更适合语义检索
    "如何解决VMAX日志存储空间不足的问题？",  # 解决方案类，更适合语义检索
    "VMAX-S与其他型号的主要区别是什么？"  # 平衡型问题
]

for question in questions:
    print(f"\n问题: {question}")
    result = get_qa_chain_with_memory(question)
    print(f"\n回答: {result['answer']}")
    print("=" * 50)