Skip to content

Scheduler fails in setting external executor if multiple executors in use #67385

@jscheffl

Description

@jscheffl

Under which category would you file this issue?

Airflow Core

Apache Airflow version

3.2.2rc1

What happened and how to reproduce it?

Have an error that makes scheduler crashing with 3.2.2rc1.

2026-05-22T19:57:55.471027Z [info     ] Adopting or resetting orphaned tasks for active dag runs [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:2809
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DatatypeMismatch: CASE types text and uuid cannot be matched
LINE 1: ...executor.CeleryExecutor', 'CeleryExecutor')) THEN gen_random...
                                                             ^


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 6, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 113, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 66, in scheduler
    run_command_with_daemon_option(
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 69, in <lambda>
    callback=lambda: _run_scheduler_job(args),
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/memray_utils.py", line 60, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 48, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)  # type: ignore[arg-type]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 355, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 384, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1531, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1670, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1815, in _do_scheduling
    num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1076, in _critical_section_enqueue_task_instances
    queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 964, in _executable_task_instances_to_queued
    result = session.execute(queued_update.returning(TI.id, TI.external_executor_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2351, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2249, in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/bulk_persistence.py", line 1660, in orm_execute_statement
    return super().orm_execute_statement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/context.py", line 306, in orm_execute_statement
    result = conn.execute(
             ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1419, in execute
    return meth(
           ^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 527, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2363, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DatatypeMismatch) CASE types text and uuid cannot be matched
LINE 1: ...executor.CeleryExecutor', 'CeleryExecutor')) THEN gen_random...
                                                             ^

[SQL: UPDATE task_instance SET state=%(state)s, queued_dttm=%(queued_dttm)s, queued_by_job_id=%(queued_by_job_id)s, updated_at=%(updated_at)s, external_executor_id=CASE WHEN (task_instance.executor IN (%(executor_1_1)s, %(executor_1_2)s
)) THEN gen_random_uuid() WHEN (task_instance.executor IS NULL) THEN gen_random_uuid() ELSE task_instance.external_executor_id END WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.map_i
ndex = %(map_index_1)s AND task_instance.task_id IN (%(task_id_1_1)s) RETURNING task_instance.id, task_instance.external_executor_id]
[parameters: {'state': <TaskInstanceState.QUEUED: 'queued'>, 'queued_dttm': datetime.datetime(2026, 5, 22, 19, 57, 55, 857339, tzinfo=Timezone('UTC')), 'queued_by_job_id': 223, 'updated_at': datetime.datetime(2026, 5, 22, 19, 57, 55, 85
9683, tzinfo=Timezone('UTC')), 'dag_id_1': 'dt_flow_maintain_workers', 'run_id_1': 'scheduled__2026-05-22T19:30:00+00:00', 'map_index_1': -1, 'executor_1_1': 'airflow.providers.celery.executors.celery_executor.CeleryExecutor', 'executor
_1_2': 'CeleryExecutor', 'task_id_1_1': 'maintain_worker_queues_pools'}]
(Background on this error at: https://sqlalche.me/e/20/f405)
stream closed: EOF for dev-sje2lr/dt-flow-scheduler-6d45f475bb-wb2bk (scheduler)

Backend: Postgres
Versions used:

  • apache-airflow                            3.2.2rc1
  • apache-airflow-core                       3.2.2rc1
  • apache-airflow-providers-celery           3.19.0
  • apache-airflow-providers-edge3            3.6.0

I assume this is caused by PR #65711 (back-ported from #65594) - taking a look at the code I assume this is a bug caused by Multiple executors hitting this. The UUID must be converted to str() in the CASE statement.

What you think should happen instead?

Scheduler should not crash

Operating System

Linux

Deployment

Official Apache Airflow Helm Chart

Apache Airflow Provider(s)

celery

Versions of Apache Airflow Providers

  • apache-airflow-providers-celery           3.19.0
  • apache-airflow-providers-edge3            3.6.0

Official Helm Chart version

1.19.0

Kubernetes Version

?

Helm Chart configuration

No response

Docker Image customizations

Some local packages and patches.

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    affected_version:3.2Use for reporting issues with 3.2area:Schedulerincluding HA (high availability) schedulerkind:bugThis is a clearly a bug

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions