From 20e8d905744021a1234406ce87f32ca4a329e8ad Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 28 May 2026 16:26:03 +0100 Subject: [PATCH] Fix KubernetesPodOperator emitting orphan timestamps for empty container writes When a container running under KPO writes an empty line, kubelet streams it back (with ``timestamps=True``) as ``" \n"`` -- a timestamp followed by a separator space and an empty message. ``parse_log_line`` called ``line.strip().partition(" ")`` which removed the trailing separator space before partitioning, so the function returned ``timestamp=None`` and the caller treated the line as a continuation of the previous buffered log record. The bare RFC3339 string was then appended onto the previous message and emitted as a multi-line log where only the first line carried the Airflow ``[ts] {pod_manager.py:N} INFO -`` prefix, leaving unprefixed timestamp rows interleaved in task logs. Downstream that breaks ``airflow.utils.log.file_task_handler._parse_timestamp`` (which feeds the line to ``pendulum.parse`` after stripping ``[]``): malformed fragments from these orphan rows can raise ``ValueError: month must be in 1..12`` and fail the task entirely. The fix: * ``parse_log_line`` no longer pre-strips the line; it ``rstrip("\n")`` only and partitions on the original separator, so empty container writes are recognised as ``(timestamp, "")`` rather than as continuations. It also catches ``ValueError`` (not just ``ParserError``) so a malformed timestamp can never escape. * The sync and async log consumer loops skip emit for empty messages -- the resume marker still advances in the sync path, but no noisy ``[base] `` row is written. Regressed in #33675 (cncf-kubernetes 7.5.0, Aug 2023) which replaced the original ``line.find(" ")`` split with the strip+partition pattern under the banner of a refactor. The pre-refactor implementation correctly handled `` \n`` because ``find(" ")`` matched the separator space directly. Reported in #36571 against 7.12.0 / 7.13.0, still reproducible on the current main. --- .../cncf/kubernetes/utils/pod_manager.py | 27 ++++++-- .../cncf/kubernetes/utils/test_pod_manager.py | 69 +++++++++++++++++++ 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index e4bb7ae9115a2..835d239f209f2 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -566,6 +566,15 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None line = raw_line.decode("utf-8", errors="backslashreplace") line_timestamp, message = parse_log_line(line) if line_timestamp: # detect new log line + if not message: + # Empty container write: advance the resume + # marker but do not emit a noisy ``[base] `` + # row or break the previous buffered message + # with a stray continuation (#36571). + self.container_log_times[ + (pod.metadata.namespace, pod.metadata.name, container_name) + ] = line_timestamp + continue if message_to_log is None: # first line in the log message_to_log = message message_timestamp = line_timestamp @@ -1108,12 +1117,17 @@ def parse_log_line(line: str) -> tuple[DateTime | None, str]: :param line: k8s log line :return: timestamp and log message """ - timestamp, sep, message = line.strip().partition(" ") - if not sep: - return None, line + # Strip only the trailing newline so an empty container write (which + # kubelet streams back as " \n" under ``timestamps=True``) + # keeps the separator space and is recognised as a real log line, not a + # continuation of the previous one (#36571). When kubelet emits "\n" + # with no trailing space, ``partition`` returns the whole line as + # ``timestamp`` and ``message`` as ``""`` -- the parse below handles both. + stripped = line.rstrip("\n") + timestamp, _, message = stripped.partition(" ") try: last_log_time = cast("DateTime", pendulum.parse(timestamp)) - except ParserError: + except (ParserError, ValueError): return None, line return last_log_time, message @@ -1220,6 +1234,11 @@ async def fetch_container_logs_before_current_sec( if line_timestamp and line_timestamp.replace(microsecond=0) == now_seconds: break if line_timestamp: # detect new log line + if not message: + # Empty container write -- drop it instead of letting + # it overwrite the buffered message with "" or be + # emitted as a noisy ``[base] `` row (#36571). + continue if message_to_log is None: # first line in the log message_to_log = message else: # previous log line is complete diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 27cfd87fc6d6a..e2390f90dc3a9 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -88,6 +88,40 @@ def test_parse_log_line(): assert line == log_message +@pytest.mark.parametrize( + ("raw_line", "expected_ts"), + [ + pytest.param( + "2026-05-28T13:07:57.030578889Z \n", + "2026-05-28T13:07:57.030578889Z", + id="trailing-space-and-newline", + ), + pytest.param( + "2026-05-28T13:07:57.030581518Z\n", + "2026-05-28T13:07:57.030581518Z", + id="newline-only", + ), + pytest.param( + "2026-05-28T13:07:57.030642740Z ", + "2026-05-28T13:07:57.030642740Z", + id="trailing-space-no-newline", + ), + ], +) +def test_parse_log_line_handles_empty_container_writes(raw_line, expected_ts): + """ + Regression for #36571: an empty container write (just ``\\n``) is streamed + back by kubelet as ``" \\n"`` when ``timestamps=True``. The + parser must recognise it as a real (empty) log line rather than as a + continuation of the previous one, otherwise the bare timestamp is appended + onto the previous buffered message and emitted unformatted into task logs. + """ + timestamp, message = parse_log_line(raw_line) + + assert timestamp == pendulum.parse(expected_ts) + assert message == "" + + def test_log_pod_event(): """Test logging a pod event.""" mock_pod_manager = mock.Mock() @@ -782,6 +816,41 @@ def test_parse_multi_line_logs(self, mock_read_pod_logs, mock_container_is_runni assert "message3 line1" in caplog.text assert "ERROR" not in caplog.text + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") + def test_empty_container_lines_do_not_pollute_previous_message( + self, mock_read_pod_logs, mock_container_is_running, caplog + ): + """ + Regression for #36571: when a container writes empty lines, kubelet + returns them as ``" \\n"`` rows. Previously these slipped through + ``parse_log_line`` as "no timestamp" and were appended as continuations + onto the previous buffered message, which then emitted multi-line + records where only the first line carried the Airflow log prefix -- + leaving bare ```` rows in task logs that downstream pendulum-based + parsers ``(file_task_handler._parse_timestamp)`` then choked on. + """ + log = ( + "2026-05-28T13:07:50.160Z first test line\n" + "2026-05-28T13:07:57.030578889Z \n" + "2026-05-28T13:07:57.030581518Z\n" + "2026-05-28T13:07:57.030642740Z \n" + "2026-05-28T13:07:57.034Z last test line\n" + ) + mock_read_pod_logs.return_value = [bytes(line, "utf-8") for line in log.split("\n")] + mock_container_is_running.return_value = False + + with caplog.at_level(logging.INFO): + self.pod_manager.fetch_container_logs(mock.MagicMock(), "base", follow=True) + + assert "first test line" in caplog.text + assert "last test line" in caplog.text + # The empty-line timestamps must not leak into the previous message and + # must not be emitted as orphan rows. + assert "2026-05-28T13:07:57.030578889Z" not in caplog.text + assert "2026-05-28T13:07:57.030581518Z" not in caplog.text + assert "2026-05-28T13:07:57.030642740Z" not in caplog.text + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") def test_container_log_times_tracks_last_timestamp(self, mock_read_pod_logs, mock_container_is_running):