In [None]:
import os
import re
import gc
import time
import numpy as np
import faiss
import fitz  # PyMuPDF
from tqdm import tqdm
import ollama  # 确保已安装ollama Python包

# 配置常量
EMBEDDING_MODEL = "nomic-embed-text"  # 替换为你的嵌入模型
VECTOR_STORE_DIR = "./vector_stores"  # 向量存储保存目录
MODEL_NAME = "llama3.2:latest" 

In [None]:
def build_pdf_vector_store(directory_path, index_name="default_index", max_chars_per_page=50000, batch_size=50):
    """
    端到端构建PDF向量存储系统
    """
    # 创建存储目录
    os.makedirs(VECTOR_STORE_DIR, exist_ok=True)
    index_path = os.path.join(VECTOR_STORE_DIR, f"{index_name}.faiss")
    
    # 步骤1: 加载并处理PDF
    print(f"📂 开始处理目录: {directory_path}")
    all_chunks, metadata = load_pdfs_from_directory(directory_path, max_chars_per_page)
    
    if not all_chunks:
        print("⚠️ 未找到可处理的PDF文件，终止操作")
        return None, None, None
    
    # 步骤2: 创建向量存储
    print(f"\n🛠️ 开始创建向量存储 (共{len(all_chunks)}个文本块)")
    index, all_chunks, metadata = create_vector_store(all_chunks, metadata, batch_size)
    
    if index is None:
        print("❌ 向量存储创建失败")
        return None, None, None
    
    # 步骤3: 保存向量存储
    print(f"\n💾 保存向量存储到: {index_path}")
    faiss.write_index(index, index_path)
    
    # 保存元数据 (可选)
    metadata_path = os.path.join(VECTOR_STORE_DIR, f"{index_name}_metadata.pkl")
    with open(metadata_path, "wb") as f:
        import pickle
        pickle.dump({"chunks": all_chunks, "metadata": metadata}, f)
    
    print(f"✅ 向量存储构建完成! 索引大小: {index.ntotal} 个向量")
    return index, all_chunks, metadata

def load_pdfs_from_directory(directory_path, max_chars_per_page=50000):
    """流式加载PDF并提取文本，优化内存管理"""
    all_chunks = []
    metadata = []
    
    # 验证目录存在
    if not os.path.exists(directory_path):
        print(f"❌ 目录不存在: {directory_path}")
        return [], []
    
    pdf_files = [f for f in os.listdir(directory_path) 
                 if f.lower().endswith('.pdf')]
    
    if not pdf_files:
        print(f"ℹ️ 目录中没有PDF文件: {directory_path}")
        return [], []
    
    print(f"发现 {len(pdf_files)} 个PDF文件，开始处理...")
    
    for filename in tqdm(pdf_files, desc="📄 处理PDF文件"):
        filepath = os.path.join(directory_path, filename)
        try:
            # 使用PyMuPDF打开（内存效率高）
            with fitz.open(filepath) as doc:  # 使用上下文管理器自动关闭
                total_chunks = 0
                
                for page_num in range(len(doc)):
                    page = doc.load_page(page_num)
                    
                    # 核心文本提取（优化版）
                    page_text = page.get_text("text")  # 纯文本模式更快
                    
                    # 高效清理文本
                    clean_text = re.sub(r'\s+', ' ', page_text).strip()
                    if len(clean_text) > max_chars_per_page:
                        clean_text = clean_text[:max_chars_per_page]
                        print(f"⚠️ 截断超大页面: {filename} 第{page_num+1}页")
                    
                    # 分块处理
                    page_chunks = split_text(clean_text)
                    total_chunks += len(page_chunks)
                    
                    # 收集数据
                    for i, chunk in enumerate(page_chunks):
                        all_chunks.append(chunk)
                        metadata.append({
                            'filename': filename,
                            'filepath': filepath,
                            'page': page_num + 1,
                            'chunk_index': i,
                            'total_chunks': len(page_chunks),
                            'timestamp': time.time()
                        })
                    
                    # 主动释放内存
                    del page, page_text, clean_text
                    page = None  # 确保解除引用
                    
                    # 每5页手动GC一次
                    if page_num % 5 == 0:
                        gc.collect()
                
                print(f"✅ 成功加载: {filename} | 页数: {len(doc)} | 块数: {total_chunks}")
                
        except Exception as e:
            print(f"❌ 处理失败 {filename}: {str(e)[:200]}")
            # 失败时强制GC清理
            gc.collect()
    
    return all_chunks, metadata

