diff --git a/DESIGN.md b/DESIGN.md index 7fe25c1..f52de78 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -124,6 +124,46 @@ ack-mcp-server 采用分层架构,遵循以下设计原则: 每个 Handler 实现特定领域的 MCP 工具和资源: +## 鉴权方案策略 / 集群Kubeconfig证书管理 + +ack-mcp-server中tools所需权限分为: +- 访问Kubernetes集群rbac权限,通过集群证书访问 +- 访问阿里云服务权限,通过阿里云OpenAPI访问,通过阿里云Ram鉴权体系鉴权 +- 访问可观测数据,如Prometheus指标、日志系统数据 + +### Kubernetes集群访问策略 + +通过配置ack-mcp-server参数: +```shell +KUBECONFIG_MODE = ACK_PUBLIC(默认,通过ACK OpenAPI获取公网kubeconfig访问) / ACK_PRIVATE (通过ACK OpenAPI获取内网kubeconfig访问) / LOCAL(本地kubeconfig) + +KUBECONFIG_PATH = xxx (Optional参数,只有当KUBECONFIG_MODE = LOCAL 时生效,指定本地kubeconfig文件路径) +``` + +注意:本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/control-public-access-to-the-api-server-of-a-cluster)。 + +默认配置为通过阿里云OpenAPI获取公网kubeconfig访问,默认ttl=1h。 + +推荐生产使用时,打通集群网络内网访问后,推荐使用KUBECONFIG_MODE = ACK_PRIVATE,通过阿里云OpenAPI获取内网kubeconfig访问,避免公网暴露kubeconfig。 + +### 访问阿里云服务权限 + +通过[阿里云Ram鉴权体系](https://help.aliyun.com/zh/sdk/developer-reference/v2-manage-python-access-credentials)。 + +推荐生产使用,推荐通过子账号控制授权策略,满足安全最小使用权限范围最佳实践。 + +### 访问可观测数据 + +优先访问ACK集群对应的阿里云Prometheus服务数据,如没有对应服务,通过env参数寻找可观测数据的访问地址。 +通过配置可指定[Prometheus Read HTTP API](https://prometheus.io/docs/prometheus/latest/querying/api/)。 + +当该集群没有阿里云Prometheus对应实例数据,ack-mcp-server将按按如下优先级寻找={prometheus_http_api_url}访问可观测数据。 +```shell +env参数配置: +PROMETHEUS_HTTP_API_{cluster_id}={prometheus_http_api_url} +PROMETHEUS_HTTP_API={prometheus_http_api_url} +``` + ## 包命名和版本管理 ### 项目命名 diff --git a/README.md b/README.md index 50cad67..1bda567 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ https://github.com/user-attachments/assets/9e48cac3-0af1-424c-9f16-3862d047cc68 ### 💻 2.2 (可选)创建ACK集群 - 阿里云账号中已创建的 ACK 集群 -- ACK集群开启公网访问的kubeconfig or ack-mcp-server本地网络可访问的kubeconfig配置(置于.kube/config中) +- 需要生成的集群网络可访问的情况下,配置对应的Kubernetes集群访问凭证,参考[配置方式](./DESIGN.md#kubernetes集群访问策略),在生产环境建议打通集群网络后,通过配置KUBECONFIG_MODE = ACK_PRIVATE,通过内网访问集群。 ### 📍 2.3 部署运行ack-mcp-server @@ -196,8 +196,8 @@ make build-binary - Python 3.12+ - 阿里云账号及 AccessKey、AccessSecretKey,所需权限集 - 阿里云账号中已创建的 ACK 集群 -- ACK集群开启公网访问的kubeconfig or ack-mcp-server本地网络可访问的kubeconfig配置(置于.kube/config中) - +- 配置ACK集群可被ack-mcp-server本地网络可访问的kubeconfig配置,参考[配置方式](./DESIGN.md#kubernetes集群访问策略)。 + - 注:推荐在生产环境建议打通集群网络后,通过配置KUBECONFIG_MODE = ACK_PRIVATE,通过内网访问集群。本地测试使用公网访问集群kubeconfig需在[对应ACK开启公网访问kubeconfig](https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/control-public-access-to-the-api-server-of-a-cluster)。 ### 📋 3.2 开发环境搭建 @@ -374,7 +374,7 @@ cd benchmarks ## 7. 常见问题 - **未配置 AK**: 请检查 ACCESS_KEY_ID/ACCESS_KEY_SECRET 环境变量 -- **ACK集群未开公网kubeconfig**: ack-mcp-server无法执行kubectl tool,需要ACK集群开启公网访问的kubeconfig 或者 ack-mcp-server本地网络可访问的kubeconfig配置(置于.kube/config中) +- **ACK集群网络不可访问**: 当ack-mcp-server使用 KUBECONFIG_MODE = ACK_PUBLIC 公网方式访问集群kubeconfig,需要ACK集群开启公网访问的kubeconfig,在生产环境中推荐打通集群网络,并使用 ACK_PRIVATE 私网方式访问集群kubeconfig,以遵守生产安全最佳实践。 ## 8. 安全 diff --git a/src/kubectl_handler.py b/src/kubectl_handler.py index 09b457c..f305eea 100644 --- a/src/kubectl_handler.py +++ b/src/kubectl_handler.py @@ -9,7 +9,6 @@ from ack_cluster_handler import parse_master_url from models import KubectlOutput - class KubectlContextManager(TTLCache): """基于 TTL+LRU 缓存的 kubeconfig 文件管理器""" @@ -23,6 +22,7 @@ def __init__(self, ttl_minutes: int = 60): super().__init__(maxsize=50, ttl=ttl_minutes * 60) # TTL 以秒为单位,提前5min self._cs_client = None # CS客户端实例 + self.do_not_cleanup_file = None # 本地kubeconfig文件路径,不需要清理 # 使用 .kube 目录存储 kubeconfig 文件 self._kube_dir = os.path.expanduser("~/.kube") @@ -78,12 +78,14 @@ def cleanup_all_mcp_files(self): except Exception: pass - def _get_or_create_kubeconfig_file(self, cluster_id: str) -> str: + def _get_or_create_kubeconfig_file(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str) -> str: """获取或创建集群的 kubeconfig 文件 Args: cluster_id: 集群ID - + kubeconfig_mode: 获取kubeconfig的模式,支持 "ACK_PUBLIC", "ACK_PRIVATE", "LOCAL" + kubeconfig_path: 本地kubeconfig文件路径(仅在模式为LOCAL时使用) + Returns: kubeconfig 文件路径 """ @@ -92,8 +94,31 @@ def _get_or_create_kubeconfig_file(self, cluster_id: str) -> str: logger.debug(f"Found cached kubeconfig for cluster {cluster_id}") return self[cluster_id] + if kubeconfig_mode == "INCLUSTER": + # 使用集群内配置 + logger.debug(f"Using in-cluster kubeconfig for cluster {cluster_id}") + kubeconfig_path = self._construct_incluster_kubeconfig() + self[cluster_id] = kubeconfig_path + return kubeconfig_path + + if kubeconfig_mode == "LOCAL": + # 使用本地 kubeconfig 文件 + # 检查路径是否为空 + if not kubeconfig_path: + raise ValueError(f"Local kubeconfig path is not set") + kubeconfig_path = os.path.abspath(os.path.expanduser(kubeconfig_path)) + if not os.path.exists(kubeconfig_path): + raise ValueError(f"File {kubeconfig_path} does not exist") + self.do_not_cleanup_file = kubeconfig_path + logger.debug(f"Using local kubeconfig for cluster {cluster_id} from {kubeconfig_path}") + self[cluster_id] = kubeconfig_path + return kubeconfig_path + + # 从 ACK 获取 kubeconfig + private_ip_address = kubeconfig_mode == "ACK_PRIVATE" + # 创建新的 kubeconfig 文件 - kubeconfig_content = self._get_kubeconfig_from_ack(cluster_id, int(self.ttl / 60)) # 转换为分钟 + kubeconfig_content = self._get_kubeconfig_from_ack(cluster_id, private_ip_address, int(self.ttl / 60)) # 转换为分钟 if not kubeconfig_content: raise ValueError(f"Failed to get kubeconfig for cluster {cluster_id}") @@ -115,6 +140,9 @@ def popitem(self): key, path = super().popitem() # 删除 kubeconfig 文件 if path and os.path.exists(path): + if self.do_not_cleanup_file and os.path.samefile(path, self.do_not_cleanup_file): + logger.debug(f"Skipped removal of protected kubeconfig file: {path}") + return try: os.remove(path) logger.debug(f"Removed cached kubeconfig file: {path}") @@ -128,6 +156,9 @@ def cleanup(self): removed_count = 0 for key, path in list(self.items()): if path and os.path.exists(path): + # 只有当do_not_cleanup_file存在且路径不同时才清理 + if self.do_not_cleanup_file and os.path.samefile(path, self.do_not_cleanup_file): + continue try: os.remove(path) removed_count += 1 @@ -150,11 +181,12 @@ def _get_cs_client(self): raise ValueError("CS client not set") return self._cs_client - def _get_kubeconfig_from_ack(self, cluster_id: str, ttl_minutes: int = 60) -> Optional[str]: + def _get_kubeconfig_from_ack(self, cluster_id: str, private_ip_address: bool = False, ttl_minutes: int = 60) -> Optional[str]: """通过ACK API获取kubeconfig配置 Args: cluster_id: 集群ID + private_ip_address: 是否获取内网连接配置 ttl_minutes: kubeconfig有效期(分钟),默认60分钟 """ try: @@ -172,13 +204,22 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, ttl_minutes: int = 60) -> Op # 检查是否有公网API Server端点 master_url_str = getattr(cluster_info, 'master_url', '') master_url = parse_master_url(master_url_str) - if not master_url["api_server_endpoint"]: - raise ValueError(f"Cluster {cluster_id} does not have public endpoint access, " - f"Please enable public endpoint access setting first.") + if private_ip_address: + if not master_url["intranet_api_server_endpoint"]: + raise ValueError( + f"Cluster {cluster_id} does not have intranet endpoint access, " + f"Please enable intranet endpoint access setting first." + ) + else: + if not master_url["api_server_endpoint"]: + raise ValueError( + f"Cluster {cluster_id} does not have public endpoint access, " + f"Please enable public endpoint access setting first." + ) # 调用DescribeClusterUserKubeconfig API request = cs_models.DescribeClusterUserKubeconfigRequest( - private_ip_address=False, # 获取公网连接配置 + private_ip_address=private_ip_address, temporary_duration_minutes=ttl_minutes, # 使用传入的TTL ) @@ -195,16 +236,52 @@ def _get_kubeconfig_from_ack(self, cluster_id: str, ttl_minutes: int = 60) -> Op logger.error(f"Failed to fetch kubeconfig for cluster {cluster_id}: {e}") raise e - def get_kubeconfig_path(self, cluster_id: str) -> str: + def _construct_incluster_kubeconfig(self) -> str: + """构造集群内 kubeconfig 文件路径 + + Returns: + kubeconfig 文件路径 + """ + tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" + rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + host, port = os.getenv("KUBERNETES_SERVICE_HOST"), os.getenv("KUBERNETES_SERVICE_PORT") + if not host or not port: + raise ValueError("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined") + + kubeconfig_path = os.path.join(self._kube_dir, "config.incluster") + with open(kubeconfig_path, 'w') as f: + f.write(f"""apiVersion: v1 +clusters: +- cluster: + certificate-authority: {rootCAFile} + server: https://{host}:{port} + name: in-cluster +contexts: +- context: + cluster: in-cluster + user: in-cluster + name: in-cluster +current-context: in-cluster +kind: Config +users: +- name: in-cluster + user: + tokenFile: {tokenFile} +""") + return kubeconfig_path + + def get_kubeconfig_path(self, cluster_id: str, kubeconfig_mode: str, kubeconfig_path: str) -> str: """获取集群的 kubeconfig 文件路径 Args: cluster_id: 集群ID - + kubeconfig_mode: 获取kubeconfig的模式,支持 "ACK_PUBLIC", "ACK_PRIVATE", "LOCAL" + kubeconfig_path: 本地kubeconfig文件路径(仅在模式为LOCAL时使用) + Returns: kubeconfig 文件路径 """ - return self._get_or_create_kubeconfig_file(cluster_id) + return self._get_or_create_kubeconfig_file(cluster_id, kubeconfig_mode, kubeconfig_path) # 全局上下文管理器实例 @@ -224,7 +301,13 @@ def get_context_manager(ttl_minutes: int = 60) -> KubectlContextManager: class KubectlHandler: - """Handler for running kubectl commands via a FastMCP tool.""" + """ + Handler for running kubectl commands via a FastMCP tool. + + Design: + kubeconfig management policy: https://github.com/aliyun/alibabacloud-ack-mcp-server/issues/1 + + """ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None): """Initialize the kubectl handler. @@ -538,7 +621,7 @@ async def ack_kubectl( # 获取 kubeconfig 文件路径 context_manager = get_context_manager() - kubeconfig_path = context_manager.get_kubeconfig_path(cluster_id) + kubeconfig_path = context_manager.get_kubeconfig_path(cluster_id, self.settings.get("kubeconfig_mode"), self.settings.get("kubeconfig_path")) # 检查是否为流式命令 is_streaming, stream_type = self.is_streaming_command(command) diff --git a/src/main_server.py b/src/main_server.py index b278300..fc4eebb 100644 --- a/src/main_server.py +++ b/src/main_server.py @@ -222,6 +222,17 @@ def main(): type=str, help="AlibabaCloud Access Key Secret (default: from env ACCESS_KEY_SECRET)" ) + parser.add_argument( + "--kubeconfig-mode", + type=str, + choices=["ACK_PUBLIC", "ACK_PRIVATE", "INCLUSTER", "LOCAL"], + help="Mode to obtain kubeconfig for ACK clusters (default: from env KUBECONFIG_MODE)" + ) + parser.add_argument( + "--kubeconfig-path", + type=str, + help="Path to local kubeconfig file when KUBECONFIG_MODE is LOCAL (default: from env KUBECONFIG_PATH)" + ) parser.add_argument( "--audit-config", "-c", @@ -273,6 +284,10 @@ def main(): # 兼容性配置 "access_secret_key": args.access_key_secret or os.getenv("ACCESS_KEY_SECRET"), # 兼容旧字段名 "original_settings": Configs(vars(args)), + + # ACK kubectl 配置 + "kubeconfig_mode": args.kubeconfig_mode or os.getenv("KUBECONFIG_MODE", "ACK_PUBLIC"), + "kubeconfig_path": args.kubeconfig_path or os.getenv("KUBECONFIG_PATH", "~/.kube/config"), } # 验证必要的配置 diff --git a/src/tests/test_kubeconfig_mode.py b/src/tests/test_kubeconfig_mode.py new file mode 100644 index 0000000..eff2aa0 --- /dev/null +++ b/src/tests/test_kubeconfig_mode.py @@ -0,0 +1,570 @@ +import os +import sys +import tempfile +import pytest +from unittest.mock import patch, MagicMock, mock_open + +# 添加父目录到路径以导入模块 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import kubectl_handler as module_under_test + + +class FakeServer: + def __init__(self): + self.tools = {} + + def tool(self, name: str = None, description: str = None): + def decorator(func): + key = name or getattr(func, "__name__", "unnamed") + self.tools[key] = func + return func + return decorator + + +class FakeRequestContext: + def __init__(self, lifespan_context=None): + self.lifespan_context = lifespan_context or {} + + +class FakeContext: + def __init__(self, lifespan_context=None): + self.request_context = FakeRequestContext(lifespan_context) + + +class FakeCSClient: + def describe_cluster_detail(self, cluster_id): + class FakeResponse: + class FakeBody: + def __init__(self): + self.master_url = '{"api_server_endpoint": "https://test.example.com:6443", "intranet_api_server_endpoint": "https://internal.test.com:6443"}' + body = FakeBody() + return FakeResponse() + + def describe_cluster_user_kubeconfig(self, cluster_id, request): + class FakeResponse: + class FakeBody: + config = "apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443\nusers:\n- name: test-user\n user:\n token: test-token" + body = FakeBody() + return FakeResponse() + + +class FakeCSClientFactory: + def __call__(self, region_id, config=None): + return FakeCSClient() + + +class FakeLifespanContext: + def __init__(self): + self.providers = {"cs_client_factory": FakeCSClientFactory()} + self.config = {"region_id": "cn-hangzhou"} + + +@pytest.fixture +def context_manager(): + """创建一个新的上下文管理器实例用于测试""" + # 清理全局缓存 + module_under_test._context_manager = None + cm = module_under_test.get_context_manager(ttl_minutes=1) # 使用1分钟TTL便于测试 + yield cm + # 清理 + cm.cleanup() + module_under_test._context_manager = None + + +@pytest.fixture +def temp_kubeconfig_file(): + """创建一个临时kubeconfig文件用于测试""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write("apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443\nusers:\n- name: test-user\n user:\n token: test-token") + temp_path = f.name + yield temp_path + # 清理 + if os.path.exists(temp_path): + os.unlink(temp_path) + + +def test_local_kubeconfig_mode_success(context_manager, temp_kubeconfig_file): + """测试 LOCAL 模式成功使用本地 kubeconfig 文件""" + cluster_id = "test-cluster" + + # 使用 LOCAL 模式获取 kubeconfig 路径 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=temp_kubeconfig_file + ) + + # 验证返回的路径是本地文件路径 + assert kubeconfig_path == temp_kubeconfig_file + assert os.path.exists(kubeconfig_path) + + # 验证缓存中已存储 + assert cluster_id in context_manager + assert context_manager[cluster_id] == temp_kubeconfig_file + + +def test_local_kubeconfig_mode_file_not_exists(context_manager): + """测试 LOCAL 模式但文件不存在的情况""" + cluster_id = "test-cluster" + non_existent_path = "/tmp/non-existent-kubeconfig.yaml" + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="File .* does not exist"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=non_existent_path + ) + + +def test_local_kubeconfig_mode_empty_path(context_manager): + """测试 LOCAL 模式但路径为空的情况""" + cluster_id = "test-cluster" + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="Local kubeconfig path is not set"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path="" + ) + + +def test_ack_public_kubeconfig_mode_success(context_manager): + """测试 ACK_PUBLIC 模式成功获取 kubeconfig""" + cluster_id = "test-cluster" + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClient() + + # 使用 ACK_PUBLIC 模式获取 kubeconfig 路径 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PUBLIC", + kubeconfig_path="" + ) + + # 验证返回的路径是生成的文件路径 + assert kubeconfig_path.startswith(os.path.expanduser("~/.kube/mcp-kubeconfig-")) + assert kubeconfig_path.endswith(".yaml") + assert os.path.exists(kubeconfig_path) + + # 验证缓存中已存储 + assert cluster_id in context_manager + assert context_manager[cluster_id] == kubeconfig_path + + # 验证文件内容 + with open(kubeconfig_path, 'r') as f: + content = f.read() + assert "apiVersion: v1" in content + assert "server: https://test.example.com:6443" in content + + +def test_ack_private_kubeconfig_mode_success(context_manager): + """测试 ACK_PRIVATE 模式成功获取 kubeconfig""" + cluster_id = "test-cluster" + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClient() + + # 使用 ACK_PRIVATE 模式获取 kubeconfig 路径 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PRIVATE", + kubeconfig_path="" + ) + + # 验证返回的路径是生成的文件路径 + assert kubeconfig_path.startswith(os.path.expanduser("~/.kube/mcp-kubeconfig-")) + assert kubeconfig_path.endswith(".yaml") + assert os.path.exists(kubeconfig_path) + + # 验证缓存中已存储 + assert cluster_id in context_manager + assert context_manager[cluster_id] == kubeconfig_path + + +def test_incluster_kubeconfig_mode_success(context_manager): + """测试 INCLUSTER 模式成功构造 kubeconfig""" + cluster_id = "test-cluster" + + # Mock 环境变量 + with patch.dict(os.environ, { + "KUBERNETES_SERVICE_HOST": "kubernetes.default.svc", + "KUBERNETES_SERVICE_PORT": "443" + }): + # Mock 文件操作 + mock_file = mock_open() + with patch('kubectl_handler.os.open', mock_file), \ + patch('kubectl_handler.os.O_RDWR', 0), \ + patch('kubectl_handler.os.O_CREAT', 0): + # 使用 INCLUSTER 模式获取 kubeconfig 路径 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="INCLUSTER", + kubeconfig_path="" + ) + + # 验证返回的路径是集群内配置文件路径 + expected_path = os.path.expanduser("~/.kube/config.incluster") + assert kubeconfig_path == expected_path + + # 验证缓存中已存储 + assert cluster_id in context_manager + assert context_manager[cluster_id] == kubeconfig_path + + +def test_ack_public_kubeconfig_mode_api_failure(context_manager): + """测试 ACK_PUBLIC 模式但 API 调用失败的情况""" + cluster_id = "test-cluster" + + # Mock CS 客户端返回空配置 + class FakeCSClientFailure: + def describe_cluster_detail(self, cluster_id): + class FakeResponse: + class FakeBody: + def __init__(self): + self.master_url = '{"api_server_endpoint": "https://test.example.com:6443", "intranet_api_server_endpoint": "https://internal.test.com:6443"}' + body = FakeBody() + return FakeResponse() + + def describe_cluster_user_kubeconfig(self, cluster_id, request): + class FakeResponse: + class FakeBody: + config = None # 返回空配置 + body = FakeBody() + return FakeResponse() + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClientFailure() + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="Failed to get kubeconfig for cluster"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PUBLIC", + kubeconfig_path="" + ) + + +def test_ack_private_kubeconfig_mode_no_intranet_endpoint(context_manager): + """测试 ACK_PRIVATE 模式但集群没有内网端点的情况""" + cluster_id = "test-cluster" + + # Mock CS 客户端返回没有内网端点的集群详情 + class FakeCSClientNoIntranet: + def describe_cluster_detail(self, cluster_id): + class FakeResponse: + class FakeBody: + def __init__(self): + # 没有 intranet_api_server_endpoint + self.master_url = '{"api_server_endpoint": "https://test.example.com:6443"}' + body = FakeBody() + return FakeResponse() + + def describe_cluster_user_kubeconfig(self, cluster_id, request): + class FakeResponse: + class FakeBody: + config = "apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443" + body = FakeBody() + return FakeResponse() + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClientNoIntranet() + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="does not have intranet endpoint access"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PRIVATE", + kubeconfig_path="" + ) + + +def test_ack_public_kubeconfig_mode_no_public_endpoint(context_manager): + """测试 ACK_PUBLIC 模式但集群没有公网端点的情况""" + cluster_id = "test-cluster" + + # Mock CS 客户端返回没有公网端点的集群详情 + class FakeCSClientNoPublic: + def describe_cluster_detail(self, cluster_id): + class FakeResponse: + class FakeBody: + def __init__(self): + # 没有 api_server_endpoint + self.master_url = '{"intranet_api_server_endpoint": "https://internal.test.com:6443"}' + body = FakeBody() + return FakeResponse() + + def describe_cluster_user_kubeconfig(self, cluster_id, request): + class FakeResponse: + class FakeBody: + config = "apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443" + body = FakeBody() + return FakeResponse() + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClientNoPublic() + + # 应该抛出 ValueError + with pytest.raises(ValueError, match="does not have public endpoint access"): + context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PUBLIC", + kubeconfig_path="" + ) + + +def test_cached_kubeconfig_reuse(context_manager, temp_kubeconfig_file): + """测试缓存的 kubeconfig 被正确重用""" + cluster_id = "test-cluster" + + # 第一次获取 kubeconfig + kubeconfig_path1 = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=temp_kubeconfig_file + ) + + # 第二次获取同一个集群的 kubeconfig + kubeconfig_path2 = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=temp_kubeconfig_file + ) + + # 应该返回相同的路径 + assert kubeconfig_path1 == kubeconfig_path2 + assert kubeconfig_path1 == temp_kubeconfig_file + + +def test_kubeconfig_cleanup_on_cache_eviction(context_manager): + """测试缓存驱逐时 kubeconfig 文件被正确清理""" + cluster_id = "test-cluster" + + # Mock CS 客户端 + with patch.object(context_manager, '_get_cs_client') as mock_get_cs_client: + mock_get_cs_client.return_value = FakeCSClient() + + # 获取 ACK_PUBLIC 模式的 kubeconfig (会创建临时文件) + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="ACK_PUBLIC", + kubeconfig_path="" + ) + + # 验证文件存在 + assert os.path.exists(kubeconfig_path) + + # 验证缓存中有项目 + assert len(context_manager) == 1 + + # 手动驱逐缓存项,应该会清理文件 + key, path = context_manager.popitem() + assert key == cluster_id + assert path == kubeconfig_path + + # 文件应该已被删除 + # 注意:由于 popitem 中的清理逻辑,文件应该已被删除 + + +def test_local_kubeconfig_not_cleaned_up(context_manager, temp_kubeconfig_file): + """测试 LOCAL 模式的 kubeconfig 文件不会被清理""" + cluster_id = "test-cluster" + + # 使用 LOCAL 模式 + kubeconfig_path = context_manager.get_kubeconfig_path( + cluster_id=cluster_id, + kubeconfig_mode="LOCAL", + kubeconfig_path=temp_kubeconfig_file + ) + + # 手动调用清理方法 + context_manager.cleanup() + + # 本地文件应该仍然存在 + assert os.path.exists(temp_kubeconfig_file) + + +@pytest.mark.asyncio +async def test_kubectl_tool_with_local_kubeconfig_mode(): + """测试 kubectl 工具使用 LOCAL 模式""" + # 创建临时 kubeconfig 文件 + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write("apiVersion: v1\nclusters:\n- cluster:\n server: https://test.example.com:6443\nusers:\n- name: test-user\n user:\n token: test-token") + temp_kubeconfig_path = f.name + + try: + # 创建带有 LOCAL 模式设置的 handler + server = FakeServer() + handler = module_under_test.KubectlHandler( + server, + { + "allow_write": True, + "kubeconfig_mode": "LOCAL", + "kubeconfig_path": temp_kubeconfig_path + } + ) + tool = server.tools["ack_kubectl"] + + # Mock subprocess.run + with patch('kubectl_handler.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="pods found", stderr="") + + # 创建上下文 + ctx = FakeContext(FakeLifespanContext()) + + # 执行命令 + result = await tool(ctx, command="get pods", cluster_id="test-cluster") + + # 验证结果 + assert result.exit_code == 0 + assert result.stdout == "pods found" + + # 验证 subprocess.run 被调用 + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # 获取第一个位置参数 + assert f"--kubeconfig {temp_kubeconfig_path}" in call_args + finally: + # 清理临时文件 + if os.path.exists(temp_kubeconfig_path): + os.unlink(temp_kubeconfig_path) + + # 清理全局缓存 + module_under_test._context_manager = None + + +@pytest.mark.asyncio +async def test_kubectl_tool_with_ack_public_kubeconfig_mode(): + """测试 kubectl 工具使用 ACK_PUBLIC 模式""" + # 创建带有 ACK_PUBLIC 模式设置的 handler + server = FakeServer() + handler = module_under_test.KubectlHandler( + server, + { + "allow_write": True, + "kubeconfig_mode": "ACK_PUBLIC", + "kubeconfig_path": "" + } + ) + tool = server.tools["ack_kubectl"] + + # Mock subprocess.run + with patch('kubectl_handler.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="pods found", stderr="") + + # Mock CS 客户端 + with patch('kubectl_handler.get_context_manager') as mock_get_context_manager: + # 创建 mock context manager + mock_context_manager = MagicMock() + mock_context_manager.get_kubeconfig_path.return_value = "/tmp/test-kubeconfig.yaml" + mock_get_context_manager.return_value = mock_context_manager + + # 创建上下文 + ctx = FakeContext(FakeLifespanContext()) + + # 执行命令 + result = await tool(ctx, command="get pods", cluster_id="test-cluster") + + # 验证结果 + assert result.exit_code == 0 + assert result.stdout == "pods found" + + # 验证 subprocess.run 被调用 + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # 获取第一个位置参数 + assert "--kubeconfig /tmp/test-kubeconfig.yaml" in call_args + + +@pytest.mark.asyncio +async def test_kubectl_tool_with_ack_private_kubeconfig_mode(): + """测试 kubectl 工具使用 ACK_PRIVATE 模式""" + # 创建带有 ACK_PRIVATE 模式设置的 handler + server = FakeServer() + handler = module_under_test.KubectlHandler( + server, + { + "allow_write": True, + "kubeconfig_mode": "ACK_PRIVATE", + "kubeconfig_path": "" + } + ) + tool = server.tools["ack_kubectl"] + + # Mock subprocess.run + with patch('kubectl_handler.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="pods found", stderr="") + + # Mock CS 客户端 + with patch('kubectl_handler.get_context_manager') as mock_get_context_manager: + # 创建 mock context manager + mock_context_manager = MagicMock() + mock_context_manager.get_kubeconfig_path.return_value = "/tmp/test-kubeconfig.yaml" + mock_get_context_manager.return_value = mock_context_manager + + # 创建上下文 + ctx = FakeContext(FakeLifespanContext()) + + # 执行命令 + result = await tool(ctx, command="get pods", cluster_id="test-cluster") + + # 验证结果 + assert result.exit_code == 0 + assert result.stdout == "pods found" + + # 验证 subprocess.run 被调用 + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # 获取第一个位置参数 + assert "--kubeconfig /tmp/test-kubeconfig.yaml" in call_args + + +@pytest.mark.asyncio +async def test_kubectl_tool_with_incluster_kubeconfig_mode(): + """测试 kubectl 工具使用 INCLUSTER 模式""" + # 创建带有 INCLUSTER 模式设置的 handler + server = FakeServer() + handler = module_under_test.KubectlHandler( + server, + { + "allow_write": True, + "kubeconfig_mode": "INCLUSTER", + "kubeconfig_path": "" + } + ) + tool = server.tools["ack_kubectl"] + + # Mock subprocess.run + with patch('kubectl_handler.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="pods found", stderr="") + + # Mock CS 客户端 + with patch('kubectl_handler.get_context_manager') as mock_get_context_manager: + # 创建 mock context manager + mock_context_manager = MagicMock() + mock_context_manager.get_kubeconfig_path.return_value = "/tmp/.kube/config.incluster" + mock_get_context_manager.return_value = mock_context_manager + + # 创建上下文 + ctx = FakeContext(FakeLifespanContext()) + + # 执行命令 + result = await tool(ctx, command="get pods", cluster_id="test-cluster") + + # 验证结果 + assert result.exit_code == 0 + assert result.stdout == "pods found" + + # 验证 subprocess.run 被调用 + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] # 获取第一个位置参数 + assert "--kubeconfig /tmp/.kube/config.incluster" in call_args + + +if __name__ == "__main__": + pytest.main([__file__]) \ No newline at end of file