diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 1b3c4254e8e0..8441b42472a0 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Generator import aiofiles +import tenacity from asgiref.sync import sync_to_async from kubernetes import client, config, watch from kubernetes.config import ConfigException @@ -35,6 +36,7 @@ from airflow.hooks.base import BaseHook from airflow.models import Connection from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation from airflow.providers.cncf.kubernetes.utils.pod_manager import PodOperatorHookProtocol from airflow.utils import yaml @@ -486,6 +488,12 @@ def get_deployment_status( except Exception as exc: raise exc + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_random_exponential(), + reraise=True, + retry=tenacity.retry_if_exception(should_retry_creation), + ) def create_job( self, job: V1Job, diff --git a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py index c2e52b1a07ed..352ed59459e2 100644 --- a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py +++ b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py @@ -23,6 +23,7 @@ import pendulum from deprecated import deprecated +from kubernetes.client.rest import ApiException from slugify import slugify from airflow.compat.functools import cache @@ -181,3 +182,18 @@ def annotations_for_logging_task_metadata(annotation_set): else: annotations_for_logging = "" return annotations_for_logging + + +def should_retry_creation(exception: BaseException) -> bool: + """ + Check if an Exception indicates a transient error and warrants retrying. + + This function is needed for preventing 'No agent available' error. The error appears time to time + when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment + has no solution. Like a temporary solution we decided to retry Job or Resource creation request each + time when this error appears. + More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457 + """ + if isinstance(exception, ApiException): + return str(exception.status) == "500" + return False diff --git a/airflow/providers/cncf/kubernetes/operators/resource.py b/airflow/providers/cncf/kubernetes/operators/resource.py index 6ecbad6da9ba..417f75464619 100644 --- a/airflow/providers/cncf/kubernetes/operators/resource.py +++ b/airflow/providers/cncf/kubernetes/operators/resource.py @@ -22,12 +22,14 @@ from functools import cached_property from typing import TYPE_CHECKING, Sequence +import tenacity import yaml from kubernetes.utils import create_from_yaml from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator @@ -126,7 +128,14 @@ def create_custom_from_yaml_object(self, body: dict): else: self.custom_object_client.create_cluster_custom_object(group, version, plural, body) + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_random_exponential(), + reraise=True, + retry=tenacity.retry_if_exception(should_retry_creation), + ) def _create_objects(self, objects): + self.log.info("Starting resource creation") if not self.custom_resource_definition: create_from_yaml( k8s_client=self.client, @@ -144,6 +153,7 @@ def execute(self, context) -> None: self._create_objects(yaml.safe_load_all(stream)) else: raise AirflowException("File %s not found", self.yaml_conf_file) + self.log.info("Resource was created") class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator): diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py index f02ccd1f3f35..c5aa62e1439e 100644 --- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py +++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py @@ -26,6 +26,7 @@ import kubernetes import pytest +from kubernetes.client.rest import ApiException from kubernetes.config import ConfigException from sqlalchemy.orm import make_transient @@ -624,6 +625,44 @@ def test_wait_until_job_complete(self, mock_job_status, mock_kube_config_merger, mock_sleep.assert_has_calls([mock.call(POLL_INTERVAL)] * 4) assert job_actual == job_expected + @patch(f"{HOOK_MODULE}.json.dumps") + @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client") + def test_create_job_retries_on_500_error(self, mock_client, mock_json_dumps): + mock_client.create_namespaced_job.side_effect = [ + ApiException(status=500), + MagicMock(), + ] + + hook = KubernetesHook() + hook.create_job(job=mock.MagicMock()) + + assert mock_client.create_namespaced_job.call_count == 2 + + @patch(f"{HOOK_MODULE}.json.dumps") + @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client") + def test_create_job_fails_on_other_exception(self, mock_client, mock_json_dumps): + mock_client.create_namespaced_job.side_effect = [ApiException(status=404)] + + hook = KubernetesHook() + with pytest.raises(ApiException): + hook.create_job(job=mock.MagicMock()) + + @patch(f"{HOOK_MODULE}.json.dumps") + @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client") + def test_create_job_retries_three_times(self, mock_client, mock_json_dumps): + mock_client.create_namespaced_job.side_effect = [ + ApiException(status=500), + ApiException(status=500), + ApiException(status=500), + ApiException(status=500), + ] + + hook = KubernetesHook() + with pytest.raises(ApiException): + hook.create_job(job=mock.MagicMock()) + + assert mock_client.create_namespaced_job.call_count == 3 + class TestKubernetesHookIncorrectConfiguration: @pytest.mark.parametrize( diff --git a/tests/providers/cncf/kubernetes/operators/test_resource.py b/tests/providers/cncf/kubernetes/operators/test_resource.py index d2cef33d842f..ec7a8c8f7a0c 100644 --- a/tests/providers/cncf/kubernetes/operators/test_resource.py +++ b/tests/providers/cncf/kubernetes/operators/test_resource.py @@ -16,10 +16,11 @@ # under the License. from __future__ import annotations -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest import yaml +from kubernetes.client.rest import ApiException from airflow import DAG from airflow.providers.cncf.kubernetes.operators.resource import ( @@ -237,3 +238,61 @@ def test_delete_not_namespaced_custom_app_from_yaml(self, mock_delete_cluster_cu "resourceflavors", "default-flavor-test", ) + + @patch("kubernetes.config.load_kube_config") + @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml") + def test_create_objects_retries_on_500_error(self, mock_create_from_yaml, mock_load_kube_config, context): + mock_create_from_yaml.side_effect = [ + ApiException(status=500), + MagicMock(), + ] + + op = KubernetesCreateResourceOperator( + yaml_conf=TEST_VALID_RESOURCE_YAML, + dag=self.dag, + kubernetes_conn_id="kubernetes_default", + task_id="test_task_id", + config_file="/foo/bar", + ) + op.execute(context) + + assert mock_create_from_yaml.call_count == 2 + + @patch("kubernetes.config.load_kube_config") + @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml") + def test_create_objects_fails_on_other_exception( + self, mock_create_from_yaml, mock_load_kube_config, context + ): + mock_create_from_yaml.side_effect = [ApiException(status=404)] + + op = KubernetesCreateResourceOperator( + yaml_conf=TEST_VALID_RESOURCE_YAML, + dag=self.dag, + kubernetes_conn_id="kubernetes_default", + task_id="test_task_id", + config_file="/foo/bar", + ) + with pytest.raises(ApiException): + op.execute(context) + + @patch("kubernetes.config.load_kube_config") + @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml") + def test_create_objects_retries_three_times(self, mock_create_from_yaml, mock_load_kube_config, context): + mock_create_from_yaml.side_effect = [ + ApiException(status=500), + ApiException(status=500), + ApiException(status=500), + ApiException(status=500), + ] + + op = KubernetesCreateResourceOperator( + yaml_conf=TEST_VALID_RESOURCE_YAML, + dag=self.dag, + kubernetes_conn_id="kubernetes_default", + task_id="test_task_id", + config_file="/foo/bar", + ) + with pytest.raises(ApiException): + op.execute(context) + + assert mock_create_from_yaml.call_count == 3