From 5fd3fafff84e90e2d82397e5a1a2b6dc471fe420 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 15 Apr 2026 12:07:26 +0100 Subject: [PATCH 1/3] Handle supervisor remote log upload failures gracefully Prevent remote log upload errors from crashing task supervisor shutdown. Catch upload exceptions in the supervisor, log the failure with task context, and keep task completion behavior intact. --- .../airflow/sdk/execution_time/supervisor.py | 7 ++-- .../execution_time/test_supervisor.py | 32 +++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 87e171d8cbccd..c72ed23fa02bd 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1097,8 +1097,11 @@ def _upload_logs(self): """ from airflow.sdk.log import upload_to_remote - with _remote_logging_conn(self.client): - upload_to_remote(self.process_log, self.ti) + try: + with _remote_logging_conn(self.client): + upload_to_remote(self.process_log, self.ti) + except Exception: + log.exception("Failed to upload remote logs", ti_id=self.id, pid=self.pid) def _monitor_subprocess(self): """ diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 39540913f454a..05e1c9aa8255b 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2915,6 +2915,38 @@ def mock_upload_to_remote(process_log, ti): assert connection_available["conn_uri"] is not None, "Connection URI was None during upload" +@pytest.mark.parametrize("captured_logs", [logging.ERROR], indirect=True) +def test_log_upload_failures_are_non_fatal(captured_logs, mocker): + proc = ActivitySubprocess( + process_log=mocker.MagicMock(), + id=TI_ID, + pid=12345, + stdin=mocker.MagicMock(), + client=mocker.MagicMock(), + process=mocker.MagicMock(), + ) + proc.ti = mocker.MagicMock() + + mocker.patch("airflow.sdk.execution_time.supervisor._remote_logging_conn", return_value=nullcontext()) + upload_to_remote = mocker.patch( + "airflow.sdk.log.upload_to_remote", side_effect=RuntimeError("upload failed") + ) + + proc._upload_logs() + + upload_to_remote.assert_called_once_with(proc.process_log, proc.ti) + assert { + "event": "Failed to upload remote logs", + "level": "error", + "logger": "supervisor", + "ti_id": TI_ID, + "pid": 12345, + "timestamp": mocker.ANY, + "loc": mocker.ANY, + "exc_info": mocker.ANY, + } in captured_logs + + def test_remote_logging_conn_sets_process_context(monkeypatch, mocker): """ Test that _remote_logging_conn sets _AIRFLOW_PROCESS_CONTEXT=client. From a332cbe5b5d1ba2933187f022e726146a0adf71b Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 15 Apr 2026 13:17:03 +0100 Subject: [PATCH 2/3] Update task-sdk/src/airflow/sdk/execution_time/supervisor.py Co-authored-by: Ash Berlin-Taylor --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index c72ed23fa02bd..4b27314e0eb3d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1101,7 +1101,7 @@ def _upload_logs(self): with _remote_logging_conn(self.client): upload_to_remote(self.process_log, self.ti) except Exception: - log.exception("Failed to upload remote logs", ti_id=self.id, pid=self.pid) + self.process_log.exception("Failed to upload remote logs", ti_id=self.id, pid=self.pid) def _monitor_subprocess(self): """ From 80e52dcfe4a4cd5df8dcaaefc0f745187663e50a Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 15 Apr 2026 13:21:42 +0100 Subject: [PATCH 3/3] fixup! Update task-sdk/src/airflow/sdk/execution_time/supervisor.py --- .../execution_time/test_supervisor.py | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 05e1c9aa8255b..3ab95e0810e09 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2915,8 +2915,7 @@ def mock_upload_to_remote(process_log, ti): assert connection_available["conn_uri"] is not None, "Connection URI was None during upload" -@pytest.mark.parametrize("captured_logs", [logging.ERROR], indirect=True) -def test_log_upload_failures_are_non_fatal(captured_logs, mocker): +def test_log_upload_failures_are_non_fatal(mocker): proc = ActivitySubprocess( process_log=mocker.MagicMock(), id=TI_ID, @@ -2927,24 +2926,18 @@ def test_log_upload_failures_are_non_fatal(captured_logs, mocker): ) proc.ti = mocker.MagicMock() - mocker.patch("airflow.sdk.execution_time.supervisor._remote_logging_conn", return_value=nullcontext()) - upload_to_remote = mocker.patch( - "airflow.sdk.log.upload_to_remote", side_effect=RuntimeError("upload failed") + mocker.patch( + "airflow.sdk.execution_time.supervisor._remote_logging_conn", + side_effect=RuntimeError("upload failed"), ) proc._upload_logs() - upload_to_remote.assert_called_once_with(proc.process_log, proc.ti) - assert { - "event": "Failed to upload remote logs", - "level": "error", - "logger": "supervisor", - "ti_id": TI_ID, - "pid": 12345, - "timestamp": mocker.ANY, - "loc": mocker.ANY, - "exc_info": mocker.ANY, - } in captured_logs + proc.process_log.exception.assert_called_once_with( + "Failed to upload remote logs", + ti_id=TI_ID, + pid=12345, + ) def test_remote_logging_conn_sets_process_context(monkeypatch, mocker):