From e4689de961dfb9c215bd194af27428f369c9e84f Mon Sep 17 00:00:00 2001 From: luoyuliuyin Date: Thu, 25 Apr 2024 16:20:32 +0800 Subject: [PATCH] Feature: Support using content of kubeconfig to create KubernetesHook --- .../cncf/kubernetes/hooks/kubernetes.py | 11 +++-- .../cncf/kubernetes/operators/job.py | 10 ++++ .../cncf/kubernetes/operators/pod.py | 5 ++ .../cncf/kubernetes/operators/resource.py | 6 ++- .../kubernetes/operators/spark_kubernetes.py | 1 + .../providers/cncf/kubernetes/triggers/job.py | 5 ++ .../providers/cncf/kubernetes/triggers/pod.py | 5 ++ .../kubernetes/decorators/test_kubernetes.py | 2 + .../cncf/kubernetes/hooks/test_kubernetes.py | 49 +++++++++++++++++++ .../cncf/kubernetes/operators/test_job.py | 1 + .../cncf/kubernetes/operators/test_pod.py | 1 + .../cncf/kubernetes/triggers/test_job.py | 2 + .../cncf/kubernetes/triggers/test_pod.py | 1 + 13 files changed, 94 insertions(+), 5 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 1b3c4254e8e0e..ff87c37832b4b 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -66,8 +66,8 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol): - use in cluster configuration by using extra field ``in_cluster`` in connection - use custom config by providing path to the file using extra field ``kube_config_path`` in connection - - use custom configuration by providing content of kubeconfig file via - extra field ``kube_config`` in connection + - use custom configuration by providing content of kubeconfig file using extra field ``kube_config`` + or via extra field ``kube_config`` in connection - use default config by providing no extras This hook check for configuration option in the above order. Once an option is present it will @@ -84,6 +84,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol): :param cluster_context: Optionally specify a context to use (e.g. if you have multiple in your kubeconfig. :param config_file: Path to kubeconfig file. + :param kube_config: content of kubeconfig file. :param in_cluster: Set to ``True`` if running from within a kubernetes cluster. :param disable_verify_ssl: Set to ``True`` if SSL verification should be disabled. :param disable_tcp_keepalive: Set to ``True`` if you want to disable keepalive logic. @@ -135,6 +136,7 @@ def __init__( client_configuration: client.Configuration | None = None, cluster_context: str | None = None, config_file: str | None = None, + kube_config: str | None = None, in_cluster: bool | None = None, disable_verify_ssl: bool | None = None, disable_tcp_keepalive: bool | None = None, @@ -144,6 +146,7 @@ def __init__( self.client_configuration = client_configuration self.cluster_context = cluster_context self.config_file = config_file + self.kube_config = kube_config self.in_cluster = in_cluster self.disable_verify_ssl = disable_verify_ssl self.disable_tcp_keepalive = disable_tcp_keepalive @@ -203,7 +206,7 @@ def get_conn(self) -> client.ApiClient: in_cluster = self._coalesce_param(self.in_cluster, self._get_field("in_cluster")) cluster_context = self._coalesce_param(self.cluster_context, self._get_field("cluster_context")) kubeconfig_path = self._coalesce_param(self.config_file, self._get_field("kube_config_path")) - kubeconfig = self._get_field("kube_config") + kubeconfig = self._coalesce_param(self.kube_config, self._get_field("kube_config")) num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig, kubeconfig_path] if o) if num_selected_configuration > 1: @@ -645,7 +648,7 @@ async def _load_config(self): in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster")) cluster_context = self._coalesce_param(self.cluster_context, await self._get_field("cluster_context")) kubeconfig_path = self._coalesce_param(self.config_file, await self._get_field("kube_config_path")) - kubeconfig = await self._get_field("kube_config") + kubeconfig = await self._coalesce_param(self.kube_config, self._get_field("kube_config")) num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig, kubeconfig_path] if o) diff --git a/airflow/providers/cncf/kubernetes/operators/job.py b/airflow/providers/cncf/kubernetes/operators/job.py index eb7c64614615f..94cf3ecac3a87 100644 --- a/airflow/providers/cncf/kubernetes/operators/job.py +++ b/airflow/providers/cncf/kubernetes/operators/job.py @@ -130,6 +130,7 @@ def hook(self) -> KubernetesHook: conn_id=self.kubernetes_conn_id, in_cluster=self.in_cluster, config_file=self.config_file, + kube_config=self.kube_config, cluster_context=self.cluster_context, ) return hook @@ -185,6 +186,7 @@ def execute_deferrable(self): kubernetes_conn_id=self.kubernetes_conn_id, cluster_context=self.cluster_context, config_file=self.config_file, + kube_config=self.kube_config, in_cluster=self.in_cluster, poll_interval=self.job_poll_interval, ), @@ -363,6 +365,7 @@ class KubernetesDeleteJobOperator(BaseOperator): for the Kubernetes cluster. :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` + :param kube_config: content of kubeconfig file. :param in_cluster: run kubernetes client with in_cluster configuration. :param cluster_context: context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. (templated) @@ -388,6 +391,7 @@ def __init__( namespace: str, kubernetes_conn_id: str | None = KubernetesHook.default_conn_name, config_file: str | None = None, + kube_config: str | None = None, in_cluster: bool | None = None, cluster_context: str | None = None, delete_on_status: str | None = None, @@ -400,6 +404,7 @@ def __init__( self.namespace = namespace self.kubernetes_conn_id = kubernetes_conn_id self.config_file = config_file + self.kube_config = kube_config self.in_cluster = in_cluster self.cluster_context = cluster_context self.delete_on_status = delete_on_status @@ -412,6 +417,7 @@ def hook(self) -> KubernetesHook: conn_id=self.kubernetes_conn_id, in_cluster=self.in_cluster, config_file=self.config_file, + kube_config=self.kube_config, cluster_context=self.cluster_context, ) @@ -473,6 +479,7 @@ class KubernetesPatchJobOperator(BaseOperator): for the Kubernetes cluster. :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` + :param kube_config: content of kubeconfig file. :param in_cluster: run kubernetes client with in_cluster configuration. :param cluster_context: context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. (templated) @@ -494,6 +501,7 @@ def __init__( body: object, kubernetes_conn_id: str | None = KubernetesHook.default_conn_name, config_file: str | None = None, + kube_config: str | None = None, in_cluster: bool | None = None, cluster_context: str | None = None, **kwargs, @@ -504,6 +512,7 @@ def __init__( self.body = body self.kubernetes_conn_id = kubernetes_conn_id self.config_file = config_file + self.kube_config = kube_config self.in_cluster = in_cluster self.cluster_context = cluster_context @@ -513,6 +522,7 @@ def hook(self) -> KubernetesHook: conn_id=self.kubernetes_conn_id, in_cluster=self.in_cluster, config_file=self.config_file, + kube_config=self.kube_config, cluster_context=self.cluster_context, ) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 0b387352ae8a6..68ebd6306a918 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -164,6 +164,7 @@ class KubernetesPodOperator(BaseOperator): :param affinity: affinity scheduling rules for the launched pod. :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` + :param kube_config: content of kubeconfig file. :param node_selector: A dict containing a group of scheduling rules. :param image_pull_secrets: Any image pull secrets to be given to the pod. If more than one secret is required, provide a @@ -284,6 +285,7 @@ def __init__( container_resources: k8s.V1ResourceRequirements | None = None, affinity: k8s.V1Affinity | None = None, config_file: str | None = None, + kube_config: str | None = None, node_selector: dict | None = None, image_pull_secrets: list[k8s.V1LocalObjectReference] | None = None, service_account_name: str | None = None, @@ -359,6 +361,7 @@ def __init__( self.affinity = convert_affinity(affinity) if affinity else {} self.container_resources = container_resources self.config_file = config_file + self.kube_config = kube_config self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else [] self.service_account_name = service_account_name self.hostnetwork = hostnetwork @@ -510,6 +513,7 @@ def hook(self) -> PodOperatorHookProtocol: conn_id=self.kubernetes_conn_id, in_cluster=self.in_cluster, config_file=self.config_file, + kube_config=self.kube_config, cluster_context=self.cluster_context, ) return hook @@ -676,6 +680,7 @@ def invoke_defer_method(self, last_log_time: DateTime | None = None): kubernetes_conn_id=self.kubernetes_conn_id, cluster_context=self.cluster_context, config_file=self.config_file, + kube_config=self.kube_config, in_cluster=self.in_cluster, poll_interval=self.poll_interval, get_logs=self.get_logs, diff --git a/airflow/providers/cncf/kubernetes/operators/resource.py b/airflow/providers/cncf/kubernetes/operators/resource.py index 6ecbad6da9ba9..a6acd80412186 100644 --- a/airflow/providers/cncf/kubernetes/operators/resource.py +++ b/airflow/providers/cncf/kubernetes/operators/resource.py @@ -66,6 +66,7 @@ def __init__( custom_resource_definition: bool = False, namespaced: bool = True, config_file: str | None = None, + kube_config: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -76,6 +77,7 @@ def __init__( self.custom_resource_definition = custom_resource_definition self.namespaced = namespaced self.config_file = config_file + self.kube_config = kube_config if not any([self.yaml_conf, self.yaml_conf_file]): raise AirflowException("One of `yaml_conf` or `yaml_conf_file` arguments must be provided") @@ -90,7 +92,9 @@ def custom_object_client(self) -> CustomObjectsApi: @cached_property def hook(self) -> KubernetesHook: - hook = KubernetesHook(conn_id=self.kubernetes_conn_id, config_file=self.config_file) + hook = KubernetesHook( + conn_id=self.kubernetes_conn_id, config_file=self.config_file, kube_config=self.kube_config + ) return hook def get_namespace(self) -> str: diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index d9c3425f6e143..3bd3b07da6ba0 100644 --- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -265,6 +265,7 @@ def hook(self) -> KubernetesHook: in_cluster=self.in_cluster or self.template_body.get("kubernetes", {}).get("in_cluster", False), config_file=self.config_file or self.template_body.get("kubernetes", {}).get("kube_config_file", None), + kube_config=self.kube_config, cluster_context=self.cluster_context or self.template_body.get("kubernetes", {}).get("cluster_context", None), ) diff --git a/airflow/providers/cncf/kubernetes/triggers/job.py b/airflow/providers/cncf/kubernetes/triggers/job.py index f229017df1499..ed2aba93be656 100644 --- a/airflow/providers/cncf/kubernetes/triggers/job.py +++ b/airflow/providers/cncf/kubernetes/triggers/job.py @@ -36,6 +36,7 @@ class KubernetesJobTrigger(BaseTrigger): for the Kubernetes cluster. :param cluster_context: Context that points to kubernetes cluster. :param config_file: Path to kubeconfig file. + :param kube_config: content of kubeconfig file. :param poll_interval: Polling period in seconds to check for the status. :param in_cluster: run kubernetes client with in_cluster configuration. """ @@ -48,6 +49,7 @@ def __init__( poll_interval: float = 10.0, cluster_context: str | None = None, config_file: str | None = None, + kube_config: str | None = None, in_cluster: bool | None = None, ): super().__init__() @@ -57,6 +59,7 @@ def __init__( self.poll_interval = poll_interval self.cluster_context = cluster_context self.config_file = config_file + self.kube_config = kube_config self.in_cluster = in_cluster def serialize(self) -> tuple[str, dict[str, Any]]: @@ -70,6 +73,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "poll_interval": self.poll_interval, "cluster_context": self.cluster_context, "config_file": self.config_file, + "kube_config": self.kube_config, "in_cluster": self.in_cluster, }, ) @@ -97,5 +101,6 @@ def hook(self) -> AsyncKubernetesHook: conn_id=self.kubernetes_conn_id, in_cluster=self.in_cluster, config_file=self.config_file, + kube_config=self.kube_config, cluster_context=self.cluster_context, ) diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index b74e3ef877b1d..22a063b310f7e 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -62,6 +62,7 @@ class KubernetesPodTrigger(BaseTrigger): for the Kubernetes cluster. :param cluster_context: Context that points to kubernetes cluster. :param config_file: Path to kubeconfig file. + :param kube_config: content of kubeconfig file. :param poll_interval: Polling period in seconds to check for the status. :param trigger_start_time: time in Datetime format when the trigger was started :param in_cluster: run kubernetes client with in_cluster configuration. @@ -90,6 +91,7 @@ def __init__( poll_interval: float = 2, cluster_context: str | None = None, config_file: str | None = None, + kube_config: str | None = None, in_cluster: bool | None = None, get_logs: bool = True, startup_timeout: int = 120, @@ -108,6 +110,7 @@ def __init__( self.poll_interval = poll_interval self.cluster_context = cluster_context self.config_file = config_file + self.kube_config = kube_config self.in_cluster = in_cluster self.get_logs = get_logs self.startup_timeout = startup_timeout @@ -143,6 +146,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "poll_interval": self.poll_interval, "cluster_context": self.cluster_context, "config_file": self.config_file, + "kube_config": self.kube_config, "in_cluster": self.in_cluster, "get_logs": self.get_logs, "startup_timeout": self.startup_timeout, @@ -281,6 +285,7 @@ def _get_async_hook(self) -> AsyncKubernetesHook: conn_id=self.kubernetes_conn_id, in_cluster=self.in_cluster, config_file=self.config_file, + kube_config=self.kube_config, cluster_context=self.cluster_context, ) diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py index 5ceaf2ad259a6..51e181b2e7e7b 100644 --- a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py +++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py @@ -92,6 +92,7 @@ def f(): mock_hook.assert_called_once_with( conn_id="kubernetes_default", in_cluster=False, + kube_config=None, cluster_context="default", config_file="/tmp/fake_file", ) @@ -142,6 +143,7 @@ def f(arg1, arg2, kwarg1=None, kwarg2=None): in_cluster=False, cluster_context="default", config_file="/tmp/fake_file", + kube_config=None, ) assert mock_create_pod.call_count == 1 assert mock_hook.return_value.get_xcom_sidecar_container_image.call_count == 1 diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py index f02ccd1f3f355..4492fa70a5a04 100644 --- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py +++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py @@ -279,6 +279,55 @@ def test_kube_config_path( mock_kube_config_loader.assert_called_once() assert isinstance(api_conn, kubernetes.client.api_client.ApiClient) + @pytest.mark.parametrize( + "conn_id, kube_config, has_conn_id, has_kube_config", + ( + (None, None, False, False), + (None, "content of kubeconfig file", False, True), + ("kube_config", None, True, False), + ("kube_config", "content of kubeconfig file", True, True), + ), + ) + @patch("kubernetes.config.kube_config.KubeConfigLoader") + @patch("kubernetes.config.kube_config.KubeConfigMerger") + @patch.object(tempfile, "NamedTemporaryFile") + def test_kube_config( + self, + mock_tempfile, + mock_kube_config_merger, + mock_kube_config_loader, + conn_id, + kube_config, + has_conn_id, + has_kube_config + ): + """ + Verifies whether temporary kube config file is created. + """ + mock_tempfile.return_value.__enter__.return_value.name = "fake-temp-file" + mock_kube_config_merger.return_value.config = {"fake_config": "value"} + kubernetes_hook = KubernetesHook(conn_id=conn_id, kube_config=kube_config) + api_conn = kubernetes_hook.get_conn() + if has_conn_id: + if has_kube_config: + mock_tempfile.is_called_once() + mock_kube_config_loader.assert_called_once() + mock_kube_config_merger.assert_called_once_with("fake-temp-file") + else: + mock_tempfile.is_called_once() + mock_kube_config_loader.assert_called_once() + mock_kube_config_merger.assert_called_once_with("fake-temp-file") + else: + if has_kube_config: + mock_tempfile.is_called_once() + mock_kube_config_loader.assert_called_once() + mock_kube_config_merger.assert_called_once_with("fake-temp-file") + else: + mock_tempfile.assert_not_called() + mock_kube_config_loader.assert_called_once() + mock_kube_config_merger.assert_called_once_with(KUBE_CONFIG_PATH) + assert isinstance(api_conn, kubernetes.client.api_client.ApiClient) + @pytest.mark.parametrize( "conn_id, has_config", ( diff --git a/tests/providers/cncf/kubernetes/operators/test_job.py b/tests/providers/cncf/kubernetes/operators/test_job.py index d776da248263e..3e4e500126183 100644 --- a/tests/providers/cncf/kubernetes/operators/test_job.py +++ b/tests/providers/cncf/kubernetes/operators/test_job.py @@ -587,6 +587,7 @@ def test_execute_deferrable(self, mock_trigger, mock_execute_deferrable): cluster_context=mock_cluster_context, config_file=mock_config_file, in_cluster=mock_in_cluster, + kube_config=None, poll_interval=POLL_INTERVAL, ) assert actual_result is None diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index db10bff330b01..39d1342f7d2e2 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -232,6 +232,7 @@ def test_config_path(self, hook_mock): conn_id="kubernetes_default", config_file=file_path, in_cluster=None, + kube_config=None ) @pytest.mark.parametrize( diff --git a/tests/providers/cncf/kubernetes/triggers/test_job.py b/tests/providers/cncf/kubernetes/triggers/test_job.py index 6124f5471c889..ec0ec85f6d845 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_job.py +++ b/tests/providers/cncf/kubernetes/triggers/test_job.py @@ -61,6 +61,7 @@ def test_serialize(self, trigger): "poll_interval": POLL_INTERVAL, "cluster_context": CLUSTER_CONTEXT, "config_file": CONFIG_FILE, + "kube_config": None, "in_cluster": IN_CLUSTER, } @@ -130,6 +131,7 @@ def test_hook(self, mock_hook, trigger): conn_id=CONN_ID, in_cluster=IN_CLUSTER, config_file=CONFIG_FILE, + kube_config=None, cluster_context=CLUSTER_CONTEXT, ) assert hook_actual == hook_expected diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index 6572f03414d1d..d53a149bbba7f 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -102,6 +102,7 @@ def test_serialize(self, trigger): "poll_interval": POLL_INTERVAL, "cluster_context": CLUSTER_CONTEXT, "config_file": CONFIG_FILE, + "kube_config": None, "in_cluster": IN_CLUSTER, "get_logs": GET_LOGS, "startup_timeout": STARTUP_TIMEOUT_SECS,