diff --git a/airflow/providers/apache/flink/sensors/flink_kubernetes.py b/airflow/providers/apache/flink/sensors/flink_kubernetes.py index 2b5ea99678deb..fcca14a8fb5c1 100644 --- a/airflow/providers/apache/flink/sensors/flink_kubernetes.py +++ b/airflow/providers/apache/flink/sensors/flink_kubernetes.py @@ -21,7 +21,7 @@ from kubernetes import client -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.sensors.base import BaseSensorOperator @@ -125,7 +125,11 @@ def poke(self, context: Context) -> bool: if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES: self._log_driver(application_state, response) if application_state in self.FAILURE_STATES: - raise AirflowException(f"Flink application failed with state: {application_state}") + # TODO: remove this if check when min_airflow_version is set to higher than 2.7.1 + message = f"Flink application failed with state: {application_state}" + if self.soft_fail: + raise AirflowSkipException(message) + raise AirflowException(message) elif application_state in self.SUCCESS_STATES: self.log.info("Flink application ended successfully") return True diff --git a/tests/providers/apache/flink/sensors/test_flink_kubernetes.py b/tests/providers/apache/flink/sensors/test_flink_kubernetes.py index 6196083e3977b..cc1b0f88180ad 100644 --- a/tests/providers/apache/flink/sensors/test_flink_kubernetes.py +++ b/tests/providers/apache/flink/sensors/test_flink_kubernetes.py @@ -27,7 +27,7 @@ from kubernetes.client.rest import ApiException from airflow import DAG -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowSkipException from airflow.models import Connection from airflow.providers.apache.flink.sensors.flink_kubernetes import FlinkKubernetesSensor from airflow.utils import db, timezone @@ -900,15 +900,20 @@ def test_cluster_ready_state(self, mock_get_namespaced_crd, mock_kubernetes_hook version="v1beta1", ) + @pytest.mark.parametrize( + "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + ) @patch( "kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object", return_value=TEST_ERROR_CLUSTER, ) - def test_cluster_error_state(self, mock_get_namespaced_crd, mock_kubernetes_hook): + def test_cluster_error_state( + self, mock_get_namespaced_crd, mock_kubernetes_hook, soft_fail, expected_exception + ): sensor = FlinkKubernetesSensor( - application_name="flink-stream-example", dag=self.dag, task_id="test_task_id" + application_name="flink-stream-example", dag=self.dag, task_id="test_task_id", soft_fail=soft_fail ) - with pytest.raises(AirflowException): + with pytest.raises(expected_exception): sensor.poke(None) mock_kubernetes_hook.assert_called_once_with() mock_get_namespaced_crd.assert_called_once_with(