Skip to content

Commit

Permalink
Add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Feb 14, 2024
1 parent ec5fe74 commit e95321c
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 3 deletions.
81 changes: 81 additions & 0 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,85 @@ Example:
Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to fetch the first of one Dataset given to the DAG, and the first of one DatasetEvent for that Dataset. An implementation may be quite complex if you have multiple Datasets, potentially with multiple DatasetEvents.

Advanced Dataset Scheduling with Conditional Expressions
--------------------------------------------------------

Apache Airflow introduces advanced scheduling capabilities that leverage conditional expressions with datasets. This feature allows Airflow users to define complex dependencies for DAG executions based on dataset updates, using logical operators for more granular control over workflow triggers.

Logical Operators for Datasets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Airflow supports two logical operators for combining dataset conditions:

- **AND (``&``)**: Specifies that the DAG should be triggered only after all of the specified datasets have been updated.
- **OR (``|``)**: Specifies that the DAG should be triggered when any one of the specified datasets is updated.

These operators enable the expression of complex dataset update conditions, enhancing the dynamism and flexibility of Airflow workflows.

Example Usage
-------------

**Scheduling Based on Multiple Dataset Updates**

To schedule a DAG to run only when two specific datasets have both been updated, use the AND operator (``&``):

.. code-block:: python
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.datasets import Dataset
import pendulum
dag1_dataset = Dataset("s3://dag1/output_1.txt")
dag2_dataset = Dataset("s3://dag2/output_1.txt")
with DAG(
dag_id="consume_1_and_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset & dag2_dataset),
) as dag:
BashOperator(
task_id="consume_1_and_2_with_dataset_expressions",
bash_command="sleep 5",
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
)
**Scheduling Based on Any Dataset Update**

To trigger a DAG execution when either of two datasets is updated, apply the OR operator (``|``):

.. code-block:: python
with DAG(
dag_id="consume_1_or_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | dag2_dataset),
) as dag:
BashOperator(
task_id="consume_1_or_2_with_dataset_expressions",
bash_command="sleep 5",
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
)
**Complex Conditional Logic**

For scenarios requiring more intricate conditions, such as triggering a DAG when one dataset is updated or when both of two other datasets are updated, combine the OR and AND operators:

.. code-block:: python
dag3_dataset = Dataset("s3://dag3/output_3.txt")
with DAG(
dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
) as dag:
BashOperator(
task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
bash_command="sleep 5",
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
)
Combining Dataset and Time-Based Schedules
------------------------------------------

Expand All @@ -245,3 +324,5 @@ DatasetTimetable Integration
With the introduction of ``DatasetTimetable``, it is now possible to schedule DAGs based on both dataset events and time-based schedules. This feature offers flexibility for scenarios where a DAG needs to be triggered by data updates as well as run periodically according to a fixed timetable.

For more detailed information on ``DatasetTimetable`` and its usage, refer to the corresponding section in :ref:`DatasetTimetable <dataset-timetable-section>`.

These examples illustrate how Airflow's conditional dataset expressions can be used to create complex, data-dependent scheduling scenarios, providing precise control over when DAGs are triggered in response to data updates.
26 changes: 23 additions & 3 deletions docs/apache-airflow/authoring-and-scheduling/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,29 @@ Here's an example of a DAG using ``DatasetTimetable``:
In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC based on the ``CronTriggerTimetable``, and it is also triggered by updates to ``dag1_dataset``.

Future Enhancements
~~~~~~~~~~~~~~~~~~~
Future iterations may introduce more complex combinations for scheduling (e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility for scheduling DAGs in various scenarios.
Integrate conditional dataset with Time-Based Scheduling
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Combining conditional dataset expressions with time-based schedules enhances scheduling flexibility:

.. code-block:: python
from airflow.timetables import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
with DAG(
dag_id="conditional_dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
),
tags=["dataset-time-based-timetable"],
) as dag:
BashOperator(
task_id="conditional_dataset_and_time_based_timetable",
bash_command="sleep 5",
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
)
Timetables comparisons
Expand Down

0 comments on commit e95321c

Please sign in to comment.