From f68dac3d4f1e3801e84c6dcf645e5adfa416c629 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 18 May 2026 18:42:33 +0800 Subject: [PATCH] Clarify ``logging_config_class`` contract and document REMOTE_TASK_LOG ``[logging] logging_config_class`` is documented as a "Logging class" but actually resolves to a ``logging.config.dictConfig`` dict, and the ``REMOTE_TASK_LOG`` / ``DEFAULT_REMOTE_CONN_ID`` side channel that powers remote log read-back was undocumented. Custom configs silently lost UI log read-back as a result. - Document the real contract for ``logging_config_class`` (dict, not class) and the ``REMOTE_TASK_LOG`` / ``DEFAULT_REMOTE_CONN_ID`` module-level attributes in the config option help, ``advanced-logging-configuration.rst``, and the ``discover_remote_log_handler`` docstring. - Add a startup ``WARNING`` when ``remote_logging`` is on but the user's logging module is missing ``REMOTE_TASK_LOG``, emitted from ``configure_logging`` after ``dictConfig`` runs so it sees the final state. --- .../advanced-logging-configuration.rst | 61 +++++++++++++++-- .../airflow_local_settings.py | 9 ++- .../src/airflow/config_templates/config.yml | 14 ++-- airflow-core/src/airflow/logging_config.py | 57 ++++++++++++++-- .../tests/unit/logging/test_logging_config.py | 66 +++++++++++++++++++ .../src/airflow_shared/logging/remote.py | 20 +++++- task-sdk/src/airflow/sdk/log.py | 9 ++- 7 files changed, 213 insertions(+), 23 deletions(-) create mode 100644 airflow-core/tests/unit/logging/test_logging_config.py diff --git a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst index fa34e74f3140b..781662e678010 100644 --- a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst +++ b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst @@ -46,13 +46,14 @@ that Python objects log to loggers that follow naming convention of ``. You can read more about standard python logging classes (Loggers, Handlers, Formatters) in the `Python logging documentation `_. -Create a custom logging class ------------------------------ +Create a custom logging config +------------------------------ Configuring your logging classes can be done via the ``logging_config_class`` option in ``airflow.cfg`` file. -This configuration should specify the import path to a configuration compatible with -:func:`logging.config.dictConfig`. If your file is a standard import location, then you should set a -:envvar:`PYTHONPATH` environment variable. +Despite the option name the value is a dotted import path to a ``dict`` that satisfies +:func:`logging.config.dictConfig` — not a Python class. The ``_class`` suffix is +historical and kept for backwards compatibility. If your file is a standard import +location, then you should set a :envvar:`PYTHONPATH` environment variable. Follow the steps below to enable custom logging config class: @@ -102,6 +103,56 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo You can override the way both standard logs of the components and "task" logs are handled. +Custom logging configs and remote logging +----------------------------------------- + +When ``[logging] remote_logging = True`` and you point ``logging_config_class`` +at your own module, define two module-level attributes alongside your +``LOGGING_CONFIG`` dict: + +* ``REMOTE_TASK_LOG`` — an instance of + :class:`~airflow.logging.remote.RemoteLogIO` (or + :class:`~airflow.logging.remote.RemoteLogStreamIO`) that uploads task logs + and reads them back for the UI. +* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id used when + ``[logging] remote_log_conn_id`` is unset. + +If ``REMOTE_TASK_LOG`` is missing, Airflow emits one ``WARNING`` at startup +and the UI cannot read task logs back from the remote backend. + + .. code-block:: python + + # ~/airflow/config/log_config.py + from airflow.logging.remote import RemoteLogIO + + + class MyRemoteLogIO: + @property + def processors(self): + return () + + def upload(self, path, ti): ... # upload local log file at ``path`` to your backend + + def read(self, relative_path, ti): ... # return (source_info, log_messages) for the UI + + + REMOTE_TASK_LOG: RemoteLogIO | None = MyRemoteLogIO() + DEFAULT_REMOTE_CONN_ID: str | None = "my_remote_conn" + + LOGGING_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + # ... your formatters / handlers / loggers / root ... + } + +.. note:: + + ``airflow.config_templates.airflow_local_settings`` (and its + ``DEFAULT_LOGGING_CONFIG``) is planned for deprecation. Build + ``LOGGING_CONFIG`` directly rather than deep-copying from it, and define + ``REMOTE_TASK_LOG`` yourself rather than re-exporting from that module. + + Custom logger for Operators, Hooks and Tasks -------------------------------------------- diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 7072d2dd6ea88..4ad479a8c914c 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -49,7 +49,9 @@ BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")) -# This isn't used anymore, but kept for compat of people who might have imported it +# Default value for the ``[logging] logging_config_class`` option. Plain +# ``logging.config.dictConfig`` dict; the ``_class`` suffix on the config option +# is historical. DEFAULT_LOGGING_CONFIG: dict[str, Any] = { "version": 1, "disable_existing_loggers": False, @@ -121,6 +123,11 @@ ################## REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") + +# Side-channel attributes read by ``discover_remote_log_handler`` from whichever +# module ``[logging] logging_config_class`` resolves through. Custom modules that +# override that option should define both at module scope to enable remote +# task-log read-back. REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None DEFAULT_REMOTE_CONN_ID: str | None = None diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 57ddd14a6cb2c..17761527b23a6 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -261,10 +261,7 @@ core: allowed_deserialization_classes_regexp: description: | Space-separated list of classes that may be imported during deserialization. Items are processed - as regex expressions and matched against the full classname (``re.fullmatch`` semantics), so a - pattern such as ``airflow\.models\.Variable`` does not also admit ``airflow.models.VariableXYZ``. - Use ``.*`` (e.g. ``airflow\.models\..*``) to allow a prefix and any suffix. Python built-in - classes (like dict) are always allowed. + as regex expressions. Python built-in classes (like dict) are always allowed. This is a secondary option to ``[core] allowed_deserialization_classes``. version_added: 2.8.2 type: string @@ -866,9 +863,12 @@ logging: default: "INFO" logging_config_class: description: | - Logging class - Specify the class that will specify the logging configuration - This class has to be on the python classpath + Dotted import path to a ``logging.config.dictConfig`` dict. The + ``_class`` suffix is historical — the target is a dict, not a class. + + Airflow also reads two optional module-level attributes from the + enclosing module: ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID``, + used to drive remote task-log read-back. See :ref:`write-logs-advanced`. version_added: 2.0.0 type: string example: "my.path.default_local_settings.LOGGING_CONFIG" diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index 0da017d9b0826..e9a41814fbe6e 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -31,6 +31,9 @@ log = logging.getLogger(__name__) +# Default ``[logging] logging_config_class`` value when the user has not overridden it. +DEFAULT_LOGGING_CONFIG_PATH = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" + class _ActiveLoggingConfig: """Private class to hold active logging config variables.""" @@ -60,14 +63,20 @@ def get_default_remote_conn_id() -> str | None: def load_logging_config() -> tuple[dict[str, Any], str]: - """Configure & Validate Airflow Logging.""" - fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" - logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) + """ + Resolve ``[logging] logging_config_class`` and the remote-log side channel. + + The option is a dotted path to a ``logging.config.dictConfig`` dict. After + importing the dict, the enclosing module is probed for optional + ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID`` attributes via + :func:`airflow._shared.logging.remote.discover_remote_log_handler`. + """ + logging_class_path = conf.get("logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH) # Sometimes we end up with `""` as the value! - logging_class_path = logging_class_path or fallback + logging_class_path = logging_class_path or DEFAULT_LOGGING_CONFIG_PATH - user_defined = logging_class_path != fallback + user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH try: logging_config = import_string(logging_class_path) @@ -86,15 +95,45 @@ def load_logging_config() -> tuple[dict[str, Any], str]: f"to: {type(err).__name__}:{err}" ) else: - # Load remote logging configuration using shared discovery logic + # Load remote logging configuration using shared discovery logic. remote_task_log, default_remote_conn_id = discover_remote_log_handler( - logging_class_path, fallback, import_string + logging_class_path, DEFAULT_LOGGING_CONFIG_PATH, import_string ) _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id) return logging_config, logging_class_path +def _warn_if_missing_remote_task_log(logging_class_path: str) -> None: + """ + Warn if ``[logging] remote_logging`` is on but the user module exposes no remote IO. + + Runs *after* ``dictConfig`` has constructed handlers, so deprecated + self-registration in provider task handlers (Elasticsearch, OpenSearch) has + already had its chance to populate ``_ActiveLoggingConfig.remote_task_log``. + Only fires for user-defined ``logging_config_class`` values; the stock + fallback is exempt. + """ + user_defined = bool(logging_class_path) and logging_class_path != DEFAULT_LOGGING_CONFIG_PATH + remote_logging_enabled = conf.getboolean("logging", "remote_logging", fallback=False) + if not (user_defined and remote_logging_enabled): + return + if _ActiveLoggingConfig.remote_task_log is not None: + return + # Strip the trailing ``.`` to leave the enclosing module path. + # ``logging_class_path`` should always be dotted since ``import_string`` + # would have raised otherwise, but guard the access defensively. + parts = logging_class_path.rsplit(".", 1) + modpath = parts[0] if len(parts) == 2 else logging_class_path + log.warning( + "[logging] remote_logging is enabled but the user-defined logging module %r " + "does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is " + "disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope " + "to enable it.", + modpath, + ) + + def configure_logging(): from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values @@ -146,6 +185,10 @@ def configure_logging(): # otherwise Airflow would silently fall back on the default config raise e + # Runs after dictConfig so deprecated handler self-registration (ES/OS) has + # had its chance to populate _ActiveLoggingConfig.remote_task_log. + _warn_if_missing_remote_task_log(logging_class_path) + validate_logging_config() new_folder_permissions = int( diff --git a/airflow-core/tests/unit/logging/test_logging_config.py b/airflow-core/tests/unit/logging/test_logging_config.py new file mode 100644 index 0000000000000..96b7da633651b --- /dev/null +++ b/airflow-core/tests/unit/logging/test_logging_config.py @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.logging_config import ( + DEFAULT_LOGGING_CONFIG_PATH, + _ActiveLoggingConfig, + _warn_if_missing_remote_task_log, +) + + +class TestWarnIfMissingRemoteTaskLog: + @pytest.fixture(autouse=True) + def _reset_active_logging_config(self, monkeypatch): + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + + def test_warns_when_user_module_missing_remote_task_log_and_remote_logging_enabled(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_called_once() + assert "my_pkg.custom_settings" in mock_log.warning.call_args.args + + def test_no_warning_when_using_fallback_path(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log(DEFAULT_LOGGING_CONFIG_PATH) + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_logging_disabled(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "False") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_task_log_is_set(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_not_called() + + def test_no_warning_when_empty_logging_class_path(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("") + mock_log.warning.assert_not_called() diff --git a/shared/logging/src/airflow_shared/logging/remote.py b/shared/logging/src/airflow_shared/logging/remote.py index d8d76ef6a078c..8ab30a8510303 100644 --- a/shared/logging/src/airflow_shared/logging/remote.py +++ b/shared/logging/src/airflow_shared/logging/remote.py @@ -79,7 +79,25 @@ def discover_remote_log_handler( fallback_path: str, import_string: Callable[[str], Any], ) -> tuple[RemoteLogIO | None, str | None]: - """Discover and load the remote log handler from a logging config module.""" + """ + Look up the optional remote-log handler alongside a logging dictConfig. + + ``[logging] logging_config_class`` is a dotted path to a + ``logging.config.dictConfig`` dict. After importing the dict, this helper + re-imports the enclosing module and reads two optional module-level + attributes via ``getattr``: + + * ``REMOTE_TASK_LOG`` — :class:`RemoteLogIO` / :class:`RemoteLogStreamIO` + instance that uploads and reads task logs. + * ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id for that + backend. + + Either may be ``None`` immediately after this call; provider task handlers + can still populate ``REMOTE_TASK_LOG`` from inside ``__init__`` when + ``dictConfig`` instantiates them (deprecated path). Callers that want to + warn on missing remote-log configuration should re-check + ``_ActiveLoggingConfig.remote_task_log`` *after* ``dictConfig`` has run. + """ # Sometimes we end up with `""` as the value! logging_class_path = logging_class_path or fallback_path diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index f93abe1a72240..a5d28361e0085 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -172,7 +172,12 @@ def init_log_file(local_relative_path: str) -> Path: def _load_logging_config() -> None: - """Load and cache the remote logging configuration from SDK config.""" + """ + Load and cache the remote logging configuration from SDK config. + + SDK mirror of :func:`airflow.logging_config.load_logging_config` — see that + function for the ``logging_config_class`` / ``REMOTE_TASK_LOG`` contract. + """ from airflow.sdk._shared.logging.remote import discover_remote_log_handler from airflow.sdk._shared.module_loading import import_string from airflow.sdk.configuration import conf @@ -180,7 +185,7 @@ def _load_logging_config() -> None: fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) - # Load remote logging configuration using shared discovery logic + # Load remote logging configuration using shared discovery logic. remote_task_log, default_remote_conn_id = discover_remote_log_handler( logging_class_path, fallback, import_string )