Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ that Python objects log to loggers that follow naming convention of ``<package>.
You can read more about standard python logging classes (Loggers, Handlers, Formatters) in the
`Python logging documentation <https://docs.python.org/library/logging.html>`_.

Create a custom logging class
-----------------------------
Create a custom logging config
------------------------------

Configuring your logging classes can be done via the ``logging_config_class`` option in ``airflow.cfg`` file.
This configuration should specify the import path to a configuration compatible with
:func:`logging.config.dictConfig`. If your file is a standard import location, then you should set a
:envvar:`PYTHONPATH` environment variable.
Despite the option name the value is a dotted import path to a ``dict`` that satisfies
:func:`logging.config.dictConfig` — not a Python class. The ``_class`` suffix is
historical and kept for backwards compatibility. If your file is a standard import
location, then you should set a :envvar:`PYTHONPATH` environment variable.

Follow the steps below to enable custom logging config class:

Expand Down Expand Up @@ -102,6 +103,56 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo
You can override the way both standard logs of the components and "task" logs are handled.


Custom logging configs and remote logging
-----------------------------------------

When ``[logging] remote_logging = True`` and you point ``logging_config_class``
at your own module, define two module-level attributes alongside your
``LOGGING_CONFIG`` dict:

* ``REMOTE_TASK_LOG`` — an instance of
:class:`~airflow.logging.remote.RemoteLogIO` (or
:class:`~airflow.logging.remote.RemoteLogStreamIO`) that uploads task logs
and reads them back for the UI.
* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id used when
``[logging] remote_log_conn_id`` is unset.

If ``REMOTE_TASK_LOG`` is missing, Airflow emits one ``WARNING`` at startup
and the UI cannot read task logs back from the remote backend.

.. code-block:: python

# ~/airflow/config/log_config.py
from airflow.logging.remote import RemoteLogIO


class MyRemoteLogIO:
@property
def processors(self):
return ()

def upload(self, path, ti): ... # upload local log file at ``path`` to your backend

def read(self, relative_path, ti): ... # return (source_info, log_messages) for the UI


REMOTE_TASK_LOG: RemoteLogIO | None = MyRemoteLogIO()
DEFAULT_REMOTE_CONN_ID: str | None = "my_remote_conn"

LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
# ... your formatters / handlers / loggers / root ...
}

.. note::

``airflow.config_templates.airflow_local_settings`` (and its
``DEFAULT_LOGGING_CONFIG``) is planned for deprecation. Build
``LOGGING_CONFIG`` directly rather than deep-copying from it, and define
``REMOTE_TASK_LOG`` yourself rather than re-exporting from that module.


Custom logger for Operators, Hooks and Tasks
--------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@

BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER"))

# This isn't used anymore, but kept for compat of people who might have imported it
# Default value for the ``[logging] logging_config_class`` option. Plain
# ``logging.config.dictConfig`` dict; the ``_class`` suffix on the config option
# is historical.
DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
Expand Down Expand Up @@ -121,6 +123,11 @@
##################

REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")

# Side-channel attributes read by ``discover_remote_log_handler`` from whichever
# module ``[logging] logging_config_class`` resolves through. Custom modules that
# override that option should define both at module scope to enable remote
# task-log read-back.
REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None
DEFAULT_REMOTE_CONN_ID: str | None = None

Expand Down
9 changes: 6 additions & 3 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,12 @@ logging:
default: "INFO"
logging_config_class:
description: |
Logging class
Specify the class that will specify the logging configuration
This class has to be on the python classpath
Dotted import path to a ``logging.config.dictConfig`` dict. The
``_class`` suffix is historical — the target is a dict, not a class.

Airflow also reads two optional module-level attributes from the
enclosing module: ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID``,
used to drive remote task-log read-back. See :ref:`write-logs-advanced`.
version_added: 2.0.0
type: string
example: "my.path.default_local_settings.LOGGING_CONFIG"
Expand Down
57 changes: 50 additions & 7 deletions airflow-core/src/airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

log = logging.getLogger(__name__)

# Default ``[logging] logging_config_class`` value when the user has not overridden it.
DEFAULT_LOGGING_CONFIG_PATH = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"


class _ActiveLoggingConfig:
"""Private class to hold active logging config variables."""
Expand Down Expand Up @@ -60,14 +63,20 @@ def get_default_remote_conn_id() -> str | None:


