Skip to content

Commit

Permalink
Fix double logging with some task logging handler (#27591)
Browse files Browse the repository at this point in the history
A previous change to fix disappearing log messages turned on propagation
for the `airflow.task` logger, and disabled it again after
`set_context()` was called, but only if that function returned a special
sentinel value.

For the "core" task log handlers we returned them, but some providers
weren't "correctly" subclassing and weren't returning this sentinel
value.

The fix here is to change the logic from disable only on special value
to disable by default and maintain propagation on special value; this
means that if a handler doesn't return the value from `super()` (or if
they don't even subclass the default handler) propagation will still be
disabled by default.

(cherry picked from commit 933fefc)
  • Loading branch information
ashb authored and ephraimbuddy committed Nov 10, 2022
1 parent 4815734 commit 5ab2236
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 24 deletions.
7 changes: 3 additions & 4 deletions airflow/utils/log/file_task_handler.py
Expand Up @@ -29,12 +29,12 @@
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session

if TYPE_CHECKING:
from airflow.models import TaskInstance
from airflow.utils.log.logging_mixin import SetContextPropagate


class FileTaskHandler(logging.Handler):
Expand All @@ -61,7 +61,7 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
stacklevel=(2 if type(self) == FileTaskHandler else 3),
)

def set_context(self, ti: TaskInstance):
def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
"""
Provide task_instance context to airflow task handler.
Expand All @@ -72,8 +72,7 @@ def set_context(self, ti: TaskInstance):
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)

return DISABLE_PROPOGATE
return None

def emit(self, record):
if self.handler:
Expand Down
41 changes: 34 additions & 7 deletions airflow/utils/log/logging_mixin.py
Expand Up @@ -18,18 +18,35 @@
from __future__ import annotations

import abc
import enum
import logging
import re
import sys
from io import IOBase
from logging import Handler, Logger, StreamHandler
from typing import IO
from typing import IO, cast

# 7-bit C1 ANSI escape sequences
ANSI_ESCAPE = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')

# Private: A sentinel object
DISABLE_PROPOGATE = object()

# Private: A sentinel objects
class SetContextPropagate(enum.Enum):
""":meta private:"""

# If a `set_context` function wants to _keep_ propagation set on it's logger it needs to return this
# special value.
MAINTAIN_PROPAGATE = object()
# Don't use this one anymore!
DISABLE_PROPAGATE = object()


def __getattr__(name):
if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
# Compat for spelling on off chance someone is using this directly
# And old object that isn't needed anymore
return SetContextPropagate.DISABLE_PROPAGATE
raise AttributeError(f"module {__name__} has no attribute {name}")


def remove_escape_codes(text: str) -> str:
Expand Down Expand Up @@ -183,13 +200,23 @@ def set_context(logger, value):
:param value: value to set
"""
while logger:
orig_propagate = logger.propagate
for handler in logger.handlers:
# Not all handlers need to have context passed in so we ignore
# the error when handlers do not have set_context defined.
set_context = getattr(handler, 'set_context', None)
if set_context and set_context(value) is DISABLE_PROPOGATE:
logger.propagate = False
if logger.propagate is True:

# Don't use getatrr so we have type checking. And we don't care if handler is actually a
# FileTaskHandler, it just needs to have a set_context function!
if hasattr(handler, "set_context"):
from airflow.utils.log.file_task_handler import FileTaskHandler

flag = cast(FileTaskHandler, handler).set_context(value)
# By default we disable propagate once we have configured the logger, unless that handler
# explicitly asks us to keep it on.
if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
logger.propagate = False
if orig_propagate is True:
# If we were set to propagate before we turned if off, then keep passing set_context up
logger = logger.parent
else:
break
95 changes: 82 additions & 13 deletions tests/utils/test_logging_mixin.py
Expand Up @@ -17,29 +17,64 @@
# under the License.
from __future__ import annotations

import logging
import sys
import warnings
from unittest import mock

from airflow.utils.log.logging_mixin import StreamLogWriter, set_context
import pytest

from airflow.utils.log.logging_mixin import SetContextPropagate, StreamLogWriter, set_context


@pytest.fixture
def logger():
parent = logging.getLogger(__name__)
parent.propagate = False
yield parent

parent.propagate = True


@pytest.fixture
def child_logger(logger):
yield logger.getChild("child")


@pytest.fixture
def parent_child_handlers(child_logger):
parent_handler = logging.NullHandler()
parent_handler.handle = mock.MagicMock(name="parent_handler.handle")

child_handler = logging.NullHandler()
child_handler.handle = mock.MagicMock(name="handler.handle")

logger = child_logger.parent
logger.addHandler(parent_handler)

child_logger.addHandler(child_handler),
child_logger.propagate = True

yield parent_handler, child_handler

logger.removeHandler(parent_handler)
child_logger.removeHandler(child_handler)


class TestLoggingMixin:
def setup_method(self):
warnings.filterwarnings(action='always')

def test_set_context(self):
handler1 = mock.MagicMock()
handler2 = mock.MagicMock()
parent = mock.MagicMock()
def test_set_context(self, child_logger, parent_child_handlers):
handler1, handler2 = parent_child_handlers
handler1.set_context = mock.MagicMock()
handler2.set_context = mock.MagicMock()

parent = logging.getLogger(__name__)
parent.propagate = False
parent.handlers = [
handler1,
]
log = mock.MagicMock()
log.handlers = [
handler2,
]
log.parent = parent
parent.addHandler(handler1)
log = parent.getChild("child")
log.addHandler(handler2),
log.propagate = True

value = "test"
Expand Down Expand Up @@ -105,3 +140,37 @@ def test_iobase_compatibility(self):
assert not log.closed
# has no specific effect
log.close()


@pytest.mark.parametrize(["maintain_propagate"], [[SetContextPropagate.MAINTAIN_PROPAGATE], [None]])
def test_set_context_propagation(parent_child_handlers, child_logger, maintain_propagate):
# Test the behaviour of set_context and logger propagation and the MAINTAIN_PROPAGATE return

parent_handler, handler = parent_child_handlers
handler.set_context = mock.MagicMock(return_value=maintain_propagate)

# Before settting_context, ensure logs make it to the parent
line = sys._getframe().f_lineno + 1
record = child_logger.makeRecord(
child_logger.name, logging.INFO, __file__, line, "test message", [], None
)
child_logger.handle(record)

handler.handle.assert_called_once_with(record)
# Should call the parent handler too in the default/unconfigured case
parent_handler.handle.assert_called_once_with(record)

parent_handler.handle.reset_mock()
handler.handle.reset_mock()

# Ensure that once we've called set_context on the handler we disable propagation to parent loggers by
# default!
set_context(child_logger, {})

child_logger.handle(record)

handler.handle.assert_called_once_with(record)
if maintain_propagate is SetContextPropagate.MAINTAIN_PROPAGATE:
parent_handler.handle.assert_called_once_with(record)
else:
parent_handler.handle.assert_not_called()

0 comments on commit 5ab2236

Please sign in to comment.