Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions airflow-core/src/airflow/ui/src/mocks/handlers/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,10 @@ export const handlers: Array<HttpHandler> = [
http.get("/api/v2/dags/log_grouping/dagRuns/manual__2025-02-18T12:19/taskInstances/generate/logs/1", () =>
HttpResponse.json({
content: [
{ event: "::group::Log message source details" },
{
event: "::group::Log message source details",
sources: [
event:
"/home/airflow/logs/dag_id=tutorial_dag/run_id=manual__2025-02-28T05:18:54.249762+00:00/task_id=load/attempt=1.log",
],
},
{ event: "::endgroup::" },
{
Expand Down Expand Up @@ -211,11 +210,10 @@ export const handlers: Array<HttpHandler> = [
http.get("/api/v2/dags/log_grouping/dagRuns/manual__2025-02-18T12:19/taskInstances/ti_context/logs/1", () =>
HttpResponse.json({
content: [
{ event: "::group::Log message source details" },
{
event: "::group::Log message source details",
sources: [
event:
"/home/airflow/logs/dag_id=log_grouping/run_id=manual__2025-02-18T12:19/task_id=ti_context/attempt=1.log",
],
},
{ event: "::endgroup::" },
{
Expand Down Expand Up @@ -255,11 +253,10 @@ export const handlers: Array<HttpHandler> = [
http.get("/api/v2/dags/log_grouping/dagRuns/manual__2025-02-18T12:19/taskInstances/log_source/logs/1", () =>
HttpResponse.json({
content: [
{ event: "::group::Log message source details", timestamp: null },
{
event: "::group::Log message source details",
sources: [
event:
"/root/airflow/logs/dag_id=log_grouping/run_id=manual__2025-02-18T12:19/task_id=log_source/attempt=1.log",
],
timestamp: null,
},
{ event: "::endgroup::", timestamp: null },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ describe("Task log grouping", () => {
await waitForLogs();

// Group headers use the summary-{name} testid pattern and are always visible
const summarySource = screen.getByTestId(
'summary-Log message source details sources=["/home/airflow/logs/dag_id=tutorial_dag/run_id=manual__2025-02-28T05:18:54.249762+00:00/task_id=load/attempt=1.log"]',
);
const summarySource = screen.getByTestId("summary-Log message source details");

expect(summarySource).toBeVisible();

Expand Down Expand Up @@ -190,9 +188,7 @@ describe("Task Identity preamble", () => {

await waitForLogs();

const sourceGroup = screen.getByTestId(
'summary-Log message source details sources=["/home/airflow/logs/dag_id=log_grouping/run_id=manual__2025-02-18T12:19/task_id=ti_context/attempt=1.log"]',
);
const sourceGroup = screen.getByTestId("summary-Log message source details");

expect(sourceGroup).toBeInTheDocument();

Expand Down Expand Up @@ -432,8 +428,8 @@ describe("Task log search", () => {

fireEvent.click(summaryDependency);

await expectRenderedLineNumber(/dep_context=non-requeueable/iu, 0);
await expectRenderedLineNumber(/dep_context=requeueable/iu, 1);
await expectRenderedLineNumber(/starting attempt 1 of 3/iu, 2);
await expectRenderedLineNumber(/dep_context=non-requeueable/iu, 1);
await expectRenderedLineNumber(/dep_context=requeueable/iu, 2);
await expectRenderedLineNumber(/starting attempt 1 of 3/iu, 3);
}, 10_000);
});
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ describe("getDownloadText", () => {
it("places Task Identity preamble after the source details endgroup, before the first log line", () => {
const fetchedData = {
content: [
{ event: "::group::Log message source details", sources: ["/logs/a.log", "/logs/b.log"] },
{ event: "::group::Log message source details" },
{ event: "/logs/a.log" },
{ event: "/logs/b.log" },
{ event: "some source detail" },
{ event: "::endgroup::" },
tiLine("First log line", "2026-01-01T00:00:00Z"),
Expand All @@ -68,7 +70,8 @@ describe("getDownloadText", () => {
it("does not include TI context fields on individual log lines", () => {
const fetchedData = {
content: [
{ event: "::group::Log message source details", sources: ["/logs/a.log"] },
{ event: "::group::Log message source details" },
{ event: "/logs/a.log" },
{ event: "::endgroup::" },
tiLine("Task started", "2026-01-01T00:00:00Z"),
],
Expand All @@ -87,7 +90,8 @@ describe("getDownloadText", () => {
it("omits the preamble when no TI context fields are present", () => {
const fetchedData = {
content: [
{ event: "::group::Log message source details", sources: ["/logs/a.log"] },
{ event: "::group::Log message source details" },
{ event: "/logs/a.log" },
{ event: "::endgroup::" },
{ event: "plain log line", level: "info", timestamp: "2026-01-01T00:00:00Z" },
],
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,8 @@ def _read(
# Log message source details are grouped: they are not relevant for most users and can
# distract them from finding the root cause of their errors
header = [
StructuredLogMessage(event="::group::Log message source details", sources=source_list), # type: ignore[call-arg]
StructuredLogMessage(event="::group::Log message source details"),
*[StructuredLogMessage(event=source) for source in source_list],
StructuredLogMessage(event="::endgroup::"),
]
end_of_log = ti.try_number != try_number or ti.state not in (
Expand Down
5 changes: 1 addition & 4 deletions airflow-core/src/airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ def get_no_log_state_message(ti: TaskInstance | TaskInstanceHistory) -> Iterator
else:
msg = "No logs available for this task."

yield StructuredLogMessage(
timestamp=None,
event="::group::Log message source details",
)
yield StructuredLogMessage(timestamp=None, event="::group::Log message source details")
yield StructuredLogMessage(timestamp=None, event="::endgroup::")
yield StructuredLogMessage(
timestamp=ti.updated_at or datetime.now(timezone.utc),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ def test_should_respond_200_json(self, try_number):
expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log"
log_content = "Log for testing." if try_number == 1 else "Log for testing 2."
assert response.status_code == 200, response.json()
resp_contnt = response.json()["content"]
assert expected_filename in resp_contnt[0]["sources"]
assert log_content in resp_contnt[2]["event"]
resp_content = response.json()["content"]
assert expected_filename in resp_content[1]["event"]
assert log_content in resp_content[3]["event"]

assert response.json()["continuation_token"] is None
assert response.status_code == 200
Expand Down
32 changes: 14 additions & 18 deletions airflow-core/tests/unit/utils/log/test_log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ def test_test_read_log_chunks_should_read_one_try(self):

logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == [
f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"
]
assert logs[1].event == "::endgroup::"
assert logs[2].event == "try_number=1."
assert (
logs[1].event == f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"
)
assert logs[2].event == "::endgroup::"
assert logs[3].event == "try_number=1."
assert metadata == {"end_of_log": True, "log_pos": 1}

def test_test_read_log_chunks_should_read_latest_files(self):
Expand All @@ -146,11 +146,11 @@ def test_test_read_log_chunks_should_read_latest_files(self):

logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == [
f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"
]
assert logs[1].event == "::endgroup::"
assert logs[2].event == f"try_number={ti.try_number}."
assert (
logs[1].event == f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"
)
assert logs[2].event == "::endgroup::"
assert logs[3].event == f"try_number={ti.try_number}."
assert metadata == {"end_of_log": True, "log_pos": 1}

def test_test_test_read_log_stream_should_read_one_try(self):
Expand All @@ -160,10 +160,8 @@ def test_test_test_read_log_stream_should_read_one_try(self):
stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={})

assert list(stream) == [
'{"timestamp":null,'
'"event":"::group::Log message source details",'
f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"]'
"}\n",
'{"timestamp":null,"event":"::group::Log message source details"}\n',
f'{{"timestamp":null,"event":"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"}}\n',
'{"timestamp":null,"event":"::endgroup::"}\n',
'{"timestamp":null,"event":"try_number=1."}\n',
]
Expand All @@ -174,10 +172,8 @@ def test_test_test_read_log_stream_should_read_latest_logs(self):
stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={})

assert list(stream) == [
'{"timestamp":null,'
'"event":"::group::Log message source details",'
f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"]'
"}\n",
'{"timestamp":null,"event":"::group::Log message source details"}\n',
f'{{"timestamp":null,"event":"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"}}\n',
'{"timestamp":null,"event":"::endgroup::"}\n',
'{"timestamp":null,"event":"try_number=3."}\n',
]
Expand Down
46 changes: 24 additions & 22 deletions airflow-core/tests/unit/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,20 +545,20 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
logical_date=DEFAULT_DATE,
)
ti.state = TaskInstanceState.SUCCESS # we're testing scenario when task is done
expected_logs = ["::group::Log message source details", "::endgroup::"]
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
if remote_logs:
fth._read_remote_logs = mock.Mock()
fth._read_remote_logs.return_value = ["found remote logs"], ["remote\nlog\ncontent"]
expected_logs.extend(
[
"remote",
"log",
"content",
]
)
expected_logs = [
"::group::Log message source details",
"found remote logs",
"::endgroup::",
"remote",
"log",
"content",
]
if local_logs:
fth._read_from_local = mock.Mock()
fth._read_from_local.return_value = (
Expand All @@ -567,27 +567,29 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
)
# only when not read from remote and TI is unfinished will read from local
if not remote_logs:
expected_logs.extend(
[
"local",
"log",
"content",
]
)
expected_logs = [
"::group::Log message source details",
"found local logs",
"::endgroup::",
"local",
"log",
"content",
]
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = (
["this message"],
[convert_list_to_stream("this\nlog\ncontent".splitlines())],
)
# only when not read from remote and not read from local will read from logs server
if served_logs_checked:
expected_logs.extend(
[
"this",
"log",
"content",
]
)
expected_logs = [
"::group::Log message source details",
"this message",
"::endgroup::",
"this",
"log",
"content",
]

logs, metadata = fth._read(ti=ti, try_number=1)
if served_logs_checked:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ def test_remote_logging_s3(self):
run_id=resp["dag_run_id"],
)

task_log_sources = [
source for content in task_logs.get("content", [{}]) for source in content.get("sources", [])
]
task_log_sources = [content.get("event", []) for content in task_logs.get("content", [{}])]
response = s3_client.list_objects_v2(Bucket=bucket_name)

if "Contents" not in response:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ def extract_events(logs: Iterable[StructuredLogMessage], skip_source_info=True)
"""Helper function to return just the event (a.k.a message) from a list of StructuredLogMessage"""
logs = iter(logs)
if skip_source_info:
in_source_group = False

def is_source_group(log: StructuredLogMessage) -> bool:
return not hasattr(log, "timestamp") or log.event == "::endgroup::" or hasattr(log, "sources")
nonlocal in_source_group
if not in_source_group and log.event == "::group::Log message source details":
in_source_group = True
elif in_source_group and log.event == "::endgroup::":
in_source_group = False
return not hasattr(log, "timestamp") or log.event == "::endgroup::" or in_source_group

logs = itertools.dropwhile(is_source_group, logs)

Expand Down
1 change: 1 addition & 0 deletions devel-common/src/tests_common/test_utils/version_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_1_7_PLUS = get_base_airflow_version_tuple() >= (3, 1, 7)
AIRFLOW_V_3_1_9_PLUS = get_base_airflow_version_tuple() >= (3, 1, 9)
AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
AIRFLOW_V_3_2_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 2)
AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0)

if AIRFLOW_V_3_1_PLUS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from tests_common.test_utils.dag import sync_dag_to_db
from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, clear_db_runs
from tests_common.test_utils.taskinstance import create_task_instance
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_2_PLUS

try:
from airflow.sdk.timezone import datetime
Expand Down Expand Up @@ -295,7 +295,17 @@ def test_read(self):

expected_s3_uri = f"s3://bucket/{self.remote_log_key}"

if AIRFLOW_V_3_0_PLUS:
if AIRFLOW_V_3_2_2_PLUS:
log = list(log)
assert log[0].event == "::group::Log message source details"
assert expected_s3_uri in log[1].event
assert log[3].event == "::endgroup::"
assert log[4].event == "Log line"
assert log[5].event == "Line 2"
assert log[6].event == "Log line 3"
assert log[7].event == "Line 4"
assert metadata == {"end_of_log": True, "log_pos": 4}
elif AIRFLOW_V_3_0_PLUS:
log = list(log)
assert log[0].event == "::group::Log message source details"
assert expected_s3_uri in log[0].sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from tests_common.test_utils.file_task_handler import (
convert_list_to_stream,
)
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_2_PLUS

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -92,7 +92,12 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc
logs, metadata = fth._read(ti=ti, try_number=1)
fth._read_from_logs_server.assert_called_once()

if AIRFLOW_V_3_0_PLUS:
if AIRFLOW_V_3_2_2_PLUS:
logs = list(logs)
assert logs[1].event == "this message"
assert [x.event for x in logs[-3:]] == ["this", "log", "content"]
assert metadata == {"end_of_log": False, "log_pos": 3}
elif AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[0].sources == ["this message"]
assert [x.event for x in logs[-3:]] == ["this", "log", "content"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,8 @@ def _read(
from airflow.utils.log.file_task_handler import StructuredLogMessage

header = [
StructuredLogMessage(
event="::group::Log message source details",
sources=[host for host in logs_by_host.keys()],
), # type: ignore[call-arg]
StructuredLogMessage(event="::group::Log message source details"),
*[StructuredLogMessage(event=host) for host in logs_by_host.keys()],
StructuredLogMessage(event="::endgroup::"),
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ def _assert_log_events(logs, metadatas, *, expected_events: list[str], expected_
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == expected_sources
assert logs[1].event == "::endgroup::"
assert [log.event for log in logs[2:]] == expected_events
for i, source in enumerate(expected_sources, start=1):
assert logs[i].event == source
assert logs[1 + len(expected_sources)].event == "::endgroup::"
assert [log.event for log in logs[(2 + len(expected_sources)) :]] == expected_events
else:
assert len(logs) == 1
assert len(logs[0]) == 1
Expand Down
Loading
Loading