def create_vector_store(all_chunks, metadata, batch_size=50):
    """批量创建向量存储，修复嵌入请求格式问题"""
    if not all_chunks:
        print("⚠️ 无文本块可用于创建向量存储")
        return None, [], []
    
    embeddings = []
    failed_indices = []
    print(f"生成嵌入向量 (共{len(all_chunks)}个块)...")
    
    # 分批处理避免内存溢出
    for i in tqdm(range(0, len(all_chunks), batch_size), desc="🧠 生成嵌入"):
        batch = all_chunks[i:i+batch_size]
        batch_embeddings = []  # 存储当前批次的嵌入
        
        try:
            # 处理批处理中的每个文本块（逐个请求）
            for j, text_chunk in enumerate(batch):
                chunk_index = i + j  # 全局索引
                
                for attempt in range(3):  # 重试机制
                    try:
                        # 关键修复：prompt必须是单个字符串
                        response = ollama.embeddings(
                            model=EMBEDDING_MODEL,
                            prompt=text_chunk  # 单个字符串而不是列表
                        )
                        
                        # 确保我们获取到嵌入向量
                        if 'embedding' in response:
                            batch_embeddings.append(response['embedding'])
                            break  # 成功则跳出重试循环
                        else:
                            raise ValueError("响应中缺少 'embedding' 字段")
                            
                    except Exception as e:
                        if attempt < 2:
                            print(f"🔄 重试 {attempt+1}/3: 块 {chunk_index} 嵌入请求失败 ({str(e)[:100]})")
                            time.sleep(2 ** attempt)  # 指数退避
                        else:
                            print(f"❌ 块 {chunk_index} 嵌入请求最终失败: {str(e)[:200]}")
                            failed_indices.append(chunk_index)
                            batch_embeddings.append(None)  # 占位符
                            break
            
            # 添加到总嵌入列表
            embeddings.extend(batch_embeddings)
            
        except Exception as e:
            print(f"❌ 处理批处理时发生意外错误: {str(e)[:200]}")
            # 标记整个批次为失败
            failed_indices.extend(range(i, min(i+batch_size, len(all_chunks))))
            embeddings.extend([None] * len(batch))
    
    # 移除失败的嵌入
    if failed_indices:
        print(f"⚠️ 移除了 {len(failed_indices)} 个失败的嵌入")
        # 反向遍历避免索引错位
        for idx in sorted(failed_indices, reverse=True):
            del all_chunks[idx]
            del metadata[idx]
            if idx < len(embeddings):
                del embeddings[idx]
    
    # 过滤掉None值
    valid_embeddings = [e for e in embeddings if e is not None]
    if not valid_embeddings:
        print("❌ 无有效嵌入可用于创建索引")
        return None, [], []
    
    # 确保所有向量维度一致
    dimension = len(valid_embeddings[0])
    for emb in valid_embeddings:
        if len(emb) != dimension:
            print(f"⚠️ 维度不一致: {len(emb)} vs {dimension}")
            # 处理不一致情况（这里简单跳过）
            valid_embeddings.remove(emb)
    
    # 创建FAISS索引
    try:
        index = faiss.IndexFlatL2(dimension)
        index.add(np.array(valid_embeddings, dtype=np.float32))
        print(f"索引创建成功! 维度: {dimension} | 向量数: {index.ntotal}")
        return index, all_chunks, metadata
    except Exception as e:
        print(f"❌ 创建FAISS索引失败: {str(e)[:200]}")
        return None, [], []

def split_text(text, chunk_size=2000, overlap=200):
    """将文本分割为重叠的块，增强边界处理"""
    chunks = []
    start = 0
    
    # 处理空文本
    if not text.strip():
        return []
    
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunk = text[start:end].strip()
        
        # 只在有内容时添加
        if chunk:
            chunks.append(chunk)
        
        # 检查是否到达文本末尾
        if end == len(text):
            break
            
        # 移动起始位置（考虑重叠）
        start = end - overlap
        if start < 0:
            start = 0
    
    return chunks

