# Task 4: 增强版RAG管道演示

本Notebook演示了增强版RAG管道的完整使用流程，包括技术优化和性能提升。

## 主要优化特性

1. **查询扩展**: 自动生成相关查询词，提高召回率
2. **混合重排序**: 结合TF-IDF和向量相似度进行文档重排序
3. **自适应提示词**: 根据问题类型选择最优提示词模板
4. **动态上下文构建**: 智能选择最相关的文档片段
5. **性能监控**: 实时跟踪各步骤的执行时间和效果

## 1. 环境准备

In [None]:
import os
import sys
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 检查必要的环境变量
required_vars = ['LOCAL_API_KEY', 'LOCAL_BASE_URL', 'LOCAL_TEXT_MODEL', 'LOCAL_EMBEDDING_MODEL']
missing_vars = [var for var in required_vars if not os.getenv(var)]

if missing_vars:
    print(f"警告: 缺少环境变量: {missing_vars}")
    print("请确保.env文件包含所有必要的配置")
else:
    print("✓ 环境变量配置完成")

## 2. 导入增强版RAG管道

In [None]:
from enhanced_rag_pipeline import EnhancedRAGPipeline

# 初始化增强版RAG管道
print("正在初始化增强版RAG管道...")

enhanced_rag = EnhancedRAGPipeline(
    index_path="output/knowledge_base.index",
    metadata_path="output/chunk_metadata.pkl",
    k=10,  # 初始检索更多文档
    final_k=5,  # 最终返回5个最相关文档
    enable_reranking=True,  # 启用重排序
    enable_query_expansion=True  # 启用查询扩展
)

print("✓ 增强版RAG管道初始化完成")

## 3. 测试不同类型的问题

我们将测试四种不同类型的问题，展示自适应提示词的效果：

In [None]:
# 定义测试问题集
test_questions = {
    '事实性问题': [
        "公司在2023年的营业收入是多少？",
        "公司的主要产品有哪些？",
        "公司成立于何时？"
    ],
    '分析性问题': [
        "公司业绩增长的主要原因是什么？",
        "如何评价公司的盈利能力？",
        "公司面临的主要风险因素有哪些？"
    ],
    '比较性问题': [
        "与同行业竞争对手相比，公司有哪些优势？",
        "公司各业务板块的盈利能力如何比较？",
        "公司产品与竞品的差异在哪里？"
    ],
    '预测性问题': [
        "公司未来的发展前景如何？",
        "行业发展趋势对公司有何影响？",
        "公司的战略规划是什么？"
    ]
}

print("测试问题集准备完成")

## 4. 执行测试并分析结果

In [None]:
import time

def test_question_category(category, questions):
    """测试特定类别的问题"""
    print(f"\n{'='*80}")
    print(f"测试类别: {category}")
    print(f"{'='*80}")
    
    results = []
    
    for i, question in enumerate(questions, 1):
        print(f"\n[{i}/{len(questions)}] 问题: {question}")
        print("-" * 60)
        
        try:
            # 执行查询
            start_time = time.time()
            result = enhanced_rag.answer_question(question)
            end_time = time.time()
            
            # 显示结果
            print(f"答案: {result['answer']}")
            print(f"\n主要来源: {result['filename']} (第{result['page']}页)")
            
            # 显示性能指标
            metrics = result.get('performance_metrics', {})
            print(f"\n性能指标:")
            print(f"  - 总耗时: {metrics.get('total_time', end_time - start_time):.2f}s")
            print(f"  - 检索耗时: {metrics.get('retrieval_time', 0):.2f}s")
            print(f"  - 生成耗时: {metrics.get('generation_time', 0):.2f}s")
            
            # 显示检索到的文档数量
            sources = result.get('sources', [])
            print(f"  - 检索文档数: {len(sources)}")
            
            results.append({
                'question': question,
                'success': True,
                'time': metrics.get('total_time', end_time - start_time),
                'sources_count': len(sources)
            })
            
        except Exception as e:
            print(f"错误: {e}")
            results.append({
                'question': question,
                'success': False,
                'error': str(e)
            })
    
    return results

