diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 87e171d8cbccd..4b27314e0eb3d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1097,8 +1097,11 @@ def _upload_logs(self): """ from airflow.sdk.log import upload_to_remote - with _remote_logging_conn(self.client): - upload_to_remote(self.process_log, self.ti) + try: + with _remote_logging_conn(self.client): + upload_to_remote(self.process_log, self.ti) + except Exception: + self.process_log.exception("Failed to upload remote logs", ti_id=self.id, pid=self.pid) def _monitor_subprocess(self): """ diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 39540913f454a..3ab95e0810e09 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2915,6 +2915,31 @@ def mock_upload_to_remote(process_log, ti): assert connection_available["conn_uri"] is not None, "Connection URI was None during upload" +def test_log_upload_failures_are_non_fatal(mocker): + proc = ActivitySubprocess( + process_log=mocker.MagicMock(), + id=TI_ID, + pid=12345, + stdin=mocker.MagicMock(), + client=mocker.MagicMock(), + process=mocker.MagicMock(), + ) + proc.ti = mocker.MagicMock() + + mocker.patch( + "airflow.sdk.execution_time.supervisor._remote_logging_conn", + side_effect=RuntimeError("upload failed"), + ) + + proc._upload_logs() + + proc.process_log.exception.assert_called_once_with( + "Failed to upload remote logs", + ti_id=TI_ID, + pid=12345, + ) + + def test_remote_logging_conn_sets_process_context(monkeypatch, mocker): """ Test that _remote_logging_conn sets _AIRFLOW_PROCESS_CONTEXT=client.