From 313bc16c49f27481a14de3c6ed87c43377b88c7d Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 7 May 2026 23:57:27 +0200 Subject: [PATCH 1/2] Surface remote-log upload failures via structured warnings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `upload_to_remote()` silently returned in three failure paths (handler load failure, log-path resolution exception, handler.upload exception) with no signal to the operator that logs were not reaching the remote system. The supervisor's `_upload_logs` already logs the outermost exception, but the inner paths inside `upload_to_remote()` were silent — so a misconfigured remote handler or a transient remote-system outage would degrade silently while local-only logs continued. Add a dedicated `airflow.logging.remote` structlog logger and emit a `log.warning` at each of the three failure paths with the TI id and the underlying error string. No behaviour change otherwise — failures still fall through softly so a bad remote handler doesn't abort the task lifecycle. Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-004). --- task-sdk/src/airflow/sdk/log.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index f93abe1a72240..f9c73e19ce3b3 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -226,20 +226,42 @@ def relative_path_from_logger(logger) -> Path | None: def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI): raw_logger = getattr(logger, "_logger") + # Dedicated logger for remote-upload visibility — operators relying on + # remote log handlers need a way to see when those handlers fail to load + # or fail to upload. The default behaviour was to silently fall through. + upload_log = structlog.get_logger("airflow.logging.remote") handler = load_remote_log_handler() if not handler: + upload_log.warning( + "remote_log_handler_unavailable", + ti_id=str(getattr(ti, "id", None)), + note="Remote log handler could not be loaded; logs will be available locally only.", + ) return try: relative_path = relative_path_from_logger(raw_logger) - except Exception: + except Exception as exc: + upload_log.warning( + "remote_log_path_resolution_failed", + ti_id=str(getattr(ti, "id", None)), + error=str(exc), + ) return if not relative_path: return log_relative_path = relative_path.as_posix() - handler.upload(log_relative_path, ti) + try: + handler.upload(log_relative_path, ti) + except Exception as exc: + upload_log.warning( + "remote_log_upload_failed", + ti_id=str(getattr(ti, "id", None)), + log_relative_path=log_relative_path, + error=str(exc), + ) def mask_secret(secret: JsonValue, name: str | None = None) -> None: From 9c399930debe5f79454c400be6ef46a80e5675a7 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 19 May 2026 10:11:42 +0200 Subject: [PATCH 2/2] Address review feedback on remote-log upload warnings - Use ti.id directly (available since 3.0) instead of getattr fallback. - Pass exc_info=exc to structlog so the traceback is preserved on path-resolution and upload failures. - Trim past-tense from the docstring comment. - Add tests covering the three failure paths and the silent success / no-path cases. --- task-sdk/src/airflow/sdk/log.py | 12 +-- task-sdk/tests/task_sdk/test_log.py | 120 ++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 6 deletions(-) create mode 100644 task-sdk/tests/task_sdk/test_log.py diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index f9c73e19ce3b3..9542baa404602 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -228,14 +228,14 @@ def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI): raw_logger = getattr(logger, "_logger") # Dedicated logger for remote-upload visibility — operators relying on # remote log handlers need a way to see when those handlers fail to load - # or fail to upload. The default behaviour was to silently fall through. + # or fail to upload. upload_log = structlog.get_logger("airflow.logging.remote") handler = load_remote_log_handler() if not handler: upload_log.warning( "remote_log_handler_unavailable", - ti_id=str(getattr(ti, "id", None)), + ti_id=str(ti.id), note="Remote log handler could not be loaded; logs will be available locally only.", ) return @@ -245,8 +245,8 @@ def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI): except Exception as exc: upload_log.warning( "remote_log_path_resolution_failed", - ti_id=str(getattr(ti, "id", None)), - error=str(exc), + ti_id=str(ti.id), + exc_info=exc, ) return if not relative_path: @@ -258,9 +258,9 @@ def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI): except Exception as exc: upload_log.warning( "remote_log_upload_failed", - ti_id=str(getattr(ti, "id", None)), + ti_id=str(ti.id), log_relative_path=log_relative_path, - error=str(exc), + exc_info=exc, ) diff --git a/task-sdk/tests/task_sdk/test_log.py b/task-sdk/tests/task_sdk/test_log.py new file mode 100644 index 0000000000000..6b7b22a4cb639 --- /dev/null +++ b/task-sdk/tests/task_sdk/test_log.py @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import structlog +import structlog.testing +from uuid6 import uuid7 + +from airflow.sdk import log as sdk_log + + +def _make_ti(): + ti = mock.MagicMock() + ti.id = uuid7() + return ti + + +def _make_logger(): + """Build a FilteringBoundLogger-like object exposing ``_logger``.""" + logger = mock.MagicMock() + logger._logger = mock.MagicMock() + return logger + + +class TestUploadToRemote: + def test_warns_when_handler_unavailable(self): + ti = _make_ti() + with ( + mock.patch.object(sdk_log, "load_remote_log_handler", return_value=None), + structlog.testing.capture_logs() as captured, + ): + sdk_log.upload_to_remote(_make_logger(), ti) + + events = [e for e in captured if e["event"] == "remote_log_handler_unavailable"] + assert len(events) == 1 + assert events[0]["log_level"] == "warning" + assert events[0]["ti_id"] == str(ti.id) + + def test_warns_when_path_resolution_fails(self): + ti = _make_ti() + handler = mock.MagicMock() + boom = RuntimeError("cannot resolve path") + with ( + mock.patch.object(sdk_log, "load_remote_log_handler", return_value=handler), + mock.patch.object(sdk_log, "relative_path_from_logger", side_effect=boom), + structlog.testing.capture_logs() as captured, + ): + sdk_log.upload_to_remote(_make_logger(), ti) + + events = [e for e in captured if e["event"] == "remote_log_path_resolution_failed"] + assert len(events) == 1 + assert events[0]["log_level"] == "warning" + assert events[0]["ti_id"] == str(ti.id) + assert events[0]["exc_info"] is boom + handler.upload.assert_not_called() + + def test_warns_when_upload_fails(self, tmp_path): + ti = _make_ti() + handler = mock.MagicMock() + boom = RuntimeError("s3 unreachable") + handler.upload.side_effect = boom + relative = tmp_path / "dag_id" / "run_id" / "task.log" + with ( + mock.patch.object(sdk_log, "load_remote_log_handler", return_value=handler), + mock.patch.object(sdk_log, "relative_path_from_logger", return_value=relative), + structlog.testing.capture_logs() as captured, + ): + sdk_log.upload_to_remote(_make_logger(), ti) + + events = [e for e in captured if e["event"] == "remote_log_upload_failed"] + assert len(events) == 1 + assert events[0]["log_level"] == "warning" + assert events[0]["ti_id"] == str(ti.id) + assert events[0]["log_relative_path"] == relative.as_posix() + assert events[0]["exc_info"] is boom + handler.upload.assert_called_once_with(relative.as_posix(), ti) + + def test_silent_when_relative_path_is_none(self): + ti = _make_ti() + handler = mock.MagicMock() + with ( + mock.patch.object(sdk_log, "load_remote_log_handler", return_value=handler), + mock.patch.object(sdk_log, "relative_path_from_logger", return_value=None), + structlog.testing.capture_logs() as captured, + ): + sdk_log.upload_to_remote(_make_logger(), ti) + + assert captured == [] + handler.upload.assert_not_called() + + def test_silent_on_success(self, tmp_path): + ti = _make_ti() + handler = mock.MagicMock() + relative = tmp_path / "dag_id" / "run_id" / "task.log" + with ( + mock.patch.object(sdk_log, "load_remote_log_handler", return_value=handler), + mock.patch.object(sdk_log, "relative_path_from_logger", return_value=relative), + structlog.testing.capture_logs() as captured, + ): + sdk_log.upload_to_remote(_make_logger(), ti) + + assert captured == [] + handler.upload.assert_called_once_with(relative.as_posix(), ti)