Skip to content

Commit

Permalink
Extend documentation for states of DAGs & tasks and update trigger ru…
Browse files Browse the repository at this point in the history
…les docs (#21382)

(cherry picked from commit 4e95935)
  • Loading branch information
mnojek authored and ephraimbuddy committed Mar 26, 2022
1 parent d4f82a8 commit 289ae2f
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 5 deletions.
67 changes: 67 additions & 0 deletions docs/apache-airflow/best-practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,73 @@ each parameter by following the links):
* :ref:`config:scheduler__parsing_processes`
* :ref:`config:scheduler__file_parsing_sort_mode`

Example of watcher pattern with trigger rules
---------------------------------------------

The watcher pattern is how we call a DAG with a task that is "watching" the states of the other tasks. It's primary purpose is to fail a DAG Run when any other task fail.
The need came from the Airflow system tests that are DAGs with different tasks (similarly like a test containing steps).

Normally, when any task fails, all other tasks are not executed and the whole DAG Run gets failed status too. But when we use trigger rules, we can disrupt the normal flow of running tasks and the whole DAG may represent different status that we expect.
For example, we can have a teardown task (with trigger rule set to ``"all_done"``) that will be executed regardless of the state of the other tasks (e.g. to clean up the resources). In such situation, the DAG would always run this task and the DAG Run will get the status of this particular task, so we can potentially lose the information about failing tasks.
If we want to ensure that the DAG with teardown task would fail if any task fails, we need to use the watcher pattern.
The watcher task is a task that will always fail if triggered, but it needs to be triggered only if any other task fails. It needs to have a trigger rule set to ``"one_failed"`` and it needs also to be a downstream task for all other tasks in the DAG.
Thanks to this, if every other task will pass, the watcher will be skipped, but when something fails, the watcher task will be executed and fail making the DAG Run fail too.

.. note::

Be aware that trigger rules only rely on the direct upstream (parent) tasks, e.g. ``one_failed`` will ignore any failed (or ``upstream_failed``) tasks that are not a direct parent of the parameterized task.

It's easier to grab the concept with an example. Let's say that we have the following DAG:

