diff --git a/providers/elasticsearch/docs/changelog.rst b/providers/elasticsearch/docs/changelog.rst index 01b919c3b09ef..6ee0bfccc12ee 100644 --- a/providers/elasticsearch/docs/changelog.rst +++ b/providers/elasticsearch/docs/changelog.rst @@ -27,6 +27,14 @@ Changelog --------- +``ElasticsearchTaskHandler`` no longer silently registers itself as the remote +task-log reader during ``dictConfig``. The implicit registration still happens +for one more release but now emits an ``AirflowProviderDeprecationWarning`` and +will be removed in a future provider release. If you ship a custom +``[logging] logging_config_class`` module that swaps in +``ElasticsearchTaskHandler``, set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` +at module scope in that module. + 6.5.4 ..... diff --git a/providers/elasticsearch/docs/logging/index.rst b/providers/elasticsearch/docs/logging/index.rst index 5cfae8d6230da..df2a6e6be71f2 100644 --- a/providers/elasticsearch/docs/logging/index.rst +++ b/providers/elasticsearch/docs/logging/index.rst @@ -64,6 +64,99 @@ To output task logs to ElasticSearch, the following config could be used: (set ` write_to_es = True target_index = [name of the index to store logs] +.. _elasticsearch-airflow-3-0-to-3-1-local-settings: + +Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.7 +'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +This section is **only about reading task logs back into the Airflow UI**. Tasks running +on workers will write logs as usual (to local files, stdout, or — with appropriate log +shipping — to Elasticsearch) regardless of the override below. Without the override on +Airflow 3.0.0 – 3.1.7, logs reach Elasticsearch fine but the **UI cannot render them** +because no handler is registered to fetch them back. + +The wiring that registers ``ElasticsearchTaskHandler`` inside the stock +``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) landed in +Airflow **3.2.0** (`apache/airflow#62121 +`_) and was backported to Airflow **3.1.8** +(`apache/airflow#62940 `_). On Airflow +**3.0.0 – 3.1.7** installing the provider is not enough: to make the UI's log viewer +fetch logs from Elasticsearch you must ship a custom logging config that swaps the +``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. The override requires +``apache-airflow-providers-elasticsearch`` **6.5.0+** (`apache/airflow#53821 +`_), which is where +``ElasticsearchRemoteLogIO`` was introduced. + +Create a module on the Python path — for example ``config/airflow_local_settings.py`` — +and point Airflow at it via ``[logging] logging_config_class``: + +.. code-block:: python + + from airflow.config_templates.airflow_local_settings import ( + BASE_LOG_FOLDER, + DEFAULT_LOGGING_CONFIG, + ) + from airflow.providers.common.compat.sdk import conf + from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO + + ELASTICSEARCH_HOST = conf.get("elasticsearch", "host", fallback=None) + + REMOTE_TASK_LOG = None + DEFAULT_REMOTE_CONN_ID = None + + if ELASTICSEARCH_HOST: + DEFAULT_LOGGING_CONFIG["handlers"]["task"] = { + "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(BASE_LOG_FOLDER), + "end_of_log_mark": conf.get("elasticsearch", "end_of_log_mark", fallback="end_of_log"), + "host": ELASTICSEARCH_HOST, + "frontend": conf.get("elasticsearch", "frontend", fallback=""), + "write_stdout": conf.getboolean("elasticsearch", "write_stdout"), + "write_to_es": conf.getboolean("elasticsearch", "write_to_es", fallback=False), + "json_format": conf.getboolean("elasticsearch", "json_format"), + "json_fields": conf.get("elasticsearch", "json_fields"), + "host_field": conf.get("elasticsearch", "host_field", fallback="host"), + "offset_field": conf.get("elasticsearch", "offset_field", fallback="offset"), + } + REMOTE_TASK_LOG = ElasticsearchRemoteLogIO( + host=ELASTICSEARCH_HOST, + target_index=conf.get("elasticsearch", "target_index", fallback="airflow-logs"), + write_stdout=conf.getboolean("elasticsearch", "write_stdout"), + write_to_es=conf.getboolean("elasticsearch", "write_to_es", fallback=False), + offset_field=conf.get("elasticsearch", "offset_field", fallback="offset"), + host_field=conf.get("elasticsearch", "host_field", fallback="host"), + base_log_folder=str(BASE_LOG_FOLDER), + delete_local_copy=conf.getboolean("logging", "delete_local_logs"), + json_format=conf.getboolean("elasticsearch", "json_format"), + log_id_template=conf.get( + "elasticsearch", + "log_id_template", + fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", + ), + ) + +Then, in ``airflow.cfg``: + +.. code-block:: ini + + [logging] + remote_logging = True + logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG + +.. note:: + + Earlier versions of this guide relied on ``ElasticsearchTaskHandler`` self-registering + ``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` instantiated it. + That implicit registration is now deprecated (``AirflowProviderDeprecationWarning``) + and will be removed in a future provider release; define ``REMOTE_TASK_LOG`` at + module scope as shown above. See :ref:`write-logs-advanced` for the full + ``logging_config_class`` contract. + +On Airflow **3.1.8+** or **3.2.0+** this override is unnecessary — the stock +``airflow_local_settings.py`` already contains an ``elif ELASTICSEARCH_HOST:`` branch, so +configuring the ``[elasticsearch]`` section in ``airflow.cfg`` is sufficient. + .. _write-logs-elasticsearch-tls: Writing logs to Elasticsearch over TLS diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index 402d64e5bf112..8aad101b44d86 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -25,6 +25,7 @@ import shutil import sys import time +import warnings from collections import defaultdict from collections.abc import Callable, Iterable from operator import attrgetter @@ -41,6 +42,7 @@ from elasticsearch.exceptions import NotFoundError import airflow.logging_config as alc +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models.dagrun import DagRun from airflow.providers.common.compat.sdk import conf from airflow.providers.elasticsearch._compat import apply_compat_with @@ -318,8 +320,31 @@ def __init__( from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log if get_remote_task_log() is None: + # stacklevel=1 keeps the warning attributed to this airflow.providers + # module so module-based deprecation filters still match; dictConfig + # is in stdlib and would otherwise hide the warning at stacklevel=2. + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler " + "during dictConfig is deprecated and will be removed in a future " + "provider release. Set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` " + "at module scope in your ``[logging] logging_config_class`` module. " + "See the Elasticsearch provider logging documentation for the " + "updated override example.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) _ActiveLoggingConfig.set(self.io, None) elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined] + warnings.warn( + "Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler " + "during dictConfig is deprecated and will be removed in a future " + "provider release. Set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` " + "at module scope in your ``[logging] logging_config_class`` module. " + "See the Elasticsearch provider logging documentation for the " + "updated override example.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined] @staticmethod diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py new file mode 100644 index 0000000000000..a1746818eed6e --- /dev/null +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS + + +@pytest.fixture(autouse=True) +def _no_implicit_remote_task_log_warning(monkeypatch): + """ + Suppress the deprecated implicit-registration warning during direct handler construction. + + Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module + scope in the ``[logging] logging_config_class`` module, so + ``ElasticsearchTaskHandler.__init__`` does not emit + ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin + treats as a test failure). + + On Airflow 2.x the handler does not run the registration branch at all, so the + fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist). + On Airflow 3.0/3.1 the handler reads ``airflow.logging_config.REMOTE_TASK_LOG``. + On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``. + """ + if not AIRFLOW_V_3_0_PLUS: + return + if AIRFLOW_V_3_2_PLUS: + from airflow.logging_config import _ActiveLoggingConfig + + # raising=False is required because remote_task_log is annotation-only at + # class scope and may not be initialized in isolated test runs. + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + else: + import airflow.logging_config + + monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", object(), raising=False)