Skip to content

Sequential Execution of Dynamic Mapped Task #37119

@manugarri

Description

@manugarri

Description

Dynamic tasks enable dag creation based on run time based number of inputs. This is great because it not only makes dags flexible, but also it reduces the workload on the scheduler since the tasks are defined at run time.

However, the current dynamic tasks run all inputs in parallel (via the expand function). However, there are some cases where a list of steps needs to be run sequentially, but the list is not known until runtime.

Use case/motivation

It is helpful to show an actual use case to see why the feature would be useful.

I am parsing a yaml with some EMR steps to run as a @task , and I cannot add a task inside another one with a for loop to create the actual operators that run the operations described on the yaml file.

The dag looks more or less like this :

with DAG(...):
  @task
  def parse_yaml(yaml_path):
     """ 
     read the templated yaml and expand the jinja variables inside
     This yaml has blocks of steps that have to run sequentially:
      something like this:
      groups:
        - group_name:1
          steps:
            - step_name: 1.1
              step_emr_command: spark-submit ...
            - step_name: 1.2
              step_emr_command: spark-submit ...
        - group_name:2
          steps:
             - step_name: 2.1
               step_emr_command: spark-submit ...
             - step_name: 2.2
               step_emr_command: spark-submit ...
      """
     return parsed_yaml
  
  @task_group
  def run_group(group_definition):
     setup_emr_cluster = EMRCreateJobFlowOperator()
     run_steps = EMRAddStepsOperator(group_definition['steps'])
     terminate_emr_cluster = EMRTerminateJobFlowOperator()
     setup_emr_cluster >> run_steps >> terminate_emr_cluster
  
  parsed_yaml = parse_yaml(yaml_path)
  
  # this runs all the step groups in parallel (bad)
  run_group.expand(parsed_yaml)

hopefully this pseudo example showcases what im trying to achieve. I need the run_group for group:1 to run before group2 , not in parallel.

Here is the diagram that should be run based on the yaml file above:

                          ┌───────────────────────────────────────────────────┐                ┌─────────────────────────────────────────────────┐
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
┌─────────────┐           │                                                   │                │                                                 │
│             │           │                                                   │                │ ┌─────────┐  ┌───────┐   ┌───────┐   ┌───────── │
│  parse_yaml ├──────────►│  ┌─────────  ┌───────┐   ┌────────┐  ┌──────────  │                │ │EMRCreate├─►│step   ├──►│step   ├──►│Terminate │
│             │           │  │EMRCreate  │ step  │   │step    │  │Terminate''e├───────────────►│ │Cluster2 │  │2.1    │   │2.2    │   │Cluster2│ │
│             │           │  │Cluster1┌─►│ 1.1   ├──►│ 1.2    ├─►│Cluster1 │  │                │ └─────────┘  └───────┘   └───────┘   └────────┘ │
└─────────────┘           │  └────────┘  └───────┘   └────────┘  └─────────┘  │                │                                                 │
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
                          │                             Group 1               │                │                                         Group 2 │
                          │                                                   │                │                                                 │
                          └───────────────────────────────────────────────────┘                └─────────────────────────────────────────────────┘

Related issues

There was a previous unanswered discussion on the repo (link).

There seem to be some related questions on StackOverflow ( 1, 2 3 , 4)

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions