From f656e3ae1befc05a6afe0e7611bc54f258fbe86b Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 26 May 2026 17:27:14 +0900 Subject: [PATCH 1/2] Fix callback state not updating from executor events due to UUID type mismatch --- .../src/airflow/jobs/scheduler_job_runner.py | 3 ++- airflow-core/tests/unit/models/test_callback.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 6acd92fbe9285..224659c4c4d99 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -32,6 +32,7 @@ from functools import lru_cache, partial from itertools import groupby from typing import TYPE_CHECKING, Any, cast +from uuid import UUID from sqlalchemy import ( CTE, @@ -1270,7 +1271,7 @@ def process_executor_events( # Handle callback state events for callback_id in callback_keys_with_events: state, info = event_buffer.pop(callback_id) - callback = session.get(Callback, str(callback_id)) + callback = session.get(Callback, UUID(str(callback_id))) if not callback: # This should not normally happen - we just received an event for this callback. # Only possible if callback was deleted mid-execution (e.g., cascade delete from DagRun deletion). diff --git a/airflow-core/tests/unit/models/test_callback.py b/airflow-core/tests/unit/models/test_callback.py index 5903f54185455..1a4c7d344296e 100644 --- a/airflow-core/tests/unit/models/test_callback.py +++ b/airflow-core/tests/unit/models/test_callback.py @@ -237,6 +237,18 @@ def test_queue(self): callback.queue() assert callback.state == CallbackState.QUEUED + def test_session_get_requires_uuid_not_str(self, session): + """Filtering the UUID id column with a plain str breaks on SQLite, so + callers must wrap with ``UUID(...)`` before querying.""" + from uuid import UUID + + callback = ExecutorCallback(TEST_SYNC_CALLBACK, fetch_method=CallbackFetchMethod.IMPORT_PATH) + callback_id_str = str(callback.id) + session.add(callback) + session.commit() + + assert session.get(Callback, UUID(callback_id_str)) is not None + class TestDagProcessorCallback: def test_polymorphic_serde(self, session): From 0116c28bbba8ac70db74faab25ecde8e31909adb Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 26 May 2026 18:01:37 +0900 Subject: [PATCH 2/2] fix logic --- airflow-core/tests/unit/models/test_callback.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/models/test_callback.py b/airflow-core/tests/unit/models/test_callback.py index 1a4c7d344296e..c31d1c99b499d 100644 --- a/airflow-core/tests/unit/models/test_callback.py +++ b/airflow-core/tests/unit/models/test_callback.py @@ -243,9 +243,11 @@ def test_session_get_requires_uuid_not_str(self, session): from uuid import UUID callback = ExecutorCallback(TEST_SYNC_CALLBACK, fetch_method=CallbackFetchMethod.IMPORT_PATH) - callback_id_str = str(callback.id) session.add(callback) session.commit() + # ``id`` is filled by the ``uuid6.uuid7`` default at flush time, so it + # is only safe to stringify *after* the commit. + callback_id_str = str(callback.id) assert session.get(Callback, UUID(callback_id_str)) is not None