Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,15 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
line = raw_line.decode("utf-8", errors="backslashreplace")
line_timestamp, message = parse_log_line(line)
if line_timestamp: # detect new log line
if not message:
# Empty container write: advance the resume
# marker but do not emit a noisy ``[base] ``
# row or break the previous buffered message
# with a stray continuation (#36571).
self.container_log_times[
(pod.metadata.namespace, pod.metadata.name, container_name)
] = line_timestamp
continue
if message_to_log is None: # first line in the log
message_to_log = message
message_timestamp = line_timestamp
Expand Down Expand Up @@ -1108,12 +1117,17 @@ def parse_log_line(line: str) -> tuple[DateTime | None, str]:
:param line: k8s log line
:return: timestamp and log message
"""
timestamp, sep, message = line.strip().partition(" ")
if not sep:
return None, line
# Strip only the trailing newline so an empty container write (which
# kubelet streams back as "<rfc3339-ts> \n" under ``timestamps=True``)
# keeps the separator space and is recognised as a real log line, not a
# continuation of the previous one (#36571). When kubelet emits "<ts>\n"
# with no trailing space, ``partition`` returns the whole line as
# ``timestamp`` and ``message`` as ``""`` -- the parse below handles both.
stripped = line.rstrip("\n")
timestamp, _, message = stripped.partition(" ")
try:
last_log_time = cast("DateTime", pendulum.parse(timestamp))
except ParserError:
except (ParserError, ValueError):
return None, line
return last_log_time, message

Expand Down Expand Up @@ -1220,6 +1234,11 @@ async def fetch_container_logs_before_current_sec(
if line_timestamp and line_timestamp.replace(microsecond=0) == now_seconds:
break
if line_timestamp: # detect new log line
if not message:
# Empty container write -- drop it instead of letting
# it overwrite the buffered message with "" or be
# emitted as a noisy ``[base] `` row (#36571).
continue
if message_to_log is None: # first line in the log
message_to_log = message
else: # previous log line is complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,40 @@ def test_parse_log_line():
assert line == log_message


@pytest.mark.parametrize(
("raw_line", "expected_ts"),
[
pytest.param(
"2026-05-28T13:07:57.030578889Z \n",
"2026-05-28T13:07:57.030578889Z",
id="trailing-space-and-newline",
),
pytest.param(
"2026-05-28T13:07:57.030581518Z\n",
"2026-05-28T13:07:57.030581518Z",
id="newline-only",
),
pytest.param(
"2026-05-28T13:07:57.030642740Z ",
"2026-05-28T13:07:57.030642740Z",
id="trailing-space-no-newline",
),
],
)
def test_parse_log_line_handles_empty_container_writes(raw_line, expected_ts):
"""
Regression for #36571: an empty container write (just ``\\n``) is streamed
back by kubelet as ``"<rfc3339-ts> \\n"`` when ``timestamps=True``. The
parser must recognise it as a real (empty) log line rather than as a
continuation of the previous one, otherwise the bare timestamp is appended
onto the previous buffered message and emitted unformatted into task logs.
"""
timestamp, message = parse_log_line(raw_line)

assert timestamp == pendulum.parse(expected_ts)
assert message == ""


def test_log_pod_event():
"""Test logging a pod event."""
mock_pod_manager = mock.Mock()
Expand Down Expand Up @@ -782,6 +816,41 @@ def test_parse_multi_line_logs(self, mock_read_pod_logs, mock_container_is_runni
assert "message3 line1" in caplog.text
assert "ERROR" not in caplog.text

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
def test_empty_container_lines_do_not_pollute_previous_message(
self, mock_read_pod_logs, mock_container_is_running, caplog
):
"""
Regression for #36571: when a container writes empty lines, kubelet
returns them as ``"<ts> \\n"`` rows. Previously these slipped through
``parse_log_line`` as "no timestamp" and were appended as continuations
onto the previous buffered message, which then emitted multi-line
records where only the first line carried the Airflow log prefix --
leaving bare ``<ts>`` rows in task logs that downstream pendulum-based
parsers ``(file_task_handler._parse_timestamp)`` then choked on.
"""
log = (
"2026-05-28T13:07:50.160Z first test line\n"
"2026-05-28T13:07:57.030578889Z \n"
"2026-05-28T13:07:57.030581518Z\n"
"2026-05-28T13:07:57.030642740Z \n"
"2026-05-28T13:07:57.034Z last test line\n"
)
mock_read_pod_logs.return_value = [bytes(line, "utf-8") for line in log.split("\n")]
mock_container_is_running.return_value = False

with caplog.at_level(logging.INFO):
self.pod_manager.fetch_container_logs(mock.MagicMock(), "base", follow=True)

assert "first test line" in caplog.text
assert "last test line" in caplog.text
# The empty-line timestamps must not leak into the previous message and
# must not be emitted as orphan rows.
assert "2026-05-28T13:07:57.030578889Z" not in caplog.text
assert "2026-05-28T13:07:57.030581518Z" not in caplog.text
assert "2026-05-28T13:07:57.030642740Z" not in caplog.text

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
def test_container_log_times_tracks_last_timestamp(self, mock_read_pod_logs, mock_container_is_running):
Expand Down
Loading