From 7d52adaf65282a9bd7444da8e8cf7db2b778cee7 Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Thu, 30 Oct 2025 13:55:02 +0800 Subject: [PATCH 01/13] feat(args): support multiple kubeconfig modes --- src/kubectl_handler.py | 48 +++++++++++++++++++++++++++++++----------- src/main_server.py | 15 +++++++++++++ 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index 09b457c..1e3b744 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -78,12 +78,14 @@ def cleanup_all_mcp_files(self): except Exception: pass - def _get_or_create_kubeconfig_file(self, cluster_id: str) -> str: + def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str) -> str: """获取或创建集群的 kubeconfig 文件 Args: cluster_id: 集群ID - + kubeconfig_mode: 获取kubeconfig的模式,支持 "ACK_PUBLIC", "ACK_PRIVATE", "LOCAL" + kubeconfig_path: 本地kubeconfig文件路径(仅在模式为LOCAL时使用) + Returns: kubeconfig 文件路径 """ @@ -91,9 +93,19 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str) -> str: if cluster_id in self: logger.debug(f"Found cached kubeconfig for cluster {cluster_id}") return self[cluster_id] + if kubeconfig_mode == "LOCAL": + # 使用本地 kubeconfig 文件 + if not kubeconfig_path or not os.path.exists(kubeconfig_path): + raise ValueError("Local kubeconfig path is not set or does not exist") + logger.debug(f"Using local kubeconfig for cluster {cluster_id} from {kubeconfig_path}") + self[cluster_id] = kubeconfig_path + return kubeconfig_path + + # 从 ACK 获取 kubeconfig + private_ip_address = kubeconfig_mode == "ACK_PRIVATE" # 创建新的 kubeconfig 文件 - kubeconfig_content = self._get_kubeconfig_from_ack(cluster_id, int(self.ttl / 60)) # 转换为分钟 + kubeconfig_content = self._get_kubeconfig_from_ack(cluster_id, private_ip_address, int(self.ttl / 60)) # 转换为分钟 if not kubeconfig_content: raise ValueError(f"Failed to get kubeconfig for cluster {cluster_id}") @@ -150,11 +162,12 @@ def _get_cs_client(self): raise ValueError("CS client not set") return self._cs_client - def _get_kubeconfig_from_ack(self, cluster_id: str, ttl_minutes: int = 60) -> Optional[str]: + def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = False, ttl_minutes: int = 60) -> Optional[str]: """通过ACK API获取kubeconfig配置 Args: cluster_id: 集群ID + private_ip_address: 是否获取内网连接配置 ttl_minutes: kubeconfig有效期(分钟),默认60分钟 """ try: @@ -172,13 +185,22 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, ttl_minutes: int = 60) -> Op # 检查是否有公网API Server端点 master_url_str = getattr(cluster_info, 'master_url', '') master_url = parse_master_url(master_url_str) - if not master_url["api_server_endpoint"]: - raise ValueError(f"Cluster {cluster_id} does not have public endpoint access, " - f"Please enable public endpoint access setting first.") + if private_ip_address: + if not master_url["intranet_api_server_endpoint"]: + raise ValueError( + f"Cluster {cluster_id} does not have intranet endpoint access, " + f"Please enable intranet endpoint access setting first." + ) + else: + if not master_url["api_server_endpoint"]: + raise ValueError( + f"Cluster {cluster_id} does not have public endpoint access, " + f"Please enable public endpoint access setting first." + ) # 调用DescribeClusterUserKubeconfig API request = cs_models.DescribeClusterUserKubeconfigRequest( - private_ip_address=False, # 获取公网连接配置 + private_ip_address=private_ip_address, # 获取公网连接配置 temporary_duration_minutes=ttl_minutes, # 使用传入的TTL ) @@ -195,16 +217,18 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, ttl_minutes: int = 60) -> Op logger.error(f"Failed to fetch kubeconfig for cluster {cluster_id}: {e}") raise e - def get_kubeconfig_path(self, cluster_id: str) -> str: + def get_kubeconfig_path(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str) -> str: """获取集群的 kubeconfig 文件路径 Args: cluster_id: 集群ID - + kubeconfig_mode: 获取kubeconfig的模式,支持 "ACK_PUBLIC", "ACK_PRIVATE", "LOCAL" + kubeconfig_path: 本地kubeconfig文件路径(仅在模式为LOCAL时使用) + Returns: kubeconfig 文件路径 """ - return self._get_or_create_kubeconfig_file(cluster_id) + return self._get_or_create_kubeconfig_file(cluster_id, kubeconfig_mode, kubeconfig_path) # 全局上下文管理器实例 @@ -538,7 +562,7 @@ async def ack_kubectl( # 获取 kubeconfig 文件路径 context_manager = get_context_manager() - kubeconfig_path = context_manager.get_kubeconfig_path(cluster_id) + kubeconfig_path = context_manager.get_kubeconfig_path(cluster_id, self.settings.get("kubeconfig_mode"), self.settings.get("kubeconfig_path")) # 检查是否为流式命令 is_streaming, stream_type = self.is_streaming_command(command) diff --git a/src/main_server.py b/src/main_server.py index b278300..8f5194a 100644 --- a/src/main_server.py +++ b/src/main_server.py @@ -222,6 +222,17 @@ def main(): type=str, help="AlibabaCloud Access Key Secret (default: from env ACCESS_KEY_SECRET)" ) + parser.add_argument( + "--kubeconfig-mode", + type=str, + choices=["ACK_PUBLIC", "ACK_PRIVATE", "LOCAL"], + help="Mode to obtain kubeconfig for ACK clusters (default: from env KUBECONFIG_MODE)" + ) + parser.add_argument( + "--kubeconfig-path", + type=str, + help="Path to local kubeconfig file when KUBECONFIG_MODE is LOCAL (default: from env KUBECONFIG_PATH)" + ) parser.add_argument( "--audit-config", "-c", @@ -273,6 +284,10 @@ def main(): # 兼容性配置 "access_secret_key": args.access_key_secret or os.getenv("ACCESS_KEY_SECRET"), # 兼容旧字段名 "original_settings": Configs(vars(args)), + + # ACK kubectl 配置 + "kubeconfig_mode": args.kubeconfig_mode or os.getenv("KUBECONFIG_MODE", "ACK_PUBLIC"), + "kubeconfig_path": args.kubeconfig_path or os.getenv("KUBECONFIG_PATH"), } # 验证必要的配置 From 88f47a8d069e8a0d9e7abba4344c3ca5bf3ce4c1 Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Thu, 30 Oct 2025 14:06:42 +0800 Subject: [PATCH 02/13] fix: remove an incorrect comment --- src/kubectl_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index 1e3b744..444c600 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -200,7 +200,7 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = F # 调用DescribeClusterUserKubeconfig API request = cs_models.DescribeClusterUserKubeconfigRequest( - private_ip_address=private_ip_address, # 获取公网连接配置 + private_ip_address=private_ip_address, temporary_duration_minutes=ttl_minutes, # 使用传入的TTL ) From 4a40093886a574d14ec03d75b8730de4b5f212a6 Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Thu, 30 Oct 2025 14:38:16 +0800 Subject: [PATCH 03/13] fix: format error --- src/kubectl_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index 444c600..d925943 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -96,7 +96,7 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, if kubeconfig_mode == "LOCAL": # 使用本地 kubeconfig 文件 if not kubeconfig_path or not os.path.exists(kubeconfig_path): - raise ValueError("Local kubeconfig path is not set or does not exist") + raise ValueError(f"Local kubeconfig path is not set or file {kubeconfig_path} does not exist") logger.debug(f"Using local kubeconfig for cluster {cluster_id} from {kubeconfig_path}") self[cluster_id] = kubeconfig_path return kubeconfig_path From 1be4ffe46e348c5433582e77a761c93e5fbce81f Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Thu, 30 Oct 2025 16:20:21 +0800 Subject: [PATCH 04/13] fix(kubeconfig): donot remove file specified by env KUBECONFIG_PATH --- src/kubectl_handler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index d925943..f206685 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -95,6 +95,7 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, return self[cluster_id] if kubeconfig_mode == "LOCAL": # 使用本地 kubeconfig 文件 + self.donot_cleanup_file = kubeconfig_path if not kubeconfig_path or not os.path.exists(kubeconfig_path): raise ValueError(f"Local kubeconfig path is not set or file {kubeconfig_path} does not exist") logger.debug(f"Using local kubeconfig for cluster {cluster_id} from {kubeconfig_path}") @@ -126,7 +127,7 @@ def popitem(self): """重写 popitem 方法,在驱逐缓存项时清理 kubeconfig 文件""" key, path = super().popitem() # 删除 kubeconfig 文件 - if path and os.path.exists(path): + if path and os.path.exists(path) and os.path.abspath(path) != os.path.abspath(self.donot_cleanup_file): try: os.remove(path) logger.debug(f"Removed cached kubeconfig file: {path}") @@ -140,6 +141,8 @@ def cleanup(self): removed_count = 0 for key, path in list(self.items()): if path and os.path.exists(path): + if os.path.abspath(path) == os.path.abspath(self.donot_cleanup_file): + continue try: os.remove(path) removed_count += 1 From 405dc446c8b7f09963496202b639270a67145352 Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Thu, 30 Oct 2025 18:32:23 +0800 Subject: [PATCH 05/13] fix: support relative path for flag --kubeconfig-path --- src/kubectl_handler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index f206685..d3aab66 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -95,6 +95,7 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, return self[cluster_id] if kubeconfig_mode == "LOCAL": # 使用本地 kubeconfig 文件 + kubeconfig_path = os.path.abspath(kubeconfig_path) self.donot_cleanup_file = kubeconfig_path if not kubeconfig_path or not os.path.exists(kubeconfig_path): raise ValueError(f"Local kubeconfig path is not set or file {kubeconfig_path} does not exist") From ac28f0f6a793d85adcf3fdc394f82f9ce6ffe7a7 Mon Sep 17 00:00:00 2001 From: "Shichun Feng (Sean)" Date: Mon, 3 Nov 2025 16:06:17 +0800 Subject: [PATCH 06/13] 1. kubeconfig management 2. refine README and DESIGN --- DESIGN.md | 38 ++++++++++++++++++++++++++++++++++++++ README.md | 8 ++++---- src/kubectl_handler.py | 9 +++++++-- 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index 7fe25c1..83b5bc7 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -124,6 +124,44 @@ ack-mcp-server 采用分层架构,遵循以下设计原则: 每个 Handler 实现特定领域的 MCP 工具和资源: +## 鉴权方案策略 / 集群Kubeconfig证书管理 + +ack-mcp-server中tools所需权限分为: +- 访问Kubernetes集群rbac权限,通过集群证书访问 +- 访问阿里云服务权限,通过阿里云OpenAPI访问,通过阿里云Ram鉴权体系鉴权 +- 访问可观测数据,如Prometheus指标、日志系统数据 + +### Kubernetes集群访问策略 + +通过配置ack-mcp-server参数: +```shell +KUBECONFIG_MODE = ACK_PUBLIC(默认,通过ACK OpenAPI获取公网kubeconfig访问) / ACK_PRIVATE (通过ACK OpenAPI获取内网kubeconfig访问) / LOCAL(本地kubeconfig) + +KUBECONFIG_PATH = xxx (Optional参数,只有当KUBECONFIG_MODE = LOCAL 时生效,指定本地kubeconfig文件路径) +``` + +注意:本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/obtain-the-kubeconfig-file-of-a-cluster-and-use-kubectl-to-connect-to-the-cluster#a4bbf3452azq5)。 + +推荐生产使用时,打通集群网络内网访问后,推荐使用KUBECONFIG_MODE = ACK_PRIVATE,通过阿里云OpenAPI获取内网kubeconfig访问,避免公网暴露kubeconfig。 + +### 访问阿里云服务权限 + +通过[阿里云Ram鉴权体系](https://help.aliyun.com/zh/sdk/developer-reference/v2-manage-python-access-credentials)。 + +推荐生产使用,推荐通过子账号控制授权策略,满足安全最小使用权限范围最佳实践。 + +### 访问可观测数据 + +优先访问ACK集群对应的阿里云Prometheus服务数据,如没有对应服务,通过env参数寻找可观测数据的访问地址。 +通过配置可指定[Prometheus Read HTTP API](https://prometheus.io/docs/prometheus/latest/querying/api/)。 + +当该集群没有阿里云Prometheus对应实例数据,ack-mcp-server将按按如下优先级寻找={prometheus_http_api_url}访问可观测数据。 +```shell +env参数配置: +PROMETHEUS_HTTP_API_{cluster_id}={prometheus_http_api_url} +PROMETHEUS_HTTP_API={prometheus_http_api_url} +``` + ## 包命名和版本管理 ### 项目命名 diff --git a/README.md b/README.md index 50cad67..43ed150 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ https://github.com/user-attachments/assets/9e48cac3-0af1-424c-9f16-3862d047cc68 ### 💻 2.2 (可选)创建ACK集群 - 阿里云账号中已创建的 ACK 集群 -- ACK集群开启公网访问的kubeconfig or ack-mcp-server本地网络可访问的kubeconfig配置(置于.kube/config中) +- 需要生成的集群网络可访问的情况下,配置对应的Kubernetes集群访问凭证,参考[配置方式](./DESIGN.md#kubernetes集群访问策略),在生产环境建议打通集群网络后,通过配置KUBECONFIG_MODE = ACK_PRIVATE,通过内网访问集群。 ### 📍 2.3 部署运行ack-mcp-server @@ -196,8 +196,8 @@ make build-binary - Python 3.12+ - 阿里云账号及 AccessKey、AccessSecretKey,所需权限集 - 阿里云账号中已创建的 ACK 集群 -- ACK集群开启公网访问的kubeconfig or ack-mcp-server本地网络可访问的kubeconfig配置(置于.kube/config中) - +- 配置ACK集群可被ack-mcp-server本地网络可访问的kubeconfig配置,参考[配置方式](./DESIGN.md#kubernetes集群访问策略)。 + - 注:推荐在生产环境建议打通集群网络后,通过配置KUBECONFIG_MODE = ACK_PRIVATE,通过内网访问集群。本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/obtain-the-kubeconfig-file-of-a-cluster-and-use-kubectl-to-connect-to-the-cluster#a4bbf3452azq5)。 ### 📋 3.2 开发环境搭建 @@ -374,7 +374,7 @@ cd benchmarks ## 7. 常见问题 - **未配置 AK**: 请检查 ACCESS_KEY_ID/ACCESS_KEY_SECRET 环境变量 -- **ACK集群未开公网kubeconfig**: ack-mcp-server无法执行kubectl tool,需要ACK集群开启公网访问的kubeconfig 或者 ack-mcp-server本地网络可访问的kubeconfig配置(置于.kube/config中) +- **ACK集群网络不可访问**: 当ack-mcp-server使用 KUBECONFIG_MODE = ACK_PUBLIC 公网方式访问集群kubeconfig,需要ACK集群开启公网访问的kubeconfig,在生产环境中推荐打通集群网络,并使用 ACK_PRIVATE 私网方式访问集群kubeconfig,以遵守生产安全最佳实践。 ## 8. 安全 diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index d3aab66..840cf68 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -9,7 +9,6 @@ from ack_cluster_handler import parse_master_url from models import KubectlOutput - class KubectlContextManager(TTLCache): """基于 TTL+LRU 缓存的 kubeconfig 文件管理器""" @@ -252,7 +251,13 @@ def get_context_manager(ttl_minutes: int = 60) -> KubectlContextManager: class KubectlHandler: - """Handler for running kubectl commands via a FastMCP tool.""" + """ + Handler for running kubectl commands via a FastMCP tool. + + Design: + kubeconfig management policy: https://github.com/aliyun/alibabacloud-ack-mcp-server/issues/1 + + """ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): """Initialize the kubectl handler. From 0cb2114823472dd2c7a91ea3cd82e1a4be81e745 Mon Sep 17 00:00:00 2001 From: "Shichun Feng (Sean)" Date: Mon, 3 Nov 2025 16:21:54 +0800 Subject: [PATCH 07/13] =?UTF-8?q?=E5=8A=A0=E4=B8=8A=E9=BB=98=E8=AE=A4kubec?= =?UTF-8?q?onfig=E7=AD=96=E7=95=A5=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DESIGN.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/DESIGN.md b/DESIGN.md index 83b5bc7..ba198b5 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -142,6 +142,8 @@ KUBECONFIG_PATH = xxx (Optional参数,只有当KUBECONFIG_MODE = LOCAL 时生 注意:本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/obtain-the-kubeconfig-file-of-a-cluster-and-use-kubectl-to-connect-to-the-cluster#a4bbf3452azq5)。 +默认配置为通过阿里云OpenAPI获取公网kubeconfig访问,默认ttl=1h。 + 推荐生产使用时,打通集群网络内网访问后,推荐使用KUBECONFIG_MODE = ACK_PRIVATE,通过阿里云OpenAPI获取内网kubeconfig访问,避免公网暴露kubeconfig。 ### 访问阿里云服务权限 From 47709fcc27289116eea45c4f1c838b5a7b70aa7c Mon Sep 17 00:00:00 2001 From: "Shichun Feng (Sean)" Date: Mon, 3 Nov 2025 16:24:13 +0800 Subject: [PATCH 08/13] refine doc --- DESIGN.md | 2 +- README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index ba198b5..f52de78 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -140,7 +140,7 @@ KUBECONFIG_MODE = ACK_PUBLIC(默认,通过ACK OpenAPI获取公网kubeconfig访 KUBECONFIG_PATH = xxx (Optional参数,只有当KUBECONFIG_MODE = LOCAL 时生效,指定本地kubeconfig文件路径) ``` -注意:本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/obtain-the-kubeconfig-file-of-a-cluster-and-use-kubectl-to-connect-to-the-cluster#a4bbf3452azq5)。 +注意:本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/control-public-access-to-the-api-server-of-a-cluster)。 默认配置为通过阿里云OpenAPI获取公网kubeconfig访问,默认ttl=1h。 diff --git a/README.md b/README.md index 43ed150..1bda567 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,7 @@ make build-binary - 阿里云账号及 AccessKey、AccessSecretKey,所需权限集 - 阿里云账号中已创建的 ACK 集群 - 配置ACK集群可被ack-mcp-server本地网络可访问的kubeconfig配置,参考[配置方式](./DESIGN.md#kubernetes集群访问策略)。 - - 注:推荐在生产环境建议打通集群网络后,通过配置KUBECONFIG_MODE = ACK_PRIVATE,通过内网访问集群。本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/obtain-the-kubeconfig-file-of-a-cluster-and-use-kubectl-to-connect-to-the-cluster#a4bbf3452azq5)。 + - 注:推荐在生产环境建议打通集群网络后,通过配置KUBECONFIG_MODE = ACK_PRIVATE,通过内网访问集群。本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/control-public-access-to-the-api-server-of-a-cluster)。 ### 📋 3.2 开发环境搭建 From a042fc2a0dcf18536b6b8ce2e41a08f2e7ba43f5 Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Mon, 3 Nov 2025 16:25:41 +0800 Subject: [PATCH 09/13] fix: typo --- src/kubectl_handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index 840cf68..3f3409c 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -95,7 +95,7 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, if kubeconfig_mode == "LOCAL": # 使用本地 kubeconfig 文件 kubeconfig_path = os.path.abspath(kubeconfig_path) - self.donot_cleanup_file = kubeconfig_path + self.do_not_cleanup_file = kubeconfig_path if not kubeconfig_path or not os.path.exists(kubeconfig_path): raise ValueError(f"Local kubeconfig path is not set or file {kubeconfig_path} does not exist") logger.debug(f"Using local kubeconfig for cluster {cluster_id} from {kubeconfig_path}") @@ -127,7 +127,7 @@ def popitem(self): """重写 popitem 方法,在驱逐缓存项时清理 kubeconfig 文件""" key, path = super().popitem() # 删除 kubeconfig 文件 - if path and os.path.exists(path) and os.path.abspath(path) != os.path.abspath(self.donot_cleanup_file): + if path and os.path.exists(path) and os.path.abspath(path) != os.path.abspath(self.do_not_cleanup_file): try: os.remove(path) logger.debug(f"Removed cached kubeconfig file: {path}") @@ -141,7 +141,7 @@ def cleanup(self): removed_count = 0 for key, path in list(self.items()): if path and os.path.exists(path): - if os.path.abspath(path) == os.path.abspath(self.donot_cleanup_file): + if os.path.abspath(path) == os.path.abspath(self.do_not_cleanup_file): continue try: os.remove(path) From ca31beba1812cd059c077f64cdc9518253129d42 Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Tue, 4 Nov 2025 11:13:59 +0800 Subject: [PATCH 10/13] fix: add default kubeconfig path --- src/kubectl_handler.py | 17 ++++++++++++----- src/main_server.py | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index 3f3409c..23657c4 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -22,6 +22,7 @@ def __init__(self, ttl_minutes: int = 60): super().__init__(maxsize=50, ttl=ttl_minutes * 60) # TTL 以秒为单位,提前5min self._cs_client = None # CS客户端实例 + self.do_not_cleanup_file = None # 本地kubeconfig文件路径,不需要清理 # 使用 .kube 目录存储 kubeconfig 文件 self._kube_dir = os.path.expanduser("~/.kube") @@ -94,10 +95,13 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, return self[cluster_id] if kubeconfig_mode == "LOCAL": # 使用本地 kubeconfig 文件 - kubeconfig_path = os.path.abspath(kubeconfig_path) + # 检查路径是否为空 + if not kubeconfig_path: + raise ValueError(f"Local kubeconfig path is not set") + kubeconfig_path = os.path.abspath(os.path.expanduser(kubeconfig_path)) + if not os.path.exists(kubeconfig_path): + raise ValueError(f"File {kubeconfig_path} does not exist") self.do_not_cleanup_file = kubeconfig_path - if not kubeconfig_path or not os.path.exists(kubeconfig_path): - raise ValueError(f"Local kubeconfig path is not set or file {kubeconfig_path} does not exist") logger.debug(f"Using local kubeconfig for cluster {cluster_id} from {kubeconfig_path}") self[cluster_id] = kubeconfig_path return kubeconfig_path @@ -127,7 +131,9 @@ def popitem(self): """重写 popitem 方法,在驱逐缓存项时清理 kubeconfig 文件""" key, path = super().popitem() # 删除 kubeconfig 文件 - if path and os.path.exists(path) and os.path.abspath(path) != os.path.abspath(self.do_not_cleanup_file): + if (path and os.path.exists(path) and + self.do_not_cleanup_file and + os.path.abspath(path) != os.path.abspath(self.do_not_cleanup_file)): try: os.remove(path) logger.debug(f"Removed cached kubeconfig file: {path}") @@ -141,7 +147,8 @@ def cleanup(self): removed_count = 0 for key, path in list(self.items()): if path and os.path.exists(path): - if os.path.abspath(path) == os.path.abspath(self.do_not_cleanup_file): + # 只有当do_not_cleanup_file存在且路径不同时才清理 + if self.do_not_cleanup_file and os.path.abspath(path) == os.path.abspath(self.do_not_cleanup_file): continue try: os.remove(path) diff --git a/src/main_server.py b/src/main_server.py index 8f5194a..a39f6fa 100644 --- a/src/main_server.py +++ b/src/main_server.py @@ -287,7 +287,7 @@ def main(): # ACK kubectl 配置 "kubeconfig_mode": args.kubeconfig_mode or os.getenv("KUBECONFIG_MODE", "ACK_PUBLIC"), - "kubeconfig_path": args.kubeconfig_path or os.getenv("KUBECONFIG_PATH"), + "kubeconfig_path": args.kubeconfig_path or os.getenv("KUBECONFIG_PATH", "~/.kube/config"), } # 验证必要的配置 From 31fedf9167adfcc3166642015728dcb56143079f Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Tue, 4 Nov 2025 14:12:32 +0800 Subject: [PATCH 11/13] feat(kubeconfig): support incluster mode --- src/kubectl_handler.py | 42 ++++++++++++++++++++++++++++++++++++++++++ src/main_server.py | 2 +- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index 23657c4..f668d5d 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -93,6 +93,14 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, if cluster_id in self: logger.debug(f"Found cached kubeconfig for cluster {cluster_id}") return self[cluster_id] + + if kubeconfig_mode == "INCLUSTER": + # 使用集群内配置 + logger.debug(f"Using in-cluster kubeconfig for cluster {cluster_id}") + kubeconfig_path = self._construct_incluster_kubeconfig() + self[cluster_id] = kubeconfig_path + return kubeconfig_path + if kubeconfig_mode == "LOCAL": # 使用本地 kubeconfig 文件 # 检查路径是否为空 @@ -227,6 +235,40 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = F logger.error(f"Failed to fetch kubeconfig for cluster {cluster_id}: {e}") raise e + def _construct_incluster_kubeconfig(self) -> str: + """构造集群内 kubeconfig 文件路径 + + Returns: + kubeconfig 文件路径 + """ + tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" + rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + host, port = os.getenv("KUBERNETES_SERVICE_HOST"), os.getenv("KUBERNETES_SERVICE_PORT") + if not host or not port: + raise ValueError("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined") + + kubeconfig_path = os.path.join(self._kube_dir, "config.incluster") + with open(kubeconfig_path, 'w') as f: + f.write(f"""apiVersion: v1 +clusters: +- cluster: + certificate-authority: {rootCAFile} + server: https://{host}:{port} + name: in-cluster +contexts: +- context: + cluster: in-cluster + user: in-cluster + name: in-cluster +current-context: in-cluster +kind: Config +users: +- name: in-cluster + user: + tokenFile: {tokenFile} +""") + return kubeconfig_path + def get_kubeconfig_path(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str) -> str: """获取集群的 kubeconfig 文件路径 diff --git a/src/main_server.py b/src/main_server.py index a39f6fa..fc4eebb 100644 --- a/src/main_server.py +++ b/src/main_server.py @@ -225,7 +225,7 @@ def main(): parser.add_argument( "--kubeconfig-mode", type=str, - choices=["ACK_PUBLIC", "ACK_PRIVATE", "LOCAL"], + choices=["ACK_PUBLIC", "ACK_PRIVATE", "INCLUSTER", "LOCAL"], help="Mode to obtain kubeconfig for ACK clusters (default: from env KUBECONFIG_MODE)" ) parser.add_argument( From 49ce32cba9aab3df645d5fbf0448deca160dfb86 Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Tue, 4 Nov 2025 15:35:14 +0800 Subject: [PATCH 12/13] feat(test): add test cases for kubeconfig mode --- src/tests/test_kubeconfig_mode.py | 570 ++++++++++++++++++++++++++++++ 1 file changed, 570 insertions(+) create mode 100644 src/tests/test_kubeconfig_mode.py diff --git a/src/tests/test_kubeconfig_mode.py b/src/tests/test_kubeconfig_mode.py new file mode 100644 index 0000000..eff2aa0 --- /dev/null +++ b/src/tests/test_kubeconfig_mode.py @@ -0,0 +1,570 @@ +import os +import sys +import tempfile +import pytest +from unittest.mock import patch, MagicMock, mock_open + +# 添加父目录到路径以导入模块 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import kubectl_handler as module_under_test + + +class FakeServer: + def __init__(self): + self.tools = {} + + def tool(self, name: str = None, description: str = None): + def decorator(func): + key = name or getattr(func, "__name__", "unnamed") + self.tools[key] = func + return func + return decorator + + +class FakeRequestContext: + def __init__(self, lifespan_context=None): + self.lifespan_context = lifespan_context or {} + + +class FakeContext: + def __init__(self, lifespan_context=None): + self.request_context = FakeRequestContext(lifespan_context) + + +class FakeCSClient: + def describe_cluster_detail(self, cluster_id): + class FakeResponse: + class FakeBody: + def __init__(self): + self.master_url = '{"api_server_endpoint": "https://test.example.com:6443", "intranet_api_server_endpoint": "https://internal.test.com:6443"}' + body = FakeBody() + return FakeResponse() + + def describe_cluster_user_kubeconfig(self, cluster_id, request): + class FakeResponse: + class FakeBody: + config = "apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443\nusers:\n- name: test-user\n user:\n token: test-token" + body = FakeBody() + return FakeResponse() + + +class FakeCSClientFactory: + def __call__(self, region_id, config=None): + return FakeCSClient() + + +class FakeLifespanContext: + def __init__(self): + self.providers = {"cs_client_factory": FakeCSClientFactory()} + self.config = {"region_id": "cn-hangzhou"} + + +@pytest.fixture +def context_manager(): + """创建一个新的上下文管理器实例用于测试""" + # 清理全局缓存 + module_under_test._context_manager = None + cm = module_under_test.get_context_manager(ttl_minutes=1) # 使用1分钟TTL便于测试 + yield cm + # 清理 + cm.cleanup() + module_under_test._context_manager = None + + +@pytest.fixture +def temp_kubeconfig_file(): + """创建一个临时kubeconfig文件用于测试""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write("apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443\nusers:\n- name: test-user\n user:\n token: test-token") + temp_path = f.name + yield temp_path + # 清理 + if os.path.exists(temp_path): + os.unlink(temp_path) + + +def test_local_kubeconfig_mode_success(context_manager, temp_kubeconfig_file): + """测试 LOCAL 模式成功使用本地 kubeconfig 文件""" + cluster_id = "test-cluster" + + # 使用 LOCAL 模式获取 kubeconfig 路径 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=temp_kubeconfig_file + ) + + # 验证返回的路径是本地文件路径 + assert kubeconfig_path == temp_kubeconfig_file + assert os.path.exists(kubeconfig_path) + + # 验证缓存中已存储 + assert cluster_id in context_manager + assert context_manager[cluster_id] == temp_kubeconfig_file + + +def test_local_kubeconfig_mode_file_not_exists(context_manager): + """测试 LOCAL 模式但文件不存在的情况""" + cluster_id = "test-cluster" + non_existent_path = "/tmp/non-existent-kubeconfig.yaml" + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="File .* does not exist"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=non_existent_path + ) + + +def test_local_kubeconfig_mode_empty_path(context_manager): + """测试 LOCAL 模式但路径为空的情况""" + cluster_id = "test-cluster" + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="Local kubeconfig path is not set"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path="" + ) + + +def test_ack_public_kubeconfig_mode_success(context_manager): + """测试 ACK_PUBLIC 模式成功获取 kubeconfig""" + cluster_id = "test-cluster" + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClient() + + # 使用 ACK_PUBLIC 模式获取 kubeconfig 路径 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PUBLIC", + kubeconfig_path="" + ) + + # 验证返回的路径是生成的文件路径 + assert kubeconfig_path.startswith(os.path.expanduser("~/.kube/mcp-kubeconfig-")) + assert kubeconfig_path.endswith(".yaml") + assert os.path.exists(kubeconfig_path) + + # 验证缓存中已存储 + assert cluster_id in context_manager + assert context_manager[cluster_id] == kubeconfig_path + + # 验证文件内容 + with open(kubeconfig_path, 'r') as f: + content = f.read() + assert "apiVersion: v1" in content + assert "server: https://test.example.com:6443" in content + + +def test_ack_private_kubeconfig_mode_success(context_manager): + """测试 ACK_PRIVATE 模式成功获取 kubeconfig""" + cluster_id = "test-cluster" + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClient() + + # 使用 ACK_PRIVATE 模式获取 kubeconfig 路径 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PRIVATE", + kubeconfig_path="" + ) + + # 验证返回的路径是生成的文件路径 + assert kubeconfig_path.startswith(os.path.expanduser("~/.kube/mcp-kubeconfig-")) + assert kubeconfig_path.endswith(".yaml") + assert os.path.exists(kubeconfig_path) + + # 验证缓存中已存储 + assert cluster_id in context_manager + assert context_manager[cluster_id] == kubeconfig_path + + +def test_incluster_kubeconfig_mode_success(context_manager): + """测试 INCLUSTER 模式成功构造 kubeconfig""" + cluster_id = "test-cluster" + + # Mock 环境变量 + with patch.dict(os.environ, { + "KUBERNETES_SERVICE_HOST": "kubernetes.default.svc", + "KUBERNETES_SERVICE_PORT": "443" + }): + # Mock 文件操作 + mock_file = mock_open() + with patch('kubectl_handler.os.open', mock_file), \ + patch('kubectl_handler.os.O_RDWR', 0), \ + patch('kubectl_handler.os.O_CREAT', 0): + # 使用 INCLUSTER 模式获取 kubeconfig 路径 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="INCLUSTER", + kubeconfig_path="" + ) + + # 验证返回的路径是集群内配置文件路径 + expected_path = os.path.expanduser("~/.kube/config.incluster") + assert kubeconfig_path == expected_path + + # 验证缓存中已存储 + assert cluster_id in context_manager + assert context_manager[cluster_id] == kubeconfig_path + + +def test_ack_public_kubeconfig_mode_api_failure(context_manager): + """测试 ACK_PUBLIC 模式但 API 调用失败的情况""" + cluster_id = "test-cluster" + + # Mock CS 客户端返回空配置 + class FakeCSClientFailure: + def describe_cluster_detail(self, cluster_id): + class FakeResponse: + class FakeBody: + def __init__(self): + self.master_url = '{"api_server_endpoint": "https://test.example.com:6443", "intranet_api_server_endpoint": "https://internal.test.com:6443"}' + body = FakeBody() + return FakeResponse() + + def describe_cluster_user_kubeconfig(self, cluster_id, request): + class FakeResponse: + class FakeBody: + config = None # 返回空配置 + body = FakeBody() + return FakeResponse() + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClientFailure() + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="Failed to get kubeconfig for cluster"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PUBLIC", + kubeconfig_path="" + ) + + +def test_ack_private_kubeconfig_mode_no_intranet_endpoint(context_manager): + """测试 ACK_PRIVATE 模式但集群没有内网端点的情况""" + cluster_id = "test-cluster" + + # Mock CS 客户端返回没有内网端点的集群详情 + class FakeCSClientNoIntranet: + def describe_cluster_detail(self, cluster_id): + class FakeResponse: + class FakeBody: + def __init__(self): + # 没有 intranet_api_server_endpoint + self.master_url = '{"api_server_endpoint": "https://test.example.com:6443"}' + body = FakeBody() + return FakeResponse() + + def describe_cluster_user_kubeconfig(self, cluster_id, request): + class FakeResponse: + class FakeBody: + config = "apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443" + body = FakeBody() + return FakeResponse() + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClientNoIntranet() + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="does not have intranet endpoint access"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PRIVATE", + kubeconfig_path="" + ) + + +def test_ack_public_kubeconfig_mode_no_public_endpoint(context_manager): + """测试 ACK_PUBLIC 模式但集群没有公网端点的情况""" + cluster_id = "test-cluster" + + # Mock CS 客户端返回没有公网端点的集群详情 + class FakeCSClientNoPublic: + def describe_cluster_detail(self, cluster_id): + class FakeResponse: + class FakeBody: + def __init__(self): + # 没有 api_server_endpoint + self.master_url = '{"intranet_api_server_endpoint": "https://internal.test.com:6443"}' + body = FakeBody() + return FakeResponse() + + def describe_cluster_user_kubeconfig(self, cluster_id, request): + class FakeResponse: + class FakeBody: + config = "apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443" + body = FakeBody() + return FakeResponse() + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClientNoPublic() + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="does not have public endpoint access"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PUBLIC", + kubeconfig_path="" + ) + + +def test_cached_kubeconfig_reuse(context_manager, temp_kubeconfig_file): + """测试缓存的 kubeconfig 被正确重用""" + cluster_id = "test-cluster" + + # 第一次获取 kubeconfig + kubeconfig_path1 = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=temp_kubeconfig_file + ) + + # 第二次获取同一个集群的 kubeconfig + kubeconfig_path2 = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=temp_kubeconfig_file + ) + + # 应该返回相同的路径 + assert kubeconfig_path1 == kubeconfig_path2 + assert kubeconfig_path1 == temp_kubeconfig_file + + +def test_kubeconfig_cleanup_on_cache_eviction(context_manager): + """测试缓存驱逐时 kubeconfig 文件被正确清理""" + cluster_id = "test-cluster" + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClient() + + # 获取 ACK_PUBLIC 模式的 kubeconfig (会创建临时文件) + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PUBLIC", + kubeconfig_path="" + ) + + # 验证文件存在 + assert os.path.exists(kubeconfig_path) + + # 验证缓存中有项目 + assert len(context_manager) == 1 + + # 手动驱逐缓存项,应该会清理文件 + key, path = context_manager.popitem() + assert key == cluster_id + assert path == kubeconfig_path + + # 文件应该已被删除 + # 注意:由于 popitem 中的清理逻辑,文件应该已被删除 + + +def test_local_kubeconfig_not_cleaned_up(context_manager, temp_kubeconfig_file): + """测试 LOCAL 模式的 kubeconfig 文件不会被清理""" + cluster_id = "test-cluster" + + # 使用 LOCAL 模式 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=temp_kubeconfig_file + ) + + # 手动调用清理方法 + context_manager.cleanup() + + # 本地文件应该仍然存在 + assert os.path.exists(temp_kubeconfig_file) + + +@pytest.mark.asyncio +async def test_kubectl_tool_with_local_kubeconfig_mode(): + """测试 kubectl 工具使用 LOCAL 模式""" + # 创建临时 kubeconfig 文件 + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write("apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443\nusers:\n- name: test-user\n user:\n token: test-token") + temp_kubeconfig_path = f.name + + try: + # 创建带有 LOCAL 模式设置的 handler + server = FakeServer() + handler = module_under_test.KubectlHandler( + server, + { + "allow_write": True, + "kubeconfig_mode": "LOCAL", + "kubeconfig_path": temp_kubeconfig_path + } + ) + tool = server.tools["ack_kubectl"] + + # Mock subprocess.run + with patch('kubectl_handler.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="pods found", stderr="") + + # 创建上下文 + ctx = FakeContext(FakeLifespanContext()) + + # 执行命令 + result = await tool(ctx, command="get pods", cluster_id="test-cluster") + + # 验证结果 + assert result.exit_code == 0 + assert result.stdout == "pods found" + + # 验证 subprocess.run 被调用 + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # 获取第一个位置参数 + assert f"--kubeconfig {temp_kubeconfig_path}" in call_args + finally: + # 清理临时文件 + if os.path.exists(temp_kubeconfig_path): + os.unlink(temp_kubeconfig_path) + + # 清理全局缓存 + module_under_test._context_manager = None + + +@pytest.mark.asyncio +async def test_kubectl_tool_with_ack_public_kubeconfig_mode(): + """测试 kubectl 工具使用 ACK_PUBLIC 模式""" + # 创建带有 ACK_PUBLIC 模式设置的 handler + server = FakeServer() + handler = module_under_test.KubectlHandler( + server, + { + "allow_write": True, + "kubeconfig_mode": "ACK_PUBLIC", + "kubeconfig_path": "" + } + ) + tool = server.tools["ack_kubectl"] + + # Mock subprocess.run + with patch('kubectl_handler.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="pods found", stderr="") + + # Mock CS 客户端 + with patch('kubectl_handler.get_context_manager') as mock_get_context_manager: + # 创建 mock context manager + mock_context_manager = MagicMock() + mock_context_manager.get_kubeconfig_path.return_value = "/tmp/test-kubeconfig.yaml" + mock_get_context_manager.return_value = mock_context_manager + + # 创建上下文 + ctx = FakeContext(FakeLifespanContext()) + + # 执行命令 + result = await tool(ctx, command="get pods", cluster_id="test-cluster") + + # 验证结果 + assert result.exit_code == 0 + assert result.stdout == "pods found" + + # 验证 subprocess.run 被调用 + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # 获取第一个位置参数 + assert "--kubeconfig /tmp/test-kubeconfig.yaml" in call_args + + +@pytest.mark.asyncio +async def test_kubectl_tool_with_ack_private_kubeconfig_mode(): + """测试 kubectl 工具使用 ACK_PRIVATE 模式""" + # 创建带有 ACK_PRIVATE 模式设置的 handler + server = FakeServer() + handler = module_under_test.KubectlHandler( + server, + { + "allow_write": True, + "kubeconfig_mode": "ACK_PRIVATE", + "kubeconfig_path": "" + } + ) + tool = server.tools["ack_kubectl"] + + # Mock subprocess.run + with patch('kubectl_handler.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="pods found", stderr="") + + # Mock CS 客户端 + with patch('kubectl_handler.get_context_manager') as mock_get_context_manager: + # 创建 mock context manager + mock_context_manager = MagicMock() + mock_context_manager.get_kubeconfig_path.return_value = "/tmp/test-kubeconfig.yaml" + mock_get_context_manager.return_value = mock_context_manager + + # 创建上下文 + ctx = FakeContext(FakeLifespanContext()) + + # 执行命令 + result = await tool(ctx, command="get pods", cluster_id="test-cluster") + + # 验证结果 + assert result.exit_code == 0 + assert result.stdout == "pods found" + + # 验证 subprocess.run 被调用 + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # 获取第一个位置参数 + assert "--kubeconfig /tmp/test-kubeconfig.yaml" in call_args + + +@pytest.mark.asyncio +async def test_kubectl_tool_with_incluster_kubeconfig_mode(): + """测试 kubectl 工具使用 INCLUSTER 模式""" + # 创建带有 INCLUSTER 模式设置的 handler + server = FakeServer() + handler = module_under_test.KubectlHandler( + server, + { + "allow_write": True, + "kubeconfig_mode": "INCLUSTER", + "kubeconfig_path": "" + } + ) + tool = server.tools["ack_kubectl"] + + # Mock subprocess.run + with patch('kubectl_handler.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="pods found", stderr="") + + # Mock CS 客户端 + with patch('kubectl_handler.get_context_manager') as mock_get_context_manager: + # 创建 mock context manager + mock_context_manager = MagicMock() + mock_context_manager.get_kubeconfig_path.return_value = "/tmp/.kube/config.incluster" + mock_get_context_manager.return_value = mock_context_manager + + # 创建上下文 + ctx = FakeContext(FakeLifespanContext()) + + # 执行命令 + result = await tool(ctx, command="get pods", cluster_id="test-cluster") + + # 验证结果 + assert result.exit_code == 0 + assert result.stdout == "pods found" + + # 验证 subprocess.run 被调用 + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # 获取第一个位置参数 + assert "--kubeconfig /tmp/.kube/config.incluster" in call_args + + +if __name__ == "__main__": + pytest.main([__file__]) \ No newline at end of file From 34f1c452faed91cff290ff3a2fd8b2d8b6369c2b Mon Sep 17 00:00:00 2001 From: Yusheng Guo Date: Tue, 4 Nov 2025 16:40:12 +0800 Subject: [PATCH 13/13] fix: kubeconfig clean up --- src/kubectl_handler.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index f668d5d..f305eea 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -139,9 +139,10 @@ def popitem(self): """重写 popitem 方法,在驱逐缓存项时清理 kubeconfig 文件""" key, path = super().popitem() # 删除 kubeconfig 文件 - if (path and os.path.exists(path) and - self.do_not_cleanup_file and - os.path.abspath(path) != os.path.abspath(self.do_not_cleanup_file)): + if path and os.path.exists(path): + if self.do_not_cleanup_file and os.path.samefile(path, self.do_not_cleanup_file): + logger.debug(f"Skipped removal of protected kubeconfig file: {path}") + return try: os.remove(path) logger.debug(f"Removed cached kubeconfig file: {path}") @@ -156,7 +157,7 @@ def cleanup(self): for key, path in list(self.items()): if path and os.path.exists(path): # 只有当do_not_cleanup_file存在且路径不同时才清理 - if self.do_not_cleanup_file and os.path.abspath(path) == os.path.abspath(self.do_not_cleanup_file): + if self.do_not_cleanup_file and os.path.samefile(path, self.do_not_cleanup_file): continue try: os.remove(path)