Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ that Python objects log to loggers that follow naming convention of ``<package>.
You can read more about standard python logging classes (Loggers, Handlers, Formatters) in the
`Python logging documentation <https://docs.python.org/library/logging.html>`_.

Create a custom logging class
-----------------------------
Create a custom logging config
------------------------------

Configuring your logging classes can be done via the ``logging_config_class`` option in ``airflow.cfg`` file.
This configuration should specify the import path to a configuration compatible with
:func:`logging.config.dictConfig`. If your file is a standard import location, then you should set a
:envvar:`PYTHONPATH` environment variable.
Despite the option name the value is a dotted import path to a ``dict`` that satisfies
:func:`logging.config.dictConfig` — not a Python class. The ``_class`` suffix is
historical and kept for backwards compatibility. If your file is a standard import
location, then you should set a :envvar:`PYTHONPATH` environment variable.

Follow the steps below to enable custom logging config class:

Expand Down Expand Up @@ -102,6 +103,56 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo
You can override the way both standard logs of the components and "task" logs are handled.


Custom logging configs and remote logging
-----------------------------------------

When ``[logging] remote_logging = True`` and you point ``logging_config_class``
at your own module, define two module-level attributes alongside your
``LOGGING_CONFIG`` dict:

* ``REMOTE_TASK_LOG`` — an instance of
:class:`~airflow.logging.remote.RemoteLogIO` (or
:class:`~airflow.logging.remote.RemoteLogStreamIO`) that uploads task logs
and reads them back for the UI.
* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id used when
``[logging] remote_log_conn_id`` is unset.

If ``REMOTE_TASK_LOG`` is missing, Airflow emits one ``WARNING`` at startup
and the UI cannot read task logs back from the remote backend.

.. code-block:: python

# ~/airflow/config/log_config.py
from airflow.logging.remote import RemoteLogIO


class MyRemoteLogIO:
@property
def processors(self):
return ()

def upload(self, path, ti): ... # upload local log file at ``path`` to your backend

def read(self, relative_path, ti): ... # return (source_info, log_messages) for the UI


REMOTE_TASK_LOG: RemoteLogIO | None = MyRemoteLogIO()
DEFAULT_REMOTE_CONN_ID: str | None = "my_remote_conn"

LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
# ... your formatters / handlers / loggers / root ...
}

.. note::

``airflow.config_templates.airflow_local_settings`` (and its
``DEFAULT_LOGGING_CONFIG``) is planned for deprecation. Build
``LOGGING_CONFIG`` directly rather than deep-copying from it, and define
``REMOTE_TASK_LOG`` yourself rather than re-exporting from that module.


Custom logger for Operators, Hooks and Tasks
--------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@

BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER"))

# This isn't used anymore, but kept for compat of people who might have imported it
# Default value for the ``[logging] logging_config_class`` option. Plain
# ``logging.config.dictConfig`` dict; the ``_class`` suffix on the config option
# is historical.
DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
Expand Down Expand Up @@ -121,6 +123,11 @@
##################

REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")

# Side-channel attributes read by ``discover_remote_log_handler`` from whichever
# module ``[logging] logging_config_class`` resolves through. Custom modules that
# override that option should define both at module scope to enable remote
# task-log read-back.
REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None
DEFAULT_REMOTE_CONN_ID: str | None = None

Expand Down
14 changes: 7 additions & 7 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,7 @@ core:
allowed_deserialization_classes_regexp:
description: |
Space-separated list of classes that may be imported during deserialization. Items are processed
as regex expressions and matched against the full classname (``re.fullmatch`` semantics), so a
pattern such as ``airflow\.models\.Variable`` does not also admit ``airflow.models.VariableXYZ``.
Use ``.*`` (e.g. ``airflow\.models\..*``) to allow a prefix and any suffix. Python built-in
classes (like dict) are always allowed.
as regex expressions. Python built-in classes (like dict) are always allowed.
This is a secondary option to ``[core] allowed_deserialization_classes``.
version_added: 2.8.2
type: string
Expand Down Expand Up @@ -866,9 +863,12 @@ logging:
default: "INFO"
logging_config_class:
description: |
Logging class
Specify the class that will specify the logging configuration
This class has to be on the python classpath
Dotted import path to a ``logging.config.dictConfig`` dict. The
``_class`` suffix is historical — the target is a dict, not a class.

Airflow also reads two optional module-level attributes from the
enclosing module: ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID``,
used to drive remote task-log read-back. See :ref:`write-logs-advanced`.
version_added: 2.0.0
type: string
example: "my.path.default_local_settings.LOGGING_CONFIG"
Expand Down
57 changes: 50 additions & 7 deletions airflow-core/src/airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

log = logging.getLogger(__name__)

# Default ``[logging] logging_config_class`` value when the user has not overridden it.
DEFAULT_LOGGING_CONFIG_PATH = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"


class _ActiveLoggingConfig:
"""Private class to hold active logging config variables."""
Expand Down Expand Up @@ -60,14 +63,20 @@ def get_default_remote_conn_id() -> str | None:


