Skip to content

Commit

Permalink
Add branching based on mapped task group example to dynamic-task-mapp…
Browse files Browse the repository at this point in the history
…ing.rst (#36480)

* Add branching based on mapped task group example to dynamic-task-mapping.rst

Based on trying to solve [this stack overflow question](https://stackoverflow.com/questions/77730116/branching-not-working-in-airflow-as-expected/77730300#77730300), it seems impossible to reliably branch mapped tasks based on the result of an upstream task. However, it's possible to do this in a mapped task group, which this example demonstrates.

* trying to force blacken-docs
  • Loading branch information
RNHTTR committed Dec 29, 2023
1 parent eac3d95 commit 0d9a26c
Showing 1 changed file with 41 additions and 3 deletions.
Expand Up @@ -313,17 +313,17 @@ For example, this code will *not* work:
@task_group
def my_group(value):
def my_task_group(value):
if not value: # DOES NOT work as you'd expect!
task_a = EmptyOperator(...)
else:
task_a = PythonOperator(...)
task_a << my_task(value)
my_group.expand(value=[0, 1, 2])
my_task_group.expand(value=[0, 1, 2])
When code in ``my_group`` is executed, ``value`` would still only be a reference, not the real value, so the ``if not value`` branch will not work as you likely want. However, if you pass that reference into a task, it will become resolved when the task is executed, and the three ``my_task`` instances will therefore receive 1, 2, and 3, respectively.
When code in ``my_task_group`` is executed, ``value`` would still only be a reference, not the real value, so the ``if not value`` branch will not work as you likely want. However, if you pass that reference into a task, it will become resolved when the task is executed, and the three ``my_task`` instances will therefore receive 1, 2, and 3, respectively.

It is, therefore, important to remember that, if you intend to perform any logic to a value passed into a task group function, you must always use a task to run the logic, such as ``@task.branch`` (or ``BranchPythonOperator``) for conditions, and task mapping methods for loops.

Expand Down Expand Up @@ -375,6 +375,44 @@ Similar to a mapped task group, depending on a mapped task group's output would
It is also possible to perform any operations as results from a normal mapped task.

Branching on a mapped task group's output
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

While it's not possible to implement branching logic (for example using ``@task.branch``) on the results of a mapped task, it is possible to branch based on the *input* of a task group. The following example demonstrates executing one of three tasks based on the input to a mapped task group.

.. code-block:: python
inputs = ["a", "b", "c"]
@task_group(group_id="my_task_group")
def my_task_group(input):
@task.branch
def branch(element):
if "a" in element:
return "my_task_group.a"
elif "b" in element:
return "my_task_group.b"
else:
return "my_task_group.c"
@task
def a():
print("a")
@task
def b():
print("b")
@task
def c():
print("c")
branch(input) >> [a(), b(), c()]
my_task_group.expand(input=inputs)
Filtering items from a mapped task
==================================

Expand Down

0 comments on commit 0d9a26c

Please sign in to comment.