In [None]:
pip install langgraph langchain langchain_openai prometheus-api-client python-dotenv

In [None]:
import os
from typing import List, Dict, Any, TypedDict

from langchain_openai import AzureChatOpenAI
from prometheus_api_client import PrometheusConnect
from langgraph.graph import StateGraph, END
from IPython.display import display, Markdown


In [None]:
# =================================================================
#  请在这里填入您的 Azure OpenAI 凭证
# =================================================================
AZURE_OPENAI_ENDPOINT = ""  # 例如: "https://myhealthagentai.openai.azure.com/"
AZURE_OPENAI_API_KEY = ""
AZURE_OPENAI_DEPLOYMENT_NAME = "gpt-5-prometheus"
# =================================================================


# --- 初始化 Azure OpenAI LLM ---
print("正在初始化 Azure OpenAI LLM...")
try:
    llm = AzureChatOpenAI(
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2025-01-01-preview",
        azure_deployment=AZURE_OPENAI_DEPLOYMENT_NAME,
        temperature=1
    )
    # 简单测试一下LLM是否工作正常
    llm.invoke("你好")
    print("Azure OpenAI LLM 初始化并测试成功！")
except Exception as e:
    print(f"LLM 初始化失败，请检查您的凭证和网络连接。错误信息: {e}")


In [None]:
# --- 初始化 Prometheus 连接 ---
PROMETHEUS_URL = "http://xxxxx:9090"
try:
    prom_connect = PrometheusConnect(url=PROMETHEUS_URL, disable_ssl=True)
    # 检查是否能连接到 Prometheus
    if prom_connect.check_prometheus_connection():
        print(f"成功连接到 Prometheus at {PROMETHEUS_URL}")
    else:
        print(f"无法连接到 Prometheus，请确保 'kubectl port-forward' 命令正在运行。")
except Exception as e:
    print(f"连接 Prometheus 时出错: {e}")

In [None]:
class ClusterHealthState(TypedDict):
    """
    定义了在整个工作流中传递的数据结构 (状态)。
    """
    cluster_name: str
    metrics_to_query: List[str]
    prometheus_url: str
    raw_metrics_data: Dict[str, Any]
    analysis_result: str
    assessment_result: str
    final_report: str

print("Agent 状态定义完成。")

In [None]:
def fetch_metrics_node(state: ClusterHealthState) -> Dict[str, Any]:
    print("--- 正在执行: [数据获取代理] ---")
    metrics_to_query = state['metrics_to_query']
    collected_data = {}
    for metric_name in metrics_to_query:
        try:
            data = prom_connect.custom_query(query=metric_name)
            collected_data[metric_name] = data
        except Exception as e:
            collected_data[metric_name] = f"查询失败: {e}"
    print("--- [数据获取代理] 执行完毕 ---")
    return {"raw_metrics_data": collected_data}

def analyze_data_node(state: ClusterHealthState) -> Dict[str, Any]:
    print("--- 正在执行: [数据分析代理] (可能需要一些时间) ---")
    raw_data = state['raw_metrics_data']
    prompt = f"""
    你是一个资深的网站可靠性工程师(SRE)。你的任务是分析以下来自 Prometheus 的原始监控数据。
    请将这些复杂的 JSON 数据转换成简洁、易于理解的摘要。
    对于每个指标，请识别出：
    1.  涉及哪些实例 (instance) 或节点 (node)。
    2.  每个实例的当前值是多少。
    3.  如果数据是一个范围，请指出最大值、最小值和平均值。
    4.  识别任何看起来可能是异常或值得关注的数值（例如，一个节点负载远高于其他节点）。
    原始数据:
    ```json
    {raw_data}
    ```
    请生成你的分析摘要：
    """
    response = llm.invoke(prompt)
    analysis = response.content
    print("--- [数据分析代理] 执行完毕 ---")
    return {"analysis_result": analysis}

def assess_health_node(state: ClusterHealthState) -> Dict[str, Any]:
    print("--- 正在执行: [健康评估代理] ---")
    analysis = state['analysis_result']
    health_rules = """
    - 1分钟平均负载 (node_load1):
        - 0-2: 健康 (Healthy)
        - 2-5: 警告 (Warning) - 负载较高，需要关注。
        - >5: 严重 (Critical) - 负载过高，可能影响性能。
    - 节点可用内存百分比 (node_memory_available_percent):
        - >20%: 健康 (Healthy)
        - 10%-20%: 警告 (Warning) - 内存资源开始紧张。
        - <10%: 严重 (Critical) - 内存即将耗尽，有 OOM (Out of Memory) 风险。
    """
    prompt = f"""
    你是一位经验丰富的 DevOps 专家。根据以下数据分析摘要和健康评估规则，对集群的健康状况进行评估。
    你的任务是:
    1.  对每个节点和每个关键指标，根据规则判断其状态（健康、警告、严重）。
    2.  给出一个整体的集群健康评估（例如：整体健康，但有潜在风险）。
    3.  明确指出哪些节点或指标需要特别关注。
    
    健康评估规则:
    {health_rules}
    
    数据分析摘要:
    {analysis}
    
    请生成你的健康评估结论：
    """
    response = llm.invoke(prompt)
    assessment = response.content
    print("--- [健康评估代理] 执行完毕 ---")
    return {"assessment_result": assessment}

def generate_report_node(state: ClusterHealthState) -> Dict[str, Any]:
    print("--- 正在执行: [报告生成代理] ---")
    cluster_name = state['cluster_name']
    analysis = state['analysis_result']
    assessment = state['assessment_result']
    prompt = f"""
    你是一个AI运维助手，你的任务是为集群 "{cluster_name}" 生成一份清晰、专业的健康评估报告。
    请使用 Markdown 格式。报告应包含以下部分：
    1.  **集群健康评估报告**:
        -   **1. 集群健康状况**: 给出整体评估。
        -   **2. 异常点分析**: 详细描述异常点和可能原因。
        -   **3. 风险识别**: 指出潜在风险。
        -   **4. 优化建议**: 给出具体、可操作的建议。
    2.  **总结**: 对报告进行简要总结。
    请严格按照这个结构，并基于以下信息来撰写报告：
    [数据分析摘要]
    {analysis}
    [健康评估结论]
    {assessment}
    现在，请生成最终的 Markdown 格式报告：
    """
    response = llm.invoke(prompt)
    report = response.content
    print("--- [报告生成代理] 执行完毕 ---")
    return {"final_report": report}

print("所有 Agent 功能节点定义完成。")

In [None]:
workflow = StateGraph(ClusterHealthState)

# 添加节点
workflow.add_node("fetch_metrics", fetch_metrics_node)
workflow.add_node("analyze_data", analyze_data_node)
workflow.add_node("assess_health", assess_health_node)
workflow.add_node("generate_report", generate_report_node)

# 定义工作流的执行顺序
workflow.set_entry_point("fetch_metrics")
workflow.add_edge("fetch_metrics", "analyze_data")
workflow.add_edge("analyze_data", "assess_health")
workflow.add_edge("assess_health", "generate_report")
workflow.add_edge("generate_report", END)

# 编译成可执行的应用
app = workflow.compile()

print("LangGraph 工作流构建并编译完成！")

In [None]:
# 定义我们要分析的指标
node_memory_available_percent = '(node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100'

# 设置初始输入
inputs = {
    "cluster_name": "MyHealthAgentCluster",
    "prometheus_url": PROMETHEUS_URL,
    "metrics_to_query": [
        "node_load1",  # 1分钟平均负载
        node_memory_available_percent, # 节点可用内存百分比
    ]
}

# 运行工作流！
# 这个过程会依次调用我们定义的每个节点，并打印执行信息
print("开始执行工作流...")
final_state = app.invoke(inputs)
print("\n工作流执行完毕！最终报告已生成。")

In [None]:
# 从最终状态中获取报告内容
final_report_markdown = final_state['final_report']

# 使用 IPython.display.Markdown 在 Notebook 中漂亮地展示报告
print("========= 集群健康评估报告 (预览) =========")
display(Markdown(final_report_markdown))

# 将报告保存到文件中
report_filename = "cluster_health_report.md"
with open(report_filename, "w", encoding="utf-8") as f:
    f.write(final_report_markdown)

print(f"\n报告已成功保存到文件: {report_filename}")