From f9393c22a484ed9e6e9a964bd07e8c8ebac69da1 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 7 May 2026 23:59:05 +0200 Subject: [PATCH 1/5] Adjust log message header for expandable sources --- airflow-core/src/airflow/utils/log/file_task_handler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 31d3db8df4dda..3d7302b72a139 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -670,7 +670,8 @@ def _read( # Log message source details are grouped: they are not relevant for most users and can # distract them from finding the root cause of their errors header = [ - StructuredLogMessage(event="::group::Log message source details", sources=source_list), # type: ignore[call-arg] + StructuredLogMessage(event="::group::Log message source details"), + *[StructuredLogMessage(event=source) for source in source_list], StructuredLogMessage(event="::endgroup::"), ] end_of_log = ti.try_number != try_number or ti.state not in ( From 4c1fdc5dfb1cf208690fc18f3e3403978eb8d5e2 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Fri, 8 May 2026 21:26:31 +0200 Subject: [PATCH 2/5] Fix tests --- .../core_api/routes/public/test_log.py | 6 +-- .../tests/unit/utils/log/test_log_reader.py | 32 ++++++------- .../tests/unit/utils/test_log_handlers.py | 46 ++++++++++--------- .../remote_log_tests/test_remote_logging.py | 2 +- .../test_utils/file_task_handler.py | 8 +++- 5 files changed, 49 insertions(+), 45 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py index 113401f85fb71..12fd8fe111ab0 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py @@ -172,9 +172,9 @@ def test_should_respond_200_json(self, try_number): expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" log_content = "Log for testing." if try_number == 1 else "Log for testing 2." assert response.status_code == 200, response.json() - resp_contnt = response.json()["content"] - assert expected_filename in resp_contnt[0]["sources"] - assert log_content in resp_contnt[2]["event"] + resp_content = response.json()["content"] + assert expected_filename in resp_content[1]["event"] + assert log_content in resp_content[3]["event"] assert response.json()["continuation_token"] is None assert response.status_code == 200 diff --git a/airflow-core/tests/unit/utils/log/test_log_reader.py b/airflow-core/tests/unit/utils/log/test_log_reader.py index addcb5ebdee99..1c65cb1806b14 100644 --- a/airflow-core/tests/unit/utils/log/test_log_reader.py +++ b/airflow-core/tests/unit/utils/log/test_log_reader.py @@ -131,11 +131,11 @@ def test_test_read_log_chunks_should_read_one_try(self): logs = list(logs) assert logs[0].event == "::group::Log message source details" - assert logs[0].sources == [ - f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log" - ] - assert logs[1].event == "::endgroup::" - assert logs[2].event == "try_number=1." + assert ( + logs[1].event == f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log" + ) + assert logs[2].event == "::endgroup::" + assert logs[3].event == "try_number=1." assert metadata == {"end_of_log": True, "log_pos": 1} def test_test_read_log_chunks_should_read_latest_files(self): @@ -146,11 +146,11 @@ def test_test_read_log_chunks_should_read_latest_files(self): logs = list(logs) assert logs[0].event == "::group::Log message source details" - assert logs[0].sources == [ - f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log" - ] - assert logs[1].event == "::endgroup::" - assert logs[2].event == f"try_number={ti.try_number}." + assert ( + logs[1].event == f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log" + ) + assert logs[2].event == "::endgroup::" + assert logs[3].event == f"try_number={ti.try_number}." assert metadata == {"end_of_log": True, "log_pos": 1} def test_test_test_read_log_stream_should_read_one_try(self): @@ -160,10 +160,8 @@ def test_test_test_read_log_stream_should_read_one_try(self): stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={}) assert list(stream) == [ - '{"timestamp":null,' - '"event":"::group::Log message source details",' - f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"]' - "}\n", + '{"timestamp":null,"event":"::group::Log message source details"}\n', + f'{{"timestamp":null,"event":"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"}}\n', '{"timestamp":null,"event":"::endgroup::"}\n', '{"timestamp":null,"event":"try_number=1."}\n', ] @@ -174,10 +172,8 @@ def test_test_test_read_log_stream_should_read_latest_logs(self): stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={}) assert list(stream) == [ - '{"timestamp":null,' - '"event":"::group::Log message source details",' - f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"]' - "}\n", + '{"timestamp":null,"event":"::group::Log message source details"}\n', + f'{{"timestamp":null,"event":"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"}}\n', '{"timestamp":null,"event":"::endgroup::"}\n', '{"timestamp":null,"event":"try_number=3."}\n', ] diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index 4c95327c9fb71..1c52f84a5f68b 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -545,20 +545,20 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( logical_date=DEFAULT_DATE, ) ti.state = TaskInstanceState.SUCCESS # we're testing scenario when task is done - expected_logs = ["::group::Log message source details", "::endgroup::"] with conf_vars({("core", "executor"): executor_name}): reload(executor_loader) fth = FileTaskHandler("") if remote_logs: fth._read_remote_logs = mock.Mock() fth._read_remote_logs.return_value = ["found remote logs"], ["remote\nlog\ncontent"] - expected_logs.extend( - [ - "remote", - "log", - "content", - ] - ) + expected_logs = [ + "::group::Log message source details", + "found remote logs", + "::endgroup::", + "remote", + "log", + "content", + ] if local_logs: fth._read_from_local = mock.Mock() fth._read_from_local.return_value = ( @@ -567,13 +567,14 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( ) # only when not read from remote and TI is unfinished will read from local if not remote_logs: - expected_logs.extend( - [ - "local", - "log", - "content", - ] - ) + expected_logs = [ + "::group::Log message source details", + "found local logs", + "::endgroup::", + "local", + "log", + "content", + ] fth._read_from_logs_server = mock.Mock() fth._read_from_logs_server.return_value = ( ["this message"], @@ -581,13 +582,14 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( ) # only when not read from remote and not read from local will read from logs server if served_logs_checked: - expected_logs.extend( - [ - "this", - "log", - "content", - ] - ) + expected_logs = [ + "::group::Log message source details", + "this message", + "::endgroup::", + "this", + "log", + "content", + ] logs, metadata = fth._read(ti=ti, try_number=1) if served_logs_checked: diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index f424e39d0a533..1ead5d66cf9df 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -77,7 +77,7 @@ def test_remote_logging_s3(self): ) task_log_sources = [ - source for content in task_logs.get("content", [{}]) for source in content.get("sources", []) + source for content in task_logs.get("content", [{}]) for source in content.get("event", []) ] response = s3_client.list_objects_v2(Bucket=bucket_name) diff --git a/devel-common/src/tests_common/test_utils/file_task_handler.py b/devel-common/src/tests_common/test_utils/file_task_handler.py index 5153fcfc511a0..abc5da7cd2bbb 100644 --- a/devel-common/src/tests_common/test_utils/file_task_handler.py +++ b/devel-common/src/tests_common/test_utils/file_task_handler.py @@ -34,9 +34,15 @@ def extract_events(logs: Iterable[StructuredLogMessage], skip_source_info=True) """Helper function to return just the event (a.k.a message) from a list of StructuredLogMessage""" logs = iter(logs) if skip_source_info: + in_source_group = False def is_source_group(log: StructuredLogMessage) -> bool: - return not hasattr(log, "timestamp") or log.event == "::endgroup::" or hasattr(log, "sources") + nonlocal in_source_group + if not in_source_group and log.event == "::group::Log message source details": + in_source_group = True + elif in_source_group and log.event == "::endgroup::": + in_source_group = False + return not hasattr(log, "timestamp") or log.event == "::endgroup::" or in_source_group logs = itertools.dropwhile(is_source_group, logs) From 9870c8e4619c5687490479d2164da86b0fdf3544 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 9 May 2026 17:55:50 +0200 Subject: [PATCH 3/5] Fix tests --- .../src/airflow/ui/src/mocks/handlers/log.ts | 15 ++++++--------- airflow-core/src/airflow/utils/log/log_reader.py | 5 +---- .../remote_log_tests/test_remote_logging.py | 4 +--- .../unit/amazon/aws/log/test_s3_task_handler.py | 12 ++++++------ .../unit/celery/log_handlers/test_log_handlers.py | 2 +- .../elasticsearch/log/es_task_handler.py | 6 ++---- .../elasticsearch/log/test_es_task_handler.py | 7 ++++--- .../google/cloud/log/test_gcs_task_handler.py | 14 ++++++-------- .../microsoft/azure/log/test_wasb_task_handler.py | 6 +++--- .../providers/opensearch/log/os_task_handler.py | 6 ++---- .../unit/opensearch/log/test_os_task_handler.py | 7 ++++--- 11 files changed, 36 insertions(+), 48 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/mocks/handlers/log.ts b/airflow-core/src/airflow/ui/src/mocks/handlers/log.ts index 36bd52960b5a1..2cf2ed177d42a 100644 --- a/airflow-core/src/airflow/ui/src/mocks/handlers/log.ts +++ b/airflow-core/src/airflow/ui/src/mocks/handlers/log.ts @@ -68,11 +68,10 @@ export const handlers: Array = [ http.get("/api/v2/dags/log_grouping/dagRuns/manual__2025-02-18T12:19/taskInstances/generate/logs/1", () => HttpResponse.json({ content: [ + { event: "::group::Log message source details" }, { - event: "::group::Log message source details", - sources: [ + event: "/home/airflow/logs/dag_id=tutorial_dag/run_id=manual__2025-02-28T05:18:54.249762+00:00/task_id=load/attempt=1.log", - ], }, { event: "::endgroup::" }, { @@ -211,11 +210,10 @@ export const handlers: Array = [ http.get("/api/v2/dags/log_grouping/dagRuns/manual__2025-02-18T12:19/taskInstances/ti_context/logs/1", () => HttpResponse.json({ content: [ + { event: "::group::Log message source details" }, { - event: "::group::Log message source details", - sources: [ + event: "/home/airflow/logs/dag_id=log_grouping/run_id=manual__2025-02-18T12:19/task_id=ti_context/attempt=1.log", - ], }, { event: "::endgroup::" }, { @@ -255,11 +253,10 @@ export const handlers: Array = [ http.get("/api/v2/dags/log_grouping/dagRuns/manual__2025-02-18T12:19/taskInstances/log_source/logs/1", () => HttpResponse.json({ content: [ + { event: "::group::Log message source details", timestamp: null }, { - event: "::group::Log message source details", - sources: [ + event: "/root/airflow/logs/dag_id=log_grouping/run_id=manual__2025-02-18T12:19/task_id=log_source/attempt=1.log", - ], timestamp: null, }, { event: "::endgroup::", timestamp: null }, diff --git a/airflow-core/src/airflow/utils/log/log_reader.py b/airflow-core/src/airflow/utils/log/log_reader.py index 99576559e96b1..a4d641b1a6f2d 100644 --- a/airflow-core/src/airflow/utils/log/log_reader.py +++ b/airflow-core/src/airflow/utils/log/log_reader.py @@ -60,10 +60,7 @@ def get_no_log_state_message(ti: TaskInstance | TaskInstanceHistory) -> Iterator else: msg = "No logs available for this task." - yield StructuredLogMessage( - timestamp=None, - event="::group::Log message source details", - ) + yield StructuredLogMessage(timestamp=None, event="::group::Log message source details") yield StructuredLogMessage(timestamp=None, event="::endgroup::") yield StructuredLogMessage( timestamp=ti.updated_at or datetime.now(timezone.utc), diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index 1ead5d66cf9df..e958132a5dea2 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -76,9 +76,7 @@ def test_remote_logging_s3(self): run_id=resp["dag_run_id"], ) - task_log_sources = [ - source for content in task_logs.get("content", [{}]) for source in content.get("event", []) - ] + task_log_sources = [content.get("event", []) for content in task_logs.get("content", [{}])] response = s3_client.list_objects_v2(Bucket=bucket_name) if "Contents" not in response: diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 3fd900603e47c..071ab0c6a5042 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -298,12 +298,12 @@ def test_read(self): if AIRFLOW_V_3_0_PLUS: log = list(log) assert log[0].event == "::group::Log message source details" - assert expected_s3_uri in log[0].sources - assert log[1].event == "::endgroup::" - assert log[2].event == "Log line" - assert log[3].event == "Line 2" - assert log[4].event == "Log line 3" - assert log[5].event == "Line 4" + assert expected_s3_uri in log[1].event + assert log[3].event == "::endgroup::" + assert log[4].event == "Log line" + assert log[5].event == "Line 2" + assert log[6].event == "Log line 3" + assert log[7].event == "Line 4" assert metadata == {"end_of_log": True, "log_pos": 4} else: actual = log[0][0][-1] diff --git a/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py b/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py index 13d7765b38b35..8ec188a86650b 100644 --- a/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py +++ b/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py @@ -94,7 +94,7 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc if AIRFLOW_V_3_0_PLUS: logs = list(logs) - assert logs[0].sources == ["this message"] + assert logs[1].event == "this message" assert [x.event for x in logs[-3:]] == ["this", "log", "content"] assert metadata == {"end_of_log": False, "log_pos": 3} else: diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index a8c676a8699c2..7e0255677a8b1 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -428,10 +428,8 @@ def _read( from airflow.utils.log.file_task_handler import StructuredLogMessage header = [ - StructuredLogMessage( - event="::group::Log message source details", - sources=[host for host in logs_by_host.keys()], - ), # type: ignore[call-arg] + StructuredLogMessage(event="::group::Log message source details"), + *[StructuredLogMessage(event=host) for host in logs_by_host.keys()], StructuredLogMessage(event="::endgroup::"), ] diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index 90ed1773b3546..14a1e951e8869 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -115,9 +115,10 @@ def _assert_log_events(logs, metadatas, *, expected_events: list[str], expected_ if AIRFLOW_V_3_0_PLUS: logs = list(logs) assert logs[0].event == "::group::Log message source details" - assert logs[0].sources == expected_sources - assert logs[1].event == "::endgroup::" - assert [log.event for log in logs[2:]] == expected_events + for i, source in enumerate(expected_sources, start=1): + assert logs[i].event == source + assert logs[1 + len(expected_sources)].event == "::endgroup::" + assert [log.event for log in logs[(2 + len(expected_sources)) :]] == expected_events else: assert len(logs) == 1 assert len(logs[0]) == 1 diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index ae492fe919f58..e507bbf99d838 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -372,9 +372,9 @@ def test_should_read_logs_from_remote( if AIRFLOW_V_3_0_PLUS: logs = list(logs) assert logs[0].event == "::group::Log message source details" - assert logs[0].sources == [expected_gs_uri] - assert logs[1].event == "::endgroup::" - assert logs[2].event == "CONTENT" + assert logs[1].event == expected_gs_uri + assert logs[2].event == "::endgroup::" + assert logs[3].event == "CONTENT" assert metadata == {"end_of_log": True, "log_pos": 1} else: assert f"*** Found remote logs:\n*** * {expected_gs_uri}\n" in logs @@ -403,11 +403,9 @@ def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client, if AIRFLOW_V_3_0_PLUS: log = list(log) assert log[0].event == "::group::Log message source details" - assert log[0].sources == [ - expected_gs_uri, - f"{self.gcs_task_handler.local_base}/1.log", - ] - assert log[1].event == "::endgroup::" + assert log[1].event == expected_gs_uri + assert log[2].event == f"{self.gcs_task_handler.local_base}/1.log" + assert log[3].event == "::endgroup::" assert metadata == {"end_of_log": True, "log_pos": 0} else: assert ( diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index 4a09b28983375..d63e27dc815c5 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -120,9 +120,9 @@ def test_wasb_read(self, mock_hook_cls, ti): if AIRFLOW_V_3_0_PLUS: logs = list(logs) assert logs[0].event == "::group::Log message source details" - assert logs[0].sources == ["https://wasb-container.blob.core.windows.net/abc/hello.log"] - assert logs[1].event == "::endgroup::" - assert logs[2].event == "Log line" + assert logs[1].event == "https://wasb-container.blob.core.windows.net/abc/hello.log" + assert logs[2].event == "::endgroup::" + assert logs[3].event == "Log line" assert metadata == {"end_of_log": True, "log_pos": 1} else: assert logs[0][0][0] == "localhost" diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index bb63d341355c4..45e1a3ef17250 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -593,10 +593,8 @@ def _read( from airflow.utils.log.file_task_handler import StructuredLogMessage header = [ - StructuredLogMessage( - event="::group::Log message source details", - sources=[host for host in logs_by_host.keys()], - ), # type: ignore[call-arg] + StructuredLogMessage(event="::group::Log message source details"), + *[StructuredLogMessage(event=host) for host in logs_by_host.keys()], StructuredLogMessage(event="::endgroup::"), ] diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index 34683ff29c7e5..53fd285de6863 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -111,9 +111,10 @@ def _assert_log_events(logs, metadatas, *, expected_events: list[str], expected_ if AIRFLOW_V_3_0_PLUS: logs = list(logs) assert logs[0].event == "::group::Log message source details" - assert logs[0].sources == expected_sources - assert logs[1].event == "::endgroup::" - assert [log.event for log in logs[2:]] == expected_events + for i, source in enumerate(expected_sources, start=1): + assert logs[i].event == source + assert logs[1 + len(expected_sources)].event == "::endgroup::" + assert [log.event for log in logs[(2 + len(expected_sources)) :]] == expected_events else: assert len(logs) == 1 assert len(logs[0]) == 1 From cfa83ebc2c22fd8fa8204f21e8d40410d70fefa8 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 9 May 2026 18:20:06 +0200 Subject: [PATCH 4/5] Fix react tests --- .../ui/src/pages/TaskInstance/Logs/Logs.test.tsx | 14 +++++--------- .../ui/src/pages/TaskInstance/Logs/utils.test.ts | 10 +++++++--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx index 6619dbfa37d4e..8a28bb77b0e77 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx @@ -91,9 +91,7 @@ describe("Task log grouping", () => { await waitForLogs(); // Group headers use the summary-{name} testid pattern and are always visible - const summarySource = screen.getByTestId( - 'summary-Log message source details sources=["/home/airflow/logs/dag_id=tutorial_dag/run_id=manual__2025-02-28T05:18:54.249762+00:00/task_id=load/attempt=1.log"]', - ); + const summarySource = screen.getByTestId("summary-Log message source details"); expect(summarySource).toBeVisible(); @@ -190,9 +188,7 @@ describe("Task Identity preamble", () => { await waitForLogs(); - const sourceGroup = screen.getByTestId( - 'summary-Log message source details sources=["/home/airflow/logs/dag_id=log_grouping/run_id=manual__2025-02-18T12:19/task_id=ti_context/attempt=1.log"]', - ); + const sourceGroup = screen.getByTestId("summary-Log message source details"); expect(sourceGroup).toBeInTheDocument(); @@ -432,8 +428,8 @@ describe("Task log search", () => { fireEvent.click(summaryDependency); - await expectRenderedLineNumber(/dep_context=non-requeueable/iu, 0); - await expectRenderedLineNumber(/dep_context=requeueable/iu, 1); - await expectRenderedLineNumber(/starting attempt 1 of 3/iu, 2); + await expectRenderedLineNumber(/dep_context=non-requeueable/iu, 1); + await expectRenderedLineNumber(/dep_context=requeueable/iu, 2); + await expectRenderedLineNumber(/starting attempt 1 of 3/iu, 3); }, 10_000); }); diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/utils.test.ts b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/utils.test.ts index 0b0607b4c576e..9d2cf6231dff7 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/utils.test.ts +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/utils.test.ts @@ -47,7 +47,9 @@ describe("getDownloadText", () => { it("places Task Identity preamble after the source details endgroup, before the first log line", () => { const fetchedData = { content: [ - { event: "::group::Log message source details", sources: ["/logs/a.log", "/logs/b.log"] }, + { event: "::group::Log message source details" }, + { event: "/logs/a.log" }, + { event: "/logs/b.log" }, { event: "some source detail" }, { event: "::endgroup::" }, tiLine("First log line", "2026-01-01T00:00:00Z"), @@ -68,7 +70,8 @@ describe("getDownloadText", () => { it("does not include TI context fields on individual log lines", () => { const fetchedData = { content: [ - { event: "::group::Log message source details", sources: ["/logs/a.log"] }, + { event: "::group::Log message source details" }, + { event: "/logs/a.log" }, { event: "::endgroup::" }, tiLine("Task started", "2026-01-01T00:00:00Z"), ], @@ -87,7 +90,8 @@ describe("getDownloadText", () => { it("omits the preamble when no TI context fields are present", () => { const fetchedData = { content: [ - { event: "::group::Log message source details", sources: ["/logs/a.log"] }, + { event: "::group::Log message source details" }, + { event: "/logs/a.log" }, { event: "::endgroup::" }, { event: "plain log line", level: "info", timestamp: "2026-01-01T00:00:00Z" }, ], From cecb9d9a3b581132dec7a389776c6f517d0d89e4 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 9 May 2026 21:05:53 +0200 Subject: [PATCH 5/5] Fix backcompat tests with old Airflow versions --- .../tests_common/test_utils/version_compat.py | 1 + .../amazon/aws/log/test_s3_task_handler.py | 14 ++++++++++-- .../celery/log_handlers/test_log_handlers.py | 9 ++++++-- .../google/cloud/log/test_gcs_task_handler.py | 22 ++++++++++++++++--- .../azure/log/test_wasb_task_handler.py | 11 ++++++++-- 5 files changed, 48 insertions(+), 9 deletions(-) diff --git a/devel-common/src/tests_common/test_utils/version_compat.py b/devel-common/src/tests_common/test_utils/version_compat.py index 635b0e08e3350..7eb25dec2b3cb 100644 --- a/devel-common/src/tests_common/test_utils/version_compat.py +++ b/devel-common/src/tests_common/test_utils/version_compat.py @@ -40,6 +40,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: AIRFLOW_V_3_1_7_PLUS = get_base_airflow_version_tuple() >= (3, 1, 7) AIRFLOW_V_3_1_9_PLUS = get_base_airflow_version_tuple() >= (3, 1, 9) AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0) +AIRFLOW_V_3_2_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 2) AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0) if AIRFLOW_V_3_1_PLUS: diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 071ab0c6a5042..58adff9096c5b 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -38,7 +38,7 @@ from tests_common.test_utils.dag import sync_dag_to_db from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, clear_db_runs from tests_common.test_utils.taskinstance import create_task_instance -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_2_PLUS try: from airflow.sdk.timezone import datetime @@ -295,7 +295,7 @@ def test_read(self): expected_s3_uri = f"s3://bucket/{self.remote_log_key}" - if AIRFLOW_V_3_0_PLUS: + if AIRFLOW_V_3_2_2_PLUS: log = list(log) assert log[0].event == "::group::Log message source details" assert expected_s3_uri in log[1].event @@ -305,6 +305,16 @@ def test_read(self): assert log[6].event == "Log line 3" assert log[7].event == "Line 4" assert metadata == {"end_of_log": True, "log_pos": 4} + elif AIRFLOW_V_3_0_PLUS: + log = list(log) + assert log[0].event == "::group::Log message source details" + assert expected_s3_uri in log[0].sources + assert log[1].event == "::endgroup::" + assert log[2].event == "Log line" + assert log[3].event == "Line 2" + assert log[4].event == "Log line 3" + assert log[5].event == "Line 4" + assert metadata == {"end_of_log": True, "log_pos": 4} else: actual = log[0][0][-1] assert f"*** Found logs in s3:\n*** * {expected_s3_uri}\n" in actual diff --git a/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py b/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py index 8ec188a86650b..a4a6f5e59f37e 100644 --- a/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py +++ b/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py @@ -39,7 +39,7 @@ from tests_common.test_utils.file_task_handler import ( convert_list_to_stream, ) -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_2_PLUS pytestmark = pytest.mark.db_test @@ -92,11 +92,16 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc logs, metadata = fth._read(ti=ti, try_number=1) fth._read_from_logs_server.assert_called_once() - if AIRFLOW_V_3_0_PLUS: + if AIRFLOW_V_3_2_2_PLUS: logs = list(logs) assert logs[1].event == "this message" assert [x.event for x in logs[-3:]] == ["this", "log", "content"] assert metadata == {"end_of_log": False, "log_pos": 3} + elif AIRFLOW_V_3_0_PLUS: + logs = list(logs) + assert logs[0].sources == ["this message"] + assert [x.event for x in logs[-3:]] == ["this", "log", "content"] + assert metadata == {"end_of_log": False, "log_pos": 3} else: assert "*** this message\n" in logs assert logs.endswith("this\nlog\ncontent") diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index e507bbf99d838..f0fbc73ad3011 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -33,7 +33,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_2_PLUS if TYPE_CHECKING: from pathlib import Path @@ -369,13 +369,20 @@ def test_should_read_logs_from_remote( mock_blob.from_string.assert_called_once_with(expected_gs_uri, mock_client.return_value) - if AIRFLOW_V_3_0_PLUS: + if AIRFLOW_V_3_2_2_PLUS: logs = list(logs) assert logs[0].event == "::group::Log message source details" assert logs[1].event == expected_gs_uri assert logs[2].event == "::endgroup::" assert logs[3].event == "CONTENT" assert metadata == {"end_of_log": True, "log_pos": 1} + elif AIRFLOW_V_3_0_PLUS: + logs = list(logs) + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == [expected_gs_uri] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "CONTENT" + assert metadata == {"end_of_log": True, "log_pos": 1} else: assert f"*** Found remote logs:\n*** * {expected_gs_uri}\n" in logs assert logs.endswith("CONTENT") @@ -400,13 +407,22 @@ def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client, log, metadata = self.gcs_task_handler._read(ti, self.ti.try_number) expected_gs_uri = f"gs://bucket/{blob_name}" - if AIRFLOW_V_3_0_PLUS: + if AIRFLOW_V_3_2_2_PLUS: log = list(log) assert log[0].event == "::group::Log message source details" assert log[1].event == expected_gs_uri assert log[2].event == f"{self.gcs_task_handler.local_base}/1.log" assert log[3].event == "::endgroup::" assert metadata == {"end_of_log": True, "log_pos": 0} + elif AIRFLOW_V_3_0_PLUS: + log = list(log) + assert log[0].event == "::group::Log message source details" + assert log[0].sources == [ + expected_gs_uri, + f"{self.gcs_task_handler.local_base}/1.log", + ] + assert log[1].event == "::endgroup::" + assert metadata == {"end_of_log": True, "log_pos": 0} else: assert ( "*** Found remote logs:\n" diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index d63e27dc815c5..732c9ea49daa1 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -33,7 +33,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_2_PLUS pytestmark = pytest.mark.db_test @@ -117,13 +117,20 @@ def test_wasb_read(self, mock_hook_cls, ti): logs, metadata = self.wasb_task_handler.read(ti) - if AIRFLOW_V_3_0_PLUS: + if AIRFLOW_V_3_2_2_PLUS: logs = list(logs) assert logs[0].event == "::group::Log message source details" assert logs[1].event == "https://wasb-container.blob.core.windows.net/abc/hello.log" assert logs[2].event == "::endgroup::" assert logs[3].event == "Log line" assert metadata == {"end_of_log": True, "log_pos": 1} + elif AIRFLOW_V_3_0_PLUS: + logs = list(logs) + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["https://wasb-container.blob.core.windows.net/abc/hello.log"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "Log line" + assert metadata == {"end_of_log": True, "log_pos": 1} else: assert logs[0][0][0] == "localhost" assert (