Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions providers/elasticsearch/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

``ElasticsearchTaskHandler`` no longer silently registers itself as the remote
task-log reader during ``dictConfig``. The implicit registration still happens
for one more release but now emits an ``AirflowProviderDeprecationWarning`` and
will be removed in a future provider release. If you ship a custom
``[logging] logging_config_class`` module that swaps in
``ElasticsearchTaskHandler``, set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)``
at module scope in that module.

6.5.4
.....

Expand Down
93 changes: 93 additions & 0 deletions providers/elasticsearch/docs/logging/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,99 @@ To output task logs to ElasticSearch, the following config could be used: (set `
write_to_es = True
target_index = [name of the index to store logs]

.. _elasticsearch-airflow-3-0-to-3-1-local-settings:

Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.7
''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''

This section is **only about reading task logs back into the Airflow UI**. Tasks running
on workers will write logs as usual (to local files, stdout, or — with appropriate log
shipping — to Elasticsearch) regardless of the override below. Without the override on
Airflow 3.0.0 – 3.1.7, logs reach Elasticsearch fine but the **UI cannot render them**
because no handler is registered to fetch them back.

The wiring that registers ``ElasticsearchTaskHandler`` inside the stock
``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) landed in
Airflow **3.2.0** (`apache/airflow#62121
<https://github.com/apache/airflow/pull/62121>`_) and was backported to Airflow **3.1.8**
(`apache/airflow#62940 <https://github.com/apache/airflow/pull/62940>`_). On Airflow
**3.0.0 – 3.1.7** installing the provider is not enough: to make the UI's log viewer
fetch logs from Elasticsearch you must ship a custom logging config that swaps the
``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. The override requires
``apache-airflow-providers-elasticsearch`` **6.5.0+** (`apache/airflow#53821
<https://github.com/apache/airflow/pull/53821>`_), which is where
``ElasticsearchRemoteLogIO`` was introduced.

Create a module on the Python path — for example ``config/airflow_local_settings.py`` —
and point Airflow at it via ``[logging] logging_config_class``:

.. code-block:: python

from airflow.config_templates.airflow_local_settings import (
BASE_LOG_FOLDER,
DEFAULT_LOGGING_CONFIG,
)
from airflow.providers.common.compat.sdk import conf
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO

ELASTICSEARCH_HOST = conf.get("elasticsearch", "host", fallback=None)

REMOTE_TASK_LOG = None
DEFAULT_REMOTE_CONN_ID = None

if ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG["handlers"]["task"] = {
"class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(BASE_LOG_FOLDER),
"end_of_log_mark": conf.get("elasticsearch", "end_of_log_mark", fallback="end_of_log"),
"host": ELASTICSEARCH_HOST,
"frontend": conf.get("elasticsearch", "frontend", fallback=""),
"write_stdout": conf.getboolean("elasticsearch", "write_stdout"),
"write_to_es": conf.getboolean("elasticsearch", "write_to_es", fallback=False),
"json_format": conf.getboolean("elasticsearch", "json_format"),
"json_fields": conf.get("elasticsearch", "json_fields"),
"host_field": conf.get("elasticsearch", "host_field", fallback="host"),
"offset_field": conf.get("elasticsearch", "offset_field", fallback="offset"),
}
REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
host=ELASTICSEARCH_HOST,
target_index=conf.get("elasticsearch", "target_index", fallback="airflow-logs"),
write_stdout=conf.getboolean("elasticsearch", "write_stdout"),
write_to_es=conf.getboolean("elasticsearch", "write_to_es", fallback=False),
offset_field=conf.get("elasticsearch", "offset_field", fallback="offset"),
host_field=conf.get("elasticsearch", "host_field", fallback="host"),
base_log_folder=str(BASE_LOG_FOLDER),
delete_local_copy=conf.getboolean("logging", "delete_local_logs"),
json_format=conf.getboolean("elasticsearch", "json_format"),
log_id_template=conf.get(
"elasticsearch",
"log_id_template",
fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
),
)

Then, in ``airflow.cfg``:

.. code-block:: ini

[logging]
remote_logging = True
logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG

.. note::

Earlier versions of this guide relied on ``ElasticsearchTaskHandler`` self-registering
``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` instantiated it.
That implicit registration is now deprecated (``AirflowProviderDeprecationWarning``)
and will be removed in a future provider release; define ``REMOTE_TASK_LOG`` at
module scope as shown above. See :ref:`write-logs-advanced` for the full
``logging_config_class`` contract.

On Airflow **3.1.8+** or **3.2.0+** this override is unnecessary — the stock
``airflow_local_settings.py`` already contains an ``elif ELASTICSEARCH_HOST:`` branch, so
configuring the ``[elasticsearch]`` section in ``airflow.cfg`` is sufficient.

.. _write-logs-elasticsearch-tls:

Writing logs to Elasticsearch over TLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import shutil
import sys
import time
import warnings
from collections import defaultdict
from collections.abc import Callable, Iterable
from operator import attrgetter
Expand All @@ -41,6 +42,7 @@
from elasticsearch.exceptions import NotFoundError

import airflow.logging_config as alc
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models.dagrun import DagRun
from airflow.providers.common.compat.sdk import conf
from airflow.providers.elasticsearch._compat import apply_compat_with
Expand Down Expand Up @@ -318,8 +320,31 @@ def __init__(
from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log

if get_remote_task_log() is None:
# stacklevel=1 keeps the warning attributed to this airflow.providers
# module so module-based deprecation filters still match; dictConfig
# is in stdlib and would otherwise hide the warning at stacklevel=2.
warnings.warn(
"Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler "
"during dictConfig is deprecated and will be removed in a future "
"provider release. Set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` "
"at module scope in your ``[logging] logging_config_class`` module. "
"See the Elasticsearch provider logging documentation for the "
"updated override example.",
AirflowProviderDeprecationWarning,
stacklevel=1,
)
_ActiveLoggingConfig.set(self.io, None)
elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined]
warnings.warn(
"Implicit REMOTE_TASK_LOG registration by ElasticsearchTaskHandler "
"during dictConfig is deprecated and will be removed in a future "
"provider release. Set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` "
"at module scope in your ``[logging] logging_config_class`` module. "
"See the Elasticsearch provider logging documentation for the "
"updated override example.",
AirflowProviderDeprecationWarning,
stacklevel=1,
)
alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined]

@staticmethod
Expand Down
52 changes: 52 additions & 0 deletions providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest

from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS


@pytest.fixture(autouse=True)
def _no_implicit_remote_task_log_warning(monkeypatch):
"""
Suppress the deprecated implicit-registration warning during direct handler construction.

Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module
scope in the ``[logging] logging_config_class`` module, so
``ElasticsearchTaskHandler.__init__`` does not emit
``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin
treats as a test failure).

On Airflow 2.x the handler does not run the registration branch at all, so the
fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist).
On Airflow 3.0/3.1 the handler reads ``airflow.logging_config.REMOTE_TASK_LOG``.
On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``.
"""
if not AIRFLOW_V_3_0_PLUS:
return
if AIRFLOW_V_3_2_PLUS:
from airflow.logging_config import _ActiveLoggingConfig

# raising=False is required because remote_task_log is annotation-only at
# class scope and may not be initialized in isolated test runs.
monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False)
monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False)
else:
import airflow.logging_config

monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", object(), raising=False)
Loading