diff --git a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst index fa34e74f3140b..781662e678010 100644 --- a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst +++ b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst @@ -46,13 +46,14 @@ that Python objects log to loggers that follow naming convention of ``. You can read more about standard python logging classes (Loggers, Handlers, Formatters) in the `Python logging documentation `_. -Create a custom logging class ------------------------------ +Create a custom logging config +------------------------------ Configuring your logging classes can be done via the ``logging_config_class`` option in ``airflow.cfg`` file. -This configuration should specify the import path to a configuration compatible with -:func:`logging.config.dictConfig`. If your file is a standard import location, then you should set a -:envvar:`PYTHONPATH` environment variable. +Despite the option name the value is a dotted import path to a ``dict`` that satisfies +:func:`logging.config.dictConfig` — not a Python class. The ``_class`` suffix is +historical and kept for backwards compatibility. If your file is a standard import +location, then you should set a :envvar:`PYTHONPATH` environment variable. Follow the steps below to enable custom logging config class: @@ -102,6 +103,56 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo You can override the way both standard logs of the components and "task" logs are handled. +Custom logging configs and remote logging +----------------------------------------- + +When ``[logging] remote_logging = True`` and you point ``logging_config_class`` +at your own module, define two module-level attributes alongside your +``LOGGING_CONFIG`` dict: + +* ``REMOTE_TASK_LOG`` — an instance of + :class:`~airflow.logging.remote.RemoteLogIO` (or + :class:`~airflow.logging.remote.RemoteLogStreamIO`) that uploads task logs + and reads them back for the UI. +* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id used when + ``[logging] remote_log_conn_id`` is unset. + +If ``REMOTE_TASK_LOG`` is missing, Airflow emits one ``WARNING`` at startup +and the UI cannot read task logs back from the remote backend. + + .. code-block:: python + + # ~/airflow/config/log_config.py + from airflow.logging.remote import RemoteLogIO + + + class MyRemoteLogIO: + @property + def processors(self): + return () + + def upload(self, path, ti): ... # upload local log file at ``path`` to your backend + + def read(self, relative_path, ti): ... # return (source_info, log_messages) for the UI + + + REMOTE_TASK_LOG: RemoteLogIO | None = MyRemoteLogIO() + DEFAULT_REMOTE_CONN_ID: str | None = "my_remote_conn" + + LOGGING_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + # ... your formatters / handlers / loggers / root ... + } + +.. note:: + + ``airflow.config_templates.airflow_local_settings`` (and its + ``DEFAULT_LOGGING_CONFIG``) is planned for deprecation. Build + ``LOGGING_CONFIG`` directly rather than deep-copying from it, and define + ``REMOTE_TASK_LOG`` yourself rather than re-exporting from that module. + + Custom logger for Operators, Hooks and Tasks -------------------------------------------- diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 7072d2dd6ea88..4ad479a8c914c 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -49,7 +49,9 @@ BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")) -# This isn't used anymore, but kept for compat of people who might have imported it +# Default value for the ``[logging] logging_config_class`` option. Plain +# ``logging.config.dictConfig`` dict; the ``_class`` suffix on the config option +# is historical. DEFAULT_LOGGING_CONFIG: dict[str, Any] = { "version": 1, "disable_existing_loggers": False, @@ -121,6 +123,11 @@ ################## REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") + +# Side-channel attributes read by ``discover_remote_log_handler`` from whichever +# module ``[logging] logging_config_class`` resolves through. Custom modules that +# override that option should define both at module scope to enable remote +# task-log read-back. REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None DEFAULT_REMOTE_CONN_ID: str | None = None diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 03593ce4ba036..17761527b23a6 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -863,9 +863,12 @@ logging: default: "INFO" logging_config_class: description: | - Logging class - Specify the class that will specify the logging configuration - This class has to be on the python classpath + Dotted import path to a ``logging.config.dictConfig`` dict. The + ``_class`` suffix is historical — the target is a dict, not a class. + + Airflow also reads two optional module-level attributes from the + enclosing module: ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID``, + used to drive remote task-log read-back. See :ref:`write-logs-advanced`. version_added: 2.0.0 type: string example: "my.path.default_local_settings.LOGGING_CONFIG" diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index 0da017d9b0826..e9a41814fbe6e 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -31,6 +31,9 @@ log = logging.getLogger(__name__) +# Default ``[logging] logging_config_class`` value when the user has not overridden it. +DEFAULT_LOGGING_CONFIG_PATH = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" + class _ActiveLoggingConfig: """Private class to hold active logging config variables.""" @@ -60,14 +63,20 @@ def get_default_remote_conn_id() -> str | None: def load_logging_config() -> tuple[dict[str, Any], str]: - """Configure & Validate Airflow Logging.""" - fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" - logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) + """ + Resolve ``[logging] logging_config_class`` and the remote-log side channel. + + The option is a dotted path to a ``logging.config.dictConfig`` dict. After + importing the dict, the enclosing module is probed for optional + ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID`` attributes via + :func:`airflow._shared.logging.remote.discover_remote_log_handler`. + """ + logging_class_path = conf.get("logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH) # Sometimes we end up with `""` as the value! - logging_class_path = logging_class_path or fallback + logging_class_path = logging_class_path or DEFAULT_LOGGING_CONFIG_PATH - user_defined = logging_class_path != fallback + user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH try: logging_config = import_string(logging_class_path) @@ -86,15 +95,45 @@ def load_logging_config() -> tuple[dict[str, Any], str]: f"to: {type(err).__name__}:{err}" ) else: - # Load remote logging configuration using shared discovery logic + # Load remote logging configuration using shared discovery logic. remote_task_log, default_remote_conn_id = discover_remote_log_handler( - logging_class_path, fallback, import_string + logging_class_path, DEFAULT_LOGGING_CONFIG_PATH, import_string ) _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id) return logging_config, logging_class_path +def _warn_if_missing_remote_task_log(logging_class_path: str) -> None: + """ + Warn if ``[logging] remote_logging`` is on but the user module exposes no remote IO. + + Runs *after* ``dictConfig`` has constructed handlers, so deprecated + self-registration in provider task handlers (Elasticsearch, OpenSearch) has + already had its chance to populate ``_ActiveLoggingConfig.remote_task_log``. + Only fires for user-defined ``logging_config_class`` values; the stock + fallback is exempt. + """ + user_defined = bool(logging_class_path) and logging_class_path != DEFAULT_LOGGING_CONFIG_PATH + remote_logging_enabled = conf.getboolean("logging", "remote_logging", fallback=False) + if not (user_defined and remote_logging_enabled): + return + if _ActiveLoggingConfig.remote_task_log is not None: + return + # Strip the trailing ``.`` to leave the enclosing module path. + # ``logging_class_path`` should always be dotted since ``import_string`` + # would have raised otherwise, but guard the access defensively. + parts = logging_class_path.rsplit(".", 1) + modpath = parts[0] if len(parts) == 2 else logging_class_path + log.warning( + "[logging] remote_logging is enabled but the user-defined logging module %r " + "does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is " + "disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope " + "to enable it.", + modpath, + ) + + def configure_logging(): from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values @@ -146,6 +185,10 @@ def configure_logging(): # otherwise Airflow would silently fall back on the default config raise e + # Runs after dictConfig so deprecated handler self-registration (ES/OS) has + # had its chance to populate _ActiveLoggingConfig.remote_task_log. + _warn_if_missing_remote_task_log(logging_class_path) + validate_logging_config() new_folder_permissions = int( diff --git a/airflow-core/tests/unit/logging/test_logging_config.py b/airflow-core/tests/unit/logging/test_logging_config.py new file mode 100644 index 0000000000000..96b7da633651b --- /dev/null +++ b/airflow-core/tests/unit/logging/test_logging_config.py @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.logging_config import ( + DEFAULT_LOGGING_CONFIG_PATH, + _ActiveLoggingConfig, + _warn_if_missing_remote_task_log, +) + + +class TestWarnIfMissingRemoteTaskLog: + @pytest.fixture(autouse=True) + def _reset_active_logging_config(self, monkeypatch): + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + + def test_warns_when_user_module_missing_remote_task_log_and_remote_logging_enabled(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_called_once() + assert "my_pkg.custom_settings" in mock_log.warning.call_args.args + + def test_no_warning_when_using_fallback_path(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log(DEFAULT_LOGGING_CONFIG_PATH) + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_logging_disabled(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "False") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_task_log_is_set(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_not_called() + + def test_no_warning_when_empty_logging_class_path(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("") + mock_log.warning.assert_not_called() diff --git a/providers/elasticsearch/docs/changelog.rst b/providers/elasticsearch/docs/changelog.rst index 2209b6ff95b1d..103293b13e3b7 100644 --- a/providers/elasticsearch/docs/changelog.rst +++ b/providers/elasticsearch/docs/changelog.rst @@ -27,6 +27,13 @@ Changelog --------- +``ElasticsearchTaskHandler`` no longer silently registers itself as the remote +task-log reader during ``dictConfig``. The implicit registration still happens +for one more release but now emits an ``AirflowProviderDeprecationWarning`` and +will be removed in a future provider release. If you ship a custom +``[logging] logging_config_class`` module that swaps in +``ElasticsearchTaskHandler``, set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` +at module scope in that module. 6.5.3 ..... diff --git a/providers/elasticsearch/docs/logging/index.rst b/providers/elasticsearch/docs/logging/index.rst index 3254bf5406ccf..670c958843a07 100644 --- a/providers/elasticsearch/docs/logging/index.rst +++ b/providers/elasticsearch/docs/logging/index.rst @@ -64,6 +64,99 @@ To output task logs to ElasticSearch, the following config could be used: (set ` write_to_es = True target_index = [name of the index to store logs] +.. _elasticsearch-airflow-3-0-to-3-1-local-settings: + +Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.7 +'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +This section is **only about reading task logs back into the Airflow UI**. Tasks running +on workers will write logs as usual (to local files, stdout, or — with appropriate log +shipping — to Elasticsearch) regardless of the override below. Without the override on +Airflow 3.0.0 – 3.1.7, logs reach Elasticsearch fine but the **UI cannot render them** +because no handler is registered to fetch them back. + +The wiring that registers ``ElasticsearchTaskHandler`` inside the stock +``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) landed in +Airflow **3.2.0** (`apache/airflow#62121 +`_) and was backported to Airflow **3.1.8** +(`apache/airflow#62940 `_). On Airflow +**3.0.0 – 3.1.7** installing the provider is not enough: to make the UI's log viewer +fetch logs from Elasticsearch you must ship a custom logging config that swaps the +``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. The override requires +``apache-airflow-providers-elasticsearch`` **6.5.0+** (`apache/airflow#53821 +`_), which is where +``ElasticsearchRemoteLogIO`` was introduced. + +Create a module on the Python path — for example ``config/airflow_local_settings.py`` — +and point Airflow at it via ``[logging] logging_config_class``: + +.. code-block:: python + + from airflow.config_templates.airflow_local_settings import ( + BASE_LOG_FOLDER, + DEFAULT_LOGGING_CONFIG, + ) + from airflow.providers.common.compat.sdk import conf + from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO + + ELASTICSEARCH_HOST = conf.get("elasticsearch", "host", fallback=None) + + REMOTE_TASK_LOG = None + DEFAULT_REMOTE_CONN_ID = None + + if ELASTICSEARCH_HOST: + DEFAULT_LOGGING_CONFIG["handlers"]["task"] = { + "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(BASE_LOG_FOLDER), + "end_of_log_mark": conf.get("elasticsearch", "end_of_log_mark", fallback="end_of_log"), + "host": ELASTICSEARCH_HOST, + "frontend": conf.get("elasticsearch", "frontend", fallback=""), + "write_stdout": conf.getboolean("elasticsearch", "write_stdout"), + "write_to_es": conf.getboolean("elasticsearch", "write_to_es", fallback=False), + "json_format": conf.getboolean("elasticsearch", "json_format"), + "json_fields": conf.get("elasticsearch", "json_fields"), + "host_field": conf.get("elasticsearch", "host_field", fallback="host"), + "offset_field": conf.get("elasticsearch", "offset_field", fallback="offset"), + } + REMOTE_TASK_LOG = ElasticsearchRemoteLogIO( + host=ELASTICSEARCH_HOST, + target_index=conf.get("elasticsearch", "target_index", fallback="airflow-logs"), + write_stdout=conf.getboolean("elasticsearch", "write_stdout"), + write_to_es=conf.getboolean("elasticsearch", "write_to_es", fallback=False), + offset_field=conf.get("elasticsearch", "offset_field", fallback="offset"), + host_field=conf.get("elasticsearch", "host_field", fallback="host"), + base_log_folder=str(BASE_LOG_FOLDER), + delete_local_copy=conf.getboolean("logging", "delete_local_logs"), + json_format=conf.getboolean("elasticsearch", "json_format"), + log_id_template=conf.get( + "elasticsearch", + "log_id_template", + fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", + ), + ) + +Then, in ``airflow.cfg``: + +.. code-block:: ini + + [logging] + remote_logging = True + logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG + +.. note:: + + Earlier versions of this guide relied on ``ElasticsearchTaskHandler`` self-registering + ``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` instantiated it. + That implicit registration is now deprecated (``AirflowProviderDeprecationWarning``) + and will be removed in a future provider release; define ``REMOTE_TASK_LOG`` at + module scope as shown above. See :ref:`write-logs-advanced` for the full + ``logging_config_class`` contract. + +On Airflow **3.1.8+** or **3.2.0+** this override is unnecessary — the stock +``airflow_local_settings.py`` already contains an ``elif ELASTICSEARCH_HOST:`` branch, so +configuring the ``[elasticsearch]`` section in ``airflow.cfg`` is sufficient. + .. _write-logs-elasticsearch-tls: Writing logs to Elasticsearch over TLS diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index eb531452df824..83e2727f3cc47 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -25,6 +25,7 @@ import shutil import sys import time +import warnings from collections import defaultdict from collections.abc import Callable, Iterable from operator import attrgetter @@ -41,6 +42,7 @@ from elasticsearch.exceptions import NotFoundError import airflow.logging_config as alc +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models.dagrun import DagRun from airflow.providers.common.compat.sdk import conf from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter @@ -317,8 +319,31 @@ def __init__( from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log if get_remote_task_log() is None: + # stacklevel=1 keeps the warning attributed to this airflow.providers + # module so module-based deprecation filters still match; dictConfig + # is in stdlib and would otherwise hide the warning at stacklevel=2. + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler " + "during dictConfig is deprecated and will be removed in a future " + "provider release. Set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` " + "at module scope in your ``[logging] logging_config_class`` module. " + "See the Elasticsearch provider logging documentation for the " + "updated override example.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) _ActiveLoggingConfig.set(self.io, None) elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined] + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler " + "during dictConfig is deprecated and will be removed in a future " + "provider release. Set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` " + "at module scope in your ``[logging] logging_config_class`` module. " + "See the Elasticsearch provider logging documentation for the " + "updated override example.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined] @staticmethod diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py new file mode 100644 index 0000000000000..a1746818eed6e --- /dev/null +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS + + +@pytest.fixture(autouse=True) +def _no_implicit_remote_task_log_warning(monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``ElasticsearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + On Airflow 2.x the handler does not run the registration branch at all, so the + fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist). + On Airflow 3.0/3.1 the handler reads ``airflow.logging_config.REMOTE_TASK_LOG``. + On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``. + """ + if not AIRFLOW_V_3_0_PLUS: + return + if AIRFLOW_V_3_2_PLUS: + from airflow.logging_config import _ActiveLoggingConfig + + # raising=False is required because remote_task_log is annotation-only at + # class scope and may not be initialized in isolated test runs. + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + else: + import airflow.logging_config + + monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", object(), raising=False) diff --git a/providers/opensearch/docs/changelog.rst b/providers/opensearch/docs/changelog.rst index 941ccd7e1a9c8..d7494f51cc122 100644 --- a/providers/opensearch/docs/changelog.rst +++ b/providers/opensearch/docs/changelog.rst @@ -27,6 +27,14 @@ Changelog --------- +``OpensearchTaskHandler`` no longer silently registers itself as the remote +task-log reader during ``dictConfig``. The implicit registration still happens +for one more release but now emits an ``AirflowProviderDeprecationWarning`` and +will be removed in a future provider release. If you ship a custom +``[logging] logging_config_class`` module that swaps in ``OpensearchTaskHandler``, +set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at module scope in that +module. See the OpenSearch provider's logging guide for the updated override +example. 1.9.1 ..... diff --git a/providers/opensearch/docs/logging/index.rst b/providers/opensearch/docs/logging/index.rst index 34f2dc359b9b0..43f849217110c 100644 --- a/providers/opensearch/docs/logging/index.rst +++ b/providers/opensearch/docs/logging/index.rst @@ -88,9 +88,7 @@ The wiring that registers ``OpensearchTaskHandler`` inside the stock ``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) only landed in Airflow **3.2.1**. On Airflow **3.0.0 – 3.2.0** installing the provider is not enough: to make the UI's log viewer fetch logs from OpenSearch you must ship a custom logging -config that swaps the ``task`` handler. The handler self-registers as the remote-log -reader on construction (via ``REMOTE_TASK_LOG`` on 3.0/3.1 and ``_ActiveLoggingConfig`` -on 3.2), so swapping the handler class is the only change required. +config that swaps the ``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. Create a module on the Python path — for example ``config/airflow_local_settings.py`` — and point Airflow at it via ``[logging] logging_config_class``: @@ -102,9 +100,13 @@ and point Airflow at it via ``[logging] logging_config_class``: DEFAULT_LOGGING_CONFIG, ) from airflow.providers.common.compat.sdk import conf + from airflow.providers.opensearch.log.os_task_handler import OpensearchRemoteLogIO OPENSEARCH_HOST = conf.get("opensearch", "host", fallback=None) + REMOTE_TASK_LOG = None + DEFAULT_REMOTE_CONN_ID = None + if OPENSEARCH_HOST: DEFAULT_LOGGING_CONFIG["handlers"]["task"] = { "class": "airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler", @@ -123,6 +125,25 @@ and point Airflow at it via ``[logging] logging_config_class``: "write_to_opensearch": conf.getboolean("opensearch", "write_to_os", fallback=False), "target_index": conf.get("opensearch", "target_index", fallback="airflow-logs"), } + REMOTE_TASK_LOG = OpensearchRemoteLogIO( + host=OPENSEARCH_HOST, + port=conf.getint("opensearch", "port", fallback=9200), + username=conf.get("opensearch", "username"), + password=conf.get("opensearch", "password"), + target_index=conf.get("opensearch", "target_index", fallback="airflow-logs"), + write_stdout=conf.getboolean("opensearch", "write_stdout"), + write_to_opensearch=conf.getboolean("opensearch", "write_to_os", fallback=False), + offset_field=conf.get("opensearch", "offset_field", fallback="offset"), + host_field=conf.get("opensearch", "host_field", fallback="host"), + base_log_folder=str(BASE_LOG_FOLDER), + delete_local_copy=conf.getboolean("logging", "delete_local_logs"), + json_format=conf.getboolean("opensearch", "json_format"), + log_id_template=conf.get( + "opensearch", + "log_id_template", + fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", + ), + ) Then, in ``airflow.cfg``: @@ -132,6 +153,15 @@ Then, in ``airflow.cfg``: remote_logging = True logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG +.. note:: + + Earlier versions of this guide relied on ``OpensearchTaskHandler`` self-registering + ``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` instantiated it. + That implicit registration is now deprecated (``AirflowProviderDeprecationWarning``) + and will be removed in a future provider release; define ``REMOTE_TASK_LOG`` at + module scope as shown above. See :ref:`write-logs-advanced` for the full + ``logging_config_class`` contract. + On Airflow **3.2.1+** this override is unnecessary — the stock ``airflow_local_settings.py`` already contains an ``elif OPENSEARCH_HOST:`` branch, so configuring the ``[opensearch]`` section in ``airflow.cfg`` is sufficient. diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index 45e1a3ef17250..728c9c809868d 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -23,6 +23,7 @@ import os import sys import time +import warnings from collections import defaultdict from collections.abc import Callable from datetime import datetime @@ -38,6 +39,7 @@ from sqlalchemy import select import airflow.logging_config as alc +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import DagRun from airflow.providers.common.compat.module_loading import import_string from airflow.providers.common.compat.sdk import AirflowException, conf @@ -380,8 +382,31 @@ def __init__( from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log if get_remote_task_log() is None: + # stacklevel=1 keeps the warning attributed to this airflow.providers + # module so module-based deprecation filters still match; dictConfig + # is in stdlib and would otherwise hide the warning at stacklevel=2. + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by OpensearchTaskHandler during " + "dictConfig is deprecated and will be removed in a future provider " + "release. Set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at " + "module scope in your ``[logging] logging_config_class`` module. See " + "the OpenSearch provider logging documentation for the updated " + "override example.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) _ActiveLoggingConfig.set(self.io, None) elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined] + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by OpensearchTaskHandler during " + "dictConfig is deprecated and will be removed in a future provider " + "release. Set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at " + "module scope in your ``[logging] logging_config_class`` module. See " + "the OpenSearch provider logging documentation for the updated " + "override example.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined] def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: diff --git a/providers/opensearch/tests/unit/opensearch/log/conftest.py b/providers/opensearch/tests/unit/opensearch/log/conftest.py new file mode 100644 index 0000000000000..5f68f22355e68 --- /dev/null +++ b/providers/opensearch/tests/unit/opensearch/log/conftest.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS + + +@pytest.fixture(autouse=True) +def _no_implicit_remote_task_log_warning(monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``OpensearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + On Airflow 2.x the handler does not run the registration branch at all, so the + fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist). + On Airflow 3.0/3.1 the handler reads ``airflow.logging_config.REMOTE_TASK_LOG``. + On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``. + """ + if not AIRFLOW_V_3_0_PLUS: + return + if AIRFLOW_V_3_2_PLUS: + from airflow.logging_config import _ActiveLoggingConfig + + # raising=False is required because remote_task_log is annotation-only at + # class scope and may not be initialized in isolated test runs. + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + else: + import airflow.logging_config + + monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", object(), raising=False) diff --git a/shared/logging/src/airflow_shared/logging/remote.py b/shared/logging/src/airflow_shared/logging/remote.py index d8d76ef6a078c..8ab30a8510303 100644 --- a/shared/logging/src/airflow_shared/logging/remote.py +++ b/shared/logging/src/airflow_shared/logging/remote.py @@ -79,7 +79,25 @@ def discover_remote_log_handler( fallback_path: str, import_string: Callable[[str], Any], ) -> tuple[RemoteLogIO | None, str | None]: - """Discover and load the remote log handler from a logging config module.""" + """ + Look up the optional remote-log handler alongside a logging dictConfig. + + ``[logging] logging_config_class`` is a dotted path to a + ``logging.config.dictConfig`` dict. After importing the dict, this helper + re-imports the enclosing module and reads two optional module-level + attributes via ``getattr``: + + * ``REMOTE_TASK_LOG`` — :class:`RemoteLogIO` / :class:`RemoteLogStreamIO` + instance that uploads and reads task logs. + * ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id for that + backend. + + Either may be ``None`` immediately after this call; provider task handlers + can still populate ``REMOTE_TASK_LOG`` from inside ``__init__`` when + ``dictConfig`` instantiates them (deprecated path). Callers that want to + warn on missing remote-log configuration should re-check + ``_ActiveLoggingConfig.remote_task_log`` *after* ``dictConfig`` has run. + """ # Sometimes we end up with `""` as the value! logging_class_path = logging_class_path or fallback_path diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index f93abe1a72240..a5d28361e0085 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -172,7 +172,12 @@ def init_log_file(local_relative_path: str) -> Path: def _load_logging_config() -> None: - """Load and cache the remote logging configuration from SDK config.""" + """ + Load and cache the remote logging configuration from SDK config. + + SDK mirror of :func:`airflow.logging_config.load_logging_config` — see that + function for the ``logging_config_class`` / ``REMOTE_TASK_LOG`` contract. + """ from airflow.sdk._shared.logging.remote import discover_remote_log_handler from airflow.sdk._shared.module_loading import import_string from airflow.sdk.configuration import conf @@ -180,7 +185,7 @@ def _load_logging_config() -> None: fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) - # Load remote logging configuration using shared discovery logic + # Load remote logging configuration using shared discovery logic. remote_task_log, default_remote_conn_id = discover_remote_log_handler( logging_class_path, fallback, import_string )