In [None]:
from langchain_community.document_loaders import PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from pymilvus import (
    connections,
    utility,
    FieldSchema, CollectionSchema, DataType,
    Collection,
    AnnSearchRequest, RRFRanker, WeightedRanker,
    db
)
import numpy as np
from FlagEmbedding import BGEM3FlagModel
from transformers import pipeline
import os
from datetime import datetime
import torch
from tqdm import tqdm

devices = ["cuda:1"]
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
TARGET_DB = "test_db"  # 自定义数据库名称
COLLECTION_NAME="hybrid_search_collection"
EMBEDDING_MODEL_PATH = "/disk3/lsp/models/BAAI/bge-m3"

# 连接 Milvus
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)

# 创建数据库（如果不存在）
if TARGET_DB not in db.list_database():
    print(f"数据库 {TARGET_DB} 不存在，创建数据库")
    db.create_database(TARGET_DB)
# 删除数据库
# db.drop_database(TARGET_DB)

# 切换当前数据库
db.using_database(TARGET_DB)
print("当前数据库：", TARGET_DB)
# 列出当前所有collection
print("当前数据库集合：", utility.list_collections())

if utility.has_collection(COLLECTION_NAME):
    utility.drop_collection(COLLECTION_NAME)
    print(f"集合 {COLLECTION_NAME} 已删除")
# for collection_name in utility.list_collections():
#     utility.drop_collection(collection_name)
#     print(f"Collection {collection_name} dropped.")

['hybrid_search_collection']
集合 hybrid_search_collection 已删除


In [None]:

# ====================
# 1. PDF文本解析与处理
# ====================
def extract_text_from_pdf(pdf_dir):
    # 获取目录下所有pdf文件
    pdf_paths = [os.path.join(pdf_dir, f) for f in os.listdir(pdf_dir) if f.endswith('.pdf')]
    data = []
    print("=====================================")
    print("PDF 文件数：", len(pdf_paths))
    # 逐个解析pdf文件
    for pdf_file in pdf_paths:
        print(f"解析 PDF ：{pdf_file}")
        loader = PyMuPDFLoader(pdf_file)
        data += loader.load()
    print('数据总数: ', len(data))
    return data

