# Graph RAG 构建指南：参数配置详解

在构建基于图结构的检索增强生成（Graph RAG）系统时，合理的参数配置是确保系统性能的关键。下面将详细解析Graph RAG构建中的核心配置参数，并提供一个完整的Python配置类实现示例。

## 核心配置参数解析

### 1. 大语言模型（LLM）配置
LLM作为Graph RAG的"大脑"，负责生成回答和推理。以下是关键参数：

```python
class PolicyConfig:
    def __init__(self):
        # LLM 配置
        self.llm_model_name = "qwen-plus"  # 模型名称，示例使用通义千问增强版
        self.llm_api_key = "sk-"          # API访问密钥（需替换为实际值）
        self.llm_base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"  # 服务端点
```

**参数说明**：
- `llm_model_name`：指定使用的LLM模型版本，不同模型在理解能力、响应速度和上下文窗口上有差异
- `llm_api_key`：安全凭证，用于验证API访问权限（生产环境建议使用环境变量存储）
- `llm_base_url`：模型服务地址，云服务通常提供多个区域端点

### 2. 嵌入模型配置
嵌入模型负责将文本转换为向量表示，是图结构构建的基础：

```python
        # 嵌入模型配置
        self.embedding_api_key = ""       # 嵌入服务API密钥（可选）
        self.embedding_base_url = ""     # 嵌入服务端点（可选）
        self.embedding_model_name = "bge-m3"  # 预训练嵌入模型名称
        self.huggingface_model = "bge-small-zh-v1.5"  # 本地嵌入模型路径或名称
```

**参数说明**：
- 嵌入服务配置（`embedding_api_key`/`base_url`）：当使用云服务提供的嵌入API时需要配置
- `embedding_model_name`：预训练嵌入模型标识，影响向量维度和语义表示能力
- `huggingface_model`：支持本地部署的HuggingFace模型，示例使用中文优化的bge-small模型

**部署建议**：
- 生产环境建议同时配置云服务和本地模型作为容灾方案
- 中文场景推荐使用`bge-small-zh-v1.5`等中文优化模型

### 3. 工作目录与数据处理
```python
        # 工作目录
        self.working_dir = "./work_dir"  # 所有中间文件存储位置
        
        # 数据文件配置
        self.input_file = "政策法规.parquet"  # 输入数据文件（Parquet格式）
        self.process_limit = 5000  # 单次处理最大文档数
```

**参数说明**：
- `working_dir`：建议使用绝对路径避免路径问题，存放图数据库、向量索引等
- `input_file`：支持Parquet等列式存储格式，适合大规模文档处理
- `process_limit`：内存优化参数，防止处理超大文档集时内存溢出

## 完整的构建代码如下

In [None]:
"""
PolicyGraphRAG 数据处理模块
用于处理政策数据并构建知识图谱索引
"""
import asyncio
import traceback
from typing import Dict, List, Tuple, Any
import pandas as pd
from tqdm import tqdm

from policy_graphrag.data_model.node import Node
from policy_graphrag.data_model.edge import Edge
from policy_graphrag.embeddings.hugging_face import HuggingFaceEmbedding
from policy_graphrag.embeddings import OpenaiEmbedding
from policy_graphrag.llms import OpenAIProvider
from policy_graphrag import PolicyGraphRAG


class PolicyConfig:
    """配置管理类"""
    
    def __init__(self):
        # LLM 配置
        self.llm_model_name = "qwen-plus"
        self.llm_api_key = "sk-"
        self.llm_base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
        
        # 嵌入模型配置
        self.embedding_api_key = ""
        self.embedding_base_url = ""
        self.embedding_model_name = "bge-m3"
        self.huggingface_model = "bge-small-zh-v1.5"
        
        # 工作目录
        self.working_dir = "./work_dir"
        
        # 数据文件配置
        self.input_file = "政策法规.parquet"
        self.process_limit = 5000


