# Part 1: Foundations - Graph Agent 基础入门

## 🎯 学习目标

通过本教程，您将掌握：
- 🤖 **理解Agent基本概念** - 什么是智能体和Graph Agent
- 🔧 **LLM Provider使用** - 多种LLM的统一调用
- 🕸️ **Graph Agent实现** - 创建简单的图结构智能体
- 📊 **基础实践** - 完成简单的PHM诊断案例

## 📋 预计时长：1.5-2小时

---

## 🛠️ 环境准备

In [None]:
# 导入必要的库
import sys
import os
from pathlib import Path

# 设置项目路径
project_root = Path.cwd().parent.parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / "src"))

# 导入我们的模块
from modules import agent_basics
from modules import llm_providers_unified
from modules import graph_agent_intro

print("✅ 环境准备完成")
print(f"项目根目录: {project_root}")

## 1. 什么是Agent？🤖

### 1.1 Agent的核心概念

**Agent（智能体）** 是一个能够：
- 📥 **感知**（Perceive）：接收和理解环境信息
- 🤔 **思考**（Think）：处理信息并进行决策
- 🎬 **行动**（Act）：执行决策并影响环境

让我们从最简单的Agent开始理解：

In [None]:
# 演示基础Agent概念
print("🤖 Agent基础概念演示")
agent_basics.demonstrate_agent_basics()

### 1.2 简单Agent示例

让我们创建一个基础的Agent来理解其工作原理：

In [None]:
# 创建简单Agent示例
from modules.agent_basics import SimpleAgent

# 演示基本Agent工作流程
simple_agent = SimpleAgent()
test_input = "轴承温度异常，需要诊断"
result = simple_agent.process(test_input)

print(f"输入: {test_input}")
print(f"输出: {result}")

### 1.3 Agent的局限性

传统的规则Agent有以下局限：
- 🔒 **固定规则** - 无法处理新情况
- 📏 **线性流程** - 感知→思考→行动的固定循环
- 🧠 **有限智能** - 缺乏推理和学习能力

这就是为什么我们需要更智能的**Graph Agent**！

---

## 2. LLM Provider集成 - 为Agent添加"大脑" 🧠

现代Agent使用大语言模型(LLM)作为"大脑"。让我们学习如何使用不同的LLM Provider：

In [None]:
# 检查可用的LLM Provider
from modules.llm_providers_unified import UnifiedLLMProvider, create_llm

print("🔍 检查LLM Provider可用性:")
available_providers = UnifiedLLMProvider.list_available_providers()

for provider, info in available_providers.items():
    status = "✅" if info["ready"] else "❌"
    print(f"{status} {provider}: {info['default_model']}")

# 选择最佳Provider
best_provider = UnifiedLLMProvider.get_best_available_provider()
print(f"\n🎯 使用Provider: {best_provider}")

# 测试LLM调用
llm = create_llm(best_provider)
test_query = "什么是预测性健康管理(PHM)？"
response = llm.invoke(test_query)
print(f"\n💬 测试查询: {test_query}")
print(f"📝 LLM回答: {response.content[:100]}...")

In [None]:
import ipywidgets as widgets
from IPython.display import display, clear_output
from modules.llm_providers_unified import create_llm

# 创建Provider选择器
available_providers = UnifiedLLMProvider.list_available_providers()
ready_providers = [p for p, info in available_providers.items() if info["ready"]]

provider_selector = widgets.Dropdown(
    options=ready_providers,
    value=ready_providers[0] if ready_providers else 'mock',
    description='Provider:',
    style={'description_width': 'initial'}
)

query_input = widgets.Textarea(
    value='什么是PHM（预测性健康管理）？请简要解释其核心概念。',
    description='查询内容:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='100%', height='80px')
)

test_button = widgets.Button(
    description='🧪 测试Provider',
    button_style='info',
    layout=widgets.Layout(width='150px')
)

output = widgets.Output()

def test_llm_provider(btn):
    with output:
        clear_output(wait=True)
        try:
            provider = provider_selector.value
            print(f"🤖 正在使用 {provider} Provider...")
            
            # 创建LLM实例
            llm = create_llm(provider=provider)
            
            # 发送查询
            import time
            start_time = time.time()
            response = llm.invoke(query_input.value)
            end_time = time.time()
            
            # 提取响应内容
            content = response.content if hasattr(response, 'content') else str(response)
            
            print(f"\n✅ 响应成功 (用时: {end_time - start_time:.2f}秒)")
            print(f"📝 响应长度: {len(content)} 字符")
            print(f"\n💬 {provider} 的回答:")
            print("-" * 50)
            print(content)
            
        except Exception as e:
            print(f"❌ 错误: {e}")
            print("请检查API密钥配置或网络连接")

test_button.on_click(test_llm_provider)

# 显示界面
display(widgets.VBox([
    widgets.HTML("<h3>🧪 LLM Provider 交互式测试</h3>"),
    provider_selector,
    query_input,
    test_button,
    output
]))

In [None]:
# 运行LLM Provider基准测试
print("🏃‍♂️ 开始LLM Provider基准测试...")
results = llm_providers_unified.demo_provider_comparison()

In [None]:
# 演示Graph Agent的基本概念
from modules.graph_agent_intro import SimpleGraphAgent

# 创建简单的Graph Agent
simple_graph = SimpleGraphAgent()

# 运行Graph Agent
test_input = "轴承温度传感器读数异常"
result = simple_graph.run(test_input)

print(f"📊 输入: {test_input}")
print(f"🔄 执行流程: {' → '.join(result['steps'])}")
print(f"✅ 最终结果: {result.get('result', 'N/A')}")

In [None]:
from modules.graph_agent_intro import LLMGraphAgent

# 创建LLM增强的Graph Agent
llm_graph = LLMGraphAgent(llm_provider=best_provider)

# 测试PHM场景
scenario = "设备振动频率突然增加，幅度超过正常值30%"
result = llm_graph.run(scenario)

print(f"🧪 测试场景: {scenario}")
print(f"🔄 执行步骤: {' → '.join(result['steps'])}")
print(f"🎯 LLM决策: {result.get('result', 'N/A')[:150]}...")

### 3.4 条件路由的Graph Agent

Graph Agent的强大之处在于能够根据状态进行条件分支。让我们看一个PHM诊断的例子：

In [None]:
from modules.graph_agent_intro import ConditionalGraphAgent

print("🌿 创建条件路由Graph Agent")

# 创建条件Graph Agent用于PHM诊断
conditional_graph = ConditionalGraphAgent(llm_provider=best_provider)

# 测试不同的传感器数据场景
test_cases = [
    {
        "name": "正常运行状态",
        "data": {"temperature": 0.45, "pressure": 0.52, "vibration": 0.48},
        "expected": "应该走快速检查路径"
    },
    {
        "name": "轻微异常",
        "data": {"temperature": 0.75, "pressure": 0.65, "vibration": 0.58},
        "expected": "可能需要详细分析"
    },
    {
        "name": "严重异常",
        "data": {"temperature": 0.95, "pressure": 0.15, "vibration": 0.88},
        "expected": "需要详细分析和行动规划"
    }
]

for test_case in test_cases:
    print(f"\n{'🔬 ' + test_case['name']:=^60}")
    print(f"📊 传感器数据: {test_case['data']}")
    print(f"📝 预期: {test_case['expected']}")
    
    result = conditional_graph.run(test_case["data"])
    
    print(f"\n✅ 实际执行路径: {' → '.join(result['steps_completed'])}")
    print(f"📈 置信分数: {result.get('confidence_score', 0):.2f}")
    print(f"🏥 诊断结果: {result.get('diagnosis', 'N/A')[:100]}...")
    
    if result.get('action_plan'):
        print(f"📋 行动计划: {result['action_plan'][:100]}...")

---

## 4. Graph Agent架构深度理解 🏗️

### 4.1 Graph Agent的核心组件

让我们深入理解Graph Agent的核心组件：

In [None]:
# 展示Graph Agent的内部结构
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator

# 1. 状态定义（TypedDict）
class PHMDiagnosisState(TypedDict):
    """PHM诊断状态定义"""
    sensor_readings: dict  # 传感器读数
    analysis_results: Annotated[list, operator.add]  # 分析结果（累加）
    current_step: str  # 当前步骤
    confidence_level: float  # 置信度
    recommendations: list  # 建议列表

print("🏗️ Graph Agent核心组件解析")
print("\n1️⃣ 状态定义（TypedDict）:")
print("   - 定义Agent在执行过程中维护的数据结构")
print("   - 类型安全，IDE友好")
print("   - 支持Annotated类型，如自动累加列表")

print("\n2️⃣ 节点函数（Node Functions）:")
print("   - 每个节点是一个纯函数")
print("   - 接收状态，返回状态更新")
print("   - 无副作用，易于测试")

print("\n3️⃣ 边和路由（Edges & Routing）:")
print("   - 固定边：add_edge(from, to)")
print("   - 条件边：add_conditional_edges(from, condition_func, mapping)")
print("   - 支持循环和复杂控制流")

print("\n4️⃣ 编译和执行（Compile & Invoke）:")
print("   - workflow.compile() 创建可执行的图")
print("   - graph.invoke(initial_state) 执行工作流")
print("   - 返回最终状态")

In [None]:
from typing import TypedDict, Annotated, Dict, Any
from langgraph.graph import StateGraph, END
import operator
import time

# 定义专用的PHM状态
class PHMDiagnosisState(TypedDict):
    sensor_data: Dict[str, float]
    analysis_history: Annotated[list, operator.add]
    current_diagnosis: str
    severity_level: str
    recommended_actions: list
    processing_time: float

