diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 6acd92fbe9285..224659c4c4d99 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -32,6 +32,7 @@ from functools import lru_cache, partial from itertools import groupby from typing import TYPE_CHECKING, Any, cast +from uuid import UUID from sqlalchemy import ( CTE, @@ -1270,7 +1271,7 @@ def process_executor_events( # Handle callback state events for callback_id in callback_keys_with_events: state, info = event_buffer.pop(callback_id) - callback = session.get(Callback, str(callback_id)) + callback = session.get(Callback, UUID(str(callback_id))) if not callback: # This should not normally happen - we just received an event for this callback. # Only possible if callback was deleted mid-execution (e.g., cascade delete from DagRun deletion). diff --git a/airflow-core/tests/unit/models/test_callback.py b/airflow-core/tests/unit/models/test_callback.py index 5903f54185455..c31d1c99b499d 100644 --- a/airflow-core/tests/unit/models/test_callback.py +++ b/airflow-core/tests/unit/models/test_callback.py @@ -237,6 +237,20 @@ def test_queue(self): callback.queue() assert callback.state == CallbackState.QUEUED + def test_session_get_requires_uuid_not_str(self, session): + """Filtering the UUID id column with a plain str breaks on SQLite, so + callers must wrap with ``UUID(...)`` before querying.""" + from uuid import UUID + + callback = ExecutorCallback(TEST_SYNC_CALLBACK, fetch_method=CallbackFetchMethod.IMPORT_PATH) + session.add(callback) + session.commit() + # ``id`` is filled by the ``uuid6.uuid7`` default at flush time, so it + # is only safe to stringify *after* the commit. + callback_id_str = str(callback.id) + + assert session.get(Callback, UUID(callback_id_str)) is not None + class TestDagProcessorCallback: def test_polymorphic_serde(self, session):