diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index cb8e45b343d5f..43fb60f11b012 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -509,7 +509,6 @@ def make_client(self) -> Client: return client def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, req_id: int) -> None: - resp: BaseModel | None = None dump_opts: dict[str, bool] = {} self._last_runner_comms = time.monotonic() @@ -936,8 +935,8 @@ def get_logger(trigger_id: int) -> WrappedLogger: if exc := event.pop("exception", None): # TODO: convert the dict back to a pretty stack trace event["error_detail"] = exc - if lvl_name := NAME_TO_LEVEL.get(event.pop("level")): - log.log(lvl_name, event.pop("event", None), **event) + lvl_name = NAME_TO_LEVEL.get(event.pop("level", "warning"), logging.WARNING) + log.log(lvl_name, event.pop("event", None), **event) @classmethod def run_in_process(cls): diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index f42c84a2bf5ec..da647cd665581 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -1051,6 +1051,48 @@ async def asend_side_effect(msg): assert len(first_call.events) == 3 assert len(second_call.events) == 2 + @pytest.mark.parametrize( + "payload", + [ + pytest.param( + {"event": "hello"}, + id="missing-level-key", + ), + pytest.param( + {"event": "hello", "level": "bogus_unknown_level"}, + id="unknown-level-value", + ), + ], + ) + def test_process_log_messages_tolerates_bad_level(self, payload): + """Log events with missing or unknown 'level' must not raise KeyError.""" + import json + + trigger_runner = TriggerRunner() + gen = trigger_runner._process_log_messages_from_subprocess() + next(gen) # advance to yield + + # Must not raise KeyError (the bug this test guards against) + gen.send(json.dumps(payload).encode()) + + def test_process_log_messages_valid_event_processed(self): + """Log events with a valid 'level' key are passed to the logger.""" + import json + from unittest.mock import patch + + trigger_runner = TriggerRunner() + mock_log = MagicMock() + with patch("structlog.get_logger", return_value=mock_log): + gen = trigger_runner._process_log_messages_from_subprocess() + next(gen) + gen.send(json.dumps({"event": "test message", "level": "info"}).encode()) + + mock_log.log.assert_called_once() + args = mock_log.log.call_args + # Positional args: (numeric_level, event_message) + assert args[0][0] == 20 # INFO level + assert args[0][1] == "test message" + @pytest.mark.asyncio @pytest.mark.usefixtures("testing_dag_bundle")