Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce DatasetOrTimeSchedule #36710

Merged
merged 9 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions airflow/example_dags/example_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

# [START dataset_def]
dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
Expand Down Expand Up @@ -128,3 +130,18 @@
outlets=[Dataset("s3://unrelated_task/dataset_other_unknown.txt")],
bash_command="sleep 5",
)

with DAG(
dag_id="dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=[dag1_dataset]
),
tags=["dataset-time-based-timetable"],
) as dag7:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="consuming_dataset_time_based",
bash_command="sleep 5",
)
3 changes: 3 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
from airflow.security import permissions
from airflow.stats import Stats
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from airflow.timetables.simple import (
ContinuousTimetable,
Expand Down Expand Up @@ -595,6 +596,8 @@ def __init__(
self.timetable = DatasetTriggeredTimetable()
self.schedule_interval = self.timetable.summary
elif timetable:
if isinstance(timetable, DatasetOrTimeSchedule):
self.dataset_triggers = timetable.datasets
self.timetable = timetable
self.schedule_interval = self.timetable.summary
else:
Expand Down
12 changes: 8 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,14 @@ def __str__(self) -> str:
)


def _encode_timetable(var: Timetable) -> dict[str, Any]:
def encode_timetable(var: Timetable) -> dict[str, Any]:
"""
Encode a timetable instance.

This delegates most of the serialization work to the type, so the behavior
can be completely controlled by a custom subclass.

:meta private:
"""
timetable_class = type(var)
importable_string = qualname(timetable_class)
Expand All @@ -211,12 +213,14 @@ def _encode_timetable(var: Timetable) -> dict[str, Any]:
return {Encoding.TYPE: importable_string, Encoding.VAR: var.serialize()}


def _decode_timetable(var: dict[str, Any]) -> Timetable:
def decode_timetable(var: dict[str, Any]) -> Timetable:
"""
Decode a previously serialized timetable.

Most of the deserialization logic is delegated to the actual type, which
we import from string.

:meta private:
"""
importable_string = var[Encoding.TYPE]
timetable_class = _get_registered_timetable(importable_string)
Expand Down Expand Up @@ -401,7 +405,7 @@ def serialize_to_json(
elif key in decorated_fields:
serialized_object[key] = cls.serialize(value)
elif key == "timetable" and value is not None:
serialized_object[key] = _encode_timetable(value)
serialized_object[key] = encode_timetable(value)
else:
value = cls.serialize(value)
if isinstance(value, dict) and Encoding.TYPE in value:
Expand Down Expand Up @@ -1368,7 +1372,7 @@ def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> SerializedDAG:
# Value structure matches exactly
pass
elif k == "timetable":
v = _decode_timetable(v)
v = decode_timetable(v)
elif k in cls._decorated_fields:
v = cls.deserialize(v)
elif k == "params":
Expand Down
92 changes: 92 additions & 0 deletions airflow/timetables/datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import collections.abc
import typing

import attrs

from airflow.datasets import Dataset
from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables.simple import DatasetTriggeredTimetable as DatasetTriggeredSchedule
potiuk marked this conversation as resolved.
Show resolved Hide resolved
from airflow.utils.types import DagRunType

if typing.TYPE_CHECKING:
import pendulum

from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable


class DatasetOrTimeSchedule(DatasetTriggeredSchedule):
"""Combine time-based scheduling with event-based scheduling."""

def __init__(self, timetable: Timetable, datasets: collections.abc.Collection[Dataset]) -> None:
self.timetable = timetable
self.datasets = datasets

self.description = f"Triggered by datasets or {timetable.description}"
self.periodic = timetable.periodic
self._can_be_scheduled = timetable._can_be_scheduled

self.run_ordering = timetable.run_ordering
self.active_runs_limit = timetable.active_runs_limit

@classmethod
def deserialize(cls, data: dict[str, typing.Any]) -> Timetable:
from airflow.serialization.serialized_objects import decode_timetable

return cls(
timetable=decode_timetable(data["timetable"]), datasets=[Dataset(**d) for d in data["datasets"]]
)

def serialize(self) -> dict[str, typing.Any]:
from airflow.serialization.serialized_objects import encode_timetable

return {
"timetable": encode_timetable(self.timetable),
"datasets": [attrs.asdict(e) for e in self.datasets],
}

def validate(self) -> None:
if isinstance(self.timetable, DatasetTriggeredSchedule):
raise AirflowTimetableInvalid("cannot nest dataset timetables")
if not isinstance(self.datasets, collections.abc.Collection) or not all(
isinstance(d, Dataset) for d in self.datasets
):
raise AirflowTimetableInvalid("all elements in 'event' must be datasets")

@property
def summary(self) -> str:
return f"Dataset or {self.timetable.summary}"

def infer_manual_data_interval(self, *, run_after: pendulum.DateTime) -> DataInterval:
return self.timetable.infer_manual_data_interval(run_after=run_after)

def next_dagrun_info(
self, *, last_automated_data_interval: DataInterval | None, restriction: TimeRestriction
) -> DagRunInfo | None:
return self.timetable.next_dagrun_info(
last_automated_data_interval=last_automated_data_interval,
restriction=restriction,
)

def generate_run_id(self, *, run_type: DagRunType, **kwargs: typing.Any) -> str:
if run_type != DagRunType.DATASET_TRIGGERED:
return self.timetable.generate_run_id(run_type=run_type, **kwargs)
return super().generate_run_id(run_type=run_type, **kwargs)
4 changes: 2 additions & 2 deletions airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

import operator
from typing import TYPE_CHECKING, Any, Collection
from typing import TYPE_CHECKING, Any, Collection, Sequence

from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone
Expand All @@ -35,7 +35,7 @@ class _TrivialTimetable(Timetable):
"""Some code reuse for "trivial" timetables that has nothing complex."""

periodic = False
run_ordering = ("execution_date",)
run_ordering: Sequence[str] = ("execution_date",)

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
Expand Down
9 changes: 9 additions & 0 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,12 @@ Example:
print_triggering_dataset_events()

Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to fetch the first of one Dataset given to the DAG, and the first of one DatasetEvent for that Dataset. An implementation may be quite complex if you have multiple Datasets, potentially with multiple DatasetEvents.

Combining Dataset and Time-Based Schedules
------------------------------------------

DatasetTimetable Integration
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
With the introduction of ``DatasetTimetable``, it is now possible to schedule DAGs based on both dataset events and time-based schedules. This feature offers flexibility for scenarios where a DAG needs to be triggered by data updates as well as run periodically according to a fixed timetable.

For more detailed information on ``DatasetTimetable`` and its usage, refer to the corresponding section in :ref:`DatasetTimetable <dataset-timetable-section>`.
39 changes: 39 additions & 0 deletions docs/apache-airflow/authoring-and-scheduling/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,45 @@ first) event for the data interval, otherwise manual runs will run with a ``data
def example_dag():
pass

.. _dataset-timetable-section:

DatasetTimetable
^^^^^^^^^^^^^^^^

The ``DatasetTimetable`` is a specialized timetable allowing for the scheduling of DAGs based on both time-based schedules and dataset events. It facilitates the creation of scheduled runs (as per traditional timetables) and dataset-triggered runs, which operate independently.

This feature is particularly useful in scenarios where a DAG needs to run on dataset updates and also at periodic intervals. It ensures that the workflow remains responsive to data changes and consistently runs regular checks or updates.

Here's an example of a DAG using ``DatasetTimetable``:

.. code-block:: python

from airflow.timetables.dataset import DatasetTimetable
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.datasets import Dataset
from airflow.models import DAG
from airflow.operators.bash import BashOperator
import pendulum

with DAG(
dag_id="dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), event=[dag1_dataset]),
tags=["dataset-time-based-timetable"],
) as dag7:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="consuming_dataset_time_based",
bash_command="sleep 5",
)

In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC based on the ``CronTriggerTimetable``, and it is also triggered by updates to ``dag1_dataset``.

Future Enhancements
~~~~~~~~~~~~~~~~~~~
Future iterations may introduce more complex combinations for scheduling (e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility for scheduling DAGs in various scenarios.


Timetables comparisons
----------------------
Expand Down
1 change: 1 addition & 0 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2096,6 +2096,7 @@ def test_outlet_datasets(self, create_task_instance):
assert session.query(DatasetDagRunQueue.target_dag_id).filter_by(
dataset_id=event.dataset.id
).order_by(DatasetDagRunQueue.target_dag_id).all() == [
("dataset_and_time_based_timetable",),
("dataset_consumes_1",),
("dataset_consumes_1_and_2",),
("dataset_consumes_1_never_scheduled",),
Expand Down