From 99a22a2ca10d0fddadbd3507edb0816ab8c0e55f Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 15 May 2026 15:04:43 +0800 Subject: [PATCH 1/7] Clarify the purpose of logging_class_path --- .../advanced-logging-configuration.rst | 64 +++++++++++++-- .../airflow_local_settings.py | 9 ++- .../src/airflow/config_templates/config.yml | 9 ++- airflow-core/src/airflow/logging_config.py | 18 ++++- providers/opensearch/docs/logging/index.rst | 4 + .../src/airflow_shared/logging/remote.py | 31 +++++++- shared/logging/tests/logging/test_remote.py | 78 +++++++++++++++++++ task-sdk/src/airflow/sdk/log.py | 16 +++- 8 files changed, 213 insertions(+), 16 deletions(-) diff --git a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst index fa34e74f3140b..a5a3fbd3fce5c 100644 --- a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst +++ b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst @@ -46,13 +46,14 @@ that Python objects log to loggers that follow naming convention of ``. You can read more about standard python logging classes (Loggers, Handlers, Formatters) in the `Python logging documentation `_. -Create a custom logging class ------------------------------ +Create a custom logging config +------------------------------ Configuring your logging classes can be done via the ``logging_config_class`` option in ``airflow.cfg`` file. -This configuration should specify the import path to a configuration compatible with -:func:`logging.config.dictConfig`. If your file is a standard import location, then you should set a -:envvar:`PYTHONPATH` environment variable. +Despite the option name the value is a dotted import path to a ``dict`` that satisfies +:func:`logging.config.dictConfig` — not a Python class. The ``_class`` suffix is +historical and kept for backwards compatibility. If your file is a standard import +location, then you should set a :envvar:`PYTHONPATH` environment variable. Follow the steps below to enable custom logging config class: @@ -102,6 +103,59 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo You can override the way both standard logs of the components and "task" logs are handled. +Custom logging configs and remote logging +----------------------------------------- + +When ``[logging] remote_logging = True``, Airflow does **not** read the remote-log +handler out of the dict you point ``logging_config_class`` at. After importing +the dict it re-imports the *enclosing module* and looks up two optional +module-level attributes via ``getattr``: + +* ``REMOTE_TASK_LOG`` — an instance of + :class:`~airflow.logging.remote.RemoteLogIO` (or + :class:`~airflow.logging.remote.RemoteLogStreamIO`) that uploads task logs + and reads them back for the UI. +* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id used when + ``[logging] remote_log_conn_id`` is unset. + +If your module does not expose ``REMOTE_TASK_LOG``, remote-log read-back is +silently disabled and Airflow emits one ``WARNING`` at startup. + +Define both attributes alongside your ``LOGGING_CONFIG``: + + .. code-block:: python + + # ~/airflow/config/log_config.py + from airflow.logging.remote import RemoteLogIO + + + class MyRemoteLogIO: + @property + def processors(self): + return () + + def upload(self, path, ti): ... # upload local log file at ``path`` to your backend + + def read(self, relative_path, ti): ... # return (source_info, log_messages) for the UI + + + REMOTE_TASK_LOG: RemoteLogIO | None = MyRemoteLogIO() + DEFAULT_REMOTE_CONN_ID: str | None = "my_remote_conn" + + LOGGING_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + # ... your formatters / handlers / loggers / root ... + } + +.. note:: + + ``airflow.config_templates.airflow_local_settings`` (and its + ``DEFAULT_LOGGING_CONFIG``) is planned for deprecation. Build + ``LOGGING_CONFIG`` directly rather than deep-copying from it, and implement + ``REMOTE_TASK_LOG`` yourself rather than re-exporting from that module. + + Custom logger for Operators, Hooks and Tasks -------------------------------------------- diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 7072d2dd6ea88..4ad479a8c914c 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -49,7 +49,9 @@ BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")) -# This isn't used anymore, but kept for compat of people who might have imported it +# Default value for the ``[logging] logging_config_class`` option. Plain +# ``logging.config.dictConfig`` dict; the ``_class`` suffix on the config option +# is historical. DEFAULT_LOGGING_CONFIG: dict[str, Any] = { "version": 1, "disable_existing_loggers": False, @@ -121,6 +123,11 @@ ################## REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") + +# Side-channel attributes read by ``discover_remote_log_handler`` from whichever +# module ``[logging] logging_config_class`` resolves through. Custom modules that +# override that option should define both at module scope to enable remote +# task-log read-back. REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None DEFAULT_REMOTE_CONN_ID: str | None = None diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 03593ce4ba036..17761527b23a6 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -863,9 +863,12 @@ logging: default: "INFO" logging_config_class: description: | - Logging class - Specify the class that will specify the logging configuration - This class has to be on the python classpath + Dotted import path to a ``logging.config.dictConfig`` dict. The + ``_class`` suffix is historical — the target is a dict, not a class. + + Airflow also reads two optional module-level attributes from the + enclosing module: ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID``, + used to drive remote task-log read-back. See :ref:`write-logs-advanced`. version_added: 2.0.0 type: string example: "my.path.default_local_settings.LOGGING_CONFIG" diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index 0da017d9b0826..198038f4e522e 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -60,7 +60,14 @@ def get_default_remote_conn_id() -> str | None: def load_logging_config() -> tuple[dict[str, Any], str]: - """Configure & Validate Airflow Logging.""" + """ + Resolve ``[logging] logging_config_class`` and the remote-log side channel. + + The option is a dotted path to a ``logging.config.dictConfig`` dict. After + importing the dict, the enclosing module is probed for optional + ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID`` attributes via + :func:`airflow._shared.logging.remote.discover_remote_log_handler`. + """ fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) @@ -86,9 +93,14 @@ def load_logging_config() -> tuple[dict[str, Any], str]: f"to: {type(err).__name__}:{err}" ) else: - # Load remote logging configuration using shared discovery logic + # Load remote logging configuration using shared discovery logic. + # Passing remote_logging_enabled lets the helper warn when a user-defined + # module forgot to expose REMOTE_TASK_LOG while remote logging is on. remote_task_log, default_remote_conn_id = discover_remote_log_handler( - logging_class_path, fallback, import_string + logging_class_path, + fallback, + import_string, + remote_logging_enabled=conf.getboolean("logging", "remote_logging", fallback=False), ) _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id) diff --git a/providers/opensearch/docs/logging/index.rst b/providers/opensearch/docs/logging/index.rst index 34f2dc359b9b0..e3f58e113554d 100644 --- a/providers/opensearch/docs/logging/index.rst +++ b/providers/opensearch/docs/logging/index.rst @@ -132,6 +132,10 @@ Then, in ``airflow.cfg``: remote_logging = True logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG +See :ref:`write-logs-advanced` for the general ``logging_config_class`` contract, +including the ``REMOTE_TASK_LOG`` / ``DEFAULT_REMOTE_CONN_ID`` attributes a +hand-written module needs when the bundled handler does not self-register. + On Airflow **3.2.1+** this override is unnecessary — the stock ``airflow_local_settings.py`` already contains an ``elif OPENSEARCH_HOST:`` branch, so configuring the ``[opensearch]`` section in ``airflow.cfg`` is sufficient. diff --git a/shared/logging/src/airflow_shared/logging/remote.py b/shared/logging/src/airflow_shared/logging/remote.py index d8d76ef6a078c..d4b0b533a1cb0 100644 --- a/shared/logging/src/airflow_shared/logging/remote.py +++ b/shared/logging/src/airflow_shared/logging/remote.py @@ -78,9 +78,29 @@ def discover_remote_log_handler( logging_class_path: str, fallback_path: str, import_string: Callable[[str], Any], + *, + remote_logging_enabled: bool = False, ) -> tuple[RemoteLogIO | None, str | None]: - """Discover and load the remote log handler from a logging config module.""" + """ + Look up the optional remote-log handler alongside a logging dictConfig. + + ``[logging] logging_config_class`` is a dotted path to a + ``logging.config.dictConfig`` dict. After importing the dict, this helper + re-imports the enclosing module and reads two optional module-level + attributes via ``getattr``: + + * ``REMOTE_TASK_LOG`` — :class:`RemoteLogIO` / :class:`RemoteLogStreamIO` + instance that uploads and reads task logs. + * ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id for that + backend. + + Missing attributes are not fatal; they disable remote-log read-back. When + ``remote_logging_enabled=True`` and a user-defined module exposes no + ``REMOTE_TASK_LOG``, a single warning is emitted so the misconfiguration + is visible. + """ # Sometimes we end up with `""` as the value! + user_defined = bool(logging_class_path) and logging_class_path != fallback_path logging_class_path = logging_class_path or fallback_path try: @@ -97,6 +117,15 @@ def discover_remote_log_handler( remote_task_log = getattr(mod, "REMOTE_TASK_LOG", None) default_remote_conn_id = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None) + if remote_task_log is None and user_defined and remote_logging_enabled: + log.warning( + "[logging] remote_logging is enabled but the user-defined logging module %r " + "does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is " + "disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope " + "to enable it.", + modpath, + ) + return remote_task_log, default_remote_conn_id except Exception as err: diff --git a/shared/logging/tests/logging/test_remote.py b/shared/logging/tests/logging/test_remote.py index a8f1d7ab29091..5bd36eb23c252 100644 --- a/shared/logging/tests/logging/test_remote.py +++ b/shared/logging/tests/logging/test_remote.py @@ -134,6 +134,84 @@ def test_handles_module_loading_error(self): assert result_handler is None assert result_conn is None + def test_warns_when_user_module_missing_remote_task_log_with_remote_logging(self): + config = {"version": 1} + mock_module = mock.MagicMock(spec=[]) # no REMOTE_TASK_LOG, no DEFAULT_REMOTE_CONN_ID + + with ( + mock.patch("airflow_shared.logging.remote.import_module", return_value=mock_module), + mock.patch("airflow_shared.logging.remote.log") as mock_log, + ): + result_handler, result_conn = discover_remote_log_handler( + "my_pkg.custom_settings.LOGGING_CONFIG", + "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG", + lambda path: config, + remote_logging_enabled=True, + ) + + assert result_handler is None + assert result_conn is None + mock_log.warning.assert_called_once() + warning_args = mock_log.warning.call_args + # The module path (parent of the dotted path) should appear in the formatted message. + assert "my_pkg.custom_settings" in warning_args.args + + def test_no_warning_when_using_fallback_path(self): + config = {"version": 1} + mock_module = mock.MagicMock(spec=[]) # no REMOTE_TASK_LOG + + fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" + with ( + mock.patch("airflow_shared.logging.remote.import_module", return_value=mock_module), + mock.patch("airflow_shared.logging.remote.log") as mock_log, + ): + discover_remote_log_handler( + fallback, + fallback, + lambda path: config, + remote_logging_enabled=True, + ) + + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_logging_disabled(self): + config = {"version": 1} + mock_module = mock.MagicMock(spec=[]) # no REMOTE_TASK_LOG + + with ( + mock.patch("airflow_shared.logging.remote.import_module", return_value=mock_module), + mock.patch("airflow_shared.logging.remote.log") as mock_log, + ): + discover_remote_log_handler( + "my_pkg.custom_settings.LOGGING_CONFIG", + "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG", + lambda path: config, + remote_logging_enabled=False, + ) + + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_task_log_present(self): + handler = DummyRemoteLogIO() + config = {"version": 1} + mock_module = mock.MagicMock() + mock_module.REMOTE_TASK_LOG = handler + mock_module.DEFAULT_REMOTE_CONN_ID = None + + with ( + mock.patch("airflow_shared.logging.remote.import_module", return_value=mock_module), + mock.patch("airflow_shared.logging.remote.log") as mock_log, + ): + result_handler, _ = discover_remote_log_handler( + "my_pkg.custom_settings.LOGGING_CONFIG", + "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG", + lambda path: config, + remote_logging_enabled=True, + ) + + assert result_handler is handler + mock_log.warning.assert_not_called() + class TestRemoteLogIOProtocol: def test_dummy_implements_protocol(self): diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index f93abe1a72240..aa040a3e7cc29 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -172,7 +172,12 @@ def init_log_file(local_relative_path: str) -> Path: def _load_logging_config() -> None: - """Load and cache the remote logging configuration from SDK config.""" + """ + Load and cache the remote logging configuration from SDK config. + + SDK mirror of :func:`airflow.logging_config.load_logging_config` — see that + function for the ``logging_config_class`` / ``REMOTE_TASK_LOG`` contract. + """ from airflow.sdk._shared.logging.remote import discover_remote_log_handler from airflow.sdk._shared.module_loading import import_string from airflow.sdk.configuration import conf @@ -180,9 +185,14 @@ def _load_logging_config() -> None: fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) - # Load remote logging configuration using shared discovery logic + # Load remote logging configuration using shared discovery logic. + # Passing remote_logging_enabled lets the helper warn when a user-defined + # module forgot to expose REMOTE_TASK_LOG while remote logging is on. remote_task_log, default_remote_conn_id = discover_remote_log_handler( - logging_class_path, fallback, import_string + logging_class_path, + fallback, + import_string, + remote_logging_enabled=conf.getboolean("logging", "remote_logging", fallback=False), ) _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id) From a2ad476942339d54e8e43bfb57ff9b96dd7c7c89 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 15 May 2026 18:53:24 +0800 Subject: [PATCH 2/7] Deprecate implicit registration of REMOTE_TASK_LOG in Elasticsearch and Opensearch task handlers --- .../advanced-logging-configuration.rst | 15 +++----- providers/elasticsearch/docs/changelog.rst | 7 ++++ .../elasticsearch/log/es_task_handler.py | 22 +++++++++++ .../elasticsearch/log/test_es_task_handler.py | 10 ++++- providers/opensearch/docs/changelog.rst | 8 ++++ providers/opensearch/docs/logging/index.rst | 38 ++++++++++++++++--- .../opensearch/log/os_task_handler.py | 22 +++++++++++ .../opensearch/log/test_os_task_handler.py | 10 ++++- 8 files changed, 115 insertions(+), 17 deletions(-) diff --git a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst index a5a3fbd3fce5c..781662e678010 100644 --- a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst +++ b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst @@ -106,10 +106,9 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo Custom logging configs and remote logging ----------------------------------------- -When ``[logging] remote_logging = True``, Airflow does **not** read the remote-log -handler out of the dict you point ``logging_config_class`` at. After importing -the dict it re-imports the *enclosing module* and looks up two optional -module-level attributes via ``getattr``: +When ``[logging] remote_logging = True`` and you point ``logging_config_class`` +at your own module, define two module-level attributes alongside your +``LOGGING_CONFIG`` dict: * ``REMOTE_TASK_LOG`` — an instance of :class:`~airflow.logging.remote.RemoteLogIO` (or @@ -118,10 +117,8 @@ module-level attributes via ``getattr``: * ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id used when ``[logging] remote_log_conn_id`` is unset. -If your module does not expose ``REMOTE_TASK_LOG``, remote-log read-back is -silently disabled and Airflow emits one ``WARNING`` at startup. - -Define both attributes alongside your ``LOGGING_CONFIG``: +If ``REMOTE_TASK_LOG`` is missing, Airflow emits one ``WARNING`` at startup +and the UI cannot read task logs back from the remote backend. .. code-block:: python @@ -152,7 +149,7 @@ Define both attributes alongside your ``LOGGING_CONFIG``: ``airflow.config_templates.airflow_local_settings`` (and its ``DEFAULT_LOGGING_CONFIG``) is planned for deprecation. Build - ``LOGGING_CONFIG`` directly rather than deep-copying from it, and implement + ``LOGGING_CONFIG`` directly rather than deep-copying from it, and define ``REMOTE_TASK_LOG`` yourself rather than re-exporting from that module. diff --git a/providers/elasticsearch/docs/changelog.rst b/providers/elasticsearch/docs/changelog.rst index 2209b6ff95b1d..103293b13e3b7 100644 --- a/providers/elasticsearch/docs/changelog.rst +++ b/providers/elasticsearch/docs/changelog.rst @@ -27,6 +27,13 @@ Changelog --------- +``ElasticsearchTaskHandler`` no longer silently registers itself as the remote +task-log reader during ``dictConfig``. The implicit registration still happens +for one more release but now emits an ``AirflowProviderDeprecationWarning`` and +will be removed in a future provider release. If you ship a custom +``[logging] logging_config_class`` module that swaps in +``ElasticsearchTaskHandler``, set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` +at module scope in that module. 6.5.3 ..... diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index eb531452df824..fd495a3a33990 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -25,6 +25,7 @@ import shutil import sys import time +import warnings from collections import defaultdict from collections.abc import Callable, Iterable from operator import attrgetter @@ -41,6 +42,7 @@ from elasticsearch.exceptions import NotFoundError import airflow.logging_config as alc +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models.dagrun import DagRun from airflow.providers.common.compat.sdk import conf from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter @@ -317,8 +319,28 @@ def __init__( from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log if get_remote_task_log() is None: + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler " + "during dictConfig is deprecated and will be removed in a future " + "provider release. Set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` " + "at module scope in your ``[logging] logging_config_class`` module. " + "See the Elasticsearch provider logging documentation for the " + "updated override example.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) _ActiveLoggingConfig.set(self.io, None) elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined] + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler " + "during dictConfig is deprecated and will be removed in a future " + "provider release. Set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` " + "at module scope in your ``[logging] logging_config_class`` module. " + "See the Elasticsearch provider logging documentation for the " + "updated override example.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined] @staticmethod diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index 3ac143fa35bb0..7b0ef35d75927 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -168,7 +168,15 @@ def _use_historical_filename_templates(self): yield @pytest.fixture(autouse=True) - def _setup_handler(self, tmp_path): + def _setup_handler(self, tmp_path, monkeypatch): + # Mirror the recommended user pattern (REMOTE_TASK_LOG defined at module + # scope in the logging_config_class module) so ElasticsearchTaskHandler.__init__ + # does not emit the deprecated implicit-registration warning during tests. + from airflow.logging_config import _ActiveLoggingConfig + + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object()) + self.local_log_location = str(tmp_path / "logs") self.end_of_log_mark = "end_of_log\n" self.write_stdout = False diff --git a/providers/opensearch/docs/changelog.rst b/providers/opensearch/docs/changelog.rst index 941ccd7e1a9c8..d7494f51cc122 100644 --- a/providers/opensearch/docs/changelog.rst +++ b/providers/opensearch/docs/changelog.rst @@ -27,6 +27,14 @@ Changelog --------- +``OpensearchTaskHandler`` no longer silently registers itself as the remote +task-log reader during ``dictConfig``. The implicit registration still happens +for one more release but now emits an ``AirflowProviderDeprecationWarning`` and +will be removed in a future provider release. If you ship a custom +``[logging] logging_config_class`` module that swaps in ``OpensearchTaskHandler``, +set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at module scope in that +module. See the OpenSearch provider's logging guide for the updated override +example. 1.9.1 ..... diff --git a/providers/opensearch/docs/logging/index.rst b/providers/opensearch/docs/logging/index.rst index e3f58e113554d..43f849217110c 100644 --- a/providers/opensearch/docs/logging/index.rst +++ b/providers/opensearch/docs/logging/index.rst @@ -88,9 +88,7 @@ The wiring that registers ``OpensearchTaskHandler`` inside the stock ``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) only landed in Airflow **3.2.1**. On Airflow **3.0.0 – 3.2.0** installing the provider is not enough: to make the UI's log viewer fetch logs from OpenSearch you must ship a custom logging -config that swaps the ``task`` handler. The handler self-registers as the remote-log -reader on construction (via ``REMOTE_TASK_LOG`` on 3.0/3.1 and ``_ActiveLoggingConfig`` -on 3.2), so swapping the handler class is the only change required. +config that swaps the ``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. Create a module on the Python path — for example ``config/airflow_local_settings.py`` — and point Airflow at it via ``[logging] logging_config_class``: @@ -102,9 +100,13 @@ and point Airflow at it via ``[logging] logging_config_class``: DEFAULT_LOGGING_CONFIG, ) from airflow.providers.common.compat.sdk import conf + from airflow.providers.opensearch.log.os_task_handler import OpensearchRemoteLogIO OPENSEARCH_HOST = conf.get("opensearch", "host", fallback=None) + REMOTE_TASK_LOG = None + DEFAULT_REMOTE_CONN_ID = None + if OPENSEARCH_HOST: DEFAULT_LOGGING_CONFIG["handlers"]["task"] = { "class": "airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler", @@ -123,6 +125,25 @@ and point Airflow at it via ``[logging] logging_config_class``: "write_to_opensearch": conf.getboolean("opensearch", "write_to_os", fallback=False), "target_index": conf.get("opensearch", "target_index", fallback="airflow-logs"), } + REMOTE_TASK_LOG = OpensearchRemoteLogIO( + host=OPENSEARCH_HOST, + port=conf.getint("opensearch", "port", fallback=9200), + username=conf.get("opensearch", "username"), + password=conf.get("opensearch", "password"), + target_index=conf.get("opensearch", "target_index", fallback="airflow-logs"), + write_stdout=conf.getboolean("opensearch", "write_stdout"), + write_to_opensearch=conf.getboolean("opensearch", "write_to_os", fallback=False), + offset_field=conf.get("opensearch", "offset_field", fallback="offset"), + host_field=conf.get("opensearch", "host_field", fallback="host"), + base_log_folder=str(BASE_LOG_FOLDER), + delete_local_copy=conf.getboolean("logging", "delete_local_logs"), + json_format=conf.getboolean("opensearch", "json_format"), + log_id_template=conf.get( + "opensearch", + "log_id_template", + fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", + ), + ) Then, in ``airflow.cfg``: @@ -132,9 +153,14 @@ Then, in ``airflow.cfg``: remote_logging = True logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG -See :ref:`write-logs-advanced` for the general ``logging_config_class`` contract, -including the ``REMOTE_TASK_LOG`` / ``DEFAULT_REMOTE_CONN_ID`` attributes a -hand-written module needs when the bundled handler does not self-register. +.. note:: + + Earlier versions of this guide relied on ``OpensearchTaskHandler`` self-registering + ``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` instantiated it. + That implicit registration is now deprecated (``AirflowProviderDeprecationWarning``) + and will be removed in a future provider release; define ``REMOTE_TASK_LOG`` at + module scope as shown above. See :ref:`write-logs-advanced` for the full + ``logging_config_class`` contract. On Airflow **3.2.1+** this override is unnecessary — the stock ``airflow_local_settings.py`` already contains an ``elif OPENSEARCH_HOST:`` branch, so configuring the ``[opensearch]`` diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index 45e1a3ef17250..0ebdfcca8a2bf 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -23,6 +23,7 @@ import os import sys import time +import warnings from collections import defaultdict from collections.abc import Callable from datetime import datetime @@ -38,6 +39,7 @@ from sqlalchemy import select import airflow.logging_config as alc +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import DagRun from airflow.providers.common.compat.module_loading import import_string from airflow.providers.common.compat.sdk import AirflowException, conf @@ -380,8 +382,28 @@ def __init__( from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log if get_remote_task_log() is None: + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by OpensearchTaskHandler during " + "dictConfig is deprecated and will be removed in a future provider " + "release. Set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at " + "module scope in your ``[logging] logging_config_class`` module. See " + "the OpenSearch provider logging documentation for the updated " + "override example.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) _ActiveLoggingConfig.set(self.io, None) elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined] + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by OpensearchTaskHandler during " + "dictConfig is deprecated and will be removed in a future provider " + "release. Set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at " + "module scope in your ``[logging] logging_config_class`` module. See " + "the OpenSearch provider logging documentation for the updated " + "override example.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined] def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index 53fd285de6863..edd83814c673f 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -159,7 +159,15 @@ def _use_historical_filename_templates(self): yield @pytest.fixture(autouse=True) - def _setup_handler(self, tmp_path): + def _setup_handler(self, tmp_path, monkeypatch): + # Mirror the recommended user pattern (REMOTE_TASK_LOG defined at module + # scope in the logging_config_class module) so OpensearchTaskHandler.__init__ + # does not emit the deprecated implicit-registration warning during tests. + from airflow.logging_config import _ActiveLoggingConfig + + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object()) + self.local_log_location = str(tmp_path / "logs") self.end_of_log_mark = "end_of_log\n" self.write_stdout = False From 1a460983f0397c8b407b01459e210b1afe4ad9bc Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 15 May 2026 19:05:43 +0800 Subject: [PATCH 3/7] CI: Fix OS and ES tests --- .../tests/unit/elasticsearch/log/test_es_task_handler.py | 7 +++++-- .../tests/unit/opensearch/log/test_os_task_handler.py | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index 7b0ef35d75927..a4aabd6cdc6e8 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -172,10 +172,13 @@ def _setup_handler(self, tmp_path, monkeypatch): # Mirror the recommended user pattern (REMOTE_TASK_LOG defined at module # scope in the logging_config_class module) so ElasticsearchTaskHandler.__init__ # does not emit the deprecated implicit-registration warning during tests. + # ``remote_task_log`` is annotation-only on _ActiveLoggingConfig (no default + # at class scope), so raising=False is required for isolated runs where the + # attribute has not yet been initialized by load_logging_config(). from airflow.logging_config import _ActiveLoggingConfig - monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True) - monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object()) + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) self.local_log_location = str(tmp_path / "logs") self.end_of_log_mark = "end_of_log\n" diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index edd83814c673f..0a7f5106967b5 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -163,10 +163,13 @@ def _setup_handler(self, tmp_path, monkeypatch): # Mirror the recommended user pattern (REMOTE_TASK_LOG defined at module # scope in the logging_config_class module) so OpensearchTaskHandler.__init__ # does not emit the deprecated implicit-registration warning during tests. + # ``remote_task_log`` is annotation-only on _ActiveLoggingConfig (no default + # at class scope), so raising=False is required for isolated runs where the + # attribute has not yet been initialized by load_logging_config(). from airflow.logging_config import _ActiveLoggingConfig - monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True) - monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object()) + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) self.local_log_location = str(tmp_path / "logs") self.end_of_log_mark = "end_of_log\n" From bf826fcf06504929672b3836a978a0fe1d30372c Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 15 May 2026 20:27:36 +0800 Subject: [PATCH 4/7] Move missing-REMOTE_TASK_LOG warning to after dictConfig The warning previously fired inside ``discover_remote_log_handler`` during ``load_logging_config``, before ``dictConfig`` had constructed handlers. That misfired for the deprecated self-registration path in the Elasticsearch and OpenSearch task handlers, which populate ``_ActiveLoggingConfig.remote_task_log`` from inside ``__init__``. Drop the ``remote_logging_enabled`` parameter from ``discover_remote_log_handler`` and emit the warning from ``configure_logging`` after ``dictConfig`` runs, so the check sees the final state. Extract the duplicated test fixture into provider ``conftest.py`` files and switch the deprecation warnings to ``stacklevel=1`` so module-based deprecation filters still match. --- airflow-core/src/airflow/logging_config.py | 51 +++++++++--- .../tests/unit/logging/test_logging_config.py | 66 ++++++++++++++++ .../elasticsearch/log/es_task_handler.py | 7 +- .../tests/unit/elasticsearch/log/conftest.py | 40 ++++++++++ .../elasticsearch/log/test_es_task_handler.py | 13 +--- .../opensearch/log/os_task_handler.py | 7 +- .../tests/unit/opensearch/log/conftest.py | 40 ++++++++++ .../opensearch/log/test_os_task_handler.py | 13 +--- .../src/airflow_shared/logging/remote.py | 21 ++--- shared/logging/tests/logging/test_remote.py | 78 ------------------- task-sdk/src/airflow/sdk/log.py | 7 +- 11 files changed, 205 insertions(+), 138 deletions(-) create mode 100644 airflow-core/tests/unit/logging/test_logging_config.py create mode 100644 providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py create mode 100644 providers/opensearch/tests/unit/opensearch/log/conftest.py diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index 198038f4e522e..e9a41814fbe6e 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -31,6 +31,9 @@ log = logging.getLogger(__name__) +# Default ``[logging] logging_config_class`` value when the user has not overridden it. +DEFAULT_LOGGING_CONFIG_PATH = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" + class _ActiveLoggingConfig: """Private class to hold active logging config variables.""" @@ -68,13 +71,12 @@ def load_logging_config() -> tuple[dict[str, Any], str]: ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID`` attributes via :func:`airflow._shared.logging.remote.discover_remote_log_handler`. """ - fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" - logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) + logging_class_path = conf.get("logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH) # Sometimes we end up with `""` as the value! - logging_class_path = logging_class_path or fallback + logging_class_path = logging_class_path or DEFAULT_LOGGING_CONFIG_PATH - user_defined = logging_class_path != fallback + user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH try: logging_config = import_string(logging_class_path) @@ -94,19 +96,44 @@ def load_logging_config() -> tuple[dict[str, Any], str]: ) else: # Load remote logging configuration using shared discovery logic. - # Passing remote_logging_enabled lets the helper warn when a user-defined - # module forgot to expose REMOTE_TASK_LOG while remote logging is on. remote_task_log, default_remote_conn_id = discover_remote_log_handler( - logging_class_path, - fallback, - import_string, - remote_logging_enabled=conf.getboolean("logging", "remote_logging", fallback=False), + logging_class_path, DEFAULT_LOGGING_CONFIG_PATH, import_string ) _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id) return logging_config, logging_class_path +def _warn_if_missing_remote_task_log(logging_class_path: str) -> None: + """ + Warn if ``[logging] remote_logging`` is on but the user module exposes no remote IO. + + Runs *after* ``dictConfig`` has constructed handlers, so deprecated + self-registration in provider task handlers (Elasticsearch, OpenSearch) has + already had its chance to populate ``_ActiveLoggingConfig.remote_task_log``. + Only fires for user-defined ``logging_config_class`` values; the stock + fallback is exempt. + """ + user_defined = bool(logging_class_path) and logging_class_path != DEFAULT_LOGGING_CONFIG_PATH + remote_logging_enabled = conf.getboolean("logging", "remote_logging", fallback=False) + if not (user_defined and remote_logging_enabled): + return + if _ActiveLoggingConfig.remote_task_log is not None: + return + # Strip the trailing ``.`` to leave the enclosing module path. + # ``logging_class_path`` should always be dotted since ``import_string`` + # would have raised otherwise, but guard the access defensively. + parts = logging_class_path.rsplit(".", 1) + modpath = parts[0] if len(parts) == 2 else logging_class_path + log.warning( + "[logging] remote_logging is enabled but the user-defined logging module %r " + "does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is " + "disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope " + "to enable it.", + modpath, + ) + + def configure_logging(): from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values @@ -158,6 +185,10 @@ def configure_logging(): # otherwise Airflow would silently fall back on the default config raise e + # Runs after dictConfig so deprecated handler self-registration (ES/OS) has + # had its chance to populate _ActiveLoggingConfig.remote_task_log. + _warn_if_missing_remote_task_log(logging_class_path) + validate_logging_config() new_folder_permissions = int( diff --git a/airflow-core/tests/unit/logging/test_logging_config.py b/airflow-core/tests/unit/logging/test_logging_config.py new file mode 100644 index 0000000000000..96b7da633651b --- /dev/null +++ b/airflow-core/tests/unit/logging/test_logging_config.py @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.logging_config import ( + DEFAULT_LOGGING_CONFIG_PATH, + _ActiveLoggingConfig, + _warn_if_missing_remote_task_log, +) + + +class TestWarnIfMissingRemoteTaskLog: + @pytest.fixture(autouse=True) + def _reset_active_logging_config(self, monkeypatch): + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + + def test_warns_when_user_module_missing_remote_task_log_and_remote_logging_enabled(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_called_once() + assert "my_pkg.custom_settings" in mock_log.warning.call_args.args + + def test_no_warning_when_using_fallback_path(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log(DEFAULT_LOGGING_CONFIG_PATH) + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_logging_disabled(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "False") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_task_log_is_set(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_not_called() + + def test_no_warning_when_empty_logging_class_path(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("") + mock_log.warning.assert_not_called() diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index fd495a3a33990..83e2727f3cc47 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -319,6 +319,9 @@ def __init__( from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log if get_remote_task_log() is None: + # stacklevel=1 keeps the warning attributed to this airflow.providers + # module so module-based deprecation filters still match; dictConfig + # is in stdlib and would otherwise hide the warning at stacklevel=2. warnings.warn( "Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler " "during dictConfig is deprecated and will be removed in a future " @@ -327,7 +330,7 @@ def __init__( "See the Elasticsearch provider logging documentation for the " "updated override example.", AirflowProviderDeprecationWarning, - stacklevel=2, + stacklevel=1, ) _ActiveLoggingConfig.set(self.io, None) elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined] @@ -339,7 +342,7 @@ def __init__( "See the Elasticsearch provider logging documentation for the " "updated override example.", AirflowProviderDeprecationWarning, - stacklevel=2, + stacklevel=1, ) alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined] diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py new file mode 100644 index 0000000000000..a3af9a9fe0562 --- /dev/null +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + + +@pytest.fixture(autouse=True) +def _no_implicit_remote_task_log_warning(monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``ElasticsearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + ``raising=False`` is required because ``_ActiveLoggingConfig.remote_task_log`` + is annotation-only at class scope and may not be initialized in isolated test + runs. + """ + from airflow.logging_config import _ActiveLoggingConfig + + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index a4aabd6cdc6e8..3ac143fa35bb0 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -168,18 +168,7 @@ def _use_historical_filename_templates(self): yield @pytest.fixture(autouse=True) - def _setup_handler(self, tmp_path, monkeypatch): - # Mirror the recommended user pattern (REMOTE_TASK_LOG defined at module - # scope in the logging_config_class module) so ElasticsearchTaskHandler.__init__ - # does not emit the deprecated implicit-registration warning during tests. - # ``remote_task_log`` is annotation-only on _ActiveLoggingConfig (no default - # at class scope), so raising=False is required for isolated runs where the - # attribute has not yet been initialized by load_logging_config(). - from airflow.logging_config import _ActiveLoggingConfig - - monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) - monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) - + def _setup_handler(self, tmp_path): self.local_log_location = str(tmp_path / "logs") self.end_of_log_mark = "end_of_log\n" self.write_stdout = False diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index 0ebdfcca8a2bf..728c9c809868d 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -382,6 +382,9 @@ def __init__( from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log if get_remote_task_log() is None: + # stacklevel=1 keeps the warning attributed to this airflow.providers + # module so module-based deprecation filters still match; dictConfig + # is in stdlib and would otherwise hide the warning at stacklevel=2. warnings.warn( "Implicit REMOTE_TASK_LOG registration by OpensearchTaskHandler during " "dictConfig is deprecated and will be removed in a future provider " @@ -390,7 +393,7 @@ def __init__( "the OpenSearch provider logging documentation for the updated " "override example.", AirflowProviderDeprecationWarning, - stacklevel=2, + stacklevel=1, ) _ActiveLoggingConfig.set(self.io, None) elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined] @@ -402,7 +405,7 @@ def __init__( "the OpenSearch provider logging documentation for the updated " "override example.", AirflowProviderDeprecationWarning, - stacklevel=2, + stacklevel=1, ) alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined] diff --git a/providers/opensearch/tests/unit/opensearch/log/conftest.py b/providers/opensearch/tests/unit/opensearch/log/conftest.py new file mode 100644 index 0000000000000..6ca13619b6b46 --- /dev/null +++ b/providers/opensearch/tests/unit/opensearch/log/conftest.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + + +@pytest.fixture(autouse=True) +def _no_implicit_remote_task_log_warning(monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``OpensearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + ``raising=False`` is required because ``_ActiveLoggingConfig.remote_task_log`` + is annotation-only at class scope and may not be initialized in isolated test + runs. + """ + from airflow.logging_config import _ActiveLoggingConfig + + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index 0a7f5106967b5..53fd285de6863 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -159,18 +159,7 @@ def _use_historical_filename_templates(self): yield @pytest.fixture(autouse=True) - def _setup_handler(self, tmp_path, monkeypatch): - # Mirror the recommended user pattern (REMOTE_TASK_LOG defined at module - # scope in the logging_config_class module) so OpensearchTaskHandler.__init__ - # does not emit the deprecated implicit-registration warning during tests. - # ``remote_task_log`` is annotation-only on _ActiveLoggingConfig (no default - # at class scope), so raising=False is required for isolated runs where the - # attribute has not yet been initialized by load_logging_config(). - from airflow.logging_config import _ActiveLoggingConfig - - monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) - monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) - + def _setup_handler(self, tmp_path): self.local_log_location = str(tmp_path / "logs") self.end_of_log_mark = "end_of_log\n" self.write_stdout = False diff --git a/shared/logging/src/airflow_shared/logging/remote.py b/shared/logging/src/airflow_shared/logging/remote.py index d4b0b533a1cb0..8ab30a8510303 100644 --- a/shared/logging/src/airflow_shared/logging/remote.py +++ b/shared/logging/src/airflow_shared/logging/remote.py @@ -78,8 +78,6 @@ def discover_remote_log_handler( logging_class_path: str, fallback_path: str, import_string: Callable[[str], Any], - *, - remote_logging_enabled: bool = False, ) -> tuple[RemoteLogIO | None, str | None]: """ Look up the optional remote-log handler alongside a logging dictConfig. @@ -94,13 +92,13 @@ def discover_remote_log_handler( * ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id for that backend. - Missing attributes are not fatal; they disable remote-log read-back. When - ``remote_logging_enabled=True`` and a user-defined module exposes no - ``REMOTE_TASK_LOG``, a single warning is emitted so the misconfiguration - is visible. + Either may be ``None`` immediately after this call; provider task handlers + can still populate ``REMOTE_TASK_LOG`` from inside ``__init__`` when + ``dictConfig`` instantiates them (deprecated path). Callers that want to + warn on missing remote-log configuration should re-check + ``_ActiveLoggingConfig.remote_task_log`` *after* ``dictConfig`` has run. """ # Sometimes we end up with `""` as the value! - user_defined = bool(logging_class_path) and logging_class_path != fallback_path logging_class_path = logging_class_path or fallback_path try: @@ -117,15 +115,6 @@ def discover_remote_log_handler( remote_task_log = getattr(mod, "REMOTE_TASK_LOG", None) default_remote_conn_id = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None) - if remote_task_log is None and user_defined and remote_logging_enabled: - log.warning( - "[logging] remote_logging is enabled but the user-defined logging module %r " - "does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is " - "disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope " - "to enable it.", - modpath, - ) - return remote_task_log, default_remote_conn_id except Exception as err: diff --git a/shared/logging/tests/logging/test_remote.py b/shared/logging/tests/logging/test_remote.py index 5bd36eb23c252..a8f1d7ab29091 100644 --- a/shared/logging/tests/logging/test_remote.py +++ b/shared/logging/tests/logging/test_remote.py @@ -134,84 +134,6 @@ def test_handles_module_loading_error(self): assert result_handler is None assert result_conn is None - def test_warns_when_user_module_missing_remote_task_log_with_remote_logging(self): - config = {"version": 1} - mock_module = mock.MagicMock(spec=[]) # no REMOTE_TASK_LOG, no DEFAULT_REMOTE_CONN_ID - - with ( - mock.patch("airflow_shared.logging.remote.import_module", return_value=mock_module), - mock.patch("airflow_shared.logging.remote.log") as mock_log, - ): - result_handler, result_conn = discover_remote_log_handler( - "my_pkg.custom_settings.LOGGING_CONFIG", - "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG", - lambda path: config, - remote_logging_enabled=True, - ) - - assert result_handler is None - assert result_conn is None - mock_log.warning.assert_called_once() - warning_args = mock_log.warning.call_args - # The module path (parent of the dotted path) should appear in the formatted message. - assert "my_pkg.custom_settings" in warning_args.args - - def test_no_warning_when_using_fallback_path(self): - config = {"version": 1} - mock_module = mock.MagicMock(spec=[]) # no REMOTE_TASK_LOG - - fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" - with ( - mock.patch("airflow_shared.logging.remote.import_module", return_value=mock_module), - mock.patch("airflow_shared.logging.remote.log") as mock_log, - ): - discover_remote_log_handler( - fallback, - fallback, - lambda path: config, - remote_logging_enabled=True, - ) - - mock_log.warning.assert_not_called() - - def test_no_warning_when_remote_logging_disabled(self): - config = {"version": 1} - mock_module = mock.MagicMock(spec=[]) # no REMOTE_TASK_LOG - - with ( - mock.patch("airflow_shared.logging.remote.import_module", return_value=mock_module), - mock.patch("airflow_shared.logging.remote.log") as mock_log, - ): - discover_remote_log_handler( - "my_pkg.custom_settings.LOGGING_CONFIG", - "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG", - lambda path: config, - remote_logging_enabled=False, - ) - - mock_log.warning.assert_not_called() - - def test_no_warning_when_remote_task_log_present(self): - handler = DummyRemoteLogIO() - config = {"version": 1} - mock_module = mock.MagicMock() - mock_module.REMOTE_TASK_LOG = handler - mock_module.DEFAULT_REMOTE_CONN_ID = None - - with ( - mock.patch("airflow_shared.logging.remote.import_module", return_value=mock_module), - mock.patch("airflow_shared.logging.remote.log") as mock_log, - ): - result_handler, _ = discover_remote_log_handler( - "my_pkg.custom_settings.LOGGING_CONFIG", - "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG", - lambda path: config, - remote_logging_enabled=True, - ) - - assert result_handler is handler - mock_log.warning.assert_not_called() - class TestRemoteLogIOProtocol: def test_dummy_implements_protocol(self): diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index aa040a3e7cc29..a5d28361e0085 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -186,13 +186,8 @@ def _load_logging_config() -> None: logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) # Load remote logging configuration using shared discovery logic. - # Passing remote_logging_enabled lets the helper warn when a user-defined - # module forgot to expose REMOTE_TASK_LOG while remote logging is on. remote_task_log, default_remote_conn_id = discover_remote_log_handler( - logging_class_path, - fallback, - import_string, - remote_logging_enabled=conf.getboolean("logging", "remote_logging", fallback=False), + logging_class_path, fallback, import_string ) _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id) From dfb504963d4700682c381c404c9e33e28a308ed0 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 15 May 2026 21:27:31 +0800 Subject: [PATCH 5/7] Add ES docs and move the confest into test modules --- .../elasticsearch/docs/logging/index.rst | 87 +++++++++++++++++++ .../tests/unit/elasticsearch/log/conftest.py | 40 --------- .../elasticsearch/log/test_es_task_handler.py | 20 +++++ .../tests/unit/opensearch/log/conftest.py | 40 --------- .../opensearch/log/test_os_task_handler.py | 20 +++++ 5 files changed, 127 insertions(+), 80 deletions(-) delete mode 100644 providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py delete mode 100644 providers/opensearch/tests/unit/opensearch/log/conftest.py diff --git a/providers/elasticsearch/docs/logging/index.rst b/providers/elasticsearch/docs/logging/index.rst index 3254bf5406ccf..de23b3b72b7bf 100644 --- a/providers/elasticsearch/docs/logging/index.rst +++ b/providers/elasticsearch/docs/logging/index.rst @@ -64,6 +64,93 @@ To output task logs to ElasticSearch, the following config could be used: (set ` write_to_es = True target_index = [name of the index to store logs] +.. _elasticsearch-airflow-3-0-to-3-1-local-settings: + +Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.x +'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +This section is **only about reading task logs back into the Airflow UI**. Tasks running +on workers will write logs as usual (to local files, stdout, or — with appropriate log +shipping — to Elasticsearch) regardless of the override below. Without the override on +Airflow 3.0.0 – 3.1.x, logs reach Elasticsearch fine but the **UI cannot render them** +because no handler is registered to fetch them back. + +The wiring that registers ``ElasticsearchTaskHandler`` inside the stock +``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) only landed +in Airflow **3.2.0**. On Airflow **3.0.0 – 3.1.x** installing the provider is not enough: +to make the UI's log viewer fetch logs from Elasticsearch you must ship a custom logging +config that swaps the ``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. + +Create a module on the Python path — for example ``config/airflow_local_settings.py`` — +and point Airflow at it via ``[logging] logging_config_class``: + +.. code-block:: python + + from airflow.config_templates.airflow_local_settings import ( + BASE_LOG_FOLDER, + DEFAULT_LOGGING_CONFIG, + ) + from airflow.providers.common.compat.sdk import conf + from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO + + ELASTICSEARCH_HOST = conf.get("elasticsearch", "host", fallback=None) + + REMOTE_TASK_LOG = None + DEFAULT_REMOTE_CONN_ID = None + + if ELASTICSEARCH_HOST: + DEFAULT_LOGGING_CONFIG["handlers"]["task"] = { + "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(BASE_LOG_FOLDER), + "end_of_log_mark": conf.get("elasticsearch", "end_of_log_mark", fallback="end_of_log"), + "host": ELASTICSEARCH_HOST, + "frontend": conf.get("elasticsearch", "frontend", fallback=""), + "write_stdout": conf.getboolean("elasticsearch", "write_stdout"), + "write_to_es": conf.getboolean("elasticsearch", "write_to_es", fallback=False), + "json_format": conf.getboolean("elasticsearch", "json_format"), + "json_fields": conf.get("elasticsearch", "json_fields"), + "host_field": conf.get("elasticsearch", "host_field", fallback="host"), + "offset_field": conf.get("elasticsearch", "offset_field", fallback="offset"), + } + REMOTE_TASK_LOG = ElasticsearchRemoteLogIO( + host=ELASTICSEARCH_HOST, + target_index=conf.get("elasticsearch", "target_index", fallback="airflow-logs"), + write_stdout=conf.getboolean("elasticsearch", "write_stdout"), + write_to_es=conf.getboolean("elasticsearch", "write_to_es", fallback=False), + offset_field=conf.get("elasticsearch", "offset_field", fallback="offset"), + host_field=conf.get("elasticsearch", "host_field", fallback="host"), + base_log_folder=str(BASE_LOG_FOLDER), + delete_local_copy=conf.getboolean("logging", "delete_local_logs"), + json_format=conf.getboolean("elasticsearch", "json_format"), + log_id_template=conf.get( + "elasticsearch", + "log_id_template", + fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", + ), + ) + +Then, in ``airflow.cfg``: + +.. code-block:: ini + + [logging] + remote_logging = True + logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG + +.. note:: + + Earlier versions of this guide relied on ``ElasticsearchTaskHandler`` self-registering + ``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` instantiated it. + That implicit registration is now deprecated (``AirflowProviderDeprecationWarning``) + and will be removed in a future provider release; define ``REMOTE_TASK_LOG`` at + module scope as shown above. See :ref:`write-logs-advanced` for the full + ``logging_config_class`` contract. + +On Airflow **3.2.0+** this override is unnecessary — the stock ``airflow_local_settings.py`` +already contains an ``elif ELASTICSEARCH_HOST:`` branch, so configuring the ``[elasticsearch]`` +section in ``airflow.cfg`` is sufficient. + .. _write-logs-elasticsearch-tls: Writing logs to Elasticsearch over TLS diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py deleted file mode 100644 index a3af9a9fe0562..0000000000000 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - - -@pytest.fixture(autouse=True) -def _no_implicit_remote_task_log_warning(monkeypatch): - """ - Suppress the deprecated implicit-registration warning during direct handler construction. - - Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module - scope in the ``[logging] logging_config_class`` module, so - ``ElasticsearchTaskHandler.__init__`` does not emit - ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin - treats as a test failure). - - ``raising=False`` is required because ``_ActiveLoggingConfig.remote_task_log`` - is annotation-only at class scope and may not be initialized in isolated test - runs. - """ - from airflow.logging_config import _ActiveLoggingConfig - - monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) - monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index 3ac143fa35bb0..34de97d6d44c6 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -167,6 +167,26 @@ def _use_historical_filename_templates(self): with conf_vars({("core", "use_historical_filename_templates"): "True"}): yield + @pytest.fixture(autouse=True) + def _no_implicit_remote_task_log_warning(self, monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``ElasticsearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + ``raising=False`` is required because ``_ActiveLoggingConfig.remote_task_log`` + is annotation-only at class scope and may not be initialized in isolated test + runs. + """ + from airflow.logging_config import _ActiveLoggingConfig + + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + @pytest.fixture(autouse=True) def _setup_handler(self, tmp_path): self.local_log_location = str(tmp_path / "logs") diff --git a/providers/opensearch/tests/unit/opensearch/log/conftest.py b/providers/opensearch/tests/unit/opensearch/log/conftest.py deleted file mode 100644 index 6ca13619b6b46..0000000000000 --- a/providers/opensearch/tests/unit/opensearch/log/conftest.py +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - - -@pytest.fixture(autouse=True) -def _no_implicit_remote_task_log_warning(monkeypatch): - """ - Suppress the deprecated implicit-registration warning during direct handler construction. - - Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module - scope in the ``[logging] logging_config_class`` module, so - ``OpensearchTaskHandler.__init__`` does not emit - ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin - treats as a test failure). - - ``raising=False`` is required because ``_ActiveLoggingConfig.remote_task_log`` - is annotation-only at class scope and may not be initialized in isolated test - runs. - """ - from airflow.logging_config import _ActiveLoggingConfig - - monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) - monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index 53fd285de6863..5a19b708db5ea 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -158,6 +158,26 @@ def _use_historical_filename_templates(self): with conf_vars({("core", "use_historical_filename_templates"): "True"}): yield + @pytest.fixture(autouse=True) + def _no_implicit_remote_task_log_warning(self, monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``OpensearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + ``raising=False`` is required because ``_ActiveLoggingConfig.remote_task_log`` + is annotation-only at class scope and may not be initialized in isolated test + runs. + """ + from airflow.logging_config import _ActiveLoggingConfig + + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + @pytest.fixture(autouse=True) def _setup_handler(self, tmp_path): self.local_log_location = str(tmp_path / "logs") From 43f228c7ab26f8f659f1775ac0c34a4efa2e232a Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 15 May 2026 21:46:53 +0800 Subject: [PATCH 6/7] Clarify the ES provider matrix in Docs --- .../elasticsearch/docs/logging/index.rst | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/providers/elasticsearch/docs/logging/index.rst b/providers/elasticsearch/docs/logging/index.rst index de23b3b72b7bf..670c958843a07 100644 --- a/providers/elasticsearch/docs/logging/index.rst +++ b/providers/elasticsearch/docs/logging/index.rst @@ -66,20 +66,26 @@ To output task logs to ElasticSearch, the following config could be used: (set ` .. _elasticsearch-airflow-3-0-to-3-1-local-settings: -Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.x +Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.7 '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' This section is **only about reading task logs back into the Airflow UI**. Tasks running on workers will write logs as usual (to local files, stdout, or — with appropriate log shipping — to Elasticsearch) regardless of the override below. Without the override on -Airflow 3.0.0 – 3.1.x, logs reach Elasticsearch fine but the **UI cannot render them** +Airflow 3.0.0 – 3.1.7, logs reach Elasticsearch fine but the **UI cannot render them** because no handler is registered to fetch them back. The wiring that registers ``ElasticsearchTaskHandler`` inside the stock -``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) only landed -in Airflow **3.2.0**. On Airflow **3.0.0 – 3.1.x** installing the provider is not enough: -to make the UI's log viewer fetch logs from Elasticsearch you must ship a custom logging -config that swaps the ``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. +``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) landed in +Airflow **3.2.0** (`apache/airflow#62121 +`_) and was backported to Airflow **3.1.8** +(`apache/airflow#62940 `_). On Airflow +**3.0.0 – 3.1.7** installing the provider is not enough: to make the UI's log viewer +fetch logs from Elasticsearch you must ship a custom logging config that swaps the +``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. The override requires +``apache-airflow-providers-elasticsearch`` **6.5.0+** (`apache/airflow#53821 +`_), which is where +``ElasticsearchRemoteLogIO`` was introduced. Create a module on the Python path — for example ``config/airflow_local_settings.py`` — and point Airflow at it via ``[logging] logging_config_class``: @@ -147,9 +153,9 @@ Then, in ``airflow.cfg``: module scope as shown above. See :ref:`write-logs-advanced` for the full ``logging_config_class`` contract. -On Airflow **3.2.0+** this override is unnecessary — the stock ``airflow_local_settings.py`` -already contains an ``elif ELASTICSEARCH_HOST:`` branch, so configuring the ``[elasticsearch]`` -section in ``airflow.cfg`` is sufficient. +On Airflow **3.1.8+** or **3.2.0+** this override is unnecessary — the stock +``airflow_local_settings.py`` already contains an ``elif ELASTICSEARCH_HOST:`` branch, so +configuring the ``[elasticsearch]`` section in ``airflow.cfg`` is sufficient. .. _write-logs-elasticsearch-tls: From ef217fc75a02c9feb9d1c6f5559097a531dd422e Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sat, 16 May 2026 11:29:08 +0800 Subject: [PATCH 7/7] fixup: Fix ES and OS compat tests --- .../tests/unit/elasticsearch/log/conftest.py | 52 +++++++++++++++++++ .../elasticsearch/log/test_es_task_handler.py | 20 ------- .../tests/unit/opensearch/log/conftest.py | 52 +++++++++++++++++++ .../opensearch/log/test_os_task_handler.py | 20 ------- 4 files changed, 104 insertions(+), 40 deletions(-) create mode 100644 providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py create mode 100644 providers/opensearch/tests/unit/opensearch/log/conftest.py diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py new file mode 100644 index 0000000000000..a1746818eed6e --- /dev/null +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS + + +@pytest.fixture(autouse=True) +def _no_implicit_remote_task_log_warning(monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``ElasticsearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + On Airflow 2.x the handler does not run the registration branch at all, so the + fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist). + On Airflow 3.0/3.1 the handler reads ``airflow.logging_config.REMOTE_TASK_LOG``. + On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``. + """ + if not AIRFLOW_V_3_0_PLUS: + return + if AIRFLOW_V_3_2_PLUS: + from airflow.logging_config import _ActiveLoggingConfig + + # raising=False is required because remote_task_log is annotation-only at + # class scope and may not be initialized in isolated test runs. + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + else: + import airflow.logging_config + + monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", object(), raising=False) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index 34de97d6d44c6..3ac143fa35bb0 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -167,26 +167,6 @@ def _use_historical_filename_templates(self): with conf_vars({("core", "use_historical_filename_templates"): "True"}): yield - @pytest.fixture(autouse=True) - def _no_implicit_remote_task_log_warning(self, monkeypatch): - """ - Suppress the deprecated implicit-registration warning during direct handler construction. - - Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module - scope in the ``[logging] logging_config_class`` module, so - ``ElasticsearchTaskHandler.__init__`` does not emit - ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin - treats as a test failure). - - ``raising=False`` is required because ``_ActiveLoggingConfig.remote_task_log`` - is annotation-only at class scope and may not be initialized in isolated test - runs. - """ - from airflow.logging_config import _ActiveLoggingConfig - - monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) - monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) - @pytest.fixture(autouse=True) def _setup_handler(self, tmp_path): self.local_log_location = str(tmp_path / "logs") diff --git a/providers/opensearch/tests/unit/opensearch/log/conftest.py b/providers/opensearch/tests/unit/opensearch/log/conftest.py new file mode 100644 index 0000000000000..5f68f22355e68 --- /dev/null +++ b/providers/opensearch/tests/unit/opensearch/log/conftest.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS + + +@pytest.fixture(autouse=True) +def _no_implicit_remote_task_log_warning(monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``OpensearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + On Airflow 2.x the handler does not run the registration branch at all, so the + fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist). + On Airflow 3.0/3.1 the handler reads ``airflow.logging_config.REMOTE_TASK_LOG``. + On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``. + """ + if not AIRFLOW_V_3_0_PLUS: + return + if AIRFLOW_V_3_2_PLUS: + from airflow.logging_config import _ActiveLoggingConfig + + # raising=False is required because remote_task_log is annotation-only at + # class scope and may not be initialized in isolated test runs. + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + else: + import airflow.logging_config + + monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", object(), raising=False) diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index 5a19b708db5ea..53fd285de6863 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -158,26 +158,6 @@ def _use_historical_filename_templates(self): with conf_vars({("core", "use_historical_filename_templates"): "True"}): yield - @pytest.fixture(autouse=True) - def _no_implicit_remote_task_log_warning(self, monkeypatch): - """ - Suppress the deprecated implicit-registration warning during direct handler construction. - - Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module - scope in the ``[logging] logging_config_class`` module, so - ``OpensearchTaskHandler.__init__`` does not emit - ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin - treats as a test failure). - - ``raising=False`` is required because ``_ActiveLoggingConfig.remote_task_log`` - is annotation-only at class scope and may not be initialized in isolated test - runs. - """ - from airflow.logging_config import _ActiveLoggingConfig - - monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) - monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) - @pytest.fixture(autouse=True) def _setup_handler(self, tmp_path): self.local_log_location = str(tmp_path / "logs")