In [None]:
# 执行所有测试
all_results = {}

for category, questions in test_questions.items():
    results = test_question_category(category, questions)
    all_results[category] = results

## 5. 性能分析和总结

In [None]:
# 分析测试结果
def analyze_results(all_results):
    """分析测试结果"""
    print(f"\n{'='*80}")
    print("测试结果分析")
    print(f"{'='*80}")
    
    total_questions = 0
    total_success = 0
    total_time = 0
    
    for category, results in all_results.items():
        success_count = sum(1 for r in results if r['success'])
        avg_time = sum(r.get('time', 0) for r in results if r['success']) / max(success_count, 1)
        
        print(f"\n{category}:")
        print(f"  - 成功率: {success_count}/{len(results)} ({success_count/len(results)*100:.1f}%)")
        print(f"  - 平均响应时间: {avg_time:.2f}s")
        
        total_questions += len(results)
        total_success += success_count
        total_time += sum(r.get('time', 0) for r in results if r['success'])
    
    print(f"\n总体统计:")
    print(f"  - 总成功率: {total_success}/{total_questions} ({total_success/total_questions*100:.1f}%)")
    print(f"  - 平均响应时间: {total_time/max(total_success, 1):.2f}s")

# 执行分析
analyze_results(all_results)

In [None]:
# 获取增强版RAG管道的性能报告
print(f"\n{'='*80}")
print("增强版RAG管道性能报告")
print(f"{'='*80}")

try:
    performance_report = enhanced_rag.get_performance_report()
    
    for metric, value in performance_report.items():
        if isinstance(value, float):
            print(f"{metric}: {value:.3f}")
        else:
            print(f"{metric}: {value}")
            
except Exception as e:
    print(f"获取性能报告失败: {e}")

## 6. 优化效果对比

### 增强版RAG vs 基础版RAG

| 特性 | 基础版RAG | 增强版RAG | 改进效果 |
| --- | --- | --- | --- |
| 检索策略 | 单一向量检索 | 混合检索+重排序 | 提高相关性 |
| 查询处理 | 原始查询 | 查询扩展 | 提高召回率 |
| 提示词 | 固定模板 | 自适应提示词 | 提高回答质量 |
| 上下文构建 | 固定长度 | 动态选择 | 优化信息密度 |
| 性能监控 | 无 | 全面监控 | 便于优化调试 |

### 技术优化要点

1. **混合检索**: 结合向量相似度和TF-IDF，平衡语义理解和关键词匹配
2. **查询扩展**: 自动生成相关查询词，扩大检索范围
3. **智能重排序**: 综合多种相似度指标，提升最终结果质量
4. **自适应提示词**: 根据问题类型选择最优提示词策略
5. **动态上下文**: 智能选择最相关文档片段，避免信息冗余

## 7. 使用建议

### 参数调优建议

```python
# 针对不同场景的参数配置

# 高精度场景（如法律、医疗）
enhanced_rag_precision = EnhancedRAGPipeline(
    k=15,  # 增加初始检索数量
    final_k=3,  # 减少最终文档数，提高精度
    enable_reranking=True,
    enable_query_expansion=False  # 关闭查询扩展，避免噪声
)

# 高召回场景（如客服、FAQ）
enhanced_rag_recall = EnhancedRAGPipeline(
    k=20,  # 大幅增加检索数量
    final_k=8,  # 增加最终文档数
    enable_reranking=True,
    enable_query_expansion=True  # 启用查询扩展
)

# 平衡场景（通用应用）
enhanced_rag_balanced = EnhancedRAGPipeline(
    k=10,
    final_k=5,
    enable_reranking=True,
    enable_query_expansion=True
)
```

### 性能优化建议

1. **批量处理**: 对于大量查询，考虑批量向量化以提高效率
2. **缓存机制**: 对常见查询结果进行缓存
3. **异步处理**: 对于实时性要求不高的场景，使用异步处理
4. **模型选择**: 根据精度和速度需求选择合适的嵌入模型
5. **硬件优化**: 使用GPU加速向量计算和模型推理