Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions providers/opensearch/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

``OpensearchTaskHandler`` no longer silently registers itself as the remote
task-log reader during ``dictConfig``. The implicit registration still happens
for one more release but now emits an ``AirflowProviderDeprecationWarning`` and
will be removed in a future provider release. If you ship a custom
``[logging] logging_config_class`` module that swaps in ``OpensearchTaskHandler``,
set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at module scope in that
module. See the OpenSearch provider's logging guide for the updated override
example.

1.9.1
.....
Expand Down
36 changes: 33 additions & 3 deletions providers/opensearch/docs/logging/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ The wiring that registers ``OpensearchTaskHandler`` inside the stock
``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) only landed
in Airflow **3.2.1**. On Airflow **3.0.0 – 3.2.0** installing the provider is not enough:
to make the UI's log viewer fetch logs from OpenSearch you must ship a custom logging
config that swaps the ``task`` handler. The handler self-registers as the remote-log
reader on construction (via ``REMOTE_TASK_LOG`` on 3.0/3.1 and ``_ActiveLoggingConfig``
on 3.2), so swapping the handler class is the only change required.
config that swaps the ``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope.

Create a module on the Python path — for example ``config/airflow_local_settings.py`` —
and point Airflow at it via ``[logging] logging_config_class``:
Expand All @@ -102,9 +100,13 @@ and point Airflow at it via ``[logging] logging_config_class``:
DEFAULT_LOGGING_CONFIG,
)
from airflow.providers.common.compat.sdk import conf
from airflow.providers.opensearch.log.os_task_handler import OpensearchRemoteLogIO

OPENSEARCH_HOST = conf.get("opensearch", "host", fallback=None)

REMOTE_TASK_LOG = None
DEFAULT_REMOTE_CONN_ID = None

if OPENSEARCH_HOST:
DEFAULT_LOGGING_CONFIG["handlers"]["task"] = {
"class": "airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler",
Expand All @@ -123,6 +125,25 @@ and point Airflow at it via ``[logging] logging_config_class``:
"write_to_opensearch": conf.getboolean("opensearch", "write_to_os", fallback=False),
"target_index": conf.get("opensearch", "target_index", fallback="airflow-logs"),
}
REMOTE_TASK_LOG = OpensearchRemoteLogIO(
host=OPENSEARCH_HOST,
port=conf.getint("opensearch", "port", fallback=9200),
username=conf.get("opensearch", "username"),
password=conf.get("opensearch", "password"),
target_index=conf.get("opensearch", "target_index", fallback="airflow-logs"),
write_stdout=conf.getboolean("opensearch", "write_stdout"),
write_to_opensearch=conf.getboolean("opensearch", "write_to_os", fallback=False),
offset_field=conf.get("opensearch", "offset_field", fallback="offset"),
host_field=conf.get("opensearch", "host_field", fallback="host"),
base_log_folder=str(BASE_LOG_FOLDER),
delete_local_copy=conf.getboolean("logging", "delete_local_logs"),
json_format=conf.getboolean("opensearch", "json_format"),
log_id_template=conf.get(
"opensearch",
"log_id_template",
fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
),
)

Then, in ``airflow.cfg``:

Expand All @@ -132,6 +153,15 @@ Then, in ``airflow.cfg``:
remote_logging = True
logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG

.. note::

Earlier versions of this guide relied on ``OpensearchTaskHandler`` self-registering
``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` instantiated it.
That implicit registration is now deprecated (``AirflowProviderDeprecationWarning``)
and will be removed in a future provider release; define ``REMOTE_TASK_LOG`` at
module scope as shown above. See :ref:`write-logs-advanced` for the full
``logging_config_class`` contract.

On Airflow **3.2.1+** this override is unnecessary — the stock ``airflow_local_settings.py``
already contains an ``elif OPENSEARCH_HOST:`` branch, so configuring the ``[opensearch]``
section in ``airflow.cfg`` is sufficient.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import sys
import time
import warnings
from collections import defaultdict
from collections.abc import Callable
from datetime import datetime
Expand All @@ -38,6 +39,7 @@
from sqlalchemy import select

import airflow.logging_config as alc
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import DagRun
from airflow.providers.common.compat.module_loading import import_string
from airflow.providers.common.compat.sdk import AirflowException, conf
Expand Down Expand Up @@ -380,8 +382,31 @@ def __init__(
from airflow.logging_config import _ActiveLoggingConfig, get_remote_task_log

if get_remote_task_log() is None:
# stacklevel=1 keeps the warning attributed to this airflow.providers
# module so module-based deprecation filters still match; dictConfig
# is in stdlib and would otherwise hide the warning at stacklevel=2.
warnings.warn(
"Implicit REMOTE_TASK_LOG registration by OpensearchTaskHandler during "
"dictConfig is deprecated and will be removed in a future provider "
"release. Set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at "
"module scope in your ``[logging] logging_config_class`` module. See "
"the OpenSearch provider logging documentation for the updated "
"override example.",
AirflowProviderDeprecationWarning,
stacklevel=1,
)
_ActiveLoggingConfig.set(self.io, None)
elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined]
warnings.warn(
"Implicit REMOTE_TASK_LOG registration by OpensearchTaskHandler during "
"dictConfig is deprecated and will be removed in a future provider "
"release. Set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at "
"module scope in your ``[logging] logging_config_class`` module. See "
"the OpenSearch provider logging documentation for the updated "
"override example.",
AirflowProviderDeprecationWarning,
stacklevel=1,
)
alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined]

def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None:
Expand Down
52 changes: 52 additions & 0 deletions providers/opensearch/tests/unit/opensearch/log/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest

from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS


@pytest.fixture(autouse=True)
def _no_implicit_remote_task_log_warning(monkeypatch):
"""
Suppress the deprecated implicit-registration warning during direct handler construction.

Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at module
scope in the ``[logging] logging_config_class`` module, so
``OpensearchTaskHandler.__init__`` does not emit
``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin
treats as a test failure).

On Airflow 2.x the handler does not run the registration branch at all, so the
fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist).
On Airflow 3.0/3.1 the handler reads ``airflow.logging_config.REMOTE_TASK_LOG``.
On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``.
"""
if not AIRFLOW_V_3_0_PLUS:
return
if AIRFLOW_V_3_2_PLUS:
from airflow.logging_config import _ActiveLoggingConfig

# raising=False is required because remote_task_log is annotation-only at
# class scope and may not be initialized in isolated test runs.
monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False)
monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False)
else:
import airflow.logging_config

monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", object(), raising=False)
Loading