def load_logging_config() -> tuple[dict[str, Any], str]:
"""Configure & Validate Airflow Logging."""
fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)
"""
Resolve ``[logging] logging_config_class`` and the remote-log side channel.

The option is a dotted path to a ``logging.config.dictConfig`` dict. After
importing the dict, the enclosing module is probed for optional
``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID`` attributes via
:func:`airflow._shared.logging.remote.discover_remote_log_handler`.
"""
logging_class_path = conf.get("logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH)

# Sometimes we end up with `""` as the value!
logging_class_path = logging_class_path or fallback
logging_class_path = logging_class_path or DEFAULT_LOGGING_CONFIG_PATH

user_defined = logging_class_path != fallback
user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH

try:
logging_config = import_string(logging_class_path)
Expand All @@ -86,15 +95,45 @@ def load_logging_config() -> tuple[dict[str, Any], str]:
f"to: {type(err).__name__}:{err}"
)
else:
# Load remote logging configuration using shared discovery logic
# Load remote logging configuration using shared discovery logic.
remote_task_log, default_remote_conn_id = discover_remote_log_handler(
logging_class_path, fallback, import_string
logging_class_path, DEFAULT_LOGGING_CONFIG_PATH, import_string
)
_ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)

return logging_config, logging_class_path


def _warn_if_missing_remote_task_log(logging_class_path: str) -> None:
"""
Warn if ``[logging] remote_logging`` is on but the user module exposes no remote IO.

Runs *after* ``dictConfig`` has constructed handlers, so deprecated
self-registration in provider task handlers (Elasticsearch, OpenSearch) has
already had its chance to populate ``_ActiveLoggingConfig.remote_task_log``.
Only fires for user-defined ``logging_config_class`` values; the stock
fallback is exempt.
"""
user_defined = bool(logging_class_path) and logging_class_path != DEFAULT_LOGGING_CONFIG_PATH
remote_logging_enabled = conf.getboolean("logging", "remote_logging", fallback=False)
if not (user_defined and remote_logging_enabled):
return
if _ActiveLoggingConfig.remote_task_log is not None:
return
# Strip the trailing ``.<config_attr>`` to leave the enclosing module path.
# ``logging_class_path`` should always be dotted since ``import_string``
# would have raised otherwise, but guard the access defensively.
parts = logging_class_path.rsplit(".", 1)
modpath = parts[0] if len(parts) == 2 else logging_class_path
log.warning(
"[logging] remote_logging is enabled but the user-defined logging module %r "
"does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is "
"disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope "
"to enable it.",
modpath,
)


def configure_logging():
from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values

Expand Down Expand Up @@ -146,6 +185,10 @@ def configure_logging():
# otherwise Airflow would silently fall back on the default config
raise e

# Runs after dictConfig so deprecated handler self-registration (ES/OS) has
# had its chance to populate _ActiveLoggingConfig.remote_task_log.
_warn_if_missing_remote_task_log(logging_class_path)

validate_logging_config()

new_folder_permissions = int(
Expand Down
66 changes: 66 additions & 0 deletions airflow-core/tests/unit/logging/test_logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest import mock

import pytest

from airflow.logging_config import (
DEFAULT_LOGGING_CONFIG_PATH,
_ActiveLoggingConfig,
_warn_if_missing_remote_task_log,
)


class TestWarnIfMissingRemoteTaskLog:
@pytest.fixture(autouse=True)
def _reset_active_logging_config(self, monkeypatch):
monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None, raising=False)
monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False)