In [4]:
# ====================
# 2. 文本分割
# ====================
def split_documents(docs, chunk_size=1024, chunk_overlap=20):
    """使用LangChain的递归字符分割器"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", "。", "！", "？", "；"]
    )
    
    return text_splitter.split_documents(docs)

In [5]:
# ====================
# 3. 向量嵌入模型初始化
# ====================
def init_embedding_model(model_dir="/disk3/lsp/models/BAAI/bge-m3", devices=["cuda:1"]):
    model = BGEM3FlagModel(model_dir, use_fp16=False, devices=devices)
    return model

In [6]:
# ====================
# 4. 读取 PDF 文档并分割
# ====================
def ingest_document(pdf_dir, batch_size=32):
    # 从 PDF 中提取文本
    docs = extract_text_from_pdf(pdf_dir)
    
    print('batch_size: ', batch_size)
    chunks = []
    for i in tqdm(range(0, len(docs), batch_size), desc="Splitting PDF into chunks"):
        batch_docs = docs[i:i+batch_size]
        chunks.extend(split_documents(batch_docs))
    print('chunks: ',len(chunks))
        
    
    # chunks = split_documents(docs)
    # print('chunks: ',len(chunks))

    # 在 ingest_document 中增加字段提取（LangChain的Document对象包含metadata）
    def convert_metadata(metadata):
        for key, value in metadata.items():
            if isinstance(value, datetime):
                print('value: ',value)
                metadata[key] = value.isoformat()
        return metadata

    texts = [doc.page_content for doc in chunks]
    metadata = [convert_metadata(doc.metadata) for doc in chunks]
    return texts, metadata
# ====================
# 5. 编码 文档，生成稠密和稀疏向量
# ====================
def embed_documents(texts, embedder):
    embeddings = embedder.encode(
        texts,
        return_dense=True,
        return_sparse=True,
        return_colbert_vecs=False
    )
    dense_vecs = embeddings["dense_vecs"]
    sparse_vecs = embeddings["lexical_weights"]
    
    print('Embeddings length: ', len(dense_vecs))
    dim = len(dense_vecs[0])
    print('dim: ',dim)
    
    return dense_vecs, sparse_vecs, dim

In [None]:
pdf_dir = './pdf'
# 编码器初始化
embedder = init_embedding_model(model_dir=EMBEDDING_MODEL_PATH, devices=devices)
texts, metadata = ingest_document(pdf_dir, batch_size=32)
dence_vecs, sparse_vecs, dim = embed_documents(texts, embedder)

./02 高考填志愿-大学各专业介绍.pdf
data:  118
batch_size:  32


Splitting PDF into chunks: 100%|██████████| 4/4 [00:00<00:00, 699.37it/s]

chunks:  118



You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Embeddings length:  118
dim:  1024


In [None]:
# ====================
# 6. Milvus向量数据库配置
# ====================
class VectorDB:
    def __init__(self, collection_name="hybrid_search_collection", dim=1024, index_params={"index_type": "FLAT", "metric_type": "IP"}, sparse_index_params={"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}):
        self.collection_name = collection_name
        self.embedding_dim = dim  # 根据模型输出维度调整
        self.index_params = index_params
        self.sparse_index_params = sparse_index_params
        
        if not self._check_collection():
            self._create_collection()
            
        self.collection = Collection(self.collection_name)
    
    def _check_collection(self):
        return utility.has_collection(self.collection_name)
    
    def _create_collection(self):
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
            FieldSchema(name="sparse_vectors", dtype=DataType.SPARSE_FLOAT_VECTOR),
            FieldSchema(name="dense_vectors", dtype=DataType.FLOAT_VECTOR, dim=self.embedding_dim),
            FieldSchema(name="metadata", dtype=DataType.JSON)
        ]
        
        schema = CollectionSchema(fields, description="Document chunks with embeddings")
        self.collection = Collection(self.collection_name, schema, consistency_level="Strong")
        
        self.collection.create_index("sparse_vectors", self.sparse_index_params)
        self.collection.create_index("dense_vectors", self.index_params)
        # # 在 _create_collection 中添加标量索引
        # self.collection.create_index(
        #     field_name="metadata", 
        #     index_params={
        #         "index_type": "STL_SORT",
        #         "metric_type": "JSON"  # Milvus 2.3+ 支持 JSON 字段索引
        #     }
        # )
    
    def insert(self, text, sparse_vectors, dense_vectors, metadata):
        """存储文本块和嵌入向量"""
        entities = [
            text,  # text字段
            sparse_vectors,  # sparse_vectors字段
            dense_vectors,  # dense_vectors字段
            metadata  # metadata字段
        ]
        self.collection.insert(entities)
        self.collection.flush()
    
    def search(self, query_embedding, top_k=10, search_params={"metric_type": "IP"}, sparse_search_params={"metric_type": "IP"}):
        self.collection.load()
        """相似性搜索"""
        self.search_params = search_params
        self.sparse_search_params = sparse_search_params
        
        sparse_req = AnnSearchRequest(query_embedding["sparse"],
                              "sparse_vectors", self.sparse_search_params, limit=top_k)
        dense_req = AnnSearchRequest(query_embedding["dense"],
                             "dense_vectors", self.search_params, limit=top_k)
        
        results = self.collection.hybrid_search(
            [sparse_req, dense_req],
            rerank=RRFRanker(),
            limit=top_k,
            output_fields=["text", "metadata"]
        )

        return [
                    {
                        "text": hit.entity.get('text'),
                        "metadata": hit.entity.get('metadata'),
                        "distance": hit.distance
                    }
                    for hit in results[0]
                ] # [hit.entity.get('text') for hit in results[0]]

In [9]:
# if len(texts) < 1e4:
index_params = {
    "index_type": "FLAT",  # 无需参数配置
    "metric_type": "IP"    # 根据需求选 L2/IP/COSINE
    }
sparse_index_params = {
    "index_type": "SPARSE_INVERTED_INDEX", 
    "metric_type": "IP"
    }
search_params = {"metric_type": "IP"}
sparse_search_params = {"metric_type": "IP"}
# else:
#     index_params = {
#         "index_type": "IVF_FLAT",
#         "metric_type": "IP",
#         "params": {
#             "nlist": min(256, int(np.sqrt(len(texts)))),  # N为数据总量
#         }
#     }
#     sparse_index_params = {
#         "index_type": "SPARSE_INVERTED_INDEX", 
#         "metric_type": "IP"
#     }
#     search_params = {
#         "metric_type": "IP",
#         "params": {"nprobe": index_params["params"]["nlist"] // 10}
#     }
#     sparse_search_params = {"metric_type": "IP"}
vector_db = VectorDB(collection_name=COLLECTION_NAME,dim=dim, index_params=index_params, sparse_index_params=sparse_index_params)
vector_db.insert(texts, sparse_vecs, dence_vecs, metadata)

In [10]:

def search_context(question, embedder, vector_db, top_k=10, search_params={"metric_type": "IP"}, sparse_search_params={"metric_type": "IP"}):
    """检索上下文"""
    # 生成问题嵌入
    query_embedding_raw = embedder.encode(
        [question], 
        return_dense=True, 
        return_sparse=True, 
        return_colbert_vecs=False)
    
    query_embedding = {}
    query_embedding["dense"] = query_embedding_raw["dense_vecs"]
    query_embedding["sparse"] = [dict(query_embedding_raw["lexical_weights"][0])]
    
    # 向量数据库检索
    results = vector_db.search(query_embedding, top_k=top_k, search_params=search_params, sparse_search_params=sparse_search_params)
    return results

In [11]:
from openai import OpenAI
openai_api_key = "token-abc123"
openai_api_base = "http://localhost:8000/v1"
client = OpenAI(
    api_key=openai_api_key,
    base_url=openai_api_base,
)

# 进行问答
while True:
    query = input("\n请输入问题（输入q退出）: ")
    if query.lower() == 'q':
        break
    
    results = search_context(query, embedder, vector_db,  top_k=10, search_params=search_params, sparse_search_params=sparse_search_params)
    context_chunks = [result['text'] for result in results]
    context = "\n".join(context_chunks)
    # 生成答案
    prompt = f"""请仅根据给定上下文回答问题。\n上下文：\n{context}\n问题：{query}\n答案："""
    completion = client.chat.completions.create(model="/llm/Qwen2.5-7B-Instruct", messages=[
            {"role": "system", "content": "请仅根据给定上下文回答问题。"},
            {"role": "user", "content": f"上下文：\n{context}\n问题：{query}\n答案："},
        ]
    )
    anwser = completion.choices[0].message.content
    print("=====================================")
    print("问题：", query)
    print("答案：", anwser)

问题： 导演的主要从事工作是？
答案： 导演的主要从事工作包括：
- MTV或广告导演
- 电视剧导演
- 电影导演
- 舞台导演
- 相应特定栏目后期工作人员


In [None]:

# 问答流程
def ask(query, generator, results):
    """问答流程"""
    context_chunks = [result['text'] for result in results]
    context = "\n".join(context_chunks)
    
    # 生成答案
    prompt = f"""请仅根据给定上下文回答问题。\n上下文：\n{context}\n问题：{query}\n答案："""
    # instruction = "给定上下文和问题，仅输出问题的答案."
    # user_input_text = f"上下文：\n{context}\n问题：{question}\n答案："
    # prompt = []
    # prompt.append(
    #             [
    #                 {"role": "system", "content": instruction},
    #                 {"role": "user", "content": user_input_text}
    #             ]
    #         )
    response = generator(
        prompt,
        max_new_tokens=256,
        temperature=0.7,
    )
    
    return response[0]['generated_text']

In [None]:
# 生成器初始化
generator = pipeline(
    "text-generation",
    model="/disk3/lsp/HippoRAG/src/meta-llama/Meta-Llama-3-8B-Instruct",
    device=device,
    torch_dtype=torch.bfloat16,  # 使用bfloat16加速
)

# 进行问答
while True:
    question = input("\n请输入问题（输入q退出）: ")
    if question.lower() == 'q':
        break
    
    results = search_context(question, embedder, vector_db, search_params, sparse_search_params)
    answer = ask(question, generator, results)
    print(f"\n{answer}")

In [None]:
# ====================
# 5. 问答系统实现
# ====================
class QASystem:
    def __init__(self):
        self.embedder = init_embedding_model()
        self.vector_db = VectorDB()
        self.generator = pipeline(
            "text-generation",
            model="/disk3/lsp/HippoRAG/src/meta-llama/Meta-Llama-3-8B-Instruct",
            device=1  # 使用CPU，GPU可设为0
        )
    
    def ingest_document(self, pdf_dir):
        """文档处理流水线"""
        docs = extract_text_from_pdf(pdf_dir)
        chunks = split_documents(docs)
        print('chunks: ',len(chunks))
        
        # 在 ingest_document 中增加字段提取（LangChain的Document对象包含metadata）
        def convert_metadata(metadata):
            for key, value in metadata.items():
                if isinstance(value, datetime):
                    metadata[key] = value.isoformat()
            return metadata
        
        texts = [doc.page_content for doc in chunks]
        metadata = [convert_metadata(doc.metadata) for doc in chunks]
        embeddings = self.embedder.embed_documents(texts)
        print('dim: ',len(embeddings[0]))
        self.vector_db.store_embeddings(texts, embeddings, metadata)
    
    def ask(self, question):
        """问答流程"""
        # 生成问题嵌入
        query_embedding = self.embedder.embed_query(question)
        
        # 向量数据库检索
        results = self.vector_db.search(query_embedding)
        context_chunks = [result['text'] for result in results]
        context = "\n".join(context_chunks)
        
        # 生成答案
        prompt = f"""请仅根据给定上下文回答问题，仅输出问题的答案：\n上下文：\n{context}\n问题：{question}\n答案："""
        response = self.generator(
            prompt,
            max_new_tokens=100,
            temperature=1.0,
        )
        
        return response[0]['generated_text']

In [None]:
# 初始化系统
qa_system = QASystem()

# 文档处理（只需运行一次）
qa_system.ingest_document("./")

In [None]:

# 进行问答
while True:
    question = input("\n请输入问题（输入q退出）: ")
    if question.lower() == 'q':
        break
        
    answer = qa_system.ask(question)
    print(f"\n{answer}")