def load_logging_config() -> tuple[dict[str, Any], str]:
"""Configure & Validate Airflow Logging."""
fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)
"""
Resolve ``[logging] logging_config_class`` and the remote-log side channel.

The option is a dotted path to a ``logging.config.dictConfig`` dict. After
importing the dict, the enclosing module is probed for optional
``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID`` attributes via
:func:`airflow._shared.logging.remote.discover_remote_log_handler`.
"""
logging_class_path = conf.get("logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH)

# Sometimes we end up with `""` as the value!
logging_class_path = logging_class_path or fallback
logging_class_path = logging_class_path or DEFAULT_LOGGING_CONFIG_PATH

user_defined = logging_class_path != fallback
user_defined = logging_class_path != DEFAULT_LOGGING_CONFIG_PATH

try:
logging_config = import_string(logging_class_path)
Expand All @@ -86,15 +95,45 @@ def load_logging_config() -> tuple[dict[str, Any], str]:
f"to: {type(err).__name__}:{err}"
)
else:
# Load remote logging configuration using shared discovery logic
# Load remote logging configuration using shared discovery logic.
remote_task_log, default_remote_conn_id = discover_remote_log_handler(
logging_class_path, fallback, import_string
logging_class_path, DEFAULT_LOGGING_CONFIG_PATH, import_string
)
_ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)

return logging_config, logging_class_path


def _warn_if_missing_remote_task_log(logging_class_path: str) -> None:
"""
Warn if ``[logging] remote_logging`` is on but the user module exposes no remote IO.

Runs *after* ``dictConfig`` has constructed handlers, so deprecated
self-registration in provider task handlers (Elasticsearch, OpenSearch) has
already had its chance to populate ``_ActiveLoggingConfig.remote_task_log``.
Only fires for user-defined ``logging_config_class`` values; the stock
fallback is exempt.
"""
user_defined = bool(logging_class_path) and logging_class_path != DEFAULT_LOGGING_CONFIG_PATH
remote_logging_enabled = conf.getboolean("logging", "remote_logging", fallback=False)
if not (user_defined and remote_logging_enabled):
return
if _ActiveLoggingConfig.remote_task_log is not None:
return
# Strip the trailing ``.<config_attr>`` to leave the enclosing module path.
# ``logging_class_path`` should always be dotted since ``import_string``
# would have raised otherwise, but guard the access defensively.
parts = logging_class_path.rsplit(".", 1)
modpath = parts[0] if len(parts) == 2 else logging_class_path
log.warning(
"[logging] remote_logging is enabled but the user-defined logging module %r "
"does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is "
"disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope "
"to enable it.",
modpath,
)


def configure_logging():
from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values

Expand Down Expand Up @@ -146,6 +185,10 @@ def configure_logging():
# otherwise Airflow would silently fall back on the default config
raise e

# Runs after dictConfig so deprecated handler self-registration (ES/OS) has
# had its chance to populate _ActiveLoggingConfig.remote_task_log.
_warn_if_missing_remote_task_log(logging_class_path)

validate_logging_config()

new_folder_permissions = int(
Expand Down
66 changes: 66 additions & 0 deletions airflow-core/tests/unit/logging/test_logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest import mock

import pytest

from airflow.logging_config import (
DEFAULT_LOGGING_CONFIG_PATH,
_ActiveLoggingConfig,
_warn_if_missing_remote_task_log,
)


class TestWarnIfMissingRemoteTaskLog:
@pytest.fixture(autouse=True)
def _reset_active_logging_config(self, monkeypatch):
monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None, raising=False)
monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False)

def test_warns_when_user_module_missing_remote_task_log_and_remote_logging_enabled(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_called_once()
assert "my_pkg.custom_settings" in mock_log.warning.call_args.args

def test_no_warning_when_using_fallback_path(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log(DEFAULT_LOGGING_CONFIG_PATH)
mock_log.warning.assert_not_called()

def test_no_warning_when_remote_logging_disabled(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "False")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_not_called()

def test_no_warning_when_remote_task_log_is_set(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False)
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_not_called()

def test_no_warning_when_empty_logging_class_path(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("")
mock_log.warning.assert_not_called()
20 changes: 19 additions & 1 deletion shared/logging/src/airflow_shared/logging/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,25 @@ def discover_remote_log_handler(
fallback_path: str,
import_string: Callable[[str], Any],
) -> tuple[RemoteLogIO | None, str | None]:
"""Discover and load the remote log handler from a logging config module."""
"""
Look up the optional remote-log handler alongside a logging dictConfig.

``[logging] logging_config_class`` is a dotted path to a
``logging.config.dictConfig`` dict. After importing the dict, this helper
re-imports the enclosing module and reads two optional module-level
attributes via ``getattr``:

* ``REMOTE_TASK_LOG`` — :class:`RemoteLogIO` / :class:`RemoteLogStreamIO`
instance that uploads and reads task logs.
* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id for that
backend.

Either may be ``None`` immediately after this call; provider task handlers
can still populate ``REMOTE_TASK_LOG`` from inside ``__init__`` when
``dictConfig`` instantiates them (deprecated path). Callers that want to
warn on missing remote-log configuration should re-check
``_ActiveLoggingConfig.remote_task_log`` *after* ``dictConfig`` has run.
"""
# Sometimes we end up with `""` as the value!
logging_class_path = logging_class_path or fallback_path

Expand Down
9 changes: 7 additions & 2 deletions task-sdk/src/airflow/sdk/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,20 @@ def init_log_file(local_relative_path: str) -> Path:


def _load_logging_config() -> None:
"""Load and cache the remote logging configuration from SDK config."""
"""
Load and cache the remote logging configuration from SDK config.

SDK mirror of :func:`airflow.logging_config.load_logging_config` — see that
function for the ``logging_config_class`` / ``REMOTE_TASK_LOG`` contract.
"""
from airflow.sdk._shared.logging.remote import discover_remote_log_handler
from airflow.sdk._shared.module_loading import import_string
from airflow.sdk.configuration import conf

fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)

# Load remote logging configuration using shared discovery logic
# Load remote logging configuration using shared discovery logic.
remote_task_log, default_remote_conn_id = discover_remote_log_handler(
logging_class_path, fallback, import_string
)
Expand Down
Loading