Skip to content

Commit

Permalink
Introducing Logical Operators for dataset conditional logic (#37101)
Browse files Browse the repository at this point in the history
* Implement | and & operators so that they can be used instead of DatasetAll and DatasetAny

* Refactor dataset class inheritance (#37590)

* Refactor DatasetAll and DatasetAny inheritance

They are moved from airflow.models.datasets to airflow.datasets since
the intention is to use them with Dataset, not DatasetModel. It is more
natural for users to import from the latter module instead.

A new (abstract) base class is added for the two classes, plus the OG
Dataset class, to inherit from. This allows us to replace a few
isinstance checks with simple molymorphism and make the logic a bit
simpler.


Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Co-authored-by: Wei Lee <weilee.rx@gmail.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
  • Loading branch information
4 people committed Feb 26, 2024
1 parent 9b17ff3 commit 6c7d2c9
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 41 deletions.
24 changes: 22 additions & 2 deletions airflow/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class BaseDatasetEventInput(Protocol):
:meta private:
"""

def __or__(self, other: BaseDatasetEventInput) -> DatasetAny:
return DatasetAny(self, other)

def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
return DatasetAll(self, other)

def evaluate(self, statuses: dict[str, bool]) -> bool:
raise NotImplementedError

Expand All @@ -50,7 +56,7 @@ class Dataset(os.PathLike, BaseDatasetEventInput):
__version__: ClassVar[int] = 1

@uri.validator
def _check_uri(self, attr, uri: str):
def _check_uri(self, attr, uri: str) -> None:
if uri.isspace():
raise ValueError(f"{attr.name} cannot be just whitespace")
try:
Expand All @@ -64,7 +70,7 @@ def _check_uri(self, attr, uri: str):
def __fspath__(self) -> str:
return self.uri

def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return self.uri == other.uri
else:
Expand Down Expand Up @@ -106,8 +112,22 @@ class DatasetAny(_DatasetBooleanCondition):

agg_func = any

def __or__(self, other: BaseDatasetEventInput) -> DatasetAny:
# Optimization: X | (Y | Z) is equivalent to X | Y | Z.
return DatasetAny(*self.objects, other)

def __repr__(self) -> str:
return f"DatasetAny({', '.join(map(str, self.objects))})"


class DatasetAll(_DatasetBooleanCondition):
"""Use to combine datasets schedule references in an "or" relationship."""

agg_func = all

def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
# Optimization: X & (Y & Z) is equivalent to X & Y & Z.
return DatasetAll(*self.objects, other)

def __repr__(self) -> str:
return f"DatasetAll({', '.join(map(str, self.objects))})"
76 changes: 60 additions & 16 deletions airflow/example_dags/example_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,39 @@
# specific language governing permissions and limitations
# under the License.
"""
Example DAG for demonstrating behavior of Datasets feature.
Example DAG for demonstrating the behavior of the Datasets feature in Airflow, including conditional and
dataset expression-based scheduling.
Notes on usage:
Turn on all the dags.
Turn on all the DAGs.
DAG dataset_produces_1 should run because it's on a schedule.
dataset_produces_1 is scheduled to run daily. Once it completes, it triggers several DAGs due to its dataset
being updated. dataset_consumes_1 is triggered immediately, as it depends solely on the dataset produced by
dataset_produces_1. consume_1_or_2_with_dataset_expressions will also be triggered, as its condition of
either dataset_produces_1 or dataset_produces_2 being updated is satisfied with dataset_produces_1.
After dataset_produces_1 runs, dataset_consumes_1 should be triggered immediately
because its only dataset dependency is managed by dataset_produces_1.
dataset_consumes_1_and_2 will not be triggered after dataset_produces_1 runs because it requires the dataset
from dataset_produces_2, which has no schedule and must be manually triggered.
No other dags should be triggered. Note that even though dataset_consumes_1_and_2 depends on
the dataset in dataset_produces_1, it will not be triggered until dataset_produces_2 runs
(and dataset_produces_2 is left with no schedule so that we can trigger it manually).
After manually triggering dataset_produces_2, several DAGs will be affected. dataset_consumes_1_and_2 should
run because both its dataset dependencies are now met. consume_1_and_2_with_dataset_expressions will be
triggered, as it requires both dataset_produces_1 and dataset_produces_2 datasets to be updated.
consume_1_or_2_with_dataset_expressions will be triggered again, since it's conditionally set to run when
either dataset is updated.
Next, trigger dataset_produces_2. After dataset_produces_2 finishes,
dataset_consumes_1_and_2 should run.
consume_1_or_both_2_and_3_with_dataset_expressions demonstrates complex dataset dependency logic.
This DAG triggers if dataset_produces_1 is updated or if both dataset_produces_2 and dag3_dataset
are updated. This example highlights the capability to combine updates from multiple datasets with logical
expressions for advanced scheduling.
Dags dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled should not run because
they depend on datasets that never get updated.
conditional_dataset_and_time_based_timetable illustrates the integration of time-based scheduling with
dataset dependencies. This DAG is configured to execute either when both dataset_produces_1 and
dataset_produces_2 datasets have been updated or according to a specific cron schedule, showcasing
Airflow's versatility in handling mixed triggers for dataset and time-based scheduling.
The DAGs dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled will not run
automatically as they depend on datasets that do not get updated or are not produced by any scheduled tasks.
"""
from __future__ import annotations

Expand All @@ -50,6 +63,7 @@
dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
# [END dataset_def]
dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"})

with DAG(
dag_id="dataset_produces_1",
Expand Down Expand Up @@ -132,16 +146,46 @@
)

with DAG(
dag_id="dataset_and_time_based_timetable",
dag_id="consume_1_and_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset & dag2_dataset),
) as dag5:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_and_2_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | dag2_dataset),
) as dag6:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_2_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
) as dag7:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="conditional_dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=[dag1_dataset]
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
),
tags=["dataset-time-based-timetable"],
) as dag7:
) as dag8:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="consuming_dataset_time_based",
task_id="conditional_dataset_and_time_based_timetable",
bash_command="sleep 5",
)
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def __init__(
self.dataset_triggers = DatasetAll(*schedule)
elif isinstance(schedule, Timetable):
timetable = schedule
elif schedule is not NOTSET:
elif schedule is not NOTSET and not isinstance(schedule, BaseDatasetEventInput):
schedule_interval = schedule

if isinstance(schedule, DatasetOrTimeSchedule):
Expand Down
64 changes: 64 additions & 0 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,68 @@ Example:
Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to fetch the first of one Dataset given to the DAG, and the first of one DatasetEvent for that Dataset. An implementation may be quite complex if you have multiple Datasets, potentially with multiple DatasetEvents.

Advanced Dataset Scheduling with Conditional Expressions
--------------------------------------------------------

Apache Airflow introduces advanced scheduling capabilities that leverage conditional expressions with datasets. This feature allows Airflow users to define complex dependencies for DAG executions based on dataset updates, using logical operators for more granular control over workflow triggers.

Logical Operators for Datasets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Airflow supports two logical operators for combining dataset conditions:

- **AND (``&``)**: Specifies that the DAG should be triggered only after all of the specified datasets have been updated.
- **OR (``|``)**: Specifies that the DAG should be triggered when any one of the specified datasets is updated.

These operators enable the expression of complex dataset update conditions, enhancing the dynamism and flexibility of Airflow workflows.

Example Usage
-------------

**Scheduling Based on Multiple Dataset Updates**

To schedule a DAG to run only when two specific datasets have both been updated, use the AND operator (``&``):

.. code-block:: python
dag1_dataset = Dataset("s3://dag1/output_1.txt")
dag2_dataset = Dataset("s3://dag2/output_1.txt")
with DAG(
# Consume dataset 1 and 2 with dataset expressions
schedule=(dag1_dataset & dag2_dataset),
...,
):
...
**Scheduling Based on Any Dataset Update**

To trigger a DAG execution when either of two datasets is updated, apply the OR operator (``|``):

.. code-block:: python
with DAG(
# Consume dataset 1 or 2 with dataset expressions
schedule=(dag1_dataset | dag2_dataset),
...,
):
...
**Complex Conditional Logic**

For scenarios requiring more intricate conditions, such as triggering a DAG when one dataset is updated or when both of two other datasets are updated, combine the OR and AND operators:

.. code-block:: python
dag3_dataset = Dataset("s3://dag3/output_3.txt")
with DAG(
# Consume dataset 1 or both 2 and 3 with dataset expressions
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
...,
):
...
Combining Dataset and Time-Based Schedules
------------------------------------------

Expand All @@ -245,3 +307,5 @@ DatasetTimetable Integration
With the introduction of ``DatasetTimetable``, it is now possible to schedule DAGs based on both dataset events and time-based schedules. This feature offers flexibility for scenarios where a DAG needs to be triggered by data updates as well as run periodically according to a fixed timetable.

For more detailed information on ``DatasetTimetable`` and its usage, refer to the corresponding section in :ref:`DatasetTimetable <dataset-timetable-section>`.

These examples illustrate how Airflow's conditional dataset expressions can be used to create complex data-dependent scheduling scenarios, providing precise control over when DAGs are triggered in response to data updates.
48 changes: 28 additions & 20 deletions docs/apache-airflow/authoring-and-scheduling/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -192,29 +192,37 @@ Here's an example of a DAG using ``DatasetTimetable``:
from airflow.timetables.dataset import DatasetTimetable
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.datasets import Dataset
from airflow.models import DAG
from airflow.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset]),
tags=["dataset-time-based-timetable"],
) as dag7:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="consuming_dataset_time_based",
bash_command="sleep 5",
)
@dag(
schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset])
# Additional arguments here, replace this comment with actual arguments
)
def example_dag():
# DAG tasks go here
pass
In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC based on the ``CronTriggerTimetable``, and it is also triggered by updates to ``dag1_dataset``.

Future Enhancements
~~~~~~~~~~~~~~~~~~~
Future iterations may introduce more complex combinations for scheduling (e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility for scheduling DAGs in various scenarios.
Integrate conditional dataset with Time-Based Scheduling
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Combining conditional dataset expressions with time-based schedules enhances scheduling flexibility:

.. code-block:: python
from airflow.timetables import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
@dag(
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
)
# Additional arguments here, replace this comment with actual arguments
)
def example_dag():
# DAG tasks go here
pass
Timetables comparisons
Expand Down

0 comments on commit 6c7d2c9

Please sign in to comment.