class CustomPHMAgent:
    """自定义PHM诊断Graph Agent"""
    
    def __init__(self, llm_provider: str = "mock"):
        self.llm = create_llm(llm_provider, temperature=0.3)
        self.graph = self._build_phm_graph()
    
    def _build_phm_graph(self):
        """构建PHM诊断图"""
        
        def data_validation_node(state: PHMDiagnosisState) -> Dict[str, Any]:
            """数据验证节点"""
            sensor_data = state["sensor_data"]
            
            # 检查数据完整性
            required_sensors = ["temperature", "vibration", "pressure"]
            missing_sensors = [s for s in required_sensors if s not in sensor_data]
            
            if missing_sensors:
                validation_result = f"⚠️ 缺少传感器数据: {', '.join(missing_sensors)}"
            else:
                validation_result = "✅ 数据验证通过"
            
            return {
                "analysis_history": [f"数据验证: {validation_result}"],
                "processing_time": time.time()
            }
        
        def anomaly_detection_node(state: PHMDiagnosisState) -> Dict[str, Any]:
            """异常检测节点"""
            sensor_data = state["sensor_data"]
            
            # 简单的阈值检测
            anomalies = []
            thresholds = {
                "temperature": (0.2, 0.8),
                "vibration": (0.15, 0.85),  
                "pressure": (0.25, 0.75)
            }
            
            for sensor, value in sensor_data.items():
                if sensor in thresholds:
                    low, high = thresholds[sensor]
                    if value < low or value > high:
                        anomalies.append(f"{sensor}: {value:.2f}")
            
            if anomalies:
                severity = "high" if len(anomalies) >= 2 else "medium"
                diagnosis = f"检测到异常: {', '.join(anomalies)}"
            else:
                severity = "low"
                diagnosis = "系统运行正常"
            
            return {
                "analysis_history": [f"异常检测: {diagnosis}"],
                "current_diagnosis": diagnosis,
                "severity_level": severity
            }
        
        def llm_analysis_node(state: PHMDiagnosisState) -> Dict[str, Any]:
            """LLM深度分析节点"""
            sensor_data = state["sensor_data"]
            current_diagnosis = state["current_diagnosis"]
            
            prompt = f"""
            作为PHM专家，请分析以下诊断信息：
            
            传感器数据: {sensor_data}
            初步诊断: {current_diagnosis}
            
            请提供：
            1. 详细的根因分析
            2. 风险评估
            3. 具体的维护建议
            
            请用简洁明了的中文回答。
            """
            
            try:
                response = self.llm.invoke(prompt)
                expert_analysis = response.content if hasattr(response, 'content') else str(response)
            except Exception as e:
                expert_analysis = f"LLM分析失败: {e}"
            
            return {
                "analysis_history": [f"专家分析: {expert_analysis[:100]}..."],
                "current_diagnosis": expert_analysis
            }
        
        def action_planning_node(state: PHMDiagnosisState) -> Dict[str, Any]:
            """行动规划节点"""
            severity = state["severity_level"]
            diagnosis = state["current_diagnosis"]
            
            # 基于严重程度制定行动计划
            if severity == "high":
                actions = [
                    "立即停机检查",
                    "联系维护团队",
                    "准备备用设备"
                ]
            elif severity == "medium":
                actions = [
                    "增加监控频率",
                    "安排预防性维护",
                    "准备维护计划"
                ]
            else:
                actions = [
                    "继续正常监控",
                    "记录当前状态"
                ]
            
            return {
                "analysis_history": [f"制定行动计划: {len(actions)}项行动"],
                "recommended_actions": actions
            }
        
        # 条件路由函数
        def should_use_llm(state: PHMDiagnosisState) -> str:
            """决定是否需要LLM深度分析"""
            severity = state.get("severity_level", "low")
            return "llm_analysis" if severity in ["medium", "high"] else "action_planning"
        
        # 构建图结构
        workflow = StateGraph(PHMDiagnosisState)
        
        # 添加节点
        workflow.add_node("validate", data_validation_node)
        workflow.add_node("detect", anomaly_detection_node)
        workflow.add_node("llm_analysis", llm_analysis_node)
        workflow.add_node("plan", action_planning_node)
        
        # 定义流程
        workflow.set_entry_point("validate")
        workflow.add_edge("validate", "detect")
        
        # 条件路由：根据严重程度决定是否需要LLM分析
        workflow.add_conditional_edges(
            "detect",
            should_use_llm,
            {
                "llm_analysis": "llm_analysis",
                "action_planning": "plan"
            }
        )
        
        workflow.add_edge("llm_analysis", "plan")
        workflow.add_edge("plan", END)
        
        return workflow.compile()
    
    def diagnose(self, sensor_data: Dict[str, float]) -> PHMDiagnosisState:
        """执行PHM诊断"""
        initial_state = {
            "sensor_data": sensor_data,
            "analysis_history": [],
            "current_diagnosis": "",
            "severity_level": "unknown",
            "recommended_actions": [],
            "processing_time": 0.0
        }
        
        result = self.graph.invoke(initial_state)
        return result

print("🏭 创建自定义PHM Graph Agent")
custom_phm_agent = CustomPHMAgent(llm_provider=best_provider)
print("✅ 自定义PHM Agent创建完成")

In [None]:
# 测试不同复杂度的PHM场景
test_scenarios = [
    {
        "name": "正常运行",
        "data": {"temperature": 0.45, "vibration": 0.50, "pressure": 0.48},
        "description": "所有参数在正常范围内"
    },
    {
        "name": "单点异常", 
        "data": {"temperature": 0.85, "vibration": 0.45, "pressure": 0.50},
        "description": "温度偏高，其他正常"
    },
    {
        "name": "多点异常",
        "data": {"temperature": 0.92, "vibration": 0.88, "pressure": 0.15},
        "description": "多个参数超出正常范围"
    },
    {
        "name": "数据不完整",
        "data": {"temperature": 0.65, "pressure": 0.40},
        "description": "缺少振动数据"
    }
]

print("🧪 测试自定义PHM Graph Agent")

