Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ def make_client(self) -> Client:
return client

def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, req_id: int) -> None:

resp: BaseModel | None = None
dump_opts: dict[str, bool] = {}
self._last_runner_comms = time.monotonic()
Expand Down Expand Up @@ -936,8 +935,8 @@ def get_logger(trigger_id: int) -> WrappedLogger:
if exc := event.pop("exception", None):
# TODO: convert the dict back to a pretty stack trace
event["error_detail"] = exc
if lvl_name := NAME_TO_LEVEL.get(event.pop("level")):
log.log(lvl_name, event.pop("event", None), **event)
lvl_name = NAME_TO_LEVEL.get(event.pop("level", "warning"), logging.WARNING)
log.log(lvl_name, event.pop("event", None), **event)

@classmethod
def run_in_process(cls):
Expand Down
42 changes: 42 additions & 0 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,48 @@ async def asend_side_effect(msg):
assert len(first_call.events) == 3
assert len(second_call.events) == 2

@pytest.mark.parametrize(
"payload",
[
pytest.param(
{"event": "hello"},
id="missing-level-key",
),
pytest.param(
{"event": "hello", "level": "bogus_unknown_level"},
id="unknown-level-value",
),
],
)
def test_process_log_messages_tolerates_bad_level(self, payload):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth adding assertions for log level

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed added assert mock_log.log.call_args[0][0] == logging.WARNING

"""Log events with missing or unknown 'level' must not raise KeyError."""
import json

trigger_runner = TriggerRunner()
gen = trigger_runner._process_log_messages_from_subprocess()
next(gen) # advance to yield

# Must not raise KeyError (the bug this test guards against)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this case only checks if exception is not raised. Missing log lines is still silently ignored

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed test now mocks structlog.get_logger and asserts mock_log.log.assert_called_once()

gen.send(json.dumps(payload).encode())

def test_process_log_messages_valid_event_processed(self):
"""Log events with a valid 'level' key are passed to the logger."""
import json
from unittest.mock import patch

trigger_runner = TriggerRunner()
mock_log = MagicMock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like the import is missing here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

with patch("structlog.get_logger", return_value=mock_log):
gen = trigger_runner._process_log_messages_from_subprocess()
next(gen)
gen.send(json.dumps({"event": "test message", "level": "info"}).encode())

mock_log.log.assert_called_once()
args = mock_log.log.call_args
# Positional args: (numeric_level, event_message)
assert args[0][0] == 20 # INFO level
assert args[0][1] == "test message"


@pytest.mark.asyncio
@pytest.mark.usefixtures("testing_dag_bundle")
Expand Down
Loading