diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index b309340e7528a..9797361a7a698 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -21,7 +21,7 @@ For more information on how the KubernetesExecutor works, take a look at the guide: :ref:`executor:KubernetesExecutor` """ -import base64 + import functools import json import multiprocessing @@ -486,35 +486,6 @@ def clear_not_launched_queued_tasks(self, session=None) -> None: TaskInstance.execution_date == task.execution_date, ).update({TaskInstance.state: State.NONE}) - def _inject_secrets(self) -> None: - def _create_or_update_secret(secret_name, secret_path): - try: - return self.kube_client.create_namespaced_secret( - self.kube_config.executor_namespace, - kubernetes.client.V1Secret( - data={'key.json': base64.b64encode(open(secret_path).read())}, - metadata=kubernetes.client.V1ObjectMeta(name=secret_name), - ), - **self.kube_config.kube_client_request_args, - ) - except ApiException as e: - if e.status == 409: - return self.kube_client.replace_namespaced_secret( - secret_name, - self.kube_config.executor_namespace, - kubernetes.client.V1Secret( - data={'key.json': base64.b64encode(open(secret_path).read())}, - metadata=kubernetes.client.V1ObjectMeta(name=secret_name), - ), - **self.kube_config.kube_client_request_args, - ) - self.log.exception( - 'Exception while trying to inject secret. Secret name: %s, error details: %s', - secret_name, - e, - ) - raise - def start(self) -> None: """Starts the executor""" self.log.info('Start Kubernetes executor') @@ -526,7 +497,6 @@ def start(self) -> None: self.kube_scheduler = AirflowKubernetesScheduler( self.kube_config, self.task_queue, self.result_queue, self.kube_client, self.scheduler_job_id ) - self._inject_secrets() self.clear_not_launched_queued_tasks() def execute_async(