Skip to content

Task assigned with queue "heavy" landed on worker assigned to "default" queue #42894

@pedro-cf

Description

@pedro-cf

Apache Airflow version

2.10.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I'm currently using an airflow instance setup with CeleryExecutor.

I have a task assigned to a "heavy" queue:

    @task.virtualenv(
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
        requirements=[
            "my_package=1.1.1"
        ],
        index_urls=["my_index_url"],
        venv_cache_path="/tmp",
        queue="heavy"
    )
    def my_task(arg1, arg2, params=None):
        ...

but for some reason the task is running on a worker configured to only handle the default queue?

Worker Queues in flower:
image

DAG RUN details:
image

-> IMPORTANT: The DAG Run existed prior to the addition of the queue="heavy" to the dag's task. And the Task state was cleared in order to restart the task.

What you think should happen instead?

Tasks assigned to a specific queue should only run in workers assigned to that specific queue.

How to reproduce

Not sure how.

Operating System

Ubuntu 22

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

CeleryExecutor
2 workers:

celery worker --concurrency ${DEFAULT_WORKER_CONCURRENCY}
celery worker -q heavy --concurrency ${HEAVY_WORKER_CONCURRENCY}

Anything else?

-> IMPORTANT: The DAG Run existed prior to the addition of the queue="heavy" to the dag's task. And the Task state was cleared in order to restart the task.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions