# AGraph 快速入门指南

本教程展示如何使用AGraph构建和操作知识图谱。

## 目录
1. [基本设置](#基本设置)
2. [创建知识图谱](#创建知识图谱)
3. [添加实体](#添加实体)
4. [建立关系](#建立关系)
5. [查询图谱](#查询图谱)
6. [文档处理](#文档处理)
7. [实体位置追踪](#实体位置追踪) ⭐ 新增
8. [位置可视化](#位置可视化) ⭐ 新增
9. [性能优化](#性能优化)
10. [实际应用示例](#实际应用示例)

## 基本设置

首先导入必要的模块并配置环境：

In [1]:
import os
from agraph import AGraph, KnowledgeGraphBuilder
from agraph.base.graphs.optimized import OptimizedKnowledgeGraph
from agraph.base.models.entities import Entity
from agraph.base.models.relations import Relation
from agraph.base.models.text import TextChunk
from agraph.base.core.types import EntityType, RelationType
from agraph.config import get_settings

# 设置OpenAI API Key（如果使用LLM功能）
# os.environ["OPENAI_API_KEY"] = "your-api-key-here"

print("AGraph模块导入成功！")

AGraph模块导入成功！


## 创建知识图谱

使用优化的知识图谱实现，获得最佳性能：

In [2]:
# 创建优化的知识图谱实例
kg = OptimizedKnowledgeGraph()

# 查看图谱初始状态
print(f"图谱初始化完成")
print(f"实体数量: {len(kg.entities)}")
print(f"关系数量: {len(kg.relations)}")
print(f"缓存状态: {'启用' if kg.cache_manager else '禁用'}")

图谱初始化完成
实体数量: 0
关系数量: 0
缓存状态: 启用


## 添加实体

创建不同类型的实体并添加到图谱中：

In [3]:
# 创建人物实体
person1 = Entity(
    name="张三",
    entity_type=EntityType.PERSON,
    description="软件工程师",
    properties={"职位": "高级开发工程师", "技能": ["Python", "机器学习"]}
)

person2 = Entity(
    name="李四",
    entity_type=EntityType.PERSON,
    description="数据科学家",
    properties={"职位": "数据科学家", "专长": ["统计学", "深度学习"]}
)

# 创建组织实体
company = Entity(
    name="AI科技公司",
    entity_type=EntityType.ORGANIZATION,
    description="专注于人工智能技术的科技公司",
    properties={"行业": "人工智能", "成立年份": 2020}
)

# 创建概念实体
concept = Entity(
    name="知识图谱",
    entity_type=EntityType.CONCEPT,
    description="用于表示知识的图形化数据结构",
    properties={"领域": "人工智能", "应用场景": ["搜索", "推荐", "问答"]}
)

# 添加实体到图谱
entities = [person1, person2, company, concept]
for entity in entities:
    kg.add_entity(entity)

print(f"已添加 {len(entities)} 个实体")
print(f"当前图谱包含 {len(kg.entities)} 个实体")

# 显示添加的实体 - 修复：正确迭代实体字典的值
for entity in kg.entities.values():  # 使用 .values() 获取实体对象
    print(f"- {entity.name} ({entity.entity_type})")

已添加 4 个实体
当前图谱包含 4 个实体
- 张三 (person)
- 李四 (person)
- AI科技公司 (organization)
- 知识图谱 (concept)


## 建立关系

在实体之间创建有意义的关系：

In [5]:
# 创建工作关系
works_for_1 = Relation(
    head_entity=person1,
    tail_entity=company,
    relation_type=RelationType.WORKS_FOR,
    description="张三在AI科技公司工作",
    properties={"部门": "研发部", "入职时间": "2021-03-01"}
)

works_for_2 = Relation(
    head_entity=person2,
    tail_entity=company,
    relation_type=RelationType.WORKS_FOR,
    description="李四在AI科技公司工作",
    properties={"部门": "数据部", "入职时间": "2021-06-15"}
)

# 创建概念关系
works_with = Relation(
    head_entity=person1,
    tail_entity=concept,
    relation_type=custom_relation_type4.WORKS_WITH,
    description="张三从事知识图谱相关工作"
)

specializes_in = Relation(
    head_entity=person2,
    tail_entity=concept,
    relation_type=custom_relation_type4.SPECIALIZES_IN,
    description="李四专门研究知识图谱"
)

# 创建合作关系
collaborates = Relation(
    head_entity=person1,
    tail_entity=person2,
    relation_type=custom_relation_type4.COLLABORATES_WITH,
    description="张三和李四经常合作",
    properties={"项目": ["智能问答系统", "推荐引擎"]}
)

# 添加关系到图谱
relations = [works_for_1, works_for_2, works_with, specializes_in, collaborates]
for relation in relations:
    kg.add_relation(relation)

print(f"已添加 {len(relations)} 个关系")
print(f"当前图谱包含 {len(kg.relations)} 个关系")

# 显示关系
for relation in kg.relations:
    head_name = relation.head_entity.name if relation.head_entity else "未知"
    tail_name = relation.tail_entity.name if relation.tail_entity else "未知"
    print(f"- {head_name} --[{relation.relation_type}]--> {tail_name}")

TypeError: <enum 'WORKS_FOR'> cannot extend <enum 'RelationType'>

## 查询图谱

使用不同的查询方法检索信息：

In [None]:
# 1. 按名称查找实体
print("=== 按名称查找实体 ===")
zhang_san = kg.get_entity_by_name("张三")
if zhang_san:
    print(f"找到实体: {zhang_san.name} - {zhang_san.description}")
    print(f"属性: {zhang_san.properties}")

# 2. 按类型查找实体
print("\n=== 按类型查找实体 ===")
persons = kg.get_entities_by_type(EntityType.PERSON)
print(f"找到 {len(persons)} 个人物实体:")
for person in persons:
    print(f"  - {person.name}: {person.description}")

# 3. 查找实体的关系
print("\n=== 查找实体关系 ===")
if zhang_san:
    relations = kg.get_entity_relations(zhang_san.id)
    print(f"{zhang_san.name} 的关系:")
    for rel in relations:
        if rel.head_entity and rel.tail_entity:
            other_entity = rel.tail_entity if rel.head_entity.id == zhang_san.id else rel.head_entity
            print(f"  - 与 {other_entity.name} 的关系: {rel.relation_type}")

# 4. 按关系类型查找
print("\n=== 按关系类型查找 ===")
work_relations = kg.get_relations_by_type(RelationType.WORKS_FOR)
print(f"找到 {len(work_relations)} 个工作关系:")
for rel in work_relations:
    if rel.head_entity and rel.tail_entity:
        print(f"  - {rel.head_entity.name} 在 {rel.tail_entity.name} 工作")

## 文档处理

使用AGraph自动从文档中提取实体和关系：

In [None]:
# 准备示例文档
sample_documents = [
    """苹果公司是一家美国跨国科技公司，总部位于加利福尼亚州库比蒂诺。
    蒂姆·库克是苹果公司的首席执行官，他在2011年接替史蒂夫·乔布斯成为CEO。
    苹果公司主要设计、开发和销售消费电子产品、计算机软件和在线服务。""",
    
    """微软公司成立于1975年，由比尔·盖茨和保罗·艾伦创立。
    萨蒂亚·纳德拉现任微软公司首席执行官，于2014年上任。
    微软专注于开发操作系统、办公软件和云计算服务。""",
    
    """人工智能是计算机科学的一个分支，旨在创建能够执行通常需要人类智能的任务的系统。
    机器学习是人工智能的一个子集，专注于算法和统计模型的应用。
    深度学习又是机器学习的一个子集，使用神经网络来模拟人脑的学习过程。"""
]

# 创建文本块
text_chunks = []
for i, doc in enumerate(sample_documents):
    chunk = TextChunk(
        content=doc,
        source=f"document_{i+1}",
        metadata={"document_type": "sample", "language": "zh"}
    )
    text_chunks.append(chunk)

print(f"准备了 {len(text_chunks)} 个文档块")
for i, chunk in enumerate(text_chunks):
    print(f"文档 {i+1}: {chunk.content[:50]}...")

In [None]:
# 位置追踪的实用工具函数
print("=== 实用位置工具演示 ===")

def find_entities_near_position(entities, target_pos, radius=50):
    """查找指定位置附近的实体"""
    nearby_entities = []
    for entity in entities:
        char_pos = entity.get_char_position()
        if char_pos:
            start_pos, end_pos = char_pos
            center_pos = (start_pos + end_pos) // 2
            if abs(center_pos - target_pos) <= radius:
                distance = abs(center_pos - target_pos)
                nearby_entities.append((entity, distance))
    
    # 按距离排序
    nearby_entities.sort(key=lambda x: x[1])
    return nearby_entities

def create_entity_timeline(entities, text):
    """创建实体在文档中的时间线视图"""
    positioned = [(e, e.get_char_position()) for e in entities if e.has_position()]
    positioned.sort(key=lambda x: x[1][0])  # 按开始位置排序
    
    timeline = []
    for entity, (start_pos, end_pos) in positioned:
        # 获取上下文
        context_start = max(0, start_pos - 20)
        context_end = min(len(text), end_pos + 20)
        context = text[context_start:context_end].replace('\n', ' ')
        
        timeline.append({
            'entity': entity.name,
            'type': entity.entity_type.value,
            'position': (start_pos, end_pos),
            'context': context,
            'confidence': entity.get_position_confidence()
        })
    
    return timeline

# 1. 查找特定位置附近的实体
print("1. 位置附近实体查询:")
target_position = 50  # 第50个字符附近
nearby = find_entities_near_position(positioned_entities, target_position, radius=30)

print(f"   位置 {target_position} 附近30个字符内的实体:")
for entity, distance in nearby:
    print(f"     - {entity.name}: 距离 {distance} 字符")

if not nearby:
    print(f"     (无实体在指定范围内)")

# 2. 创建实体时间线
print(f"\n2. 实体文档时间线:")
timeline = create_entity_timeline(positioned_entities, test_document)

for i, item in enumerate(timeline, 1):
    print(f"   {i}. 【{item['type']}】{item['entity']}")
    print(f"      位置: {item['position']}")
    print(f"      置信度: {item['confidence']:.2f}")
    print(f"      上下文: ...{item['context']}...")
    print()

# 3. 位置质量评估
print("3. 位置质量评估:")
alignment_counts = {}
total_confidence = 0
confidence_count = 0

for entity in positioned_entities:
    if entity.has_position():
        status = entity.get_alignment_status()
        alignment_counts[status] = alignment_counts.get(status, 0) + 1
        
        confidence = entity.get_position_confidence()
        total_confidence += confidence
        confidence_count += 1

print(f"   对齐状态分布:")
for status, count in alignment_counts.items():
    print(f"     - {status.value}: {count} 个实体")

if confidence_count > 0:
    avg_confidence = total_confidence / confidence_count
    print(f"   平均置信度: {avg_confidence:.3f}")
    
print(f"\n✅ 实体位置追踪功能演示完成！")

In [None]:
# 使用AGraph自动提取和定位 (如果LLM可用)
print("=== AGraph自动提取和定位演示 ===")

try:
    # 创建AGraph实例进行自动提取
    agraph = AGraph(
        collection_name="positioning_demo",
        persist_directory="./temp_agraph",
        enable_knowledge_graph=True
    )
    
    await agraph.initialize()
    
    print("1. 使用AGraph自动从文档提取实体:")
    print(f"   文档: 测试文档 ({len(test_document)} 字符)")
    
    # 从文档构建知识图谱（这会自动提取实体并设置位置）
    auto_kg = await agraph.build_from_texts(
        texts=[test_document],
        graph_name="自动定位演示",
        use_cache=False,
        save_to_vector_store=False
    )
    
    print(f"   自动提取结果:")
    print(f"     实体数: {len(auto_kg.entities)}")
    print(f"     关系数: {len(auto_kg.relations)}")
    print(f"     文本块数: {len(auto_kg.text_chunks)}")
    
    # 检查自动提取的实体是否有位置信息
    auto_positioned_entities = [e for e in auto_kg.entities.values() if e.has_position()]
    print(f"     有位置信息的实体: {len(auto_positioned_entities)}")
    
    if auto_positioned_entities:
        print(f"\n2. 自动定位的实体详情:")
        for i, entity in enumerate(auto_positioned_entities[:5], 1):
            char_pos = entity.get_char_position()
            if char_pos:
                start_pos, end_pos = char_pos
                actual_text = test_document[start_pos:end_pos]
                print(f"   {i}. {entity.name} ({entity.entity_type.value})")
                print(f"      位置: [{start_pos}-{end_pos}]")
                print(f"      文档文本: '{actual_text}'")
                print(f"      对齐状态: {entity.get_alignment_status().value}")
                print()
    
    await agraph.close()
    
except Exception as e:
    print(f"自动提取功能暂不可用: {e}")
    print("这通常是因为未配置LLM API密钥")
    print("您可以设置环境变量 OPENAI_API_KEY 来启用自动提取功能")
    print("\n继续使用手动定位的演示结果...")

In [None]:
# 基于位置的高级分析
print("=== 基于位置的高级分析 ===")

# 1. 位置接近度分析
def calculate_proximity_score(entity1, entity2, max_distance=100):
    """计算两个实体的位置接近度分数"""
    pos1 = entity1.get_char_position()
    pos2 = entity2.get_char_position()
    
    if not (pos1 and pos2):
        return 0.0
    
    # 计算实体间的最小距离
    start1, end1 = pos1
    start2, end2 = pos2
    
    # 如果重叠，距离为0
    if start1 < end2 and start2 < end1:
        distance = 0
    else:
        # 计算非重叠实体间的距离
        distance = min(abs(start1 - end2), abs(start2 - end1))
    
    # 转换为接近度分数 (距离越近分数越高)
    proximity = max(0, (max_distance - distance) / max_distance)
    return proximity

print("1. 实体接近度分析:")
proximity_pairs = []

for i in range(len(positioned_entities)):
    for j in range(i+1, len(positioned_entities)):
        entity1, entity2 = positioned_entities[i], positioned_entities[j]
        proximity = calculate_proximity_score(entity1, entity2)
        
        if proximity > 0.1:  # 只显示接近度 > 0.1 的实体对
            proximity_pairs.append((entity1, entity2, proximity))

# 按接近度排序
proximity_pairs.sort(key=lambda x: x[2], reverse=True)

for entity1, entity2, proximity in proximity_pairs[:5]:  # 显示前5个最接近的
    pos1 = entity1.get_char_position()
    pos2 = entity2.get_char_position()
    print(f"  • {entity1.name} ↔ {entity2.name}")
    print(f"    位置: {pos1} ↔ {pos2}")
    print(f"    接近度: {proximity:.3f}")
    print()

# 2. 文档段落实体密度分析  
print("2. 文档段落实体密度:")
paragraphs = test_document.split('\n\n')
current_pos = 0

for i, paragraph in enumerate(paragraphs):
    if paragraph.strip():  # 忽略空段落
        para_start = current_pos
        para_end = current_pos + len(paragraph)
        
        # 统计该段落中的实体
        entities_in_para = []
        for entity in positioned_entities:
            char_pos = entity.get_char_position()
            if char_pos:
                start_pos, end_pos = char_pos
                # 检查实体是否在当前段落范围内
                if start_pos >= para_start and end_pos <= para_end:
                    entities_in_para.append(entity)
        
        entity_density = len(entities_in_para) / len(paragraph) if paragraph else 0
        
        print(f"  段落 {i+1}: 位置 [{para_start}-{para_end}]")
        print(f"    长度: {len(paragraph)} 字符")
        print(f"    实体数: {len(entities_in_para)}")
        print(f"    实体密度: {entity_density:.4f} (实体/字符)")
        
        if entities_in_para:
            entity_names = [e.name for e in entities_in_para]
            print(f"    包含实体: {', '.join(entity_names)}")
        print()
        
        current_pos += len(paragraph) + 2  # +2 for '\n\n'

In [None]:
# 交互式位置导航
print("\n=== 交互式位置导航演示 ===")

def show_entity_context(text, entity, context_chars=50):
    """显示实体及其上下文"""
    if not entity.has_position():
        return f"实体 '{entity.name}' 无位置信息"
    
    char_pos = entity.get_char_position()
    if not char_pos:
        return f"实体 '{entity.name}' 位置信息无效"
    
    start_pos, end_pos = char_pos
    
    # 计算上下文范围
    context_start = max(0, start_pos - context_chars)
    context_end = min(len(text), end_pos + context_chars)
    
    # 提取上下文
    before_text = text[context_start:start_pos]
    entity_text = text[start_pos:end_pos]
    after_text = text[end_pos:context_end]
    
    return {
        'before': before_text,
        'entity': entity_text,
        'after': after_text,
        'position': char_pos,
        'alignment': entity.get_alignment_status().value,
        'confidence': entity.get_position_confidence()
    }

# 为每个定位实体显示上下文
print("2. 实体上下文导航:")
for i, entity in enumerate(positioned_entities[:5], 1):  # 显示前5个
    context = show_entity_context(test_document, entity, context_chars=30)
    
    if isinstance(context, dict):
        print(f"\n  {i}. 📍 {entity.name} ({entity.entity_type.value})")
        print(f"     位置: {context['position']}")
        print(f"     对齐: {context['alignment']} (置信度: {context['confidence']:.2f})")
        print(f"     上下文: ...{context['before']}【{context['entity']}】{context['after']}...")
    else:
        print(f"\n  {i}. ❌ {entity.name}: {context}")

# 3. 位置分布可视化
print(f"\n3. 实体位置分布:")
print("   文档结构映射 (每10个字符一个单位):")

# 创建简单的位置分布图
doc_length = len(test_document)
units = doc_length // 10 + 1
position_map = [' ' for _ in range(units)]

for entity in positioned_entities:
    char_pos = entity.get_char_position()
    if char_pos:
        start_pos, end_pos = char_pos
        unit_pos = start_pos // 10
        if unit_pos < len(position_map):
            # 使用实体类型的首字母标记
            type_markers = {
                EntityType.PERSON: 'P',
                EntityType.PRODUCT: 'R', 
                EntityType.SOFTWARE: 'S',
                EntityType.CONCEPT: 'C'
            }
            marker = type_markers.get(entity.entity_type, 'X')
            position_map[unit_pos] = marker

# 显示位置分布
print("   " + "".join(position_map))
print("   " + "".join([str(i) if i % 5 == 0 else '·' for i in range(units)]))
print("   图例: P=人物, R=产品, S=软件, C=概念")

In [None]:
# 文本高亮可视化
print("=== 文本高亮可视化 ===")

def create_highlighted_text(text, entities, show_html=False):
    """创建带有实体高亮的文本显示"""
    # 按实体类型分配颜色
    entity_colors = {
        EntityType.PERSON: "🟦",      # 蓝色代表人物
        EntityType.PRODUCT: "🟩",     # 绿色代表产品
        EntityType.SOFTWARE: "🟨",    # 黄色代表软件
        EntityType.CONCEPT: "🟪",     # 紫色代表概念
        EntityType.ORGANIZATION: "🟧" # 橙色代表组织
    }
    
    # 创建位置点列表
    position_points = []
    for i, entity in enumerate(entities):
        if entity.has_position():
            char_pos = entity.get_char_position()
            if char_pos:
                start_pos, end_pos = char_pos
                position_points.append({
                    'pos': start_pos,
                    'type': 'start',
                    'entity': entity,
                    'index': i
                })
                position_points.append({
                    'pos': end_pos,
                    'type': 'end',
                    'entity': entity,
                    'index': i
                })
    
    # 按位置排序
    position_points.sort(key=lambda x: (x['pos'], x['type'] == 'start'))
    
    # 生成高亮文本
    highlighted_parts = []
    last_pos = 0
    
    for point in position_points:
        pos = point['pos']
        
        # 添加点之前的文本
        if pos > last_pos:
            highlighted_parts.append(text[last_pos:pos])
        
        # 添加标记
        if point['type'] == 'start':
            entity = point['entity']
            color = entity_colors.get(entity.entity_type, "⬜")
            if show_html:
                highlighted_parts.append(f'<mark style="background-color: {color};">')
            else:
                highlighted_parts.append(f'{color}[')
        else:
            if show_html:
                highlighted_parts.append('</mark>')
            else:
                highlighted_parts.append(']')
        
        last_pos = pos
    
    # 添加剩余文本
    if last_pos < len(text):
        highlighted_parts.append(text[last_pos:])
    
    return ''.join(highlighted_parts)

# 创建高亮文本
highlighted_text = create_highlighted_text(test_document, positioned_entities)

print("1. 文本高亮展示 (使用表情符号标记):")
print("   图例: 🟦人物 🟩产品 🟨软件 🟪概念")
print("-" * 60)
print(highlighted_text)
print("-" * 60)

## 位置可视化

基于实体的精确位置信息，我们可以创建富有洞察力的可视化展示。

In [None]:
# 位置信息的序列化和持久化
print("=== 位置序列化演示 ===")

# 选择一个实体进行详细演示
if positioned_entities:
    demo_entity = positioned_entities[0]
    print(f"演示实体: {demo_entity.name}")
    
    # 1. 序列化实体（包含位置信息）
    entity_dict = demo_entity.to_dict()
    print(f"\n1. 实体序列化:")
    print(f"   包含位置信息: {'position' in entity_dict}")
    
    if 'position' in entity_dict:
        position_data = entity_dict['position']
        print(f"   位置数据: {position_data}")
        
        # 2. 反序列化验证
        restored_entity = Entity.from_dict(entity_dict)
        print(f"\n2. 反序列化验证:")
        print(f"   位置恢复成功: {restored_entity.has_position()}")
        print(f"   位置数据一致: {restored_entity.get_char_position() == demo_entity.get_char_position()}")
        
        # 3. JSON格式展示
        import json
        position_json = json.dumps(position_data, ensure_ascii=False, indent=2)
        print(f"\n3. JSON格式位置数据:")
        print(position_json)

# 4. 批量位置统计
print(f"\n=== 位置统计信息 ===")
total_positioned = sum(1 for e in positioned_entities if e.has_position())
precise_positioned = sum(1 for e in positioned_entities if e.has_precise_position())

print(f"总实体数: {len(positioned_entities)}")
print(f"有位置信息的实体: {total_positioned}")
print(f"精确定位的实体: {precise_positioned}")
print(f"定位覆盖率: {total_positioned/len(positioned_entities)*100:.1f}%")
print(f"精确定位率: {precise_positioned/total_positioned*100:.1f}%" if total_positioned > 0 else "精确定位率: N/A")

In [None]:
# 位置信息分析和查询
print("=== 位置信息分析 ===")

# 1. 按位置排序显示实体
print("\n1. 按文档位置顺序显示实体:")
sorted_entities = sorted(positioned_entities, key=lambda e: e.get_char_position()[0] if e.get_char_position() else 0)

for i, entity in enumerate(sorted_entities, 1):
    char_pos = entity.get_char_position()
    alignment_status = entity.get_alignment_status()
    confidence = entity.get_position_confidence()
    
    print(f"  {i}. {entity.name} ({entity.entity_type.value})")
    print(f"     位置: {char_pos}")
    print(f"     对齐状态: {alignment_status.value}")
    print(f"     置信度: {confidence:.2f}")
    print()

# 2. 查找特定范围内的实体
print("2. 范围查询 - 前100个字符内的实体:")
entities_in_range = []
for entity in positioned_entities:
    char_pos = entity.get_char_position()
    if char_pos and char_pos[0] < 100:  # 实体开始位置在前100字符内
        entities_in_range.append(entity)

for entity in entities_in_range:
    char_pos = entity.get_char_position()
    print(f"  - {entity.name}: 位置 {char_pos}")

# 3. 重叠检测
print(f"\n3. 实体重叠检测:")
overlap_count = 0
for i in range(len(positioned_entities)):
    for j in range(i+1, len(positioned_entities)):
        if positioned_entities[i].overlaps_with(positioned_entities[j]):
            overlap_count += 1
            pos1 = positioned_entities[i].get_char_position()
            pos2 = positioned_entities[j].get_char_position()
            print(f"  重叠: {positioned_entities[i].name} {pos1} ↔ {positioned_entities[j].name} {pos2}")

if overlap_count == 0:
    print("  无实体重叠")

In [None]:
# 手动创建带位置信息的实体
print("=== 手动实体定位演示 ===")

# 在文档中查找实体位置
entities_to_locate = [
    ("智能推荐系统", EntityType.PRODUCT),
    ("张三博士", EntityType.PERSON), 
    ("李四", EntityType.PERSON),
    ("王五", EntityType.PERSON),
    ("Python", EntityType.SOFTWARE),
    ("TensorFlow", EntityType.SOFTWARE),
    ("深度学习", EntityType.CONCEPT)
]

positioned_entities = []

print("在文档中定位实体:")
for entity_name, entity_type in entities_to_locate:
    # 查找实体在文档中的位置
    start_pos = test_document.find(entity_name)
    if start_pos != -1:
        end_pos = start_pos + len(entity_name)
        
        # 创建带位置信息的实体
        entity = Entity(
            name=entity_name,
            entity_type=entity_type,
            description=f"从文档中提取的{entity_type.value}"
        )
        
        # 设置精确位置
        entity.set_char_position(
            start_pos, 
            end_pos, 
            AlignmentStatus.MATCH_EXACT, 
            confidence=1.0
        )
        
        positioned_entities.append(entity)
        
        # 验证位置准确性
        actual_text = test_document[start_pos:end_pos]
        print(f"  ✓ {entity_name}: 位置 [{start_pos}-{end_pos}], 文本 '{actual_text}'")
    else:
        print(f"  ✗ {entity_name}: 未在文档中找到")

print(f"\n成功定位 {len(positioned_entities)} 个实体")

In [None]:
# 演示实体位置追踪功能
from agraph.base.models.positioning import Position, CharInterval, AlignmentStatus
from agraph.chunker import TokenChunker

print("=== 实体位置追踪演示 ===")

# 创建包含明确实体的测试文档
test_document = """
数据科学项目报告

项目名称：智能推荐系统
负责人：张三博士
团队成员：李四（机器学习工程师）、王五（数据工程师）
项目状态：进行中
技术栈：Python、TensorFlow、Redis

项目描述：
本项目旨在为电商平台开发智能推荐算法。系统使用深度学习模型分析用户行为数据，
提供个性化商品推荐。目前已完成数据预处理模块和模型训练框架。

下一步计划：
1. 完善推荐算法优化
2. 部署到生产环境
3. 进行A/B测试验证效果
"""

print(f"测试文档长度: {len(test_document)} 字符")
print(f"文档内容预览: {test_document[:100]}...")

# 使用分块器获取位置信息
chunker = TokenChunker(chunk_size=200, chunk_overlap=50)
chunks_with_positions = chunker.split_text_with_positions(test_document)

print(f"\n文档分块结果: {len(chunks_with_positions)} 个块")
for i, (chunk_text, start_pos, end_pos) in enumerate(chunks_with_positions):
    print(f"  块 {i+1}: 位置 [{start_pos}-{end_pos}], 长度 {end_pos-start_pos}")
    print(f"    内容预览: {chunk_text[:60]}...")
    print()

## 实体位置追踪

AGraph 支持精确追踪每个实体在源文档中的位置，这对于可视化、验证和交互式分析非常有用。

In [None]:
# 使用KnowledgeGraphBuilder自动构建图谱
# 注意：这需要配置OpenAI API密钥
try:
    builder = KnowledgeGraphBuilder()
    
    # 检查是否配置了API密钥
    settings = get_settings()
    if not settings.openai_api_key:
        print("⚠️  未配置OpenAI API密钥，跳过自动抽取演示")
        print("要启用自动抽取功能，请设置环境变量 OPENAI_API_KEY")
    else:
        print("开始自动实体和关系抽取...")
        
        # 构建知识图谱
        built_kg = await builder.build_from_chunks(text_chunks)
        
        print(f"自动抽取完成！")
        print(f"抽取的实体数量: {len(built_kg.entities)}")
        print(f"抽取的关系数量: {len(built_kg.relations)}")
        
        # 显示抽取的实体
        print("\n抽取的实体:")
        for entity in built_kg.entities[:10]:  # 显示前10个
            print(f"  - {entity.name} ({entity.entity_type})")
        
        # 显示抽取的关系
        print("\n抽取的关系:")
        for relation in built_kg.relations[:5]:  # 显示前5个
            if relation.head_entity and relation.tail_entity:
                print(f"  - {relation.head_entity.name} --[{relation.relation_type}]--> {relation.tail_entity.name}")
        
        # 合并到现有图谱
        kg.merge_graph(built_kg)
        print(f"\n合并后图谱统计:")
        print(f"总实体数: {len(kg.entities)}")
        print(f"总关系数: {len(kg.relations)}")
        
except Exception as e:
    print(f"自动抽取过程中出现错误: {e}")
    print("继续使用手动创建的实体和关系...")

## 高级查询和分析

演示图谱的高级查询功能：

In [None]:
# 1. 邻居查询
print("=== 邻居查询 ===")
if zhang_san:
    neighbors = kg.get_neighbors(zhang_san.id)
    print(f"{zhang_san.name} 的邻居实体:")
    for neighbor in neighbors:
        print(f"  - {neighbor.name} ({neighbor.entity_type})")

# 2. 路径查询
print("\n=== 路径查询 ===")
if zhang_san and person2:
    paths = kg.find_shortest_path(zhang_san.id, person2.id)
    if paths:
        print(f"从 {zhang_san.name} 到 {person2.name} 的最短路径:")
        for i, entity_id in enumerate(paths):
            entity = kg.get_entity(entity_id)
            if entity:
                print(f"  {i+1}. {entity.name}")
    else:
        print(f"未找到从 {zhang_san.name} 到 {person2.name} 的路径")

# 3. 属性过滤查询
print("\n=== 属性过滤查询 ===")
def find_entities_by_property(kg, property_key, property_value):
    """根据属性查找实体"""
    matching_entities = []
    for entity in kg.entities:
        if property_key in entity.properties:
            prop_value = entity.properties[property_key]
            if (isinstance(prop_value, list) and property_value in prop_value) or prop_value == property_value:
                matching_entities.append(entity)
    return matching_entities

# 查找具有Python技能的实体
python_experts = find_entities_by_property(kg, "技能", "Python")
print(f"Python专家:")
for expert in python_experts:
    print(f"  - {expert.name}")

# 4. 图谱统计
print("\n=== 图谱统计信息 ===")
print(f"实体统计:")
entity_types = {}
for entity in kg.entities:
    entity_type = entity.entity_type
    entity_types[entity_type] = entity_types.get(entity_type, 0) + 1

for entity_type, count in entity_types.items():
    print(f"  - {entity_type}: {count}个")

print(f"\n关系统计:")
relation_types = {}
for relation in kg.relations:
    relation_type = relation.relation_type
    relation_types[relation_type] = relation_types.get(relation_type, 0) + 1

for relation_type, count in relation_types.items():
    print(f"  - {relation_type}: {count}个")

## 性能优化

展示AGraph的性能优化特性：

In [None]:
import time
from agraph.base.infrastructure.indexes import IndexManager

# 1. 索引性能测试
print("=== 索引性能测试 ===")

# 创建大量测试实体
print("创建测试实体...")
test_entities = []
for i in range(1000):
    entity = Entity(
        name=f"测试实体_{i}",
        entity_type=EntityType.CONCEPT,
        properties={"测试ID": i, "分类": f"类别_{i % 10}"}
    )
    test_entities.append(entity)
    kg.add_entity(entity)

print(f"已添加 {len(test_entities)} 个测试实体")
print(f"图谱现在包含 {len(kg.entities)} 个实体")

# 测试查询性能
print("\n测试查询性能...")
start_time = time.time()

# 执行多次查询
for i in range(100):
    entity = kg.get_entity_by_name(f"测试实体_{i * 10}")
    
end_time = time.time()
query_time = end_time - start_time

print(f"100次查询耗时: {query_time:.4f}秒")
print(f"平均每次查询: {query_time/100*1000:.2f}毫秒")

# 2. 缓存效果测试
print("\n=== 缓存效果测试 ===")
if kg.cache_manager:
    cache_stats = kg.cache_manager.get_cache_stats()
    print(f"缓存命中率: {cache_stats.get('hit_rate', 0):.1%}")
    print(f"缓存大小: {cache_stats.get('size', 0)}")
    print(f"缓存命中次数: {cache_stats.get('hits', 0)}")
    print(f"缓存未命中次数: {cache_stats.get('misses', 0)}")
else:
    print("缓存管理器未启用")

## 图谱序列化和持久化

演示如何保存和加载知识图谱：

In [None]:
import json
import tempfile
import os

# 1. 序列化图谱
print("=== 图谱序列化 ===")
graph_data = kg.to_dict()

print(f"序列化数据包含:")
print(f"  - 实体: {len(graph_data.get('entities', []))}")
print(f"  - 关系: {len(graph_data.get('relations', []))}")
print(f"  - 元数据: {list(graph_data.get('metadata', {}).keys())}")

# 2. 保存到文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
    json.dump(graph_data, f, ensure_ascii=False, indent=2)
    temp_file = f.name

print(f"\n图谱已保存到: {temp_file}")
print(f"文件大小: {os.path.getsize(temp_file) / 1024:.1f} KB")

# 3. 从文件加载新图谱
print("\n=== 图谱加载 ===")
new_kg = OptimizedKnowledgeGraph()

with open(temp_file, 'r', encoding='utf-8') as f:
    loaded_data = json.load(f)

new_kg.from_dict(loaded_data)

print(f"从文件加载的图谱:")
print(f"  - 实体数量: {len(new_kg.entities)}")
print(f"  - 关系数量: {len(new_kg.relations)}")

# 验证数据完整性
original_entity_names = {e.name for e in kg.entities}
loaded_entity_names = {e.name for e in new_kg.entities}
print(f"  - 数据完整性: {'✓' if original_entity_names == loaded_entity_names else '✗'}")

# 清理临时文件
os.unlink(temp_file)
print(f"已清理临时文件")

## 实际应用示例：构建企业知识图谱

展示一个完整的企业知识图谱构建流程：

In [None]:
# 创建企业知识图谱
enterprise_kg = OptimizedKnowledgeGraph()

# 定义企业结构
company_data = {
    "departments": [
        {"name": "技术部", "type": "部门", "负责人": "王经理"},
        {"name": "产品部", "type": "部门", "负责人": "林经理"},
        {"name": "市场部", "type": "部门", "负责人": "陈经理"}
    ],
    "employees": [
        {"name": "王经理", "部门": "技术部", "职位": "技术总监", "技能": ["架构设计", "团队管理"]},
        {"name": "林经理", "部门": "产品部", "职位": "产品总监", "技能": ["产品规划", "用户研究"]},
        {"name": "陈经理", "部门": "市场部", "职位": "市场总监", "技能": ["市场分析", "品牌推广"]},
        {"name": "小张", "部门": "技术部", "职位": "后端工程师", "技能": ["Python", "数据库"]},
        {"name": "小李", "部门": "技术部", "职位": "前端工程师", "技能": ["React", "Vue.js"]}
    ],
    "projects": [
        {"name": "智能推荐系统", "状态": "进行中", "负责部门": "技术部"},
        {"name": "用户体验优化", "状态": "规划中", "负责部门": "产品部"},
        {"name": "品牌重塑", "状态": "已完成", "负责部门": "市场部"}
    ]
}

# 创建实体
entities_map = {}

# 创建部门实体
for dept_info in company_data["departments"]:
    dept = Entity(
        name=dept_info["name"],
        entity_type=EntityType.ORGANIZATION,
        description=f"企业{dept_info['type']}",
        properties={"类型": dept_info["type"], "负责人": dept_info["负责人"]}
    )
    enterprise_kg.add_entity(dept)
    entities_map[dept.name] = dept

# 创建员工实体
for emp_info in company_data["employees"]:
    emp = Entity(
        name=emp_info["name"],
        entity_type=EntityType.PERSON,
        description=f"{emp_info['职位']}",
        properties={
            "部门": emp_info["部门"],
            "职位": emp_info["职位"],
            "技能": emp_info["技能"]
        }
    )
    enterprise_kg.add_entity(emp)
    entities_map[emp.name] = emp

# 创建项目实体
for proj_info in company_data["projects"]:
    proj = Entity(
        name=proj_info["name"],
        entity_type=EntityType.PROJECT,
        description=f"企业项目",
        properties={
            "状态": proj_info["状态"],
            "负责部门": proj_info["负责部门"]
        }
    )
    enterprise_kg.add_entity(proj)
    entities_map[proj.name] = proj

print(f"创建了 {len(entities_map)} 个企业实体")

In [None]:
# 建立企业关系
print("=== 建立企业关系 ===")

# 员工与部门的关系
for emp_info in company_data["employees"]:
    emp_entity = entities_map[emp_info["name"]]
    dept_entity = entities_map[emp_info["部门"]]
    
    belongs_to = Relation(
        head_entity=emp_entity,
        tail_entity=dept_entity,
        relation_type=RelationType.BELONGS_TO,
        description=f"{emp_info['name']}属于{emp_info['部门']}",
        properties={"职位": emp_info["职位"]}
    )
    enterprise_kg.add_relation(belongs_to)

# 项目与部门的关系
for proj_info in company_data["projects"]:
    proj_entity = entities_map[proj_info["name"]]
    dept_entity = entities_map[proj_info["负责部门"]]
    
    managed_by = Relation(
        head_entity=proj_entity,
        tail_entity=dept_entity,
        relation_type=RelationType.MANAGED_BY,
        description=f"{proj_info['name']}由{proj_info['负责部门']}负责",
        properties={"状态": proj_info["状态"]}
    )
    enterprise_kg.add_relation(managed_by)

# 管理关系
manager_relations = [
    ("王经理", "小张", "管理"),
    ("王经理", "小李", "管理")
]

for manager_name, employee_name, relation_desc in manager_relations:
    if manager_name in entities_map and employee_name in entities_map:
        manages = Relation(
            head_entity=entities_map[manager_name],
            tail_entity=entities_map[employee_name],
            relation_type=RelationType.MANAGES,
            description=f"{manager_name}{relation_desc}{employee_name}"
        )
        enterprise_kg.add_relation(manages)

print(f"企业知识图谱构建完成！")
print(f"总实体数: {len(enterprise_kg.entities)}")
print(f"总关系数: {len(enterprise_kg.relations)}")

## 企业图谱分析

对构建的企业知识图谱进行分析：

In [None]:
# 1. 组织结构分析
print("=== 组织结构分析 ===")

# 查找所有部门
departments = [e for e in enterprise_kg.entities if e.entity_type == EntityType.ORGANIZATION and "部门" in e.name]
print(f"部门列表:")
for dept in departments:
    print(f"  - {dept.name}")
    
    # 查找部门员工
    dept_relations = enterprise_kg.get_entity_relations(dept.id)
    employees_in_dept = []
    for rel in dept_relations:
        if rel.relation_type == RelationType.BELONGS_TO and rel.head_entity:
            if rel.head_entity.entity_type == EntityType.PERSON:
                employees_in_dept.append(rel.head_entity)
    
    print(f"    员工数量: {len(employees_in_dept)}")
    for emp in employees_in_dept:
        position = emp.properties.get("职位", "未知")
        print(f"      - {emp.name} ({position})")

# 2. 项目分析
print("\n=== 项目分析 ===")
projects = [e for e in enterprise_kg.entities if e.entity_type == EntityType.PROJECT]
print(f"项目统计:")

project_status = {}
for proj in projects:
    status = proj.properties.get("状态", "未知")
    project_status[status] = project_status.get(status, 0) + 1
    print(f"  - {proj.name}: {status}")

print(f"\n项目状态统计:")
for status, count in project_status.items():
    print(f"  - {status}: {count}个项目")

# 3. 技能分析
print("\n=== 技能分析 ===")
all_skills = {}
for entity in enterprise_kg.entities:
    if entity.entity_type == EntityType.PERSON and "技能" in entity.properties:
        skills = entity.properties["技能"]
        if isinstance(skills, list):
            for skill in skills:
                if skill not in all_skills:
                    all_skills[skill] = []
                all_skills[skill].append(entity.name)

print(f"技能分布:")
for skill, people in all_skills.items():
    print(f"  - {skill}: {', '.join(people)}")

## 图谱可视化

使用简单的文本格式可视化图谱结构：

In [None]:
def visualize_graph_structure(kg, max_entities=20):
    """简单的图谱结构可视化"""
    print("=== 图谱结构可视化 ===")
    
    entities_to_show = list(kg.entities)[:max_entities]
    
    for entity in entities_to_show:
        print(f"\n📍 {entity.name} ({entity.entity_type})")
        
        # 查找相关关系
        entity_relations = kg.get_entity_relations(entity.id)
        
        # 分类显示关系
        outgoing = []
        incoming = []
        
        for rel in entity_relations:
            if rel.head_entity and rel.tail_entity:
                if rel.head_entity.id == entity.id:
                    outgoing.append((rel.relation_type, rel.tail_entity.name))
                else:
                    incoming.append((rel.relation_type, rel.head_entity.name))
        
        # 显示出边
        if outgoing:
            print(f"  出边:")
            for rel_type, target_name in outgoing[:3]:  # 限制显示数量
                print(f"    └─[{rel_type}]→ {target_name}")
        
        # 显示入边
        if incoming:
            print(f"  入边:")
            for rel_type, source_name in incoming[:3]:  # 限制显示数量
                print(f"    ←─[{rel_type}]─┘ {source_name}")
        
        if not outgoing and not incoming:
            print(f"  (无关系)")

# 可视化企业图谱
visualize_graph_structure(enterprise_kg, max_entities=10)

# 图谱度量统计
print(f"\n=== 图谱度量 ===")
total_entities = len(enterprise_kg.entities)
total_relations = len(enterprise_kg.relations)
avg_degree = (2 * total_relations) / total_entities if total_entities > 0 else 0

print(f"节点数: {total_entities}")
print(f"边数: {total_relations}")
print(f"平均度数: {avg_degree:.2f}")
print(f"图密度: {2 * total_relations / (total_entities * (total_entities - 1)):.4f}" if total_entities > 1 else "图密度: N/A")

## 总结

本教程展示了AGraph的核心功能：

✅ **基本操作**：创建实体、建立关系、查询图谱  
✅ **高级查询**：邻居查询、路径查询、属性过滤  
✅ **实体定位**：精确追踪实体在源文档中的位置 ⭐ 新增  
✅ **位置可视化**：文本高亮、位置分布、上下文导航 ⭐ 新增  
✅ **性能优化**：索引加速、缓存机制  
✅ **数据持久化**：序列化和反序列化（含位置信息）  
✅ **实际应用**：企业知识图谱构建和分析  

### 实体定位功能亮点

🎯 **精确定位**：支持字符级和Token级双重定位  
🎯 **对齐状态**：精确匹配、模糊匹配、部分匹配等状态追踪  
🎯 **位置查询**：按位置范围查找实体、接近度分析  
🎯 **可视化支持**：文本高亮、实体分布图、上下文导航  
🎯 **持久化**：位置信息完整的序列化和反序列化  

### 下一步

- 探索更多高级功能：聚类分析、向量检索、对话系统
- 查看 `AGraph_Advanced_Features.ipynb` 了解高级特性
- 参考 API 文档了解更多配置选项
- 在实际项目中应用 AGraph 构建专业知识图谱

### 性能提示

1. **使用 OptimizedKnowledgeGraph**：获得最佳查询性能
2. **启用缓存**：频繁查询时显著提升性能
3. **批量操作**：一次添加多个实体比逐个添加更高效
4. **合理索引**：充分利用内置的7种索引类型
5. **位置优化**：使用位置信息加速范围查询和相关性分析 ⭐ 新增

In [None]:
# 最终的图谱状态
print("=== 最终图谱状态 ===")
print(f"主图谱实体数: {len(kg.entities)}")
print(f"主图谱关系数: {len(kg.relations)}")
print(f"企业图谱实体数: {len(enterprise_kg.entities)}")
print(f"企业图谱关系数: {len(enterprise_kg.relations)}")
print("\n🎉 AGraph快速入门教程完成！")