class PolicyProcessor:
    """政策数据处理器"""
    
    def __init__(self, config: PolicyConfig):
        self.config = config
        self.llm = self._init_llm()
        self.embed = self._init_embedding()
        self.pgr = self._init_policy_graphrag()
        
        # 定义字段映射关系
        self.field_mapping = {
            "issuing_authority": {"entity_type": "组织机构", "description": "制定机关"},
            "document_number": {"entity_type": "发文字号", "description": "发文字号"},
            "publish_date": {"entity_type": "时间节点", "description": "发布日期"},
            "effective_date": {"entity_type": "时间节点", "description": "实施日期"},
            "timeliness": {"entity_type": "时效性", "description": "时效性"},
            "effectiveness_level": {"entity_type": "效力位阶", "description": "效力位阶"},
            "category": {"entity_type": "法规类别", "description": "法规类别"},
        }
    
    def _init_llm(self) -> OpenAIProvider:
        """初始化LLM提供者"""
        return OpenAIProvider(
            config={
                "api_key": self.config.llm_api_key,
                "base_url": self.config.llm_base_url,
                "model_name": self.config.llm_model_name,
                "temperature": 0.3
            }
        )
    
    def _init_embedding(self) -> Any:
        """初始化嵌入提供者"""
        # 使用 HuggingFace 嵌入模型
        return HuggingFaceEmbedding(
            config={
                "embedding_model": self.config.huggingface_model,
            }
        )
        
        # 或者使用 OpenAI 嵌入模型（取消注释使用）
        # return OpenaiEmbedding(
        #     config={
        #         "api_key": self.config.embedding_api_key,
        #         "base_url": self.config.embedding_base_url,
        #         "model_name": self.config.embedding_model_name
        #     }
        # )
    
    def _init_policy_graphrag(self) -> PolicyGraphRAG:
        """初始化PolicyGraphRAG"""
        return PolicyGraphRAG(
            working_dir=self.config.working_dir,
            llm_provider=self.llm,
            embed_provider=self.embed
        )
    
    def parse_policy(self, item: Dict[str, Any]) -> Tuple[List[Node], List[Edge]]:
        """
        解析政策数据项，生成节点和边
        
        Args:
            item: 政策数据项
            
        Returns:
            tuple: (节点列表, 边列表)
        """
        source_nodes = []
        source_edges = []
        
        # 添加政策标题节点
        policy_title = item.get("title", "")
        if policy_title:
            title_node = Node(
                name=policy_title,
                entity_type="政策",
                description="政策标题"
            )
            source_nodes.append(title_node)
        
        for field, mapping in self.field_mapping.items():
            value = item.get(field)
            
            if not value:
                continue
                
            entity_type = mapping["entity_type"]
            description = mapping["description"]
            
            # 处理日期字段
            if field in ["publish_date", "effective_date"]:
                if hasattr(value, 'strftime'):
                    date_str = value.strftime("%Y-%m-%d")
                    node_name = date_str
                else:
                    date_str = str(value)
                    node_name = date_str
            else:
                node_name = str(value)
            
            # 创建节点
            node = Node(
                name=node_name,
                entity_type=entity_type,
                description=""
            )
            source_nodes.append(node)
            
            # 创建边（从政策标题指向该实体）
            edge = Edge(
                source=policy_title,
                target=node_name,
                description=description
            )
            source_edges.append(edge)
        
        return source_nodes, source_edges
    
    async def process_policy_item(self, item: Dict[str, Any]) -> bool:
        """
        处理单个政策数据项
        
        Args:
            item: 政策数据项
            
        Returns:
            bool: 处理是否成功
        """
        try:
            source_nodes, source_edges = self.parse_policy(item)
            content = item.get("content_markdown", "")
            policy_name = item.get("title", "")
            
            if not content or not policy_name:
                print(f"警告: 缺少内容或标题，跳过 {policy_name}")
                return False
            
            publish_date = item.get("publish_date")
            print(f"处理中 - 发布日期: {publish_date}, 政策名称: {policy_name}")
            
            # 构建索引
            await self.pgr.index(
                policy_name=policy_name, 
                content=content, 
                source_nodes=source_nodes, 
                source_edges=source_edges
            )
            await self.pgr.index_naive(policy_name=policy_name, content=content)
            
            return True
            
        except Exception as e:
            print(f"处理政策项失败 {item.get('title', 'Unknown')}: {str(e)}")
            traceback.print_exc()
            return False

In [None]:
async def main():
    """主函数"""
    print("开始处理政策数据...")
    
    # 初始化配置和处理器
    config = PolicyConfig()
    processor = PolicyProcessor(config)
    
    # 加载数据
    print(f"加载数据文件: {config.input_file}")
    try:
        df = pd.read_parquet(config.input_file)
        print(f"数据加载成功，总记录数: {len(df)}")
    except Exception as e:
        print(f"数据加载失败: {str(e)}")
        return
    
    # 限制处理数量
    to_process_df = df[:config.process_limit]
    print(f"计划处理记录数: {len(to_process_df)}")
    
    # 处理数据
    success_count = 0
    error_count = 0
    errors = []
    
    for idx, item in enumerate(tqdm(
        to_process_df.to_dict(orient="records"), 
        total=len(to_process_df), 
        desc="构建索引"
    )):
        try:
            success = await processor.process_policy_item(item)
            if success:
                success_count += 1
            else:
                error_count += 1
                errors.append(item.get("title", f"记录_{idx}"))
        except Exception as e:
            error_count += 1
            errors.append(f"{item.get('title', f'记录_{idx}')} - {str(e)}")
            print(f"处理记录 {idx} 时发生异常: {str(e)}")
    
    print("开始进行社区划分...")
    await processor.pgr.update_graph_community_report()
    
    # 输出统计结果
    print("\n处理完成!")
    print(f"成功处理: {success_count} 条")
    print(f"处理失败: {error_count} 条")
    print(f"成功率: {success_count / (success_count + error_count) * 100:.2f}%" if (success_count + error_count) > 0 else "0%")
    
    if errors:
        print(f"\n失败的记录: {errors[:10]}")  # 只显示前10个错误
        if len(errors) > 10:
            print(f"... 还有 {len(errors) - 10} 条错误记录")

In [None]:
await main()