.. code-block:: python
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
@task(trigger_rule="one_failed", retries=0)
def watcher():
raise AirflowException("Failing task because one or more upstream tasks failed.")
with DAG(
dag_id="watcher_example",
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
failing_task = BashOperator(
task_id="failing_task", bash_command="exit 1", retries=0
)
passing_task = BashOperator(
task_id="passing_task", bash_command="echo passing_task"
)
teardown = BashOperator(
task_id="teardown", bash_command="echo teardown", trigger_rule="all_done"
)
failing_task >> passing_task >> teardown
list(dag.tasks) >> watcher()
The visual representation of this DAG after execution looks like this:

.. image:: /img/watcher.png

We have several tasks that serve different purposes:

- ``failing_task`` always fails,
- ``passing_task`` always succeeds (if executed),
- ``teardown`` is always triggered (regardless the states of the other tasks) and it should always succeed,
- ``watcher`` is a downstream task for each other task, i.e. it will be triggered when any task fails and thus fail the whole DAG Run, since it's a leaf task.

It's important to note, that without ``watcher`` task, the whole DAG Run will get the ``success`` state, since the only failing task is not the leaf task, and the ``teardown`` task will finish with ``success``.
If we want the ``watcher`` to monitor the state of all tasks, we need to make it dependent on all of them separately. Thanks to this, we can fail the DAG Run if any of the tasks fail. Note that the watcher task has a trigger rule set to ``"one_failed"``.
On the other hand, without the ``teardown`` task, the ``watcher`` task will not be needed, because ``failing_task`` will propagate its ``failed`` state to downstream task ``passed_task`` and the whole DAG Run will also get the ``failed`` status.

.. _best_practices/reducing_dag_complexity:

Reducing DAG complexity
Expand Down
5 changes: 3 additions & 2 deletions docs/apache-airflow/concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ Note that if you are running the DAG at the very start of its life---specificall
Trigger Rules
~~~~~~~~~~~~~

By default, Airflow will wait for all upstream tasks for a task to be :ref:`successful <concepts:task-states>` before it runs that task.
By default, Airflow will wait for all upstream (direct parents) tasks for a task to be :ref:`successful <concepts:task-states>` before it runs that task.

However, this is just the default behaviour, and you can control it using the ``trigger_rule`` argument to a Task. The options for ``trigger_rule`` are:

Expand All @@ -383,6 +383,7 @@ However, this is just the default behaviour, and you can control it using the ``
* ``none_skipped``: No upstream task is in a ``skipped`` state - that is, all upstream tasks are in a ``success``, ``failed``, or ``upstream_failed`` state
* ``always``: No dependencies at all, run this task at any time


You can also combine this with the :ref:`concepts:depends-on-past` functionality if you wish.

.. note::
Expand Down Expand Up @@ -724,7 +725,7 @@ You can also prepare ``.airflowignore`` file for a subfolder in ``DAG_FOLDER`` a
would only be applicable for that subfolder.

DAG Dependencies
================
----------------

*Added in Airflow 2.1*

Expand Down
6 changes: 6 additions & 0 deletions docs/apache-airflow/concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ Relationships

The key part of using Tasks is defining how they relate to each other - their *dependencies*, or as we say in Airflow, their *upstream* and *downstream* tasks. You declare your Tasks first, and then you declare their dependencies second.

.. note::

We call the *upstream* task the one that is directly preceding the other task. We used to call it a parent task before.
Be aware that this concept does not describe the tasks that are higher in the tasks hierarchy (i.e. they are not a direct parents of the task).
Same definition applies to *downstream* task, which needs to be a direct child of the other task.

There are two ways of declaring dependencies - using the ``>>`` and ``<<`` (bitshift) operators::

first_task >> second_task >> [third_task, fourth_task]
Expand Down
27 changes: 24 additions & 3 deletions docs/apache-airflow/dag-run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,30 @@
DAG Runs
=========
A DAG Run is an object representing an instantiation of the DAG in time.
Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the tasks states.
Each DAG Run is run separately from another, meaning that you can have running DAG many times at the same time.

.. _dag-run:dag-run-status:

DAG Run Status
''''''''''''''

A DAG Run status is determined when the execution of the DAG is finished.
The execution of the DAG depends on its containing tasks and their dependencies.
The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like ``success``, ``failed`` or ``skipped``.
The DAG Run is having the status assigned based on the so-called "leaf nodes" or simply "leaves". Leaf nodes are the tasks with no children.

There are two possible terminal states for the DAG Run:

- ``success`` if all of the leaf nodes states are either ``success`` or ``skipped``,
- ``failed`` if any of the leaf nodes state is either ``failed`` or ``upstream_failed``.

.. note::
Be careful if some of your tasks have defined some specific `trigger rule <dags.html#trigger-rules>`_.
These can lead to some unexpected behavior, e.g. if you have a leaf task with trigger rule `"all_done"`, it will be executed regardless of the states of the rest of the tasks and if it will succeed, then the whole DAG Run will also be marked as ``success``, even if something failed in the middle.

Cron Presets
''''''''''''

Each DAG may or may not have a schedule, which informs how DAG Runs are
created. ``schedule_interval`` is defined as a DAG argument, which can be passed a
Expand All @@ -27,9 +51,6 @@ a ``str``, a ``datetime.timedelta`` object, or one of the following cron "preset
.. tip::
You can use an online editor for CRON expressions such as `Crontab guru <https://crontab.guru/>`_

Cron Presets
''''''''''''

+----------------+----------------------------------------------------------------+-----------------+
| preset | meaning | cron |
+================+================================================================+=================+
Expand Down
Binary file added docs/apache-airflow/img/watcher.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,7 @@ taskflow
taskinstance
tblproperties
tcp
teardown
templatable
templateable
templated
Expand Down

0 comments on commit 289ae2f

Please sign in to comment.