def test_warns_when_user_module_missing_remote_task_log_and_remote_logging_enabled(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_called_once()
assert "my_pkg.custom_settings" in mock_log.warning.call_args.args

def test_no_warning_when_using_fallback_path(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log(DEFAULT_LOGGING_CONFIG_PATH)
mock_log.warning.assert_not_called()

def test_no_warning_when_remote_logging_disabled(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "False")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_not_called()

def test_no_warning_when_remote_task_log_is_set(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False)
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_not_called()

def test_no_warning_when_empty_logging_class_path(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("")
mock_log.warning.assert_not_called()
7 changes: 7 additions & 0 deletions providers/elasticsearch/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
Changelog
---------

``ElasticsearchTaskHandler`` no longer silently registers itself as the remote
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure do I need to update the Changelog myself at this stage?

task-log reader during ``dictConfig``. The implicit registration still happens
for one more release but now emits an ``AirflowProviderDeprecationWarning`` and
will be removed in a future provider release. If you ship a custom
``[logging] logging_config_class`` module that swaps in
``ElasticsearchTaskHandler``, set ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)``
at module scope in that module.

6.5.3
.....
Expand Down
93 changes: 93 additions & 0 deletions providers/elasticsearch/docs/logging/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,99 @@ To output task logs to ElasticSearch, the following config could be used: (set `
write_to_es = True
target_index = [name of the index to store logs]

.. _elasticsearch-airflow-3-0-to-3-1-local-settings:

Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.7
''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''

This section is **only about reading task logs back into the Airflow UI**. Tasks running
on workers will write logs as usual (to local files, stdout, or — with appropriate log
shipping — to Elasticsearch) regardless of the override below. Without the override on
Airflow 3.0.0 – 3.1.7, logs reach Elasticsearch fine but the **UI cannot render them**
because no handler is registered to fetch them back.

The wiring that registers ``ElasticsearchTaskHandler`` inside the stock
``airflow_local_settings.py`` (the file that builds ``DEFAULT_LOGGING_CONFIG``) landed in
Airflow **3.2.0** (`apache/airflow#62121
<https://github.com/apache/airflow/pull/62121>`_) and was backported to Airflow **3.1.8**
(`apache/airflow#62940 <https://github.com/apache/airflow/pull/62940>`_). On Airflow
**3.0.0 – 3.1.7** installing the provider is not enough: to make the UI's log viewer
fetch logs from Elasticsearch you must ship a custom logging config that swaps the
``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. The override requires
``apache-airflow-providers-elasticsearch`` **6.5.0+** (`apache/airflow#53821
<https://github.com/apache/airflow/pull/53821>`_), which is where
``ElasticsearchRemoteLogIO`` was introduced.

Create a module on the Python path — for example ``config/airflow_local_settings.py`` —
and point Airflow at it via ``[logging] logging_config_class``:

.. code-block:: python

from airflow.config_templates.airflow_local_settings import (
BASE_LOG_FOLDER,
DEFAULT_LOGGING_CONFIG,
)
from airflow.providers.common.compat.sdk import conf
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO

ELASTICSEARCH_HOST = conf.get("elasticsearch", "host", fallback=None)

REMOTE_TASK_LOG = None
DEFAULT_REMOTE_CONN_ID = None

if ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG["handlers"]["task"] = {
"class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(BASE_LOG_FOLDER),
"end_of_log_mark": conf.get("elasticsearch", "end_of_log_mark", fallback="end_of_log"),
"host": ELASTICSEARCH_HOST,
"frontend": conf.get("elasticsearch", "frontend", fallback=""),
"write_stdout": conf.getboolean("elasticsearch", "write_stdout"),
"write_to_es": conf.getboolean("elasticsearch", "write_to_es", fallback=False),
"json_format": conf.getboolean("elasticsearch", "json_format"),
"json_fields": conf.get("elasticsearch", "json_fields"),
"host_field": conf.get("elasticsearch", "host_field", fallback="host"),
"offset_field": conf.get("elasticsearch", "offset_field", fallback="offset"),
}
REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
host=ELASTICSEARCH_HOST,
target_index=conf.get("elasticsearch", "target_index", fallback="airflow-logs"),
write_stdout=conf.getboolean("elasticsearch", "write_stdout"),
write_to_es=conf.getboolean("elasticsearch", "write_to_es", fallback=False),
offset_field=conf.get("elasticsearch", "offset_field", fallback="offset"),
host_field=conf.get("elasticsearch", "host_field", fallback="host"),
base_log_folder=str(BASE_LOG_FOLDER),
delete_local_copy=conf.getboolean("logging", "delete_local_logs"),
json_format=conf.getboolean("elasticsearch", "json_format"),
log_id_template=conf.get(
"elasticsearch",
"log_id_template",
fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
),
)

Then, in ``airflow.cfg``:

.. code-block:: ini

[logging]
remote_logging = True
logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG

.. note::

Earlier versions of this guide relied on ``ElasticsearchTaskHandler`` self-registering
``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` instantiated it.
That implicit registration is now deprecated (``AirflowProviderDeprecationWarning``)
and will be removed in a future provider release; define ``REMOTE_TASK_LOG`` at
module scope as shown above. See :ref:`write-logs-advanced` for the full
``logging_config_class`` contract.

On Airflow **3.1.8+** or **3.2.0+** this override is unnecessary — the stock
``airflow_local_settings.py`` already contains an ``elif ELASTICSEARCH_HOST:`` branch, so
configuring the ``[elasticsearch]`` section in ``airflow.cfg`` is sufficient.

.. _write-logs-elasticsearch-tls:

Writing logs to Elasticsearch over TLS
Expand Down
Loading
Loading