Skip to content

multiple expand with parallelism maintained #26271

@brunorigal

Description

@brunorigal

Description

I would like to build a graph like this:

              ┌───►  task_2_1  ─────►  task_2_1
              │
              │
              │
task 1   ───────►  task_2_2  ─────►  task_3_2
              │
              │
              │
              │
              └──►   task_2_N  ─────►  task_3_N

Where the number of vertical tasks (N) is controlled dynamically by the first step computation. A possible solution would be:

from airflow import DAG
from airflow.decorators import task, task_group
from pendulum import datetime, now


@task
def task_1():
    return list(range(5))


@task
def task_2(task_num):
    return task_num


@task
def task_3(task_num):
    return task_num


with DAG(dag_id="my_dag", start_date=now(), schedule_interval=None) as dag:
    task_3.expand(task_num=task_2.expand(task_num=task_1()))

But the task_3 instances start to run only when all the task_2 instance have finished. I would rather group tasks by instances of task_2 and task_3 sharing the same task_num.
An intuitive solution would have been to use a taskgroup, but these do not have the expand method.

Use case/motivation

No response

Related issues

#25032

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    duplicateIssue that is duplicated

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions