diff --git a/DESIGN.md b/DESIGN.md index f52de78..94d2a6f 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -154,16 +154,125 @@ KUBECONFIG_PATH = xxx (Optional参数,只有当KUBECONFIG_MODE = LOCAL 时生 ### 访问可观测数据 -优先访问ACK集群对应的阿里云Prometheus服务数据,如没有对应服务,通过env参数寻找可观测数据的访问地址。 -通过配置可指定[Prometheus Read HTTP API](https://prometheus.io/docs/prometheus/latest/querying/api/)。 +#### Prometheus 端点解析策略 + +ack-mcp-server 支持三种 Prometheus 端点解析模式,通过 `prometheus_endpoint_mode` 参数配置: + +**1. ARMS_PUBLIC(默认模式)** + +通过阿里云 ARMS API 自动获取集群对应的 Prometheus 实例公网端点,失败时回退到本地配置: + +```bash +# 命令行参数 +--prometheus-endpoint-mode ARMS_PUBLIC + +# 环境变量 +export PROMETHEUS_ENDPOINT_MODE=ARMS_PUBLIC +``` + +- 调用 ARMS GetPrometheusInstance API 获取 `http_api_inter_url`(公网访问地址) +- 适用于 ack-mcp-server 部署在集群外部的场景 +- ARMS API 失败时自动回退到本地配置 + +**2. ARMS_PRIVATE(内网模式)** + +通过阿里云 ARMS API 自动获取集群对应的 Prometheus 实例内网端点: + +```bash +# 命令行参数 +--prometheus-endpoint-mode ARMS_PRIVATE + +# 环境变量 +export PROMETHEUS_ENDPOINT_MODE=ARMS_PRIVATE +``` + +- 调用 ARMS GetPrometheusInstance API 获取 `http_api_intra_url`(内网访问地址) +- 适用于 ack-mcp-server 部署在集群内部或与阿里云 VPC 内网打通的场景 +- **要求**:ack-mcp-server 所在部署环境需与对应 region 阿里云 VPC 内网网域打通 +- ARMS API 失败时自动回退到本地配置 + +**3. LOCAL(本地配置模式)** + +仅使用本地静态配置或环境变量,不调用 ARMS API: + +```bash +# 命令行参数 +--prometheus-endpoint-mode LOCAL + +# 环境变量 +export PROMETHEUS_ENDPOINT_MODE=LOCAL +``` + +- 不调用任何 ARMS API +- 适用于使用自建 Prometheus 或开发测试环境 +- 必须通过环境变量或静态配置指定 Prometheus 端点 + +#### Prometheus 端点配置 + +当使用 `LOCAL` 模式或 ARMS API 回退时,按如下优先级查找 Prometheus HTTP API 端点: -当该集群没有阿里云Prometheus对应实例数据,ack-mcp-server将按按如下优先级寻找={prometheus_http_api_url}访问可观测数据。 ```shell -env参数配置: +# 1. 集群特定配置(优先级最高) PROMETHEUS_HTTP_API_{cluster_id}={prometheus_http_api_url} + +# 2. 全局默认配置 PROMETHEUS_HTTP_API={prometheus_http_api_url} + +# 示例 +export PROMETHEUS_HTTP_API_c1234567890="https://prometheus-cluster1.example.com" +export PROMETHEUS_HTTP_API="https://prometheus-default.example.com" ``` +#### ExecutionLog 可观测性 + +所有 Prometheus 端点解析过程都记录在 `ExecutionLog` 中,包括: + +- **mode**: 使用的解析模式(`ARMS_PUBLIC`、`ARMS_PRIVATE` 或 `LOCAL`) +- **source**: 端点来源(`arms_api`、`static_config` 或 `env_var:XXX`) +- **endpoint_type**: 端点类型(`public` 或 `private`,仅 ARMS 模式) +- **request_id**: ARMS API 调用的请求 ID(如适用) +- **duration_ms**: API 调用耗时(如适用) +- **endpoint**: 最终解析的端点地址 + +示例 ExecutionLog(ARMS_PUBLIC 模式): + +```json +{ + "api_calls": [ + { + "api": "GetPrometheusInstance", + "source": "arms_api", + "mode": "ARMS_PUBLIC", + "cluster_id": "c1234567890", + "region_id": "cn-hangzhou", + "request_id": "B8A0D7C3-...", + "duration_ms": 245, + "status": "success", + "endpoint_type": "public" + } + ] +} +``` + +示例 ExecutionLog(LOCAL 模式): + +```json +{ + "api_calls": [ + { + "api": "GetPrometheusEndpoint", + "source": "env_var:PROMETHEUS_HTTP_API_c1234567890", + "mode": "LOCAL", + "cluster_id": "c1234567890", + "endpoint": "https://prometheus-cluster1.example.com", + "status": "success" + } + ] +} +``` + +通过配置可指定[Prometheus Read HTTP API](https://prometheus.io/docs/prometheus/latest/querying/api/)。 + ## 包命名和版本管理 ### 项目命名 @@ -404,6 +513,253 @@ async def query_prometheus_tool( MCP 服务器实现两种主要类型的端点: +### 执行日志追踪 (ExecutionLog) + +#### 设计目标 + +所有 ack-mcp-server 工具调用都实现完整的执行日志追踪,记录工具执行的全生命周期,包括: +- 工具调用的起止时间和总耗时 +- 所有外部 API 调用(ACK、ARMS、SLS 等)的详细信息 +- 执行过程中的警告信息 +- 错误信息和异常元数据 + +这些日志用于审计、性能监控、问题诊断和系统可观测性。 + +#### ExecutionLog 数据结构 + +```python +class ExecutionLog(BaseModel): + """执行日志模型""" + tool_call_id: str = Field(..., description="工具调用的唯一标识符") + start_time: str = Field(..., description="执行开始时间(ISO 8601格式)") + end_time: Optional[str] = Field(None, description="执行结束时间(ISO 8601格式)") + duration_ms: Optional[int] = Field(None, description="总执行时长(毫秒)") + messages: List[str] = Field(default_factory=list, description="执行过程中的消息") + api_calls: List[Dict[str, Any]] = Field(default_factory=list, description="API 调用记录列表") + warnings: List[str] = Field(default_factory=list, description="警告信息列表") + error: Optional[str] = Field(None, description="错误信息") + metadata: Optional[Dict[str, Any]] = Field(None, description="额外的元数据信息") +``` + +#### 实现原则 + +**1. 成功场景 - 精简日志** + +正常成功的执行保持日志精简,仅记录关键信息: +- API 调用名称、请求 ID、耗时、状态 +- 避免冗余的描述性消息 +- 不填充 metadata 字段 + +```python +execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": "B8A0D7C3-...", + "duration_ms": 234, + "status": "success" +}) +``` + +**2. 错误场景 - 详细日志** + +错误场景记录完整的诊断信息: +- 错误类型、错误码、失败阶段 +- 详细的错误消息和堆栈信息 +- 上下文元数据(请求参数、状态等) + +```python +execution_log.error = "Cluster endpoint not available" +execution_log.metadata = { + "error_type": "ValueError", + "error_code": "EndpointNotFound", + "failure_stage": "kubeconfig_acquisition", + "cluster_id": cluster_id, + "kubeconfig_mode": "ACK_PRIVATE" +} +``` + +**3. 外部调用追踪** + +所有外部 API 调用都必须记录: +- **阿里云 OpenAPI**:记录 request_id、duration_ms、http_status +- **Prometheus HTTP API**:记录 response_size_bytes、endpoint +- **Kubectl 命令**:记录 command、exit_code、type (normal/streaming) +- **Kubeconfig 获取**:记录 source (cache/ack_api/local_file/incluster) + +#### 使用示例 + +**工具初始化执行日志** + +```python +@mcp.tool(name='query_prometheus') +async def query_prometheus( + ctx: Context, + query: str = Field(..., description="PromQL 查询语句"), + cluster_id: str = Field(..., description="集群 ID"), +) -> QueryPrometheusOutput: + # 初始化执行日志 + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"query_prometheus_{cluster_id}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + + try: + # ... 执行业务逻辑 ... + + # 记录结束时间 + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + + return QueryPrometheusOutput( + resultType="matrix", + result=results, + execution_log=execution_log + ) + except Exception as e: + execution_log.error = str(e) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "failure_stage": "prometheus_query" + } + return { + "error": ErrorModel(error_code="QueryFailed", error_message=str(e)).model_dump(), + "execution_log": execution_log + } +``` + +**API 调用追踪** + +```python +# ACK OpenAPI 调用 +api_start = int(time.time() * 1000) +response = await cs_client.describe_cluster_detail_with_options_async( + cluster_id, request, headers, runtime +) +api_duration = int(time.time() * 1000) - api_start + +# 提取 request_id +request_id = None +if hasattr(response, 'headers') and response.headers: + request_id = response.headers.get('x-acs-request-id', 'N/A') + +execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success" +}) +``` + +**轮询场景 - 合并中间日志** + +对于需要轮询的异步操作(如诊断任务、巡检任务),需要合并中间轮询调用的执行日志: + +```python +# 提取轮询调用的 ExecutionLog +if isinstance(result, dict) and "execution_log" in result: + poll_execution_log = result.get("execution_log") +elif hasattr(result, 'execution_log'): + poll_execution_log = result.execution_log + +# 合并到主执行日志 +if poll_execution_log: + if hasattr(poll_execution_log, 'api_calls'): + execution_log.api_calls.extend(poll_execution_log.api_calls) + if hasattr(poll_execution_log, 'warnings') and poll_execution_log.warnings: + execution_log.warnings.extend(poll_execution_log.warnings) +``` + +#### 输出模型标准 + +所有工具的输出模型必须继承 `BaseOutputModel` 以包含 `execution_log` 字段: + +```python +class BaseOutputModel(BaseModel): + """所有输出模型的基类""" + execution_log: ExecutionLog = Field( + default_factory=lambda: ExecutionLog( + tool_call_id="", + start_time=datetime.utcnow().isoformat() + "Z" + ), + description="执行日志" + ) + +class QueryPrometheusOutput(BaseOutputModel): + """Prometheus 查询输出""" + resultType: str = Field(..., description="结果类型") + result: List[QueryPrometheusSeriesPoint] = Field(..., description="查询结果") + # execution_log 自动继承 +``` + +#### 完整示例日志 + +**成功场景**: +```json +{ + "tool_call_id": "query_prometheus_c1234567890_1763624189", + "start_time": "2025-01-19T10:23:09Z", + "end_time": "2025-01-19T10:23:10Z", + "duration_ms": 1245, + "api_calls": [ + { + "api": "GetPrometheusInstance", + "source": "arms_api", + "mode": "ARMS_PUBLIC", + "cluster_id": "c1234567890", + "region_id": "cn-hangzhou", + "request_id": "B8A0D7C3-1D2E-4F5A-9B8C-7D6E5F4A3B2C", + "duration_ms": 245, + "status": "success", + "endpoint_type": "public" + }, + { + "api": "PrometheusQuery", + "endpoint": "https://prometheus.cn-hangzhou.aliyuncs.com/api/v1/query_range", + "cluster_id": "c1234567890", + "duration_ms": 856, + "status": "success", + "http_status": 200, + "response_size_bytes": 3456 + } + ], + "warnings": [], + "error": null, + "metadata": null +} +``` + +**错误场景**: +```json +{ + "tool_call_id": "ack_kubectl_c1234567890_1763624289", + "start_time": "2025-01-19T10:24:49Z", + "end_time": "2025-01-19T10:24:50Z", + "duration_ms": 567, + "api_calls": [ + { + "api": "DescribeClusterDetail", + "cluster_id": "c1234567890", + "request_id": "A7B2C6D4-...", + "duration_ms": 234, + "status": "failed", + "error": "No intranet endpoint" + } + ], + "warnings": [], + "error": "Cluster c1234567890 does not have intranet endpoint access", + "metadata": { + "error_type": "ValueError", + "failure_stage": "kubeconfig_acquisition", + "kubeconfig_mode": "ACK_PRIVATE", + "cluster_id": "c1234567890" + } +} +``` + ### 资源定义 MCP协议中,“资源”为定制化地请求和访问本地的资源 (Resources allow servers to share data that provides context to language models, such as files, database schemas, or application-specific information. Each resource is uniquely identified by a URI.) diff --git a/pyproject.toml b/pyproject.toml index e469a59..b534bf1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "alibabacloud-ack-mcp-server" -version = "1.0.0" +version = "1.0.1" description = "AlibabaCloud Container Service MCP Server (ack-mcp-server)" readme = "README.md" requires-python = ">=3.12" diff --git a/src/ack_audit_log_handler.py b/src/ack_audit_log_handler.py index 8b7cbef..24d68d6 100644 --- a/src/ack_audit_log_handler.py +++ b/src/ack_audit_log_handler.py @@ -4,7 +4,8 @@ from loguru import logger from pydantic import Field import json -from datetime import datetime, timedelta +import time +from datetime import datetime, timedelta, timezone from alibabacloud_tea_util import models as util_models try: @@ -14,12 +15,15 @@ AuditLogEntry, ErrorModel, AuditLogErrorCodes, - GetCurrentTimeOutput + GetCurrentTimeOutput, + ExecutionLog ) except ImportError: from models import ( ErrorModel, - GetCurrentTimeOutput + GetCurrentTimeOutput, + ExecutionLog, + enable_execution_log_ctx ) @@ -89,6 +93,8 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): "sts": "statefulsets", "ing": "ingresses", } + # Per-handler toggle + self.enable_execution_log = self.settings.get("enable_execution_log", False) if server is None: return self.server = server @@ -195,7 +201,10 @@ async def query_audit_logs(self, """Query Kubernetes audit logs.""" if not cluster_id: raise ValueError("cluster_id is required") - + + # Set per-request context from handler setting + enable_execution_log_ctx.set(self.enable_execution_log) + # 预处理参数:将JSON字符串转换为列表 processed_verbs = self._parse_list_param(verbs) processed_resource_types = self._parse_list_param(resource_types) @@ -247,9 +256,18 @@ async def get_current_time(self) -> GetCurrentTimeOutput: Returns: GetCurrentTimeOutput: 包含当前时间的 ISO 8601 格式和 Unix 时间戳格式 """ + # Initialize execution log + enable_execution_log_ctx.set(self.enable_execution_log) + execution_log = ExecutionLog( + tool_call_id=f"get_current_time_{int(time.time() * 1000)}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + start_ms = int(time.time() * 1000) + try: - from datetime import datetime, timezone + # Using module-level datetime and timezone imports + execution_log.messages.append("Fetching current time") # 获取当前 UTC 时间 current_time = datetime.now(timezone.utc) @@ -258,14 +276,27 @@ async def get_current_time(self) -> GetCurrentTimeOutput: # 转换为 Unix 时间戳(秒级) current_time_unix = int(current_time.timestamp()) + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms return GetCurrentTimeOutput( current_time_iso=current_time_iso, current_time_unix=current_time_unix, - timezone="UTC" + timezone="UTC", + execution_log=execution_log ) except Exception as e: + execution_log.error = str(e) + execution_log.messages.append(f"Failed to get current time: {str(e)}") + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_code": "TIME_FETCH_ERROR", + "failure_stage": "get_current_time_operation" + } + return GetCurrentTimeOutput( current_time_iso="", current_time_unix=0, @@ -273,7 +304,8 @@ async def get_current_time(self) -> GetCurrentTimeOutput: error=ErrorModel( error_code="TIME_FETCH_ERROR", error_message=f"Failed to get current time: {str(e)}" - ) + ), + execution_log=execution_log ) def _normalize_params(self, params: Dict[str, Any]) -> Dict[str, Any]: @@ -307,7 +339,7 @@ def _normalize_params(self, params: Dict[str, Any]) -> Dict[str, Any]: return params - def _get_cluster_region(self, cs_client, cluster_id: str) -> str: + def _get_cluster_region(self, cs_client, cluster_id: str) -> tuple[Optional[str], Optional[str], Optional[str]]: """通过DescribeClusterDetail获取集群的region信息 Args: @@ -315,28 +347,40 @@ def _get_cluster_region(self, cs_client, cluster_id: str) -> str: cluster_id: 集群ID Returns: - 集群所在的region + tuple: (region_id, request_id, error_message) + - region_id: 集群所在的region,失败时为None + - request_id: API请求的request_id,用于追踪 + - error_message: 错误信息,成功时为None """ + request_id = None try: - # 调用DescribeClusterDetail API获取集群详情 detail_response = cs_client.describe_cluster_detail(cluster_id) + + # 提取request_id + if hasattr(detail_response, 'headers') and detail_response.headers: + request_id = detail_response.headers.get('x-acs-request-id', 'N/A') if not detail_response or not detail_response.body: - raise ValueError(f"Failed to get cluster details for {cluster_id}") + error_msg = f"Failed to get cluster details for {cluster_id}" + logger.error(error_msg) + return None, request_id, error_msg cluster_info = detail_response.body # 获取集群的region信息 region = getattr(cluster_info, 'region_id', '') if not region: - raise ValueError(f"Could not determine region for cluster {cluster_id}") + error_msg = f"Could not determine region for cluster {cluster_id}" + logger.error(error_msg) + return None, request_id, error_msg - return region + return region, request_id, None except Exception as e: - logger.error(f"Failed to get cluster region for {cluster_id}: {e}") - raise ValueError(f"Failed to get cluster region for {cluster_id}: {e}") + error_msg = f"Failed to get cluster region for {cluster_id}: {str(e)}" + logger.error(error_msg) + return None, request_id, error_msg def _parse_list_param(self, param: Optional[str]) -> Optional[List[str]]: """解析列表参数,将JSON字符串转换为Python列表 @@ -376,6 +420,13 @@ def query_audit_log_sync( cluster_id: Optional[str] = None ) -> Dict[str, Any]: """Query Kubernetes audit logs (synchronous version).""" + # Initialize execution log + execution_log = ExecutionLog( + tool_call_id=f"query_audit_log_{int(time.time() * 1000)}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + start_ms = int(time.time() * 1000) + # Collect parameters into a dict params = { "namespace": namespace, @@ -388,57 +439,132 @@ def query_audit_log_sync( "limit": limit, "cluster_id": cluster_id } - if self.cs_client is None: - cs_client = _get_cs_client(ctx, "CENTER") - self.cs_client = cs_client - region_id = self._get_cluster_region(self.cs_client, cluster_id) - self.sls_client = _get_sls_client(ctx, region_id) + + try: + if self.cs_client is None: + cs_client = _get_cs_client(ctx, "CENTER") + self.cs_client = cs_client + + # Get cluster region + execution_log.messages.append("Fetching cluster region") + api_start = int(time.time() * 1000) + region_id, request_id, error = self._get_cluster_region(self.cs_client, cluster_id) + api_duration = int(time.time() * 1000) - api_start + + if error: + # Failed to get cluster region + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error + }) + execution_log.messages.append(f"Failed to get cluster region: {error}") + raise ValueError(error) + else: + # Successfully got cluster region + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "region": region_id + }) + execution_log.messages.append(f"Cluster region retrieved: {region_id} in {api_duration}ms, requestId: {request_id}") + + self.sls_client = _get_sls_client(ctx, region_id) - # Normalize parameters - normalized_params = self._normalize_params(params) + # Normalize parameters + normalized_params = self._normalize_params(params) - try: # Query the audit logs using the provider (sync version) query = self._build_query(normalized_params) + execution_log.messages.append(f"Building SLS query, query: {query}") # Parse time parameters - start_time, end_time = self._parse_time_params(normalized_params) - + start_time_ts, end_time_ts = self._parse_time_params(normalized_params) + execution_log.messages.append(f"Query time range: {start_time_ts} to {end_time_ts}") + + if not self.sls_client: + execution_log.warnings.append(f"sls client is None") + raise RuntimeError("SLS client not properly initialized") + + # Get audit SLS project and logstore + api_start = int(time.time() * 1000) + audit_sls_project, audit_sls_logstore, request_id, error = self._get_audit_sls_project_and_logstore(cluster_id) + api_duration = int(time.time() * 1000) - api_start + if error: + execution_log.api_calls.append({ + "api": "GetClusterAuditProject", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error + }) + execution_log.messages.append(f"Failed to get audit project: {error}") + raise ValueError(error) + else: + execution_log.api_calls.append({ + "api": "GetClusterAuditProject", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "project": audit_sls_project, + "logstore": audit_sls_logstore + }) + execution_log.messages.append(f"Audit project: {audit_sls_project}, logstore: {audit_sls_logstore}, requestId: {request_id}") + + # Use real SLS client - 直接调用同步方法 + execution_log.messages.append(f"Querying SLS logs with query: {query}") try: - if not self.sls_client: - raise RuntimeError("SLS client not properly initialized") - - # 一个集群的审计日志 sls project、logstore 需要从OpenAPI中调用后获取 or 能通过配置自定义 - # https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/developer-reference/api-cs-2015-12-15-getclusterauditproject?spm=a2c4g.11186623.0.i5#undefined - audit_sls_project, audit_sls_logstore = self._get_audit_sls_project_and_logstore(cluster_id) - - # Use real SLS client - 直接调用同步方法 - return self._query_logs(audit_sls_project, audit_sls_logstore, query, start_time, end_time, - normalized_params) - + result = self._query_logs(audit_sls_project, audit_sls_logstore, query, start_time_ts, end_time_ts, + normalized_params, execution_log) + + execution_log.messages.append(f"Retrieved {result.get('total', 0)} log entries") + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + + # Add execution_log to result + result["execution_log"] = execution_log + return result + except Exception as e: logger.error(f"Failed to query audit logs: {e}") - return { - "provider_query": query, - "entries": [], - "total": 0, - "params": normalized_params, - "error": str(e) - } + raise except Exception as e: # Return error message in the expected format + execution_log.error = str(e) + execution_log.messages.append(f"Operation failed: {str(e)}") + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "failure_stage": "query_audit_log_operation" + } + return { "error": str(e), - "params": normalized_params + "params": params, + "execution_log": execution_log } def _parse_single_time(self, time_str: str, default_hours: int = 24) -> datetime: """解析时间字符串,支持相对时间和ISO 8601格式""" - from datetime import datetime + # Using module-level datetime import if not time_str: return datetime.now() - timedelta(hours=default_hours) + + # Handle "now" alias + if time_str.lower() == "now": + return datetime.now() # 相对时间格式 if time_str.endswith('h'): @@ -472,7 +598,7 @@ def _parse_time_params(self, params: Dict[str, Any]) -> tuple[int, int]: return int(start_time.timestamp()), int(end_time.timestamp()) def _query_logs(self, project: str, logstore: str, query: str, start_time: int, end_time: int, - params: Dict[str, Any]) -> Dict[str, Any]: + params: Dict[str, Any], execution_log: Optional[ExecutionLog] = None) -> Dict[str, Any]: """Query using real SLS client with get_logs API.""" try: from alibabacloud_sls20201230 import models as sls_models @@ -488,7 +614,25 @@ def _query_logs(self, project: str, logstore: str, query: str, start_time: int, ) # 调用SLS API - 使用get_logs方法 + api_start = int(time.time() * 1000) response = self.sls_client.get_logs(project, logstore, request) + api_duration = int(time.time() * 1000) - api_start + + # 提取 request_id + request_id = None + if hasattr(response, 'headers') and response.headers: + request_id = response.headers.get('x-log-requestid', 'N/A') + + # 记录到 ExecutionLog + if execution_log is not None: + execution_log.api_calls.append({ + "api": "SLS.GetLogs", + "project": project, + "logstore": logstore, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success" + }) # Parse response - get_logs 返回的是 GetLogsResponse 对象 entries = [] @@ -568,16 +712,27 @@ def _query_logs(self, project: str, logstore: str, query: str, start_time: int, entries.append(log_data) except Exception as e: - logger.warning(f"Failed to parse response body: {e}") + execution_log.messages.append(f"Failed to parse response body, response: {response}, error: {e}") + logger.warning(f"Failed to parse response body, response: {response}, error: {e}") return { "provider_query": query, "entries": entries, "total": len(entries), - "params": params + "params": params, + "request_id": request_id } except Exception as e: + # 记录到 ExecutionLog(失败) + if execution_log is not None: + execution_log.api_calls.append({ + "api": "SLS.GetLogs", + "project": project, + "logstore": logstore, + "status": "failed", + "error": str(e) + }) logger.error(f"SLS query failed: {e}") raise @@ -611,20 +766,23 @@ def _build_query(self, params: Dict[str, Any]) -> str: return query - def _get_audit_sls_project_and_logstore(self, cluster_id): + def _get_audit_sls_project_and_logstore(self, cluster_id) -> tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: runtime = util_models.RuntimeOptions() headers = {} + request_id = None try: response = self.cs_client.get_cluster_audit_project_with_options(cluster_id, headers, runtime) logger.info(f"_get_audit_sls_project_and_logstore response type: {type(response)}") + # 提取 request_id + if hasattr(response, 'headers') and response.headers: + request_id = response.headers.get('x-acs-request-id', 'N/A') if hasattr(response, 'body'): if hasattr(response.body, 'sls_project_name'): if response.body.audit_enabled: # get and return - return response.body.sls_project_name, f"audit-{cluster_id}" + return response.body.sls_project_name, f"audit-{cluster_id}", request_id, None # 此集群没有开启审计日志功能 - raise Exception("此集群没有开启审计日志功能") + return None, None, request_id, "Audit logging is not enabled for this cluster. Please enable it in the cluster console by navigating to Security → Audit on the left sidebar." except Exception as error: logger.error(error) - raise - + return None, None, request_id, str(error) diff --git a/src/ack_cluster_handler.py b/src/ack_cluster_handler.py index 487b492..1f7eda0 100644 --- a/src/ack_cluster_handler.py +++ b/src/ack_cluster_handler.py @@ -7,11 +7,15 @@ from alibabacloud_tea_util import models as util_models from pydantic import Field import json +import time +from datetime import datetime from models import ( ListClustersOutput, ClusterInfo, ErrorModel, - ClusterErrorCodes + ClusterErrorCodes, + ExecutionLog, + enable_execution_log_ctx ) @@ -106,13 +110,17 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): settings: Configuration settings """ self.settings = settings or {} - if server is None: - return - self.server = server + # Per-handler ExecutionLog output toggle + # 是否可写变更配置 self.allow_write = self.settings.get("allow_write", False) - self.settings = settings or {} + # Per-handler toggle + self.enable_execution_log = self.settings.get("enable_execution_log", False) + + if server is None: + return + self.server = server # Register tools self.server.tool( name="list_clusters", @@ -139,26 +147,76 @@ async def list_clusters( Returns: ListClustersOutput: 包含集群列表和错误信息的输出 """ + # Set per-request context from handler setting + enable_execution_log_ctx.set(self.enable_execution_log) + + # Initialize execution log + execution_log = ExecutionLog( + tool_call_id=f"list_clusters_{int(time.time() * 1000)}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + start_ms = int(time.time() * 1000) try: # 获取 CS 客户端 cs_client = _get_cs_client(ctx, "CENTER") # 构建请求 + actual_page_size = min(page_size or 10, 500) + actual_page_num = page_num or 1 + request = cs20151215_models.DescribeClustersV1Request( - page_size=min(page_size or 10, 500), - page_number=page_num or 1, + page_size=actual_page_size, + page_number=actual_page_num, ) runtime = util_models.RuntimeOptions() headers = {} # 调用 API - response = await cs_client.describe_clusters_v1with_options_async(request, headers, runtime) + api_start = int(time.time() * 1000) + execution_log.messages.append("Calling DescribeClusters API") + + try: + response = await cs_client.describe_clusters_v1with_options_async(request, headers, runtime) + api_duration = int(time.time() * 1000) - api_start + + execution_log.api_calls.append({ + "api": "DescribeClustersV1", + "region": "CENTER", + "request_params": { + "page_size": actual_page_size, + "page_number": actual_page_num + }, + "duration_ms": api_duration, + "status": "success" + }) + execution_log.messages.append(f"API call succeeded in {api_duration}ms") + except Exception as api_error: + api_duration = int(time.time() * 1000) - api_start + execution_log.api_calls.append({ + "api": "DescribeClustersV1", + "region": "CENTER", + "request_params": { + "page_size": actual_page_size, + "page_number": actual_page_num + }, + "duration_ms": api_duration, + "status": "failed", + "error": str(api_error) + }) + execution_log.messages.append(f"API call failed after {api_duration}ms: {str(api_error)}") + raise api_error # 处理响应 + request_id = response.headers.get("x-acs-request-id", "N/A") if hasattr(response, 'headers') and response.headers else "N/A" + execution_log.messages.append(f"Processing API response, requestId: {request_id}") clusters_data = _serialize_sdk_object( response.body.clusters) if response.body and response.body.clusters else [] + execution_log.messages.append(f"Retrieved {len(clusters_data)} raw cluster records") + clusters = [] + skipped_count = 0 + parse_errors = 0 for cluster_data in clusters_data: try: @@ -172,6 +230,7 @@ async def list_clusters( # 如果必填字段为空,跳过这个集群 if not cluster_name or not cluster_id or not state or not cluster_type: logger.warning(f"Skipping cluster with missing required fields: {cluster_data}") + skipped_count += 1 continue cluster_info = ClusterInfo( @@ -195,11 +254,23 @@ async def list_clusters( clusters.append(cluster_info) except Exception as e: logger.warning(f"Failed to parse cluster data: {e}") + execution_log.warnings.append(f"Failed to parse cluster data: {cluster_data}, error: {e}") + parse_errors += 1 continue + if skipped_count > 0: + execution_log.warnings.append(f"Skipped {skipped_count} clusters with missing required fields") + if parse_errors > 0: + execution_log.warnings.append(f"Failed to parse {parse_errors} cluster records") + + execution_log.messages.append(f"Successfully list {len(clusters)} clusters") + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + return ListClustersOutput( count=len(clusters), - clusters=clusters + clusters=clusters, + execution_log=execution_log ) except Exception as e: @@ -211,11 +282,21 @@ async def list_clusters( if "region" in error_message.lower() or "region_id" in error_message.lower(): error_code = ClusterErrorCodes.MISS_REGION_ID + execution_log.error = error_message + execution_log.messages.append(f"Operation failed: {error_message}") + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_code": error_code, + "failure_stage": "list_clusters_operation" + } + return ListClustersOutput( count=0, error=ErrorModel( error_code=error_code, error_message=error_message ), - clusters=[] + clusters=[], + execution_log=execution_log ) diff --git a/src/ack_controlplane_log_handler.py b/src/ack_controlplane_log_handler.py index bde6931..e75ddb6 100644 --- a/src/ack_controlplane_log_handler.py +++ b/src/ack_controlplane_log_handler.py @@ -6,6 +6,7 @@ from pydantic import Field import json import re +import time from datetime import datetime, timedelta from unittest.mock import Mock from alibabacloud_tea_util import models as util_models @@ -14,7 +15,9 @@ ControlPlaneLogEntry, ErrorModel, ControlPlaneLogErrorCodes, - ControlPlaneLogConfig + ControlPlaneLogConfig, + ExecutionLog, + enable_execution_log_ctx ) @@ -49,12 +52,17 @@ def _parse_single_time(time_str: Optional[str], default_hours: int = 24) -> date 支持相对时间后缀(s/m/h/d/w)与 ISO 8601(允许 Z),返回 datetime。 兼容纯数字的 unix 秒/毫秒。 """ - from datetime import datetime + # Using module-level datetime import if not time_str: return datetime.now() - timedelta(hours=default_hours) ts = str(time_str).strip() + + # Handle "now" alias + if ts.lower() == "now": + return datetime.now() + if ts.isdigit(): iv = int(ts) if iv > 1e12: # 毫秒 @@ -161,38 +169,44 @@ def parse_json_field(field_value, default=None): 1. 配置中明确指定 2. 通过OpenAPI CheckControlPlaneLogEnable 检查控制面日志功能是否开启,并获取project等配置 """ -def _get_controlplane_log_config(ctx: Context, cluster_id: str, region_id: str) -> ControlPlaneLogConfig: - """获取控制面日志配置信息。""" - - # try get from openAPI +def _get_controlplane_log_config(ctx: Context, cluster_id: str, region_id: str) -> tuple[Optional[ControlPlaneLogConfig], Optional[str], Optional[str]]: + """获取控制面日志配置信息。 + + Returns: + tuple: (config, request_id, error_message) + """ + request_id = None try: cs_client = _get_cs_client(ctx, region_id) - # 调用 CheckControlPlaneLogEnable API 获取控制面日志配置 https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/developer-reference/api-cs-2015-12-15-checkcontrolplanelogenable - # 注意:这里需要根据实际的 API 调用方式调整 - # 根据文档,API 路径是 GET /clusters/{ClusterId}/controlplanelog - logger.info(f"Getting control plane log config for cluster {cluster_id}") runtime = util_models.RuntimeOptions() headers = {} response = cs_client.check_control_plane_log_enable_with_options(cluster_id, headers, runtime) + + # 提取 request_id + if hasattr(response, 'headers') and response.headers: + request_id = response.headers.get('x-acs-request-id', 'N/A') + # 提取project components = getattr(response.body, 'components', []) if response.body else [] if not components: - raise Exception(f"This cluster not enable controlplane log function, please enable it. Failed to get control plane log config components from OpenAPI. response: {str(response)}") + return None, request_id, "This cluster not enable controlplane log function, please enable it in Log Center's ControlPlane log tab. Failed to get control plane log config components from OpenAPI." controlplane_project = getattr(response.body, 'log_project', None) if response.body else None if not controlplane_project: - raise Exception(f"Failed to get control plane log config from OpenAPI. response: {str(response)}") - return ControlPlaneLogConfig( + return None, request_id, "Failed to get control plane log config from OpenAPI." + + config = ControlPlaneLogConfig( log_project=controlplane_project, log_ttl="30", components=components ) + return config, request_id, None except Exception as e: logger.error(f"Failed to get control plane log config for cluster {cluster_id}: {e}") - raise + return None, request_id, str(e) class ACKControlPlaneLogHandler: @@ -206,10 +220,14 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): settings: Configuration settings """ self.settings = settings or {} + self.allow_write = settings.get("allow_write", True) if settings else True + + # Per-handler toggle + self.enable_execution_log = self.settings.get("enable_execution_log", False) + if server is None: return self.server = server - self.allow_write = settings.get("allow_write", True) if settings else True # Register tools self.server.tool( @@ -231,7 +249,7 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): logger.info("ACK Control Plane Log Handler initialized") - def _get_cluster_region(self, cs_client, cluster_id: str) -> str: + def _get_cluster_region(self, cs_client, cluster_id: str) -> tuple[Optional[str], Optional[str], Optional[str]]: """通过DescribeClusterDetail获取集群的region信息 Args: @@ -239,28 +257,32 @@ def _get_cluster_region(self, cs_client, cluster_id: str) -> str: cluster_id: 集群ID Returns: - 集群所在的region + tuple: (region_id, request_id, error_message) """ + request_id = None try: - # 调用DescribeClusterDetail API获取集群详情 detail_response = cs_client.describe_cluster_detail(cluster_id) + + # 提取 request_id + if hasattr(detail_response, 'headers') and detail_response.headers: + request_id = detail_response.headers.get('x-acs-request-id', 'N/A') if not detail_response or not detail_response.body: - raise ValueError(f"Failed to get cluster details for {cluster_id}") + return None, request_id, f"Failed to get cluster details for {cluster_id}" cluster_info = detail_response.body # 获取集群的region信息 region = getattr(cluster_info, 'region_id', '') if not region: - raise ValueError(f"Could not determine region for cluster {cluster_id}") + return None, request_id, f"Could not determine region for cluster {cluster_id}" - return region + return region, request_id, None except Exception as e: logger.error(f"Failed to get cluster region for {cluster_id}: {e}") - raise ValueError(f"Failed to get cluster region for {cluster_id}: {e}") + return None, request_id, str(e) async def query_controlplane_logs( self, @@ -283,6 +305,7 @@ async def query_controlplane_logs( Formats: - ISO 8601: "2024-01-01T10:00:00Z" - Relative: "30m", "1h", "24h", "7d" + - Current time: "now" Defaults to 24h.""" ), end_time: Optional[str] = Field( @@ -291,6 +314,7 @@ async def query_controlplane_logs( Formats: - ISO 8601: "2024-01-01T10:00:00Z" - Relative: "30m", "1h", "24h", "7d" + - Current time: "now" Defaults to current time.""" ), limit: int = Field( @@ -320,6 +344,16 @@ async def query_controlplane_logs( Returns: QueryControlPlaneLogsOutput: 包含控制面日志条目和错误信息的输出 """ + # Set per-request context from handler setting + enable_execution_log_ctx.set(self.enable_execution_log) + + # Initialize execution log + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"query_controlplane_logs_{cluster_id}_{component_name}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + try: # 验证参数 if not cluster_id: @@ -350,59 +384,136 @@ async def query_controlplane_logs( region_id = config.get("region_id", "cn-hangzhou") # 步骤1: 先查询控制面日志配置信息 + execution_log.messages.append(f"Getting control plane log config for cluster {cluster_id}") logger.info(f"Step 1: Getting control plane log config for cluster {cluster_id}") - try: - # cluster's region_id - cs_client = _get_cs_client(ctx, "CENTER") - region_id = self._get_cluster_region(cs_client, cluster_id) - - controlplane_config = _get_controlplane_log_config(ctx, cluster_id, region_id) - - # 检查控制面日志功能是否启用 - if not controlplane_config.components: - error_message = f"Control plane logging is not enabled for cluster {cluster_id}" - logger.warning(error_message) - return QueryControlPlaneLogsOutput( - error=ErrorModel( - error_code=ControlPlaneLogErrorCodes.CONTROLPLANE_LOG_NOT_ENABLED, - error_message=error_message - ) - ) - - logger.info( - f"Control plane logging enabled for cluster {cluster_id}, available components: {controlplane_config.components}") - logger.info(f"SLS project: {controlplane_config.log_project}, TTL: {controlplane_config.log_ttl} days") - - except Exception as e: - logger.error(f"Failed to get control plane log config for cluster {cluster_id}: {e}") - error_message = str(e) + + # cluster's region_id + cs_client = _get_cs_client(ctx, "CENTER") + + # Get cluster region with execution logging + api_start_region = int(time.time() * 1000) + region_id, region_request_id, region_error = self._get_cluster_region(cs_client, cluster_id) + api_duration_region = int(time.time() * 1000) - api_start_region + + if region_error: + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": region_request_id, + "duration_ms": api_duration_region, + "status": "failed", + "error": region_error + }) + execution_log.messages.append(f"Failed to get cluster region: {region_error}") + execution_log.error = region_error + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "ClusterRegionError", + "failure_stage": "get_cluster_region" + } + return QueryControlPlaneLogsOutput( + error=ErrorModel( + error_code=ControlPlaneLogErrorCodes.CLUSTER_NOT_FOUND, + error_message=region_error + ), + execution_log=execution_log + ) + + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": region_request_id, + "duration_ms": api_duration_region, + "status": "success", + "region_id": region_id + }) + execution_log.messages.append(f"Cluster region: {region_id}, requestId: {region_request_id}") + + api_start = int(time.time() * 1000) + controlplane_config, request_id, error = _get_controlplane_log_config(ctx, cluster_id, region_id) + api_duration = int(time.time() * 1000) - api_start + + if error: + execution_log.api_calls.append({ + "api": "CheckControlPlaneLogEnable", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error + }) + execution_log.messages.append(f"Failed to get control plane log config: {error}") + execution_log.error = error + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "ControlPlaneLogConfigError", + "failure_stage": "get_controlplane_log_config" + } + + # Determine error code error_code = ControlPlaneLogErrorCodes.CLUSTER_NOT_FOUND - - # 根据错误信息判断具体的错误码 - if "not found" in error_message.lower() or "does not exist" in error_message.lower(): + if "not found" in error.lower() or "does not exist" in error.lower(): error_code = ControlPlaneLogErrorCodes.CLUSTER_NOT_FOUND - elif ("not enable" in error_message.lower() or - "control plane" in error_message.lower() and "disabled" in error_message.lower() or - "controlplane log function" in error_message.lower()): + elif ("not enable" in error.lower() or + "control plane" in error.lower() and "disabled" in error.lower() or + "controlplane log function" in error.lower()): error_code = ControlPlaneLogErrorCodes.CONTROLPLANE_LOG_NOT_ENABLED - + return QueryControlPlaneLogsOutput( error=ErrorModel( error_code=error_code, + error_message=error + ), + execution_log=execution_log + ) + + execution_log.api_calls.append({ + "api": "CheckControlPlaneLogEnable", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "components": controlplane_config.components if controlplane_config else [] + }) + execution_log.messages.append(f"Control plane logging enabled, components: {controlplane_config.components}, requestId: {request_id}") + + # 检查控制面日志功能是否启用 + if not controlplane_config or not controlplane_config.components: + error_message = f"Control plane logging is not enabled for cluster {cluster_id}" + logger.warning(error_message) + execution_log.error = error_message + execution_log.messages.append(error_message) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + return QueryControlPlaneLogsOutput( + error=ErrorModel( + error_code=ControlPlaneLogErrorCodes.CONTROLPLANE_LOG_NOT_ENABLED, error_message=error_message - ) + ), + execution_log=execution_log ) + logger.info( + f"Control plane logging enabled for cluster {cluster_id}, available components: {controlplane_config.components}") + logger.info(f"SLS project: {controlplane_config.log_project}, TTL: {controlplane_config.log_ttl} days") + # 步骤2: 检查请求的组件是否在启用的组件列表中 logger.info(f"Step 2: Validating component {component_name} against enabled components") if component_name not in controlplane_config.components: error_message = f"Component '{component_name}' is not enabled for control plane logging. Available components: {controlplane_config.components}" logger.warning(error_message) + execution_log.error = error_message + execution_log.messages.append(error_message) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms return QueryControlPlaneLogsOutput( error=ErrorModel( error_code=ControlPlaneLogErrorCodes.INVALID_COMPONENT, error_message=error_message - ) + ), + execution_log=execution_log ) # 步骤3: 获取 SLS 项目名称和构建 logstore 名称 @@ -410,15 +521,21 @@ async def query_controlplane_logs( if not sls_project_name: error_message = f"SLS project name not found for cluster {cluster_id}" logger.warning(error_message) + execution_log.error = error_message + execution_log.messages.append(error_message) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms return QueryControlPlaneLogsOutput( error=ErrorModel( error_code=ControlPlaneLogErrorCodes.LOGSTORE_NOT_FOUND, error_message=error_message - ) + ), + execution_log=execution_log ) # 构建 logstore 名称: {component_name}-{cluster_id} logstore_name = f"{component_name}-{cluster_id}" + execution_log.messages.append(f"Using SLS project '{sls_project_name}', logstore '{logstore_name}'") logger.info(f"Step 3: Using SLS project '{sls_project_name}' and logstore '{logstore_name}'") # 获取 SLS 客户端 @@ -436,11 +553,21 @@ async def query_controlplane_logs( "sls_client_factory not available" in error_message.lower()): error_code = ControlPlaneLogErrorCodes.SLS_CLIENT_INIT_AK_ERROR + execution_log.error = error_message + execution_log.messages.append(f"Failed to get SLS client: {error_message}") + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "failure_stage": "get_sls_client" + } + return QueryControlPlaneLogsOutput( error=ErrorModel( error_code=error_code, error_message=error_message - ) + ), + execution_log=execution_log ) # 解析时间 @@ -450,30 +577,51 @@ async def query_controlplane_logs( # SLS API 需要秒级时间戳(与审计日志时间解析策略对齐) start_timestamp_s, end_timestamp_s = _parse_time_params(start_time_str, end_time_str) + execution_log.messages.append(f"Query time range: {start_timestamp_s} to {end_timestamp_s}") # 步骤4: 构建SLS查询语句 logger.info(f"Step 4: Building SLS query for component {component_name}") query = _build_controlplane_log_query( filter_pattern=filter_pattern ) + execution_log.messages.append(f"SLS query: {query}") logger.info(f"SLS query: {query}") # 步骤5: 调用 SLS API 查询日志 + execution_log.messages.append(f"Querying SLS logs from project '{sls_project_name}', logstore '{logstore_name}'") logger.info(f"Step 5: Querying SLS logs from project '{sls_project_name}', logstore '{logstore_name}'") - try: - from alibabacloud_sls20201230 import models as sls_models - - request = sls_models.GetLogsRequest( - from_=start_timestamp_s, - to=end_timestamp_s, - query=query, - offset=0, - line=limit_value, - reverse=False - ) + + from alibabacloud_sls20201230 import models as sls_models + + request = sls_models.GetLogsRequest( + from_=start_timestamp_s, + to=end_timestamp_s, + query=query, + offset=0, + line=limit_value, + reverse=False + ) - # 尝试不同的 API 调用方式 + # Call SLS API with execution logging + api_start = int(time.time() * 1000) + request_id = None + try: response = sls_client.get_logs(sls_project_name, logstore_name, request) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + if hasattr(response, 'headers') and response.headers: + request_id = response.headers.get('x-log-requestid', 'N/A') + + execution_log.api_calls.append({ + "api": "SLS.GetLogs", + "project": sls_project_name, + "logstore": logstore_name, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success" + }) + logger.info(f"SLS API response type: {type(response)}") if hasattr(response, 'body'): logger.info(f"Response body type: {type(response.body)}") @@ -484,6 +632,16 @@ async def query_controlplane_logs( else: logger.info(f"Response attributes: {dir(response)}") except Exception as api_error: + api_duration = int(time.time() * 1000) - api_start + execution_log.api_calls.append({ + "api": "SLS.GetLogs", + "project": sls_project_name, + "logstore": logstore_name, + "duration_ms": api_duration, + "status": "failed", + "error": str(api_error) + }) + # 如果 SLS API 调用失败,返回模拟数据用于测试 logger.warning(f"SLS API call failed, using mock data: {api_error}") # 在测试环境中,尝试从 sls_client 获取模拟数据 @@ -523,10 +681,15 @@ async def query_controlplane_logs( logger.warning(f"Failed to parse control plane log entry: {e}") continue + execution_log.messages.append(f"Retrieved {len(entries)} log entries") + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + return QueryControlPlaneLogsOutput( query=query, entries=entries, - total=len(entries) + total=len(entries), + execution_log=execution_log ) except Exception as e: @@ -538,9 +701,19 @@ async def query_controlplane_logs( if "client initialization" in error_message.lower() or "access key" in error_message.lower(): error_code = ControlPlaneLogErrorCodes.SLS_CLIENT_INIT_AK_ERROR + execution_log.error = error_message + execution_log.messages.append(f"Operation failed: {error_message}") + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "failure_stage": "query_controlplane_logs" + } + return QueryControlPlaneLogsOutput( error=ErrorModel( error_code=error_code, error_message=error_message - ) + ), + execution_log=execution_log ) diff --git a/src/ack_diagnose_handler.py b/src/ack_diagnose_handler.py index 1b3efcd..f47ed35 100644 --- a/src/ack_diagnose_handler.py +++ b/src/ack_diagnose_handler.py @@ -3,13 +3,18 @@ from loguru import logger from pydantic import Field import json +import time +from datetime import datetime from alibabacloud_cs20151215 import models as cs20151215_models from alibabacloud_tea_util import models as util_models +from typing import Dict, Any, Optional from models import ( ErrorModel, GetDiagnoseResourceResultOutput, DiagnosisStatusEnum, - DiagnosisCodeEnum + DiagnosisCodeEnum, + ExecutionLog, + enable_execution_log_ctx, ) @@ -55,10 +60,11 @@ class DiagnoseHandler: def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): self.settings = settings or {} + self.allow_write = self.settings.get("allow_write", True) + self.enable_execution_log = self.settings.get("enable_execution_log", False) if server is None: return self.server = server - self.allow_write = self.settings.get("allow_write", True) self.server.tool( name="diagnose_resource", description="""对ACK集群的Kubernetes资源进行诊断,当遇到问题难以定位时,使用该工具进行深度诊断。支持诊断的资源包括: @@ -93,17 +99,33 @@ async def diagnose_resource( """), ) -> GetDiagnoseResourceResultOutput | Dict[str, Any]: """发起ACK集群资源诊断任务""" - # if not self.allow_write: - # return {"error": ErrorModel(error_code="WriteDisabled", - # error_message="Write operations are disabled").model_dump()} - + # Set per-request context from handler setting + enable_execution_log_ctx.set(self.enable_execution_log) + + # Initialize execution log + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"diagnose_resource_{cluster_id}_{resource_type}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + try: # 解析 resource_target JSON try: target_dict = json.loads(resource_target) except json.JSONDecodeError as e: - return {"error": ErrorModel(error_code="InvalidTarget", - error_message=f"Invalid JSON in resource_target: {e}").model_dump()} + error_msg = f"Invalid JSON in resource_target: {e}" + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "JSONDecodeError", + "failure_stage": "parse_resource_target" + } + return { + "error": ErrorModel(error_code="InvalidTarget", error_message=error_msg).model_dump(), + "execution_log": execution_log + } # 获取 CS 客户端 cs_client = _get_cs_client(ctx, "CENTER") @@ -116,20 +138,65 @@ async def diagnose_resource( runtime = util_models.RuntimeOptions() headers = {} + # Call API with execution logging + api_start = int(time.time() * 1000) + request_id = None response = await cs_client.create_cluster_diagnosis_with_options_async( cluster_id, request, headers, runtime ) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + if hasattr(response, 'headers') and response.headers: + request_id = response.headers.get('x-acs-request-id', 'N/A') # 提取诊断任务ID diagnose_task_id = getattr(response.body, 'diagnosis_id', None) if response.body else None if not diagnose_task_id: - return {"error": ErrorModel(error_code="NoTaskId", - error_message="Failed to get diagnosis task ID from response").model_dump()} + error_msg = "Failed to get diagnosis task ID from response" + execution_log.api_calls.append({ + "api": "CreateClusterDiagnosis", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error_msg + }) + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "NoTaskId", + "failure_stage": "create_diagnosis" + } + return { + "error": ErrorModel(error_code="NoTaskId", error_message=error_msg).model_dump(), + "execution_log": execution_log + } + + # Concise logging for success + execution_log.api_calls.append({ + "api": "CreateClusterDiagnosis", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "diagnosis_id": diagnose_task_id + }) # 使用循环等待诊断完成 result = await self.wait_for_diagnosis_completion( - ctx, cluster_id, "CENTER", diagnose_task_id + ctx, cluster_id, "CENTER", diagnose_task_id, execution_log ) + + # Add execution_log to result + if isinstance(result, GetDiagnoseResourceResultOutput): + result.execution_log = execution_log + elif isinstance(result, dict): + result["execution_log"] = execution_log + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms return result except Exception as e: @@ -142,7 +209,19 @@ async def diagnose_resource( elif "NO_RAM_POLICY_AUTH" in str(e): error_code = "NO_RAM_POLICY_AUTH" - return {"error": ErrorModel(error_code=error_code, error_message=str(e)).model_dump()} + execution_log.error = str(e) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "error_code": error_code, + "failure_stage": "diagnose_resource" + } + + return { + "error": ErrorModel(error_code=error_code, error_message=str(e)).model_dump(), + "execution_log": execution_log + } async def get_diagnose_resource_result( self, @@ -152,6 +231,13 @@ async def get_diagnose_resource_result( diagnose_task_id: str = Field(..., description="生成的异步诊断任务id"), ) -> GetDiagnoseResourceResultOutput | Dict[str, Any]: """获取集群资源诊断任务的结果""" + # Initialize execution log + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"get_diagnose_resource_result_{cluster_id}_{diagnose_task_id}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + try: # 获取 CS 客户端 cs_client = _get_cs_client(ctx, region_id) @@ -161,13 +247,42 @@ async def get_diagnose_resource_result( runtime = util_models.RuntimeOptions() headers = {} + # Call API with execution logging + api_start = int(time.time() * 1000) + request_id = None + response = await cs_client.get_cluster_diagnosis_result_with_options_async( cluster_id, diagnose_task_id, request, headers, runtime ) + + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + if hasattr(response, 'headers') and response.headers: + request_id = response.headers.get('x-acs-request-id', 'N/A') if not response.body: - return {"error": ErrorModel(error_code="NoResponse", - error_message="No response body from diagnosis result query").model_dump()} + error_msg = "No response body from diagnosis result query" + execution_log.api_calls.append({ + "api": "GetClusterDiagnosisResult", + "cluster_id": cluster_id, + "diagnosis_id": diagnose_task_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error_msg + }) + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "NoResponse", + "failure_stage": "api_response" + } + return { + "error": ErrorModel(error_code="NoResponse", error_message=error_msg).model_dump(), + "execution_log": execution_log + } # 提取结果信息 result = getattr(response.body, 'result', None) @@ -176,6 +291,19 @@ async def get_diagnose_resource_result( finished_time = getattr(response.body, 'finished', None) resource_type = getattr(response.body, 'type', None) resource_target = getattr(response.body, 'target', None) + + # Concise logging for success + execution_log.api_calls.append({ + "api": "GetClusterDiagnosisResult", + "cluster_id": cluster_id, + "diagnosis_id": diagnose_task_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success" + }) + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms return GetDiagnoseResourceResultOutput( result=result, @@ -184,7 +312,8 @@ async def get_diagnose_resource_result( finished_time=finished_time, resource_type=resource_type, resource_target=json.dumps(resource_target) if resource_target else None, - error=None + error=None, + execution_log=execution_log ) except Exception as e: @@ -196,8 +325,20 @@ async def get_diagnose_resource_result( error_code = "CLUSTER_NOT_FOUND" elif "NO_RAM_POLICY_AUTH" in str(e): error_code = "NO_RAM_POLICY_AUTH" - - return {"error": ErrorModel(error_code=error_code, error_message=str(e)).model_dump()} + + execution_log.error = str(e) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "error_code": error_code, + "failure_stage": "get_diagnosis_result" + } + + return { + "error": ErrorModel(error_code=error_code, error_message=str(e)).model_dump(), + "execution_log": execution_log + } async def wait_for_diagnosis_completion( self, @@ -205,6 +346,7 @@ async def wait_for_diagnosis_completion( cluster_id: str, region_id: str, diagnose_task_id: str, + execution_log: ExecutionLog, max_wait_time: int = 300, # 最大等待时间(秒) poll_interval: int = 10 # 轮询间隔(秒) ) -> GetDiagnoseResourceResultOutput | Dict[str, Any]: @@ -215,6 +357,7 @@ async def wait_for_diagnosis_completion( cluster_id: 集群ID region_id: 区域ID diagnose_task_id: 诊断任务ID + execution_log: ExecutionLog for tracking max_wait_time: 最大等待时间(秒),默认5分钟 poll_interval: 轮询间隔(秒),默认10秒 @@ -222,23 +365,44 @@ async def wait_for_diagnosis_completion( 诊断结果,当状态为COMPLETED或FAILED时返回最终结果 """ import asyncio - from datetime import datetime, timedelta + from datetime import timedelta start_time = datetime.now() max_end_time = start_time + timedelta(seconds=max_wait_time) + poll_count = 0 logger.info(f"开始等待诊断任务 {diagnose_task_id} 完成,最大等待时间: {max_wait_time}秒") while datetime.now() < max_end_time: try: + poll_count += 1 # 获取当前诊断状态 result = await self.get_diagnose_resource_result( ctx, cluster_id, region_id, diagnose_task_id ) + # Extract and merge ExecutionLog from polling call + poll_execution_log = None + if isinstance(result, dict) and "execution_log" in result: + poll_execution_log = result.get("execution_log") + elif hasattr(result, 'execution_log'): + poll_execution_log = result.execution_log + + # Merge polling execution log into main execution log + if poll_execution_log: + # Add API calls from this poll + if hasattr(poll_execution_log, 'api_calls'): + execution_log.api_calls.extend(poll_execution_log.api_calls) + # Add warnings from this poll + if hasattr(poll_execution_log, 'warnings') and poll_execution_log.warnings: + execution_log.warnings.extend(poll_execution_log.warnings) + # 检查是否有错误 if isinstance(result, dict) and "error" in result: logger.error(f"获取诊断结果时出错: {result['error']}") + # Merge error info from poll if available + if poll_execution_log and hasattr(poll_execution_log, 'error') and poll_execution_log.error: + execution_log.error = poll_execution_log.error return result # 检查状态 @@ -258,7 +422,7 @@ async def wait_for_diagnosis_completion( # 继续等待 elapsed_time = (datetime.now() - start_time).total_seconds() remaining_time = max_wait_time - elapsed_time - logger.info(f"诊断任务 {diagnose_task_id} 仍在进行中,已等待 {elapsed_time:.1f}秒,剩余时间 {remaining_time:.1f}秒") + logger.info(f"论断任务 {diagnose_task_id} 仍在进行中,已等待 {elapsed_time:.1f}秒,剩余时间 {remaining_time:.1f}秒") # 等待指定间隔后继续轮询 await asyncio.sleep(poll_interval) @@ -271,6 +435,7 @@ async def wait_for_diagnosis_completion( except Exception as e: logger.error(f"轮询诊断结果时出错: {e}") + execution_log.warnings.append(f"Poll #{poll_count} error: {str(e)}") # 如果是网络错误等临时问题,继续重试 await asyncio.sleep(poll_interval) continue @@ -279,6 +444,7 @@ async def wait_for_diagnosis_completion( elapsed_time = (datetime.now() - start_time).total_seconds() error_message = f"诊断任务 {diagnose_task_id} 在 {elapsed_time:.1f}秒内未完成,已超时" logger.warning(error_message) + execution_log.warnings.append(error_message) return { "error": ErrorModel( diff --git a/src/ack_inspect_handler.py b/src/ack_inspect_handler.py index c4c7fb8..3778050 100644 --- a/src/ack_inspect_handler.py +++ b/src/ack_inspect_handler.py @@ -5,11 +5,16 @@ from alibabacloud_cs20151215 import models as cs20151215_models from alibabacloud_tea_util import models as util_models import asyncio +import time +from datetime import datetime, timedelta +from typing import Dict, Any, Optional, List from models import ( ErrorModel, QueryInspectReportOutput, InspectSummary, CheckItemResult, + ExecutionLog, + enable_execution_log_ctx, ) @@ -59,6 +64,7 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): return self.server = server self.allow_write = self.settings.get("allow_write", True) + self.enable_execution_log = self.settings.get("enable_execution_log", False) self.server.tool( name="query_inspect_report", description="即可生成并查询一个ACK集群最近的健康巡检报告" @@ -73,6 +79,16 @@ async def query_inspect_report( is_result_exception: bool = Field(True, description="是否只返回异常的结果,默认为true"), ) -> QueryInspectReportOutput | Dict[str, Any]: """查询一个ACK集群最近的巡检报告""" + # Set per-request context from handler setting + enable_execution_log_ctx.set(self.enable_execution_log) + + # Initialize execution log + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"query_inspect_report_{cluster_id}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + try: # 获取 CS 客户端 cs_client = _get_cs_client(ctx, region_id) @@ -81,12 +97,49 @@ async def query_inspect_report( # 1. 即刻创建集群巡检报告 create_request = cs20151215_models.RunClusterInspectRequest() + + api_start = int(time.time() * 1000) + request_id = None create_response = await cs_client.run_cluster_inspect_with_options_async( cluster_id, create_request, headers, runtime ) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + if hasattr(create_response, 'headers') and create_response.headers: + request_id = create_response.headers.get('x-acs-request-id', 'N/A') + if not create_response.body or not create_response.body.report_id: - return {"error": ErrorModel(error_code="ERROR_CREATE_INSPECT_REPORT", - error_message="创建巡检报告失败").model_dump()} + error_msg = "创建巡检报告失败" + execution_log.api_calls.append({ + "api": "RunClusterInspect", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error_msg + }) + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "NoReportId", + "failure_stage": "create_inspect" + } + return { + "error": ErrorModel(error_code="ERROR_CREATE_INSPECT_REPORT", error_message=error_msg).model_dump(), + "execution_log": execution_log + } + + created_report_id = create_response.body.report_id + execution_log.api_calls.append({ + "api": "RunClusterInspect", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "report_id": created_report_id + }) # 等待1秒钟让报告开始生成 await asyncio.sleep(1) @@ -96,24 +149,86 @@ async def query_inspect_report( max_results=1 # 只获取最新的一个报告 ) + api_start = int(time.time() * 1000) + request_id = None list_response = await cs_client.list_cluster_inspect_reports_with_options_async( cluster_id, list_request, headers, runtime ) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + if hasattr(list_response, 'headers') and list_response.headers: + request_id = list_response.headers.get('x-acs-request-id', 'N/A') if not list_response.body or not list_response.body.reports: - return {"error": ErrorModel(error_code="NO_INSPECT_REPORT", - error_message="当前没有已生成的巡检报告").model_dump()} + error_msg = "当前没有已生成的巡检报告" + execution_log.api_calls.append({ + "api": "ListClusterInspectReports", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error_msg + }) + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "NoReports", + "failure_stage": "list_reports" + } + return { + "error": ErrorModel(error_code="NO_INSPECT_REPORT", error_message=error_msg).model_dump(), + "execution_log": execution_log + } # 获取最新的报告ID latest_report = list_response.body.reports[0] report_id = getattr(latest_report, 'report_id', None) if not report_id: - return {"error": ErrorModel(error_code="NO_REPORT_ID", error_message="无法获取巡检报告ID").model_dump()} + error_msg = "无法获取巡检报告ID" + execution_log.api_calls.append({ + "api": "ListClusterInspectReports", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error_msg + }) + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "NoReportId", + "failure_stage": "extract_report_id" + } + return { + "error": ErrorModel(error_code="NO_REPORT_ID", error_message=error_msg).model_dump(), + "execution_log": execution_log + } + + execution_log.api_calls.append({ + "api": "ListClusterInspectReports", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "report_id": report_id + }) # 3. 等待巡检报告完成 result = await self.wait_for_inspect_completion( - ctx, cluster_id, region_id, report_id, is_result_exception + ctx, cluster_id, region_id, report_id, is_result_exception, execution_log ) + + # Add execution_log to result + if isinstance(result, QueryInspectReportOutput): + result.execution_log = execution_log + elif isinstance(result, dict): + result["execution_log"] = execution_log + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms return result except Exception as e: @@ -125,8 +240,20 @@ async def query_inspect_report( error_code = "NO_RAM_POLICY_AUTH" elif "NO_INSPECT_REPORT" in str(e): error_code = "NO_INSPECT_REPORT" - - return {"error": ErrorModel(error_code=error_code, error_message=str(e)).model_dump()} + + execution_log.error = str(e) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "error_code": error_code, + "failure_stage": "query_inspect_report" + } + + return { + "error": ErrorModel(error_code=error_code, error_message=str(e)).model_dump(), + "execution_log": execution_log + } async def wait_for_inspect_completion( self, @@ -135,6 +262,7 @@ async def wait_for_inspect_completion( region_id: str, report_id: str, is_result_exception: bool, + execution_log: ExecutionLog, max_wait_time: int = 300, # 最大等待时间(秒) poll_interval: int = 5 # 轮询间隔(秒) ) -> QueryInspectReportOutput | Dict[str, Any]: @@ -152,9 +280,9 @@ async def wait_for_inspect_completion( Returns: 巡检报告结果,当状态为completed或failed时返回最终结果 """ - from datetime import datetime, timedelta start_time = datetime.now() max_end_time = start_time + timedelta(seconds=max_wait_time) + poll_count = 0 # 如果是测试环境,使用更短的轮询间隔 if self.settings.get("test_mode", False): @@ -165,14 +293,31 @@ async def wait_for_inspect_completion( while datetime.now() < max_end_time: try: + poll_count += 1 # 获取当前巡检状态 result = await self.get_inspect_report_detail( ctx, cluster_id, region_id, report_id, is_result_exception ) + # Extract and merge ExecutionLog from polling call + poll_execution_log = None + if isinstance(result, dict) and "execution_log" in result: + poll_execution_log = result.get("execution_log") + elif hasattr(result, 'execution_log'): + poll_execution_log = result.execution_log + + # Merge polling execution log into main execution log + if poll_execution_log: + if hasattr(poll_execution_log, 'api_calls'): + execution_log.api_calls.extend(poll_execution_log.api_calls) + if hasattr(poll_execution_log, 'warnings') and poll_execution_log.warnings: + execution_log.warnings.extend(poll_execution_log.warnings) + # 检查是否有错误 if isinstance(result, dict) and "error" in result: logger.error(f"获取巡检报告结果时出错: {result['error']}") + if poll_execution_log and hasattr(poll_execution_log, 'error') and poll_execution_log.error: + execution_log.error = poll_execution_log.error return result # 检查状态 @@ -204,6 +349,7 @@ async def wait_for_inspect_completion( except Exception as e: logger.error(f"轮询巡检报告结果时出错: {e}") + execution_log.warnings.append(f"Poll #{poll_count} error: {str(e)}") # 如果是网络错误等临时问题,继续重试 await asyncio.sleep(poll_interval) continue @@ -212,6 +358,7 @@ async def wait_for_inspect_completion( elapsed_time = (datetime.now() - start_time).total_seconds() error_message = f"巡检报告 {report_id} 在 {elapsed_time:.1f}秒内未完成,已超时" logger.warning(error_message) + execution_log.warnings.append(error_message) return { "error": ErrorModel( @@ -229,6 +376,13 @@ async def get_inspect_report_detail( is_result_exception: bool ) -> QueryInspectReportOutput | Dict[str, Any]: """获取巡检报告详情""" + # Initialize execution log + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"get_inspect_report_detail_{report_id}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + try: # 获取 CS 客户端 cs_client = _get_cs_client(ctx, region_id) @@ -240,16 +394,43 @@ async def get_inspect_report_detail( enable_filter=is_result_exception # 根据参数决定是否只返回异常结果 ) + api_start = int(time.time() * 1000) + request_id = None detail_response = await cs_client.get_cluster_inspect_report_detail_with_options_async( cluster_id, report_id, detail_request, headers, runtime ) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + if hasattr(detail_response, 'headers') and detail_response.headers: + request_id = detail_response.headers.get('x-acs-request-id', 'N/A') if not detail_response.body: - return {"error": ErrorModel(error_code="NO_DETAIL_RESPONSE", - error_message="无法获取巡检报告详情").model_dump()} + error_msg = "无法获取巡检报告详情" + execution_log.api_calls.append({ + "api": "GetClusterInspectReportDetail", + "cluster_id": cluster_id, + "report_id": report_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": error_msg + }) + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "NoDetailResponse", + "failure_stage": "get_report_detail" + } + return { + "error": ErrorModel(error_code="NO_DETAIL_RESPONSE", error_message=error_msg).model_dump(), + "execution_log": execution_log + } # 解析响应数据 body = detail_response.body + status = getattr(body, 'status', None) # 构建 summary summary_data = getattr(body, 'summary', {}) @@ -273,13 +454,28 @@ async def get_inspect_report_detail( description=getattr(item, 'description', ''), fix=getattr(item, 'fix', ''), )) + + # Log successful API call (concise) + execution_log.api_calls.append({ + "api": "GetClusterInspectReportDetail", + "cluster_id": cluster_id, + "report_id": report_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "report_status": status + }) + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms return QueryInspectReportOutput( - report_status=getattr(body, 'status', None), + report_status=status, report_finish_time=getattr(body, 'endTime', None), summary=summary, checkItemResults=check_items, - error=None + error=None, + execution_log=execution_log ) except Exception as e: @@ -289,5 +485,17 @@ async def get_inspect_report_detail( error_code = "CLUSTER_NOT_FOUND" elif "NO_RAM_POLICY_AUTH" in str(e): error_code = "NO_RAM_POLICY_AUTH" - - return {"error": ErrorModel(error_code=error_code, error_message=str(e)).model_dump()} + + execution_log.error = str(e) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "error_code": error_code, + "failure_stage": "get_inspect_report_detail" + } + + return { + "error": ErrorModel(error_code=error_code, error_message=str(e)).model_dump(), + "execution_log": execution_log + } diff --git a/src/ack_prometheus_handler.py b/src/ack_prometheus_handler.py index 391e742..49b87a6 100644 --- a/src/ack_prometheus_handler.py +++ b/src/ack_prometheus_handler.py @@ -4,7 +4,9 @@ from pydantic import Field import httpx import os +import time from datetime import datetime +from typing import Dict, Any, Optional, List from models import ( ErrorModel, QueryPrometheusSeriesPoint, @@ -12,6 +14,8 @@ QueryPrometheusMetricGuidanceOutput, MetricDefinition, PromQLSample, + ExecutionLog, + enable_execution_log_ctx, ) @@ -20,10 +24,17 @@ class PrometheusHandler: def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): self.settings = settings or {} + # prometheus_endpoint_mode: "ARMS_PUBLIC" (default), "ARMS_PRIVATE", or "LOCAL" + self.prometheus_endpoint_mode = self.settings.get("prometheus_endpoint_mode", "ARMS_PUBLIC") + + self.allow_write = self.settings.get("allow_write", True) + + # Per-handler toggle + self.enable_execution_log = self.settings.get("enable_execution_log", False) + if server is None: return self.server = server - self.allow_write = self.settings.get("allow_write", True) self.server.tool(name="query_prometheus", description="查询一个ACK集群的阿里云Prometheus数据")( self.query_prometheus) @@ -32,7 +43,7 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): self.query_prometheus_metric_guidance) logger.info("Prometheus Handler initialized") - def _get_cluster_region(self, cs_client, cluster_id: str) -> str: + def _get_cluster_region(self, cs_client, cluster_id: str, execution_log: ExecutionLog) -> str: """通过DescribeClusterDetail获取集群的region信息 Args: @@ -43,11 +54,25 @@ def _get_cluster_region(self, cs_client, cluster_id: str) -> str: 集群所在的region """ try: - # 调用DescribeClusterDetail API获取集群详情 + api_start = int(time.time() * 1000) detail_response = cs_client.describe_cluster_detail(cluster_id) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + request_id = None + if hasattr(detail_response, 'headers') and detail_response.headers: + request_id = detail_response.headers.get('x-acs-request-id', 'N/A') if not detail_response or not detail_response.body: + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": "No response body" + }) raise ValueError(f"Failed to get cluster details for {cluster_id}") cluster_info = detail_response.body @@ -55,7 +80,25 @@ def _get_cluster_region(self, cs_client, cluster_id: str) -> str: region = getattr(cluster_info, 'region_id', '') if not region: + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": "No region_id in response" + }) raise ValueError(f"Could not determine region for cluster {cluster_id}") + + # Log successful API call + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "region_id": region + }) return region @@ -64,15 +107,72 @@ def _get_cluster_region(self, cs_client, cluster_id: str) -> str: raise ValueError(f"Failed to get cluster region for {cluster_id}: {e}") - def _resolve_prometheus_endpoint(self, ctx: Context, cluster_id: str) -> Optional[str]: - # 1) 优先参考 alibabacloud-o11y-prometheus-mcp-server 中的方法: - # 从 providers 里取 ARMS client,调用 GetPrometheusInstance,获取 http_api_inter_url(公网) + def _resolve_prometheus_endpoint(self, ctx: Context, cluster_id: str, execution_log: ExecutionLog) -> Optional[str]: lifespan = getattr(ctx.request_context, "lifespan_context", {}) or {} providers = lifespan.get("providers", {}) if isinstance(lifespan, dict) else {} + + # Check prometheus_endpoint_mode setting + if self.prometheus_endpoint_mode == "LOCAL": + # Mode: LOCAL - Use static config or environment variables only + return self._resolve_from_local(providers, cluster_id, execution_log) + elif self.prometheus_endpoint_mode == "ARMS_PRIVATE": + # Mode: ARMS_PRIVATE - Use ARMS API to get private endpoint, fallback to local + return self._resolve_from_arms(ctx, providers, cluster_id, execution_log, use_private=True) + else: + # Mode: ARMS_PUBLIC (default) - Use ARMS API to get public endpoint, fallback to local + return self._resolve_from_arms(ctx, providers, cluster_id, execution_log, use_private=False) + + def _resolve_from_local(self, providers: dict, cluster_id: str, execution_log: ExecutionLog) -> Optional[str]: + """Resolve Prometheus endpoint from local config (static config or env vars)""" + # 1) providers 中的静态映射 + endpoints = providers.get("prometheus_endpoints", {}) if isinstance(providers, dict) else {} + if isinstance(endpoints, dict): + ep = endpoints.get(cluster_id) or endpoints.get("default") + if ep: + execution_log.api_calls.append({ + "api": "GetPrometheusEndpoint", + "source": "static_config", + "mode": "LOCAL", + "cluster_id": cluster_id, + "endpoint": ep.rstrip("/"), + "status": "success" + }) + return ep.rstrip("/") + + # 2) 环境变量:PROMETHEUS_HTTP_API_{cluster_id} 或 PROMETHEUS_HTTP_API + env_key_specific = f"PROMETHEUS_HTTP_API_{cluster_id}" + env_key_global = "PROMETHEUS_HTTP_API" + ep = os.getenv(env_key_specific) or os.getenv(env_key_global) + if ep: + source = env_key_specific if os.getenv(env_key_specific) else env_key_global + execution_log.api_calls.append({ + "api": "GetPrometheusEndpoint", + "source": f"env_var:{source}", + "mode": "LOCAL", + "cluster_id": cluster_id, + "endpoint": ep.rstrip("/"), + "status": "success" + }) + return ep.rstrip("/") + return None + + def _resolve_from_arms(self, ctx: Context, providers: dict, cluster_id: str, execution_log: ExecutionLog, use_private: bool = False) -> Optional[str]: + """Resolve Prometheus endpoint from ARMS API, with fallback to local + + Args: + ctx: FastMCP context + providers: Runtime providers + cluster_id: ACK cluster ID + execution_log: Execution log for tracking + use_private: If True, use http_api_intra_url (private); if False, use http_api_inter_url (public) + """ + # 1) 优先参考 alibabacloud-o11y-prometheus-mcp-server 中的方法: + # 从 providers 里取 ARMS client,调用 GetPrometheusInstance + mode = "ARMS_PRIVATE" if use_private else "ARMS_PUBLIC" try: cs_client = _get_cs_client(ctx, "CENTER") - region_id = self._get_cluster_region(cs_client, cluster_id) - config = (lifespan.get("config", {}) or {}) if isinstance(lifespan, dict) else {} + region_id = self._get_cluster_region(cs_client, cluster_id, execution_log) + config = (ctx.request_context.lifespan_context.get("config", {}) or {}) if hasattr(ctx.request_context, "lifespan_context") and isinstance(ctx.request_context.lifespan_context, dict) else {} arms_client_factory = providers.get("arms_client_factory") if isinstance(providers, dict) else None if arms_client_factory and region_id: arms_client = arms_client_factory(region_id, config) @@ -80,29 +180,49 @@ def _resolve_prometheus_endpoint(self, ctx: Context, cluster_id: str) -> Optiona from alibabacloud_tea_util import models as util_models req = arms_models.GetPrometheusInstanceRequest(region_id=region_id, cluster_id=cluster_id) runtime = util_models.RuntimeOptions() + + # Call ARMS API with execution logging + api_start = int(time.time() * 1000) resp = arms_client.get_prometheus_instance_with_options(req, runtime) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + request_id = None + if hasattr(resp, 'headers') and resp.headers: + request_id = resp.headers.get('x-acs-request-id', 'N/A') + data = getattr(resp.body, 'data', None) if data: - ep = getattr(data, 'http_api_inter_url', None) or getattr(data, 'http_api_intra_url', None) + # Select endpoint based on mode + if use_private: + # ARMS_PRIVATE: prefer intra_url (private network) + ep = getattr(data, 'http_api_intra_url', None) or getattr(data, 'http_api_inter_url', None) + else: + # ARMS_PUBLIC: prefer inter_url (public network) + ep = getattr(data, 'http_api_inter_url', None) or getattr(data, 'http_api_intra_url', None) + if ep: + # Log successful ARMS API call + execution_log.api_calls.append({ + "api": "GetPrometheusInstance", + "source": "arms_api", + "mode": mode, + "cluster_id": cluster_id, + "region_id": region_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "endpoint_type": "private" if use_private else "public" + }) return str(ep).rstrip('/') + else: + execution_log.warnings.append(f"ARMS API returned no data for cluster {cluster_id}") except Exception as e: logger.debug(f"resolve endpoint via ARMS failed: {e}") + execution_log.warnings.append(f"Failed to resolve endpoint via ARMS: {str(e)}") - # 2) providers 中的静态映射 - endpoints = providers.get("prometheus_endpoints", {}) if isinstance(providers, dict) else {} - if isinstance(endpoints, dict): - ep = endpoints.get(cluster_id) or endpoints.get("default") - if ep: - return ep.rstrip("/") - - # 2) 其次尝试环境变量:PROMETHEUS_HTTP_API_{cluster_id} 或 PROMETHEUS_HTTP_API - env_key_specific = f"PROMETHEUS_HTTP_API_{cluster_id}" - env_key_global = "PROMETHEUS_HTTP_API" - ep = os.getenv(env_key_specific) or os.getenv(env_key_global) - if ep: - return ep.rstrip("/") - return None + # 2) Fallback to local resolution + return self._resolve_from_local(providers, cluster_id, execution_log) def _parse_time(self, value: Optional[str]) -> Optional[str]: if not value: @@ -125,51 +245,158 @@ async def query_prometheus( end_time: Optional[str] = Field(None, description="RFC3339或unix时间;与start_time同时提供为range查询;可能需要调用tool get_current_time获取当前时间"), step: Optional[str] = Field(None, description="range查询步长,如30s"), ) -> QueryPrometheusOutput | Dict[str, Any]: - endpoint = self._resolve_prometheus_endpoint(ctx, cluster_id) - if not endpoint: - return {"error": ErrorModel(error_code="MissingEndpoint", - error_message="无法获取 Prometheus HTTP API,请确定此集群是否已经正常部署阿里云Prometheus 或 环境变量 PROMETHEUS_HTTP_API[_]").model_dump()} - - has_range = bool(start_time and end_time) - params: Dict[str, Any] = {"query": promql} - url = endpoint + ("/api/v1/query_range" if has_range else "/api/v1/query") - if has_range: - params["start"] = self._parse_time(start_time) - params["end"] = self._parse_time(end_time) - if step: - params["step"] = step - - async with httpx.AsyncClient(timeout=60.0) as client: - resp = await client.get(url, params=params) - resp.raise_for_status() - data = resp.json() - - # 直接透传 Prometheus 的 status/data,但补充兼容所需的 resultType/result 展示 - result = data.get("data", {}) if isinstance(data, dict) else {} - result_type = result.get("resultType") - raw_result = result.get("result", []) - - # 兼容输出:resultType + result 列表;对 instant query 将 value 适配为 values 列表 - normalized = [] - if isinstance(raw_result, list): - for item in raw_result: - if not isinstance(item, dict): - continue - metric = item.get("metric", {}) - if has_range: - values = item.get("values", []) - else: - v = item.get("value") - values = [v] if v else [] - normalized.append({ - "metric": metric, - "values": values, - }) - - return QueryPrometheusOutput( - resultType=result_type or ("matrix" if has_range else "vector"), - result=[QueryPrometheusSeriesPoint(**item) for item in normalized], + # Set per-request context from handler setting + enable_execution_log_ctx.set(self.enable_execution_log) + + # Initialize execution log + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"query_prometheus_{cluster_id}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" ) + + try: + endpoint = self._resolve_prometheus_endpoint(ctx, cluster_id, execution_log) + if not endpoint: + error_msg = "无法获取 Prometheus HTTP API,请确定此集群是否已经正常部署阿里云Prometheus 或 环境变量 PROMETHEUS_HTTP_API[_]" + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "MissingEndpoint", + "failure_stage": "resolve_endpoint" + } + return { + "error": ErrorModel(error_code="MissingEndpoint", error_message=error_msg).model_dump(), + "execution_log": execution_log + } + + has_range = bool(start_time and end_time) + params: Dict[str, Any] = {"query": promql} + url = endpoint + ("/api/v1/query_range" if has_range else "/api/v1/query") + if has_range: + params["start"] = self._parse_time(start_time) + params["end"] = self._parse_time(end_time) + if step: + params["step"] = step + + execution_log.messages.append(f"Calling Prometheus API: {url} with params: {params}") + + # Call Prometheus API with execution logging + api_start = int(time.time() * 1000) + try: + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.get(url, params=params) + resp.raise_for_status() + data = resp.json() + + api_duration = int(time.time() * 1000) - api_start + + # Calculate response content size + response_size = len(resp.content) if resp.content else 0 + + # Concise logging for success + execution_log.api_calls.append({ + "api": "PrometheusQuery", + "endpoint": url, + "cluster_id": cluster_id, + "duration_ms": api_duration, + "status": "success", + "http_status": resp.status_code, + "response_size_bytes": response_size + }) + + except httpx.HTTPStatusError as e: + api_duration = int(time.time() * 1000) - api_start + error_msg = f"Prometheus API error: {e.response.status_code} - {e.response.text}" + execution_log.api_calls.append({ + "api": "PrometheusQuery", + "endpoint": url, + "cluster_id": cluster_id, + "duration_ms": api_duration, + "status": "failed", + "http_status": e.response.status_code, + "error": error_msg + }) + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "HTTPStatusError", + "failure_stage": "prometheus_query", + "http_status": e.response.status_code + } + return { + "error": ErrorModel(error_code="PrometheusAPIError", error_message=error_msg).model_dump(), + "execution_log": execution_log + } + except Exception as e: + api_duration = int(time.time() * 1000) - api_start + error_msg = f"Failed to query Prometheus: {str(e)}" + execution_log.api_calls.append({ + "api": "PrometheusQuery", + "endpoint": url, + "cluster_id": cluster_id, + "duration_ms": api_duration, + "status": "failed", + "error": error_msg + }) + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "failure_stage": "prometheus_query" + } + return { + "error": ErrorModel(error_code="QueryFailed", error_message=error_msg).model_dump(), + "execution_log": execution_log + } + + # 直接透传 Prometheus 的 status/data,但补充兼容所需的 resultType/result 展示 + result = data.get("data", {}) if isinstance(data, dict) else {} + result_type = result.get("resultType") + raw_result = result.get("result", []) + + # 兼容输出:resultType + result 列表;对 instant query 将 value 适配为 values 列表 + normalized = [] + if isinstance(raw_result, list): + for item in raw_result: + if not isinstance(item, dict): + continue + metric = item.get("metric", {}) + if has_range: + values = item.get("values", []) + else: + v = item.get("value") + values = [v] if v else [] + normalized.append({ + "metric": metric, + "values": values, + }) + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + + return QueryPrometheusOutput( + resultType=result_type or ("matrix" if has_range else "vector"), + result=[QueryPrometheusSeriesPoint(**item) for item in normalized], + execution_log=execution_log + ) + + except Exception as e: + logger.error(f"Failed to query prometheus: {e}") + execution_log.error = str(e) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "failure_stage": "query_prometheus" + } + return { + "error": ErrorModel(error_code="UnknownError", error_message=str(e)).model_dump(), + "execution_log": execution_log + } async def query_prometheus_metric_guidance( self, @@ -178,19 +405,39 @@ async def query_prometheus_metric_guidance( description="资源维度label:node/pod/container/deployment/daemonset/job/coredns/ingress/hpa/persistentvolume/mountpoint 等"), metric_category: str = Field(..., description="指标分类:cpu/memory/network/disk/state"), ) -> QueryPrometheusMetricGuidanceOutput | Dict[str, Any]: - # 从 runtime context 获取 Prometheus 指标指引数据 - lifespan = getattr(ctx.request_context, "lifespan_context", {}) or {} - providers = lifespan.get("providers", {}) if isinstance(lifespan, dict) else {} - prometheus_guidance = providers.get("prometheus_guidance", {}) if isinstance(providers, dict) else {} - - if not prometheus_guidance or not prometheus_guidance.get("initialized"): - return {"error": ErrorModel(error_code="GuidanceNotInitialized", - error_message="Prometheus guidance not initialized").model_dump()} - - metrics: List[MetricDefinition] = [] - promql_samples: List[PromQLSample] = [] - + # Set per-request context from handler setting + enable_execution_log_ctx.set(self.enable_execution_log) + + # Initialize execution log + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"query_prometheus_metric_guidance_{resource_label}_{metric_category}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + try: + # 从 runtime context 获取 Prometheus 指标指引数据 + lifespan = getattr(ctx.request_context, "lifespan_context", {}) or {} + providers = lifespan.get("providers", {}) if isinstance(lifespan, dict) else {} + prometheus_guidance = providers.get("prometheus_guidance", {}) if isinstance(providers, dict) else {} + + if not prometheus_guidance or not prometheus_guidance.get("initialized"): + error_msg = "Prometheus guidance not initialized" + execution_log.error = error_msg + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "GuidanceNotInitialized", + "failure_stage": "check_initialization" + } + return { + "error": ErrorModel(error_code="GuidanceNotInitialized", error_message=error_msg).model_dump(), + "execution_log": execution_log + } + + metrics: List[MetricDefinition] = [] + promql_samples: List[PromQLSample] = [] + # 查询指标定义 metrics_dict = prometheus_guidance.get("metrics_dictionary", {}) for file_key, file_data in metrics_dict.items(): @@ -252,17 +499,31 @@ async def query_prometheus_metric_guidance( category=rule.get("category", ""), labels=rule.get("labels", []) )) + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + + # 构建返回结果,包含指标定义和 PromQL 最佳实践 + return QueryPrometheusMetricGuidanceOutput( + metrics=metrics, + promql_samples=promql_samples, + error=None, + execution_log=execution_log + ) except Exception as e: - return {"error": ErrorModel(error_code="GuidanceQueryError", - error_message=f"Error querying guidance data: {str(e)}").model_dump()} - - # 构建返回结果,包含指标定义和 PromQL 最佳实践 - return QueryPrometheusMetricGuidanceOutput( - metrics=metrics, - promql_samples=promql_samples, - error=None - ) + logger.error(f"Error querying guidance data: {e}") + execution_log.error = str(e) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "failure_stage": "query_guidance" + } + return { + "error": ErrorModel(error_code="GuidanceQueryError", error_message=f"Error querying guidance data: {str(e)}").model_dump(), + "execution_log": execution_log + } def _get_cs_client(ctx: Context, region_id: str): """从 lifespan providers 中获取指定区域的 CS 客户端。""" diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index f305eea..ea41068 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -7,7 +7,9 @@ from cachetools import TTLCache from loguru import logger from ack_cluster_handler import parse_master_url -from models import KubectlOutput +from models import KubectlOutput, ExecutionLog, enable_execution_log_ctx +import time +from datetime import datetime class KubectlContextManager(TTLCache): """基于 TTL+LRU 缓存的 kubeconfig 文件管理器""" @@ -78,13 +80,14 @@ def cleanup_all_mcp_files(self): except Exception: pass - def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str) -> str: + def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str, execution_log: ExecutionLog) -> str: """获取或创建集群的 kubeconfig 文件 Args: cluster_id: 集群ID kubeconfig_mode: 获取kubeconfig的模式,支持 "ACK_PUBLIC", "ACK_PRIVATE", "LOCAL" kubeconfig_path: 本地kubeconfig文件路径(仅在模式为LOCAL时使用) + execution_log: 执行日志 Returns: kubeconfig 文件路径 @@ -92,12 +95,24 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, # 检查缓存中是否已存在 if cluster_id in self: logger.debug(f"Found cached kubeconfig for cluster {cluster_id}") + execution_log.api_calls.append({ + "api": "GetKubeconfig", + "source": "cache", + "cluster_id": cluster_id, + "status": "success" + }) return self[cluster_id] if kubeconfig_mode == "INCLUSTER": # 使用集群内配置 logger.debug(f"Using in-cluster kubeconfig for cluster {cluster_id}") kubeconfig_path = self._construct_incluster_kubeconfig() + execution_log.api_calls.append({ + "api": "GetKubeconfig", + "source": "incluster", + "cluster_id": cluster_id, + "status": "success" + }) self[cluster_id] = kubeconfig_path return kubeconfig_path @@ -111,6 +126,13 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, raise ValueError(f"File {kubeconfig_path} does not exist") self.do_not_cleanup_file = kubeconfig_path logger.debug(f"Using local kubeconfig for cluster {cluster_id} from {kubeconfig_path}") + execution_log.api_calls.append({ + "api": "GetKubeconfig", + "source": "local_file", + "cluster_id": cluster_id, + "path": kubeconfig_path, + "status": "success" + }) self[cluster_id] = kubeconfig_path return kubeconfig_path @@ -118,7 +140,7 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, private_ip_address = kubeconfig_mode == "ACK_PRIVATE" # 创建新的 kubeconfig 文件 - kubeconfig_content = self._get_kubeconfig_from_ack(cluster_id, private_ip_address, int(self.ttl / 60)) # 转换为分钟 + kubeconfig_content = self._get_kubeconfig_from_ack(cluster_id, private_ip_address, int(self.ttl / 60), execution_log) # 转换为分钟 if not kubeconfig_content: raise ValueError(f"Failed to get kubeconfig for cluster {cluster_id}") @@ -181,13 +203,14 @@ def _get_cs_client(self): raise ValueError("CS client not set") return self._cs_client - def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = False, ttl_minutes: int = 60) -> Optional[str]: + def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = False, ttl_minutes: int = 60, execution_log: ExecutionLog = None) -> Optional[str]: """通过ACK API获取kubeconfig配置 Args: cluster_id: 集群ID private_ip_address: 是否获取内网连接配置 ttl_minutes: kubeconfig有效期(分钟),默认60分钟 + execution_log: 执行日志 """ try: # 获取CS客户端 @@ -195,9 +218,25 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = F from alibabacloud_cs20151215 import models as cs_models # 先检查集群详情,确认是否有公网端点 + api_start = int(time.time() * 1000) detail_response = cs_client.describe_cluster_detail(cluster_id) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + request_id = None + if hasattr(detail_response, 'headers') and detail_response.headers: + request_id = detail_response.headers.get('x-acs-request-id', 'N/A') if not detail_response or not detail_response.body: + if execution_log: + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": "No response body" + }) raise ValueError(f"Failed to get cluster details for {cluster_id}") cluster_info = detail_response.body @@ -206,16 +245,44 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = F master_url = parse_master_url(master_url_str) if private_ip_address: if not master_url["intranet_api_server_endpoint"]: + if execution_log: + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": "No intranet endpoint" + }) raise ValueError( f"Cluster {cluster_id} does not have intranet endpoint access, " f"Please enable intranet endpoint access setting first." ) else: if not master_url["api_server_endpoint"]: + if execution_log: + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": "No public endpoint" + }) raise ValueError( f"Cluster {cluster_id} does not have public endpoint access, " f"Please enable public endpoint access setting first." ) + + # Log successful cluster detail check + if execution_log: + execution_log.api_calls.append({ + "api": "DescribeClusterDetail", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success" + }) # 调用DescribeClusterUserKubeconfig API request = cs_models.DescribeClusterUserKubeconfigRequest( @@ -223,13 +290,39 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = F temporary_duration_minutes=ttl_minutes, # 使用传入的TTL ) + api_start = int(time.time() * 1000) response = cs_client.describe_cluster_user_kubeconfig(cluster_id, request) + api_duration = int(time.time() * 1000) - api_start + + # Extract request_id + request_id = None + if hasattr(response, 'headers') and response.headers: + request_id = response.headers.get('x-acs-request-id', 'N/A') if response and response.body and response.body.config: logger.info(f"Successfully fetched kubeconfig for cluster {cluster_id} (TTL: {ttl_minutes} minutes)") + if execution_log: + execution_log.api_calls.append({ + "api": "DescribeClusterUserKubeconfig", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "success", + "mode": "private" if private_ip_address else "public", + "ttl_minutes": ttl_minutes + }) return response.body.config else: logger.warning(f"No kubeconfig found for cluster {cluster_id}") + if execution_log: + execution_log.api_calls.append({ + "api": "DescribeClusterUserKubeconfig", + "cluster_id": cluster_id, + "request_id": request_id, + "duration_ms": api_duration, + "status": "failed", + "error": "No kubeconfig in response" + }) return None except Exception as e: @@ -270,18 +363,19 @@ def _construct_incluster_kubeconfig(self) -> str: """) return kubeconfig_path - def get_kubeconfig_path(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str) -> str: + def get_kubeconfig_path(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str, execution_log: ExecutionLog) -> str: """获取集群的 kubeconfig 文件路径 Args: cluster_id: 集群ID kubeconfig_mode: 获取kubeconfig的模式,支持 "ACK_PUBLIC", "ACK_PRIVATE", "LOCAL" kubeconfig_path: 本地kubeconfig文件路径(仅在模式为LOCAL时使用) + execution_log: 执行日志 Returns: kubeconfig 文件路径 """ - return self._get_or_create_kubeconfig_file(cluster_id, kubeconfig_mode, kubeconfig_path) + return self._get_or_create_kubeconfig_file(cluster_id, kubeconfig_mode, kubeconfig_path, execution_log) # 全局上下文管理器实例 @@ -317,9 +411,6 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): settings: Optional settings dictionary """ self.settings = settings or {} - if server is None: - return - self.server = server # 超时配置 self.kubectl_timeout = self.settings.get("kubectl_timeout", 30) @@ -327,6 +418,13 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): # 是否可写变更配置 self.allow_write = self.settings.get("allow_write", False) + # Per-handler toggle + self.enable_execution_log = self.settings.get("enable_execution_log", False) + + if server is None: + return + self.server = server + self._register_tools() def _setup_cs_client(self, ctx: Context): @@ -441,10 +539,12 @@ def is_streaming_command(self, command: str) -> tuple[bool, Optional[str]]: return False, None - def run_streaming_command(self, command: str, kubeconfig_path: str, timeout: int = 10) -> Dict[str, Any]: + def run_streaming_command(self, command: str, kubeconfig_path: str, timeout: int, execution_log: ExecutionLog) -> Dict[str, Any]: """运行流式命令,支持超时控制""" try: full_command = f"kubectl --kubeconfig {kubeconfig_path} {command}" + + cmd_start = int(time.time() * 1000) process = subprocess.Popen( full_command, shell=True, @@ -496,9 +596,21 @@ def read_stderr(): stdout_thread.join(timeout=1) stderr_thread.join(timeout=1) + cmd_duration = int(time.time() * 1000) - cmd_start exit_code = process.returncode if process_terminated and exit_code is None: exit_code = 124 + + # Log kubectl execution + execution_log.api_calls.append({ + "api": "KubectlCommand", + "command": command, + "type": "streaming", + "duration_ms": cmd_duration, + "exit_code": exit_code or 0, + "status": "success" if exit_code == 0 else "failed", + "timeout": timeout + }) return { "exit_code": exit_code or 0, @@ -507,16 +619,25 @@ def read_stderr(): } except Exception as e: + execution_log.api_calls.append({ + "api": "KubectlCommand", + "command": command, + "type": "streaming", + "status": "failed", + "error": str(e) + }) return { "exit_code": 1, "stdout": "", "stderr": str(e) } - def run_command(self, command: str, kubeconfig_path: str, timeout: int = 10) -> Dict[str, Any]: + def run_command(self, command: str, kubeconfig_path: str, timeout: int, execution_log: ExecutionLog) -> Dict[str, Any]: """Run a kubectl command and return structured result.""" try: full_command = f"kubectl --kubeconfig {kubeconfig_path} {command}" + + cmd_start = int(time.time() * 1000) result = subprocess.run( full_command, shell=True, @@ -525,18 +646,50 @@ def run_command(self, command: str, kubeconfig_path: str, timeout: int = 10) -> check=True, timeout=timeout ) + cmd_duration = int(time.time() * 1000) - cmd_start + + # Log kubectl execution + execution_log.api_calls.append({ + "api": "KubectlCommand", + "command": command, + "type": "normal", + "duration_ms": cmd_duration, + "exit_code": result.returncode, + "status": "success", + "timeout": timeout + }) + return { "exit_code": result.returncode, "stdout": result.stdout.strip() if result.stdout else "", "stderr": result.stderr.strip() if result.stderr else "", } except subprocess.TimeoutExpired: + cmd_duration = int(time.time() * 1000) - cmd_start + execution_log.api_calls.append({ + "api": "KubectlCommand", + "command": command, + "type": "normal", + "duration_ms": cmd_duration, + "exit_code": 124, + "status": "timeout", + "timeout": timeout + }) return { "exit_code": 124, "stdout": "", "stderr": f"Command timed out after {timeout} seconds", } except subprocess.CalledProcessError as e: + cmd_duration = int(time.time() * 1000) - cmd_start + execution_log.api_calls.append({ + "api": "KubectlCommand", + "command": command, + "type": "normal", + "duration_ms": cmd_duration, + "exit_code": e.returncode, + "status": "failed" + }) return { "exit_code": e.returncode, "stdout": e.stdout.strip() if e.stdout else "", @@ -594,6 +747,16 @@ async def ack_kubectl( ), ) -> KubectlOutput: + # Set per-request context from handler setting + enable_execution_log_ctx.set(self.enable_execution_log) + + # Initialize execution log + start_ms = int(time.time() * 1000) + execution_log = ExecutionLog( + tool_call_id=f"ack_kubectl_{cluster_id}_{start_ms}", + start_time=datetime.utcnow().isoformat() + "Z" + ) + try: # 设置CS客户端 self._setup_cs_client(ctx) @@ -602,47 +765,77 @@ async def ack_kubectl( if not self.allow_write: is_write_command, not_allow_write_error = self.is_write_command(command) if is_write_command: + execution_log.error = not_allow_write_error + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "WriteCommandNotAllowed", + "command": command, + "allow_write": False + } return KubectlOutput( command=command, stdout="", stderr=not_allow_write_error, - exit_code=1 + exit_code=1, + execution_log=execution_log ) # 检查是否为交互式命令 is_interactive, interactive_error = self.is_interactive_command(command) if is_interactive: + execution_log.error = interactive_error + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": "InteractiveCommandNotSupported", + "command": command + } return KubectlOutput( command=command, stdout="", stderr=interactive_error, - exit_code=1 + exit_code=1, + execution_log=execution_log ) # 获取 kubeconfig 文件路径 context_manager = get_context_manager() - kubeconfig_path = context_manager.get_kubeconfig_path(cluster_id, self.settings.get("kubeconfig_mode"), self.settings.get("kubeconfig_path")) + kubeconfig_path = context_manager.get_kubeconfig_path(cluster_id, self.settings.get("kubeconfig_mode"), self.settings.get("kubeconfig_path"), execution_log) # 检查是否为流式命令 is_streaming, stream_type = self.is_streaming_command(command) if is_streaming: - result = self.run_streaming_command(command, kubeconfig_path, self.kubectl_timeout) + result = self.run_streaming_command(command, kubeconfig_path, self.kubectl_timeout, execution_log) else: - result = self.run_command(command, kubeconfig_path, self.kubectl_timeout) + result = self.run_command(command, kubeconfig_path, self.kubectl_timeout, execution_log) + + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms return KubectlOutput( command=command, stdout=result["stdout"], stderr=result["stderr"], - exit_code=result["exit_code"] + exit_code=result["exit_code"], + execution_log=execution_log ) except Exception as e: logger.error(f"kubectl tool execution error: {e}") + execution_log.error = str(e) + execution_log.end_time = datetime.utcnow().isoformat() + "Z" + execution_log.duration_ms = int(time.time() * 1000) - start_ms + execution_log.metadata = { + "error_type": type(e).__name__, + "failure_stage": "kubectl_execution", + "command": command + } return KubectlOutput( command=command, stdout="", stderr=str(e), - exit_code=1 + exit_code=1, + execution_log=execution_log ) diff --git a/src/main_server.py b/src/main_server.py index fc4eebb..b76e770 100644 --- a/src/main_server.py +++ b/src/main_server.py @@ -27,6 +27,8 @@ from loguru import logger from fastmcp import FastMCP +from models import ExecutionLog + from ack_audit_log_handler import ACKAuditLogHandler from ack_controlplane_log_handler import ACKControlPlaneLogHandler @@ -233,6 +235,18 @@ def main(): type=str, help="Path to local kubeconfig file when KUBECONFIG_MODE is LOCAL (default: from env KUBECONFIG_PATH)" ) + parser.add_argument( + "--prometheus-endpoint-mode", + type=str, + choices=["ARMS_PUBLIC", "ARMS_PRIVATE", "LOCAL"], + help="Prometheus endpoint resolution mode: 'ARMS_PUBLIC' (use ARMS API to get public endpoint, default), 'ARMS_PRIVATE' (use ARMS API to get private endpoint), or 'LOCAL' (use static config/env vars) (default: from env PROMETHEUS_ENDPOINT_MODE or ARMS_PUBLIC)" + ) + parser.add_argument( + "--enable-execution-log", + action=argparse.BooleanOptionalAction, + default=False, + help="Enable ExecutionLog in tool responses for detailed execution tracking (default: false)" + ) parser.add_argument( "--audit-config", "-c", @@ -260,6 +274,9 @@ def main(): "host": args.host, "port": args.port, + # ExecutionLog 配置 + "enable_execution_log": args.enable_execution_log or os.getenv("ENABLE_EXECUTION_LOG", "false").lower() == "true", + # 阿里云认证配置 "region_id": args.region or os.getenv("REGION_ID", "cn-hangzhou"), "access_key_id": args.access_key_id or os.getenv("ACCESS_KEY_ID"), @@ -288,6 +305,9 @@ def main(): # ACK kubectl 配置 "kubeconfig_mode": args.kubeconfig_mode or os.getenv("KUBECONFIG_MODE", "ACK_PUBLIC"), "kubeconfig_path": args.kubeconfig_path or os.getenv("KUBECONFIG_PATH", "~/.kube/config"), + + # Prometheus 配置 + "prometheus_endpoint_mode": args.prometheus_endpoint_mode or os.getenv("PROMETHEUS_ENDPOINT_MODE", "ARMS_PUBLIC"), } # 验证必要的配置 diff --git a/src/models.py b/src/models.py index e45f11d..a27dd80 100644 --- a/src/models.py +++ b/src/models.py @@ -1,6 +1,76 @@ from typing import Any, Dict, List, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_serializer from enum import Enum +from typing import Optional, List, Dict, Any +from loguru import logger +import contextvars + +# Context variable to control execution_log output per request/handler +enable_execution_log_ctx = contextvars.ContextVar('enable_execution_log', default=False) + + +class ExecutionLog(BaseModel): + """ + ExecutionLog records the detailed execution process of tool invocations. + Provides comprehensive tracking of request flow, timing, and operational details. + """ + tool_call_id: Optional[str] = Field(None, description="Unique identifier for tracking this tool call execution") + start_time: Optional[str] = Field(None, description="Execution start time in ISO 8601 format") + end_time: Optional[str] = Field(None, description="Execution end time in ISO 8601 format") + duration_ms: Optional[int] = Field(None, description="Total execution duration in milliseconds") + messages: List[str] = Field(default_factory=list, description="Step-by-step execution log messages") + api_calls: List[Dict[str, Any]] = Field(default_factory=list, description="Record of external API calls made during execution") + warnings: List[str] = Field(default_factory=list, description="Non-fatal warnings encountered during execution") + error: Optional[str] = Field(None, description="Error message if execution failed") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional context-specific metadata") + + def log_to_logger(self): + """Log ExecutionLog details to logger for monitoring and debugging""" + # Build complete log data with all fields + log_data = { + "tool_call_id": self.tool_call_id, + "start_time": self.start_time, + "end_time": self.end_time, + "duration_ms": self.duration_ms, + "messages": self.messages, + "api_calls": self.api_calls, + "warnings": self.warnings, + "error": self.error, + "metadata": self.metadata + } + + # Choose log level based on execution status + if self.error: + logger.error(f"ExecutionLog [ERROR]: {log_data}") + elif self.warnings: + logger.warning(f"ExecutionLog [WARNING]: {log_data}") + else: + logger.info(f"ExecutionLog [SUCCESS]: {log_data}") + + +class BaseOutputModel(BaseModel): + """ + Base class for all Output models. + Automatically includes execution_log field for tracking execution process. + """ + execution_log: ExecutionLog = Field(default_factory=ExecutionLog, description="Execution process log") + + @model_serializer(mode='wrap', when_used='always') + def _serialize_model(self, serializer, info): + """Custom serializer that conditionally excludes execution_log""" + # Log the execution log + if hasattr(self, 'execution_log') and self.execution_log: + self.execution_log.log_to_logger() + + # Get the standard serialization + data = serializer(self) + + # Check if execution_log should be excluded + enable_execution_log = enable_execution_log_ctx.get() + if not enable_execution_log and isinstance(data, dict) and 'execution_log' in data: + del data['execution_log'] + + return data class ErrorModel(BaseModel): @@ -21,7 +91,7 @@ class QueryPrometheusSeriesPoint(BaseModel): values: List[Any] = Field(default_factory=list) -class QueryPrometheusOutput(BaseModel): +class QueryPrometheusOutput(BaseOutputModel): resultType: str result: List[QueryPrometheusSeriesPoint] @@ -50,7 +120,7 @@ class PromQLSample(BaseModel): labels: List[str] = Field(default_factory=list, description="观测查询规则的关联资源labels列表") -class QueryPrometheusMetricGuidanceOutput(BaseModel): +class QueryPrometheusMetricGuidanceOutput(BaseOutputModel): metrics: List[MetricDefinition] = Field(default_factory=list, description="指标定义列表") promql_samples: List[PromQLSample] = Field(default_factory=list, description="PromQL最佳实践样例列表") error: Optional[ErrorModel] = None @@ -81,7 +151,7 @@ class DiagnosisCodeEnum(Enum): FAILED = 1 -class GetDiagnoseResourceResultOutput(BaseModel): +class GetDiagnoseResourceResultOutput(BaseOutputModel): result: Optional[str] = None status: Optional[str] = Field(..., description="诊断状态:诊断已创建, 诊断运行中, 诊断已完成") code: Optional[str] = Field(..., description="诊断结果:诊断完成, 诊断失败") @@ -119,7 +189,7 @@ class CheckItemResult(BaseModel): level: str = Field(..., description="巡检项所属级别。取值:/advice:建议/warning:低危/error:中危/critical:高危") -class QueryInspectReportOutput(BaseModel): +class QueryInspectReportOutput(BaseOutputModel): report_status: Optional[str] = None report_finish_time: Optional[str] = None summary: Optional[InspectSummary] = None @@ -152,7 +222,7 @@ class ClusterInfo(BaseModel): api_server_endpoints: dict[str, str] = Field(default_factory=list, description="集群API Server 访问地址") -class ListClustersOutput(BaseModel): +class ListClustersOutput(BaseOutputModel): count: int = Field(..., description="返回的集群数") error: Optional[ErrorModel] = Field(None, description="错误信息") clusters: List[ClusterInfo] = Field(default_factory=list, description="集群列表") @@ -194,7 +264,7 @@ class AuditLogEntry(BaseModel): raw_log: Optional[str] = Field(None, description="原始日志内容") -class QueryAuditLogsOutput(BaseModel): +class QueryAuditLogsOutput(BaseOutputModel): query: Optional[str] = Field(None, description="查询语句") entries: List[AuditLogEntry] = Field(default_factory=list, description="返回的日志条目") total: int = Field(0, description="总数") @@ -222,7 +292,7 @@ class ClusterAuditProjectInfo(BaseModel): audit_enabled: bool = Field(False, description="当前集群是否已启用 API Server 审计功能") -class GetClusterAuditProjectOutput(BaseModel): +class GetClusterAuditProjectOutput(BaseOutputModel): """获取集群审计项目信息输出结果""" cluster_id: str = Field(..., description="集群 ID") audit_info: Optional[ClusterAuditProjectInfo] = Field(None, description="审计项目信息") @@ -237,7 +307,7 @@ class KubectlInput(BaseModel): cluster_id: Optional[str] = Field(None, description="可选的集群 ID,如果提供则通过 ACK API 获取 kubeconfig") -class KubectlOutput(BaseModel): +class KubectlOutput(BaseOutputModel): """Kubectl 命令输出结果""" command: str = Field(..., description="kubectl 命令参数,例如 'get pods -A'") stdout: str = Field("", description="kubectl 命令执行结果") @@ -245,7 +315,7 @@ class KubectlOutput(BaseModel): exit_code: int = Field(0, description="kubectl 命令执行结果码") -class GetClusterKubeConfigOutput(BaseModel): +class GetClusterKubeConfigOutput(BaseOutputModel): """get_cluster_kubeconfig 命令输出结果""" error: Optional[ErrorModel] = Field(None, description="错误信息") kubeconfig: Optional[str] = Field(None, description="KUBECONFIG file path for an ACK cluster") @@ -259,7 +329,7 @@ class KubectlErrorCodes: KUBECTL_COMMAND_FAILED = "KUBECTL_COMMAND_FAILED" -class GetCurrentTimeOutput(BaseModel): +class GetCurrentTimeOutput(BaseOutputModel): """获取当前时间的输出模型""" current_time_iso: str = Field(..., description="当前时间,ISO 8601 格式 (UTC)") current_time_unix: int = Field(..., description="当前时间,Unix 时间戳(秒级)") @@ -289,7 +359,7 @@ class ControlPlaneLogEntry(BaseModel): raw_log: Optional[str] = Field(None, description="原始日志内容") -class QueryControlPlaneLogsOutput(BaseModel): +class QueryControlPlaneLogsOutput(BaseOutputModel): """查询控制面日志输出结果""" query: Optional[str] = Field(None, description="查询语句") entries: List[ControlPlaneLogEntry] = Field(default_factory=list, description="返回的日志条目") @@ -316,7 +386,7 @@ class ControlPlaneLogConfig(BaseModel): components: List[str] = Field(default_factory=list, description="当前开启控制面日志的组件列表") -class GetControlPlaneLogConfigOutput(BaseModel): +class GetControlPlaneLogConfigOutput(BaseOutputModel): """获取控制面日志配置输出结果""" cluster_id: str = Field(..., description="集群 ID") config: Optional[ControlPlaneLogConfig] = Field(None, description="控制面日志配置信息")