# 使用示例
if __name__ == "__main__":
    PDF_DIR = "/Users/stephenzhang/Desktop/AB_test"
    INDEX_NAME = "my_documents"
    
    index, chunks, meta = build_pdf_vector_store(
        directory_path=PDF_DIR,
        index_name=INDEX_NAME,
        max_chars_per_page=100000,  # 大页面限制
        batch_size=30               # 小批量避免OOM
    )
    
    if index:
        # 示例：检索相似内容
        query = "什么是机器学习?"
        query_embed = ollama.embeddings(model=EMBEDDING_MODEL, prompt=query)['embedding']

📂 开始处理目录: /Users/stephenzhang/Desktop/AB_test
发现 1 个PDF文件，开始处理...


📄 处理PDF文件: 100%|██████████| 1/1 [00:00<00:00,  5.82it/s]


✅ 成功加载: Energy statistics- A class of statistics based on distances.pdf | 页数: 25 | 块数: 60

🛠️ 开始创建向量存储 (共60个文本块)
生成嵌入向量 (共60个块)...


🧠 生成嵌入: 100%|██████████| 2/2 [00:05<00:00,  3.00s/it]

索引创建成功! 维度: 768 | 向量数: 60

💾 保存向量存储到: ./vector_stores/my_documents.faiss
✅ 向量存储构建完成! 索引大小: 60 个向量





In [None]:
def query_documents(question, index, chunks, metadata, model_name=MODEL_NAME, k=5):
    """
    使用RAG（检索增强生成）技术回答问题
    :param question: 用户问题
    :param index: FAISS索引
    :param chunks: 文本块列表
    :param metadata: 元数据列表
    :param model_name: 使用的Ollama模型名称
    :param k: 返回的相似文本块数量
    :return: 模型生成的答案
    """
    try:
        # 1. 将问题转换为嵌入向量
        response = ollama.embeddings(
            model=EMBEDDING_MODEL,
            prompt=question
        )
        query_embedding = response['embedding']
        
        # 2. 在FAISS索引中搜索相似内容
        query_embedding = np.array([query_embedding], dtype=np.float32)
        distances, indices = index.search(query_embedding, k)
        
        # 3. 检索相关文本块
        context_chunks = [chunks[i] for i in indices[0] if i < len(chunks)]
        context_metadata = [metadata[i] for i in indices[0] if i < len(metadata)]
        
        # 4. 构造上下文提示
        context = "\n\n".join([
            f"来源: {meta['filename']} 第{meta['page']}页\n内容: {chunk}"
            for chunk, meta in zip(context_chunks, context_metadata)
        ])
        
        # 5. 构造完整提示
        prompt = f"""
        你是一个专业的文档助手，请基于以下上下文信息回答问题。
        如果信息不在上下文中，请如实回答你不知道。
        
        上下文:
        {context}
        
        问题: {question}
        回答:
        """
        
        # 6. 调用模型生成回答
        response = ollama.chat(
            model=model_name,
            messages=[{'role': 'user', 'content': prompt}]
        )
        
        return response['message']['content'], context_metadata
        
    except Exception as e:
        return f"查询失败: {str(e)}", []

def interactive_chat(index, chunks, metadata, model_name="llama3"):
    """与文档进行交互式对话"""
    print("🚀 文档助手已启动! 输入 'exit' 退出")
    
    while True:
        try:
            # 获取用户输入
            question = input("\n👤 您的问题: ")
            if question.lower() in ['exit', 'quit']:
                break
                
            if not question.strip():
                continue
                
            # 查询文档
            start_time = time.time()
            answer, sources = query_documents(
                question, index, chunks, metadata, model_name
            )
            elapsed = time.time() - start_time
            
            # 显示结果
            print(f"\n🤖 助手 (响应时间: {elapsed:.2f}s):")
            print(answer)
            
            # 显示来源
            if sources:
                print("\n📚 来源:")
                for i, meta in enumerate(sources):
                    print(f"{i+1}. {meta['filename']} - 第{meta['page']}页")
        
        except KeyboardInterrupt:
            print("\n再见!")
            break
        except Exception as e:
            print(f"❌ 错误: {str(e)}")