for i, scenario in enumerate(test_scenarios, 1):
    print(f"\n{'='*80}")
    print(f"🔬 测试场景 {i}: {scenario['name']}")
    print(f"📝 描述: {scenario['description']}")
    print(f"📊 数据: {scenario['data']}")
    print(f"{'='*80}")
    
    # 执行诊断
    result = custom_phm_agent.diagnose(scenario['data'])
    
    # 显示结果
    print(f"\n🔄 执行历史:")
    for j, step in enumerate(result['analysis_history'], 1):
        print(f"  {j}. {step}")
    
    print(f"\n📋 诊断结果:")
    print(f"  🎯 诊断: {result['current_diagnosis']}")
    print(f"  ⚡ 严重等级: {result['severity_level']}")
    
    print(f"\n📝 推荐行动:")
    for j, action in enumerate(result['recommended_actions'], 1):
        print(f"  {j}. {action}")
    
    print(f"\n⏱️ 处理时间: {result.get('processing_time', 0):.3f}s")

In [None]:
# 练习：完善这个简单的诊断Agent
class MyDiagnosticAgent:
    """我的诊断Agent - 请完善实现"""
    
    def __init__(self, llm_provider="mock"):
        self.llm = create_llm(llm_provider)
    
    def diagnose(self, sensor_data: dict) -> str:
        """诊断设备状态 - 请实现此方法"""
        # TODO: 添加您的诊断逻辑
        # 提示：可以检查sensor_data中的温度、振动等参数
        
        # 示例逻辑（请改进）:
        if sensor_data.get('temperature', 0) > 80:
            return "高温警告：建议检查散热系统"
        elif sensor_data.get('vibration', 0) > 5:
            return "振动异常：建议检查轴承"
        else:
            return "设备状态正常"

# 测试您的Agent
my_agent = MyDiagnosticAgent(best_provider)
test_data = {"temperature": 85, "vibration": 3.2, "pressure": 2.1}
diagnosis = my_agent.diagnose(test_data)

print(f"传感器数据: {test_data}")
print(f"诊断结果: {diagnosis}")
print("\\n💡 挑战：尝试添加更复杂的诊断逻辑！")

In [None]:
# 练习2：比较不同Provider的回答
question = "在工业设备预测性维护中，如何平衡预警的敏感性和误报率？"

# 获取可用的Provider
available_providers = UnifiedLLMProvider.list_available_providers()
ready_providers = [p for p, info in available_providers.items() if info["ready"]]

print(f"🤔 问题: {question}")
print(f"\n🤖 测试 {len(ready_providers)} 个Provider的回答:")
print("="*80)

responses = {}

for provider in ready_providers:
    try:
        print(f"\n🚀 {provider.upper()} Provider:")
        llm = create_llm(provider, temperature=0.7)
        
        import time
        start = time.time()
        response = llm.invoke(question)
        end = time.time()
        
        content = response.content if hasattr(response, 'content') else str(response)
        responses[provider] = {
            "content": content,
            "time": end - start,
            "length": len(content)
        }
        
        print(f"⏱️ 响应时间: {end - start:.2f}秒")
        print(f"📏 回答长度: {len(content)}字符")
        print(f"💬 回答预览: {content[:150]}...")
        
    except Exception as e:
        print(f"❌ {provider} 出现错误: {e}")
        responses[provider] = {"error": str(e)}

print(f"\n📊 比较总结:")
for provider, resp in responses.items():
    if "error" not in resp:
        print(f"  {provider}: {resp['time']:.2f}s, {resp['length']}字符")
    else:
        print(f"  {provider}: 错误")

print("\n🎯 思考: 哪个Provider的回答最符合您的需求？为什么？")

In [None]:
# 最终演示：Graph Agent的优势
graph_agent_intro.compare_traditional_vs_graph()

---

## 📚 补充资源

### 相关文档
- [LangGraph官方文档](https://langchain-ai.github.io/langgraph/)
- [LangChain Provider集成](https://python.langchain.com/docs/integrations/)
- [PHM基础概念](https://en.wikipedia.org/wiki/Prognostics_and_health_management)

### 常见问题
1. **Q: 为什么选择Graph Agent而不是传统Agent？**
   A: Graph Agent提供更好的可扩展性、可维护性和复杂决策能力。

2. **Q: 如何选择合适的LLM Provider？**
   A: 考虑响应速度、成本、语言支持和API稳定性。

3. **Q: Graph Agent适合什么样的应用场景？**
   A: 复杂工作流、多步决策、条件分支、并行处理等场景。