Skip to content

Commit

Permalink
Inherit the run_ordering from DatasetTriggeredTimetable for DatasetOr…
Browse files Browse the repository at this point in the history
…TimeSchedule (#37775)
  • Loading branch information
sunank200 committed Feb 29, 2024
1 parent e358bb2 commit b4c1b3f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
2 changes: 0 additions & 2 deletions airflow/timetables/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ def __init__(
self.description = f"Triggered by datasets or {timetable.description}"
self.periodic = timetable.periodic
self._can_be_scheduled = timetable._can_be_scheduled

self.run_ordering = timetable.run_ordering
self.active_runs_limit = timetable.active_runs_limit

@classmethod
Expand Down
64 changes: 64 additions & 0 deletions tests/timetables/test_datasets_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
from pendulum import DateTime

from airflow.datasets import Dataset
from airflow.models.dataset import DatasetEvent
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.simple import DatasetTriggeredTimetable
from airflow.utils.types import DagRunType


Expand Down Expand Up @@ -179,3 +181,65 @@ def test_generate_run_id(dataset_timetable: DatasetOrTimeSchedule) -> None:
run_type=DagRunType.MANUAL, extra_args="test", logical_date=DateTime.now(), data_interval=None
)
assert isinstance(run_id, str)


@pytest.fixture
def dataset_events(mocker) -> list[DatasetEvent]:
"""Pytest fixture for creating mock DatasetEvent objects."""
now = DateTime.now()
earlier = now.subtract(days=1)
later = now.add(days=1)

# Create mock source_dag_run objects
mock_dag_run_earlier = mocker.MagicMock()
mock_dag_run_earlier.data_interval_start = earlier
mock_dag_run_earlier.data_interval_end = now

mock_dag_run_later = mocker.MagicMock()
mock_dag_run_later.data_interval_start = now
mock_dag_run_later.data_interval_end = later

# Create DatasetEvent objects with mock source_dag_run
event_earlier = DatasetEvent(timestamp=earlier, dataset_id=1)
event_later = DatasetEvent(timestamp=later, dataset_id=1)

# Use mocker to set the source_dag_run attribute to avoid SQLAlchemy's instrumentation
mocker.patch.object(event_earlier, "source_dag_run", new=mock_dag_run_earlier)
mocker.patch.object(event_later, "source_dag_run", new=mock_dag_run_later)

return [event_earlier, event_later]


def test_data_interval_for_events(
dataset_timetable: DatasetOrTimeSchedule, dataset_events: list[DatasetEvent]
) -> None:
"""
Tests the data_interval_for_events method of DatasetTimetable.
:param dataset_timetable: The DatasetTimetable instance to test.
:param dataset_events: A list of mock DatasetEvent instances.
"""
data_interval = dataset_timetable.data_interval_for_events(
logical_date=DateTime.now(), events=dataset_events
)
assert data_interval.start == min(
event.timestamp for event in dataset_events
), "Data interval start does not match"
assert data_interval.end == max(
event.timestamp for event in dataset_events
), "Data interval end does not match"


def test_run_ordering_inheritance(dataset_timetable: DatasetOrTimeSchedule) -> None:
"""
Tests that DatasetOrTimeSchedule inherits run_ordering from its parent class correctly.
:param dataset_timetable: The DatasetTimetable instance to test.
"""
assert hasattr(
dataset_timetable, "run_ordering"
), "DatasetOrTimeSchedule should have 'run_ordering' attribute"
parent_run_ordering = getattr(DatasetTriggeredTimetable, "run_ordering", None)
assert (
dataset_timetable.run_ordering == parent_run_ordering
), "run_ordering does not match the parent class"

0 comments on commit b4c1b3f

Please sign in to comment.