From 4075edec47e813097ff5c438e8130e265309aa72 Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Wed, 10 Apr 2024 13:31:41 +0300 Subject: [PATCH 1/6] add log for running callback --- airflow/models/taskinstance.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 8f9d71cfe7f43..999e65592c65e 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1231,9 +1231,10 @@ def _run_finished_callback( callbacks = callbacks if isinstance(callbacks, list) else [callbacks] for callback in callbacks: try: + callback_name = qualname(callback).split(".")[-1] + log.info("Executing %s callback", callback_name) callback(context) except Exception: - callback_name = qualname(callback).split(".")[-1] log.exception("Error when executing %s callback", callback_name) # type: ignore[attr-defined] From 3f13517dc5c044b2de8c073eba1cac0bc0a487ea Mon Sep 17 00:00:00 2001 From: rom sharon <33751805+romsharon98@users.noreply.github.com> Date: Wed, 10 Apr 2024 13:41:17 +0300 Subject: [PATCH 2/6] get callback name before try statement Co-authored-by: Andrey Anshin --- airflow/models/taskinstance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 999e65592c65e..cce3f7adcfec6 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1230,8 +1230,8 @@ def _run_finished_callback( if callbacks: callbacks = callbacks if isinstance(callbacks, list) else [callbacks] for callback in callbacks: + callback_name = qualname(callback).split(".")[-1] try: - callback_name = qualname(callback).split(".")[-1] log.info("Executing %s callback", callback_name) callback(context) except Exception: From 01ccd5df16f82f620481276e94889b6fc8e12f09 Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Wed, 10 Apr 2024 14:30:09 +0300 Subject: [PATCH 3/6] add tests --- tests/models/test_taskinstance.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index e4c9e17b21541..2fe0e4585848f 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2858,8 +2858,9 @@ def on_execute_callable(context): ], ) @patch("logging.Logger.exception") + @patch("logging.Logger.info") def test_finished_callbacks_handle_and_log_exception( - self, mock_log, finished_state, create_task_instance + self, mock_log_info, mock_log_exception, finished_state, create_task_instance ): called = completed = False @@ -2877,7 +2878,8 @@ def on_finish_callable(context): callback_name = callback_input[0] if isinstance(callback_input, list) else callback_input callback_name = qualname(callback_name).split(".")[-1] expected_message = "Error when executing %s callback" - mock_log.assert_called_with(expected_message, callback_name) + mock_log_info.assert_called_with("Executing %s callback", callback_name) + mock_log_exception.assert_called_with(expected_message, callback_name) @provide_session def test_handle_failure(self, create_dummy_dag, session=None): From a15b32c7d2428aa70aebf0bc0b40dc614224e0ad Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Wed, 10 Apr 2024 17:14:55 +0300 Subject: [PATCH 4/6] fix test --- tests/models/test_taskinstance.py | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 2fe0e4585848f..37e655277c916 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2849,21 +2849,7 @@ def on_execute_callable(context): ti.refresh_from_db() assert ti.state == State.SUCCESS - @pytest.mark.parametrize( - "finished_state", - [ - State.SUCCESS, - State.UP_FOR_RETRY, - State.FAILED, - ], - ) - @patch("logging.Logger.exception") - @patch("logging.Logger.info") - def test_finished_callbacks_handle_and_log_exception( - self, mock_log_info, mock_log_exception, finished_state, create_task_instance - ): - called = completed = False - + def test_finished_callbacks_handle_and_log_exception(self, caplog): def on_finish_callable(context): nonlocal called, completed called = True @@ -2871,15 +2857,16 @@ def on_finish_callable(context): completed = True for callback_input in [[on_finish_callable], on_finish_callable]: + called = completed = False + caplog.clear() _run_finished_callback(callbacks=callback_input, context={}) assert called assert not completed callback_name = callback_input[0] if isinstance(callback_input, list) else callback_input callback_name = qualname(callback_name).split(".")[-1] - expected_message = "Error when executing %s callback" - mock_log_info.assert_called_with("Executing %s callback", callback_name) - mock_log_exception.assert_called_with(expected_message, callback_name) + assert "Executing on_finish_callable callback" in caplog.text + assert "Error when executing on_finish_callable callback" in caplog.text @provide_session def test_handle_failure(self, create_dummy_dag, session=None): From 365c126822621e03f39dba67a303986020d0ac3c Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Mon, 15 Apr 2024 19:48:38 +0300 Subject: [PATCH 5/6] change logging --- airflow/models/taskinstance.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index cce3f7adcfec6..b4d5d5d65a1b3 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -117,7 +117,6 @@ from airflow.utils.email import send_email from airflow.utils.helpers import prune_dict, render_template_to_string from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.module_loading import qualname from airflow.utils.net import get_hostname from airflow.utils.operator_helpers import ExecutionCallableRunner, context_to_airflow_vars from airflow.utils.platform import getuser @@ -1230,12 +1229,11 @@ def _run_finished_callback( if callbacks: callbacks = callbacks if isinstance(callbacks, list) else [callbacks] for callback in callbacks: - callback_name = qualname(callback).split(".")[-1] + log.info("Executing %s callback", callback.__name__) try: - log.info("Executing %s callback", callback_name) callback(context) except Exception: - log.exception("Error when executing %s callback", callback_name) # type: ignore[attr-defined] + log.exception("Error when executing %s callback", callback.__name__) # type: ignore[attr-defined] def _log_state(*, task_instance: TaskInstance | TaskInstancePydantic, lead_msg: str = "") -> None: From 10c7da965d29bd657024c8a3558aa2ba39342636 Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Tue, 16 Apr 2024 15:52:31 +0300 Subject: [PATCH 6/6] fix tests --- tests/models/test_taskinstance.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 37e655277c916..247afb012b94d 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2879,7 +2879,9 @@ def test_handle_failure(self, create_dummy_dag, session=None): get_listener_manager().pm.hook.on_task_instance_failed = listener_callback_on_error mock_on_failure_1 = mock.MagicMock() + mock_on_failure_1.__name__ = "mock_on_failure_1" mock_on_retry_1 = mock.MagicMock() + mock_on_retry_1.__name__ = "mock_on_retry_1" dag, task1 = create_dummy_dag( dag_id="test_handle_failure", schedule=None, @@ -2916,7 +2918,9 @@ def test_handle_failure(self, create_dummy_dag, session=None): mock_on_retry_1.assert_not_called() mock_on_failure_2 = mock.MagicMock() + mock_on_failure_2.__name__ = "mock_on_failure_2" mock_on_retry_2 = mock.MagicMock() + mock_on_retry_2.__name__ = "mock_on_retry_2" task2 = EmptyOperator( task_id="test_handle_failure_on_retry", on_failure_callback=mock_on_failure_2, @@ -2938,7 +2942,9 @@ def test_handle_failure(self, create_dummy_dag, session=None): # test the scenario where normally we would retry but have been asked to fail mock_on_failure_3 = mock.MagicMock() + mock_on_failure_3.__name__ = "mock_on_failure_3" mock_on_retry_3 = mock.MagicMock() + mock_on_retry_3.__name__ = "mock_on_retry_3" task3 = EmptyOperator( task_id="test_handle_failure_on_force_fail", on_failure_callback=mock_on_failure_3, @@ -3454,6 +3460,7 @@ def raise_skip_exception(): raise AirflowSkipException callback_function = mock.MagicMock() + callback_function.__name__ = "callback_function" with dag_maker(dag_id="test_skipped_task"): task = PythonOperator( @@ -3549,6 +3556,7 @@ def timeout(): raise AirflowSensorTimeout mock_on_failure = mock.MagicMock() + mock_on_failure.__name__ = "mock_on_failure" with dag_maker(dag_id=f"test_sensor_timeout_{mode}_{retries}"): PythonSensor( task_id="test_raise_sensor_timeout", @@ -3577,6 +3585,7 @@ def timeout(): raise AirflowSensorTimeout mock_on_failure = mock.MagicMock() + mock_on_failure.__name__ = "mock_on_failure" with dag_maker(dag_id=f"test_sensor_timeout_{mode}_{retries}"): PythonSensor.partial( task_id="test_raise_sensor_timeout",