From f5b55ee2170a4ffaac461b83e83d9b81f60a3e00 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 16 Apr 2021 10:21:25 +0800 Subject: [PATCH] Implement timetable class for AIP-39 This creates a new subpackage airflow.timetables, and implements timetable constructs that provides DAG scheduling logic. The timetable classes are used to refactor schedule inference logic out of the DAG class, and existing functions related to scheduling are refactored to use timetables (and deprecated). Usages of the deprecated DAG functions in Airflow's code base are modified to either use the timetable, or infer the information by other means. For example, usages of previous_schedule() (what was a DAG last scheduled to run before this run?) are refactored to query the database when the previous scheduled run actually happened, instead of using the schedule interval (cron or timedelta) in infer the information. This is because an AIP-39 timetable does not necessarily run on a periodic-ish schedule, and we cannot reliably infer when the previous run happened. --- airflow/api/common/experimental/mark_tasks.py | 2 +- airflow/cli/commands/dag_command.py | 4 +- airflow/compat/functools.pyi | 27 ++ airflow/exceptions.py | 4 + airflow/jobs/backfill_job.py | 2 +- airflow/models/baseoperator.py | 2 +- airflow/models/dag.py | 356 ++++++++---------- airflow/models/dagbag.py | 9 +- airflow/models/dagrun.py | 6 +- airflow/models/taskinstance.py | 15 +- airflow/ti_deps/deps/prev_dagrun_dep.py | 44 +-- airflow/timetables/__init__.py | 16 + airflow/timetables/base.py | 133 +++++++ airflow/timetables/interval.py | 92 +++++ airflow/timetables/schedules.py | 207 ++++++++++ airflow/timetables/simple.py | 78 ++++ airflow/utils/dates.py | 7 + airflow/utils/timezone.py | 12 + airflow/www/views.py | 30 +- tests/cli/commands/test_dag_command.py | 11 +- tests/jobs/test_backfill_job.py | 13 +- tests/jobs/test_scheduler_job.py | 2 +- tests/models/test_dag.py | 68 ++-- tests/models/test_dagrun.py | 8 +- tests/models/test_taskinstance.py | 6 + tests/sensors/test_base.py | 3 +- tests/serialization/test_dag_serialization.py | 21 +- .../perf/scheduler_dag_execution_timing.py | 10 +- tests/test_utils/timetables.py | 27 ++ tests/ti_deps/deps/test_prev_dagrun_dep.py | 198 +++++----- .../timetables/test_time_table_iter_ranges.py | 38 ++ 31 files changed, 1056 insertions(+), 395 deletions(-) create mode 100644 airflow/compat/functools.pyi create mode 100644 airflow/timetables/__init__.py create mode 100644 airflow/timetables/base.py create mode 100644 airflow/timetables/interval.py create mode 100644 airflow/timetables/schedules.py create mode 100644 airflow/timetables/simple.py create mode 100644 tests/test_utils/timetables.py create mode 100644 tests/timetables/test_time_table_iter_ranges.py diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index 6adfd11106f28..9a99d4d340435 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -257,7 +257,7 @@ def get_execution_dates(dag, execution_date, future, past): dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date) dates = sorted({d.execution_date for d in dag_runs}) else: - dates = dag.date_range(start_date=start_date, end_date=end_date) + dates = dag.get_run_dates(start_date, end_date, align=False) return dates diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index e739162540fb4..dfd8c3c685b27 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -265,11 +265,11 @@ def dag_next_execution(args): ) print(None) else: - print(next_execution_dttm) + print(next_execution_dttm.isoformat()) for _ in range(1, args.num_executions): next_execution_dttm = dag.following_schedule(next_execution_dttm) - print(next_execution_dttm) + print(next_execution_dttm.isoformat()) else: print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr) print(None) diff --git a/airflow/compat/functools.pyi b/airflow/compat/functools.pyi new file mode 100644 index 0000000000000..8dabbd60047c0 --- /dev/null +++ b/airflow/compat/functools.pyi @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This stub exists to work around false linter errors due to python/mypy#10408. +# TODO: Remove this file after the upstream fix is available in our toolchain. + +from typing import Callable, TypeVar + +T = TypeVar("T") + +def cached_property(f: Callable[..., T]) -> T: ... +def cache(f: T) -> T: ... diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 0f1a28d867344..e15ee62f51965 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -115,6 +115,10 @@ class AirflowClusterPolicyViolation(AirflowException): """Raise when there is a violation of a Cluster Policy in Dag definition""" +class AirflowTimetableInvalid(AirflowException): + """Raise when a DAG has an invalid timetable.""" + + class DagNotFound(AirflowNotFoundException): """Raise when a DAG is not available in the system""" diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index 167aa7a337f26..880bdaa51ad3e 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -756,7 +756,7 @@ def _execute(self, session=None): start_date = self.bf_start_date # Get intervals between the start/end dates, which will turn into dag runs - run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date) + run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date, align=True) if self.run_backwards: tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past] if tasks_that_depend_on_past: diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 7b8afe44bd5b5..17ad7a7057d7b 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1244,7 +1244,7 @@ def run( start_date = start_date or self.start_date end_date = end_date or self.end_date or timezone.utcnow() - for execution_date in self.dag.date_range(start_date, end_date=end_date): + for execution_date in self.dag.get_run_dates(start_date, end_date, align=False): TaskInstance(self, execution_date).run( mark_success=mark_success, ignore_depends_on_past=(execution_date == start_date and ignore_first_depends_on_past), diff --git a/airflow/models/dag.py b/airflow/models/dag.py index e283b1475d394..c0dd158a5427f 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -47,7 +47,6 @@ import jinja2 import pendulum -from croniter import croniter from dateutil.relativedelta import relativedelta from jinja2.nativetypes import NativeEnvironment from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, or_ @@ -57,6 +56,7 @@ import airflow.templates from airflow import settings, utils +from airflow.compat.functools import cached_property from airflow.configuration import conf from airflow.exceptions import AirflowException, DuplicateTaskIdFound, TaskNotFound from airflow.models.base import ID_LEN, Base @@ -69,6 +69,10 @@ from airflow.models.taskinstance import Context, TaskInstance, TaskInstanceKey, clear_task_instances from airflow.security import permissions from airflow.stats import Stats +from airflow.timetables.base import TimeRestriction, Timetable +from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable +from airflow.timetables.schedules import Schedule +from airflow.timetables.simple import NullTimetable, OnceTimetable from airflow.typing_compat import Literal, RePatternType from airflow.utils import timezone from airflow.utils.dates import cron_presets, date_range as utils_date_range @@ -455,29 +459,26 @@ def date_range( num: Optional[int] = None, end_date: Optional[datetime] = timezone.utcnow(), ) -> List[datetime]: + message = "`DAG.date_range()` is deprecated." if num is not None: - end_date = None - return utils_date_range( - start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval - ) + result = utils_date_range(start_date=start_date, num=num) + else: + message += " Please use `DAG.get_run_dates(..., align=False)` instead." + result = self.get_run_dates(start_date, end_date, align=False) + warnings.warn(message, category=DeprecationWarning, stacklevel=2) + return result def is_fixed_time_schedule(self): - """ - Figures out if the DAG schedule has a fixed time (e.g. 3 AM). - - :return: True if the schedule has a fixed time, False if not. - """ - now = datetime.now() - cron = croniter(self.normalized_schedule_interval, now) - - start = cron.get_next(datetime) - cron_next = cron.get_next(datetime) - - if cron_next.minute == start.minute and cron_next.hour == start.hour: + warnings.warn( + "`DAG.is_fixed_time_schedule()` is deprecated.", + category=DeprecationWarning, + stacklevel=2, + ) + try: + return not self.timetable._schedule._should_fix_dst + except AttributeError: return True - return False - def following_schedule(self, dttm): """ Calculates the following schedule for this dag in UTC. @@ -485,189 +486,139 @@ def following_schedule(self, dttm): :param dttm: utc datetime :return: utc datetime """ - if isinstance(self.normalized_schedule_interval, str): - # we don't want to rely on the transitions created by - # croniter as they are not always correct - dttm = pendulum.instance(dttm) - naive = timezone.make_naive(dttm, self.timezone) - cron = croniter(self.normalized_schedule_interval, naive) - - # We assume that DST transitions happen on the minute/hour - if not self.is_fixed_time_schedule(): - # relative offset (eg. every 5 minutes) - delta = cron.get_next(datetime) - naive - following = dttm.in_timezone(self.timezone) + delta - else: - # absolute (e.g. 3 AM) - naive = cron.get_next(datetime) - tz = self.timezone - following = timezone.make_aware(naive, tz) - return timezone.convert_to_utc(following) - elif self.normalized_schedule_interval is not None: - return timezone.convert_to_utc(dttm + self.normalized_schedule_interval) + current = pendulum.instance(dttm) + between = TimeRestriction(earliest=None, latest=None, catchup=True) + next_info = self.timetable.next_dagrun_info(current, between) + if next_info is None: + return None + return next_info.data_interval.start def previous_schedule(self, dttm): - """ - Calculates the previous schedule for this dag in UTC - - :param dttm: utc datetime - :return: utc datetime - """ - if isinstance(self.normalized_schedule_interval, str): - # we don't want to rely on the transitions created by - # croniter as they are not always correct - dttm = pendulum.instance(dttm) - naive = timezone.make_naive(dttm, self.timezone) - cron = croniter(self.normalized_schedule_interval, naive) - - # We assume that DST transitions happen on the minute/hour - if not self.is_fixed_time_schedule(): - # relative offset (eg. every 5 minutes) - delta = naive - cron.get_prev(datetime) - previous = dttm.in_timezone(self.timezone) - delta - else: - # absolute (e.g. 3 AM) - naive = cron.get_prev(datetime) - tz = self.timezone - previous = timezone.make_aware(naive, tz) - return timezone.convert_to_utc(previous) - elif self.normalized_schedule_interval is not None: - return timezone.convert_to_utc(dttm - self.normalized_schedule_interval) + warnings.warn( + "`DAG.previous_schedule()` is deprecated.", + category=DeprecationWarning, + stacklevel=2, + ) + try: + schedule: Schedule = self.timetable._schedule + except AttributeError: + return None + return schedule.get_prev(pendulum.instance(dttm)) def next_dagrun_info( self, date_last_automated_dagrun: Optional[pendulum.DateTime], ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]: - """ - Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the - execution date, and the earliest it could be scheduled - - :param date_last_automated_dagrun: The max(execution_date) of existing - "automated" DagRuns for this dag (scheduled or backfill, but not - manual) - """ - if ( - self.schedule_interval == "@once" and date_last_automated_dagrun - ) or self.schedule_interval is None: - # Manual trigger, or already created the run for @once, can short circuit + """Get information about the next DagRun of this dag after ``date_last_automated_dagrun``. + + This calculates what time interval the next DagRun should operate on + (its execution date), and when it can be scheduled, , according to the + dag's timetable, start_date, end_date, etc. This doesn't check max + active run or any other "max_active_tasks" type limits, but only + performs calculations based on the various date and interval fields of + this dag and its tasks. + + :param date_last_automated_dagrun: The ``max(execution_date)`` of + existing "automated" DagRuns for this dag (scheduled or backfill, + but not manual). + :return: A 2-tuple containing the DagRun's execution date, and the + earliest it could be scheduled. + """ + # XXX: The timezone.coerce_datetime calls in this function should not + # be necessary since the function annotation suggests it only accepts + # pendulum.DateTime, and someone is passing datetime.datetime into this + # function. We should fix whatever is doing that. + if self.is_subdag: return (None, None) - next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun) - - if next_execution_date is None: + next_info = self.timetable.next_dagrun_info( + timezone.coerce_datetime(date_last_automated_dagrun), + self._time_restriction, + ) + if next_info is None: return (None, None) - - if self.schedule_interval == "@once": - # For "@once" it can be created "now" - return (next_execution_date, next_execution_date) - - return (next_execution_date, self.following_schedule(next_execution_date)) + return (next_info.data_interval.start, next_info.run_after) def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]): - """ - Get the next execution date after the given ``date_last_automated_dagrun``, according to - schedule_interval, start_date, end_date etc. This doesn't check max active run or any other - "max_active_tasks" type limits, it only performs calculations based on the various date - and interval fields of this dag and it's tasks. - - :param date_last_automated_dagrun: The execution_date of the last scheduler or - backfill triggered run for this dag - :type date_last_automated_dagrun: pendulum.Pendulum - """ - if not self.schedule_interval or self.is_subdag: - return None - - # don't schedule @once again - if self.schedule_interval == '@once' and date_last_automated_dagrun: - return None - - # don't do scheduler catchup for dag's that don't have dag.catchup = True - if not (self.catchup or self.schedule_interval == '@once'): - # The logic is that we move start_date up until - # one period before, so that timezone.utcnow() is AFTER - # the period end, and the job can be created... - now = timezone.utcnow() - next_start = self.following_schedule(now) - last_start = self.previous_schedule(now) - if next_start <= now or isinstance(self.schedule_interval, timedelta): - new_start = last_start - else: - new_start = self.previous_schedule(last_start) - - if self.start_date: - if new_start >= self.start_date: - self.start_date = new_start - else: - self.start_date = new_start - - next_run_date = None - if not date_last_automated_dagrun: - # First run - task_start_dates = [t.start_date for t in self.tasks if t.start_date] - if task_start_dates: - next_run_date = self.normalize_schedule(min(task_start_dates)) - self.log.debug("Next run date based on tasks %s", next_run_date) + warnings.warn( + "`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.next_dagrun_info(date_last_automated_dagrun)[0] + + @cached_property + def _time_restriction(self) -> TimeRestriction: + start_dates = [t.start_date for t in self.tasks if t.start_date] + if self.start_date is not None: + start_dates.append(self.start_date) + if start_dates: + earliest = timezone.coerce_datetime(min(start_dates)) else: - next_run_date = self.following_schedule(date_last_automated_dagrun) - - if date_last_automated_dagrun and next_run_date: - while next_run_date <= date_last_automated_dagrun: - next_run_date = self.following_schedule(next_run_date) - - # don't ever schedule prior to the dag's start_date - if self.start_date: - next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date) - if next_run_date == self.start_date: - next_run_date = self.normalize_schedule(self.start_date) - - self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date) - - # Don't schedule a dag beyond its end_date (as specified by the dag param) - if next_run_date and self.end_date and next_run_date > self.end_date: - return None - - # Don't schedule a dag beyond its end_date (as specified by the task params) - # Get the min task end date, which may come from the dag.default_args - task_end_dates = [t.end_date for t in self.tasks if t.end_date] - if task_end_dates and next_run_date: - min_task_end_date = min(task_end_dates) - if next_run_date > min_task_end_date: - return None - - return next_run_date - - def get_run_dates(self, start_date, end_date=None): + earliest = None + end_dates = [t.end_date for t in self.tasks if t.end_date] + if self.end_date is not None: + end_dates.append(self.end_date) + if end_dates: + latest = timezone.coerce_datetime(max(end_dates)) + else: + latest = None + return TimeRestriction(earliest, latest, self.catchup) + + @cached_property + def timetable(self) -> Timetable: + interval = self.schedule_interval + if interval is None: + return NullTimetable() + if interval == "@once": + return OnceTimetable() + if isinstance(interval, (timedelta, relativedelta)): + return DeltaDataIntervalTimetable(interval) + if isinstance(interval, str): + return CronDataIntervalTimetable(interval, self.timezone) + type_name = type(interval).__name__ + raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.") + + def get_run_dates(self, start_date, end_date=None, *, align: bool = True): """ Returns a list of dates between the interval received as parameter using this dag's schedule interval. Returned dates can be used for execution dates. - :param start_date: the start date of the interval + :param start_date: The start date of the interval. :type start_date: datetime - :param end_date: the end date of the interval, defaults to timezone.utcnow() + :param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``. :type end_date: datetime - :return: a list of dates within the interval following the dag's schedule + :param align: Whether the first run should be delayed to "align" with + the schedule, or can happen immediately at start_date. The default is + ``True``, but subdags will ignore this value and always behave as if + this is set to ``False`` for backward compatibility. + :type align: bool + :return: A list of dates within the interval following the dag's schedule. :rtype: list """ - run_dates = [] - - using_start_date = start_date - using_end_date = end_date - - # dates for dag runs - using_start_date = using_start_date or min(t.start_date for t in self.tasks) - using_end_date = using_end_date or timezone.utcnow() - - # next run date for a subdag isn't relevant (schedule_interval for subdags - # is ignored) so we use the dag run's start date in the case of a subdag - next_run_date = self.normalize_schedule(using_start_date) if not self.is_subdag else using_start_date - - while next_run_date and next_run_date <= using_end_date: - run_dates.append(next_run_date) - next_run_date = self.following_schedule(next_run_date) - - return run_dates + if start_date is None: + start = self._time_restriction.earliest + else: + start = pendulum.instance(start_date) + if end_date is None: + end = pendulum.now(timezone.utc) + else: + end = pendulum.instance(end_date) + # HACK: Sub-DAGs are currently scheduled differently. For example, say + # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level + # DAG should be first scheduled to run on midnight 2021-06-04, but a + # sub-DAG should be first scheduled to run RIGHT NOW. We can change + # this, but since the sub-DAG is going away in 3.0 anyway, let's keep + # compatibility for now and remove this entirely later. + if self.is_subdag: + align = False + return sorted(self.timetable.iter_between(start, end, align=align)) def normalize_schedule(self, dttm): - """Returns dttm + interval unless dttm is first interval then it returns dttm""" + warnings.warn( + "`DAG.normalize_schedule()` is deprecated.", + category=DeprecationWarning, + stacklevel=2, + ) following = self.following_schedule(dttm) # in case of @once @@ -854,14 +805,11 @@ def is_paused(self): @property def normalized_schedule_interval(self) -> Optional[ScheduleInterval]: - """ - Returns Normalized Schedule Interval. This is used internally by the Scheduler to - schedule DAGs. - - 1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``) - 2. If Schedule Interval is "@once" return "None" - 3. If not (1) or (2) returns schedule_interval - """ + warnings.warn( + "DAG.normalized_schedule_interval() is deprecated.", + category=DeprecationWarning, + stacklevel=2, + ) if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets: _schedule_interval = cron_presets.get(self.schedule_interval) # type: Optional[ScheduleInterval] elif self.schedule_interval == '@once': @@ -1060,10 +1008,40 @@ def set_dependency(self, upstream_task_id, downstream_task_id): """ self.get_task(upstream_task_id).set_downstream(self.get_task(downstream_task_id)) + @provide_session + def get_task_instances_before( + self, + base_date: datetime, + num: int, + *, + session: Session, + ) -> List[TaskInstance]: + """Get ``num`` task instances before (including) ``base_date``. + + The returned list may contain exactly ``num`` task instances. It can + have less if there are less than ``num`` scheduled DAG runs before + ``base_date``, or more if there are manual task runs between the + requested period, which does not count toward ``num``. + """ + min_date = ( + session.query(DagRun) + .filter( + DagRun.dag_id == self.dag_id, + DagRun.execution_date <= base_date, + DagRun.run_type != DagRunType.MANUAL, + ) + .order_by(DagRun.execution_date.desc()) + .offset(num) + .first() + ) + if min_date is None: + min_date = timezone.utc_epoch() + return self.get_task_instances(start_date=min_date, end_date=base_date, session=session) + @provide_session def get_task_instances( self, start_date=None, end_date=None, state=None, session=None - ) -> Iterable[TaskInstance]: + ) -> List[TaskInstance]: if not start_date: start_date = (timezone.utcnow() - timedelta(30)).date() start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time())) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index bd339eec348ad..3e40678a912ba 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -29,7 +29,6 @@ from datetime import datetime, timedelta from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Union -from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter from sqlalchemy.exc import OperationalError from sqlalchemy.orm import Session from tabulate import tabulate @@ -40,6 +39,7 @@ AirflowClusterPolicyViolation, AirflowDagCycleException, AirflowDagDuplicatedIdException, + AirflowTimetableInvalid, SerializedDagNotFound, ) from airflow.stats import Stats @@ -393,14 +393,13 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk): dag.fileloc = filepath try: dag.is_subdag = False - if isinstance(dag.normalized_schedule_interval, str): - croniter(dag.normalized_schedule_interval) + dag.timetable.validate() self.bag_dag(dag=dag, root_dag=dag) found_dags.append(dag) found_dags += dag.subdags - except (CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError) as cron_e: + except AirflowTimetableInvalid as exception: self.log.exception("Failed to bag_dag: %s", dag.full_filepath) - self.import_errors[dag.full_filepath] = f"Invalid Cron expression: {cron_e}" + self.import_errors[dag.full_filepath] = f"Invalid timetable expression: {exception}" self.file_last_changed[dag.full_filepath] = file_last_changed_on_disk except ( AirflowDagCycleException, diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 754b5f02fff24..e2a4783dc5a1a 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -369,14 +369,14 @@ def get_previous_dagrun(self, state: Optional[str] = None, session: Session = No @provide_session def get_previous_scheduled_dagrun(self, session: Session = None) -> Optional['DagRun']: """The previous, SCHEDULED DagRun, if there is one""" - dag = self.get_dag() - return ( session.query(DagRun) .filter( DagRun.dag_id == self.dag_id, - DagRun.execution_date == dag.previous_schedule(self.execution_date), + DagRun.execution_date < self.execution_date, + DagRun.run_type != DagRunType.MANUAL, ) + .order_by(DagRun.execution_date.desc()) .first() ) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 26c134ef15f43..07b4b283f1f10 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -762,11 +762,18 @@ def get_previous_ti( # LEGACY: most likely running from unit tests if not dr: # Means that this TaskInstance is NOT being run from a DR, but from a catchup - previous_scheduled_date = dag.previous_schedule(self.execution_date) - if not previous_scheduled_date: + try: + # XXX: This uses DAG internals, but as the outer comment + # said, the block is only reached for legacy reasons for + # development code, so that's OK-ish. + schedule = dag.timetable._schedule + except AttributeError: return None - - return TaskInstance(task=self.task, execution_date=previous_scheduled_date) + dt = pendulum.instance(self.execution_date) + return TaskInstance( + task=self.task, + execution_date=schedule.get_prev(dt), + ) dr.dag = dag diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py index 7a2b8f7d0c5bc..70573bfbd6e88 100644 --- a/airflow/ti_deps/deps/prev_dagrun_dep.py +++ b/airflow/ti_deps/deps/prev_dagrun_dep.py @@ -34,37 +34,37 @@ class PrevDagrunDep(BaseTIDep): @provide_session def _get_dep_statuses(self, ti, session, dep_context): if dep_context.ignore_depends_on_past: - yield self._passing_status( - reason="The context specified that the state of past DAGs could be ignored." - ) + reason = "The context specified that the state of past DAGs could be ignored." + yield self._passing_status(reason=reason) return if not ti.task.depends_on_past: yield self._passing_status(reason="The task did not have depends_on_past set.") return - # Don't depend on the previous task instance if we are the first task - dag = ti.task.dag - if dag.catchup: - if dag.previous_schedule(ti.execution_date) is None: - yield self._passing_status(reason="This task does not have a schedule or is @once") - return - if dag.previous_schedule(ti.execution_date) < ti.task.start_date: - yield self._passing_status( - reason="This task instance was the first task instance for its task." - ) - return + dr = ti.get_dagrun(session=session) + if not dr: + yield self._passing_status(reason="This task instance does not belong to a DAG.") + return + + # Don't depend on the previous task instance if we are the first task. + catchup = ti.task.dag.catchup + if catchup: + last_dagrun = dr.get_previous_scheduled_dagrun(session) else: - dr = ti.get_dagrun(session=session) - last_dagrun = dr.get_previous_dagrun(session=session) if dr else None + last_dagrun = dr.get_previous_dagrun(session=session) - if not last_dagrun: - yield self._passing_status( - reason="This task instance was the first task instance for its task." - ) - return + # First ever run for this DAG. + if not last_dagrun: + yield self._passing_status(reason="This task instance was the first task instance for its task.") + return + + # There was a DAG run, but the task wasn't active back then. + if catchup and last_dagrun.execution_date < ti.task.start_date: + yield self._passing_status(reason="This task instance was the first task instance for its task.") + return - previous_ti = ti.get_previous_ti(session=session) + previous_ti = last_dagrun.get_task_instance(ti.task_id, session=session) if not previous_ti: yield self._failing_status( reason="depends_on_past is true for this task's DAG, but the previous " diff --git a/airflow/timetables/__init__.py b/airflow/timetables/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/timetables/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py new file mode 100644 index 0000000000000..5faf9dba33413 --- /dev/null +++ b/airflow/timetables/base.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Iterator, NamedTuple, Optional + +from pendulum import DateTime + +from airflow.typing_compat import Protocol + + +class DataInterval(NamedTuple): + """A data interval for a DagRun to operate over. + + The represented interval is ``[start, end)``. + """ + + start: DateTime + end: DateTime + + +class TimeRestriction(NamedTuple): + """Restriction on when a DAG can be scheduled for a run. + + Specifically, the run must not be earlier than ``earliest``, nor later than + ``latest``. If ``catchup`` is *False*, the run must also not be earlier than + the current time, i.e. "missed" schedules are not backfilled. + + These values are generally set on the DAG or task's ``start_date``, + ``end_date``, and ``catchup`` arguments. + + Both ``earliest`` and ``latest`` are inclusive; a DAG run can happen exactly + at either point of time. + """ + + earliest: Optional[DateTime] + latest: Optional[DateTime] + catchup: bool + + +class DagRunInfo(NamedTuple): + """Information to schedule a DagRun. + + Instances of this will be returned by timetables when they are asked to + schedule a DagRun creation. + """ + + run_after: DateTime + """The earliest time this DagRun is created and its tasks scheduled.""" + + data_interval: DataInterval + """The data interval this DagRun to operate over, if applicable.""" + + @classmethod + def exact(cls, at: DateTime) -> "DagRunInfo": + """Represent a run on an exact time.""" + return cls(run_after=at, data_interval=DataInterval(at, at)) + + @classmethod + def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo": + """Represent a run on a continuous schedule. + + In such a schedule, each data interval starts right after the previous + one ends, and each run is scheduled right after the interval ends. This + applies to all schedules prior to AIP-39 except ``@once`` and ``None``. + """ + return cls(run_after=end, data_interval=DataInterval(start, end)) + + +class Timetable(Protocol): + """Protocol that all Timetable classes are expected to implement.""" + + def validate(self) -> None: + """Validate the timetable is correctly specified. + + This should raise AirflowTimetableInvalid on validation failure. + """ + raise NotImplementedError() + + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + """Provide information to schedule the next DagRun. + + :param last_automated_dagrun: The ``execution_date`` of the associated + DAG's last scheduled or backfilled run (manual runs not considered). + :param restriction: Restriction to apply when scheduling the DAG run. + See documentation of :class:`TimeRestriction` for details. + + :return: Information on when the next DagRun can be scheduled. None + means a DagRun will not happen. This does not mean no more runs + will be scheduled even again for this DAG; the timetable can return + a DagRunInfo object when asked at another time. + """ + raise NotImplementedError() + + def iter_between( + self, + start: DateTime, + end: DateTime, + *, + align: bool, + ) -> Iterator[DateTime]: + """Get schedules between the *start* and *end*.""" + if start > end: + raise ValueError(f"start ({start}) > end ({end})") + between = TimeRestriction(start, end, catchup=True) + + if align: + next_info = self.next_dagrun_info(None, between) + else: + yield start + next_info = self.next_dagrun_info(start, between) + + while next_info is not None: + dagrun_start = next_info.data_interval.start + yield dagrun_start + next_info = self.next_dagrun_info(dagrun_start, between) diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py new file mode 100644 index 0000000000000..6b3a46e465a8b --- /dev/null +++ b/airflow/timetables/interval.py @@ -0,0 +1,92 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +from typing import Any, Optional + +from pendulum import DateTime + +from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable +from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule, Schedule + + +class _DataIntervalTimetable(Timetable): + """Basis for timetable implementations that schedule data intervals. + + This kind of timetable classes create periodic data intervals from an + underlying schedule representation (e.g. a cron expression, or a timedelta + instance), and schedule a DagRun at the end of each interval. + """ + + _schedule: Schedule + + def __eq__(self, other: Any) -> bool: + """Delegate to the schedule.""" + if not isinstance(other, _DataIntervalTimetable): + return NotImplemented + return self._schedule == other._schedule + + def validate(self) -> None: + self._schedule.validate() + + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + earliest = restriction.earliest + if not restriction.catchup: + earliest = self._schedule.skip_to_latest(earliest) + if last_automated_dagrun is None: + # First run; schedule the run at the first available time matching + # the schedule, and retrospectively create a data interval for it. + if earliest is None: + return None + start = self._schedule.align(earliest) + else: + # There's a previous run. Create a data interval starting from when + # the end of the previous interval. + start = self._schedule.get_next(last_automated_dagrun) + if restriction.latest is not None and start > restriction.latest: + return None + end = self._schedule.get_next(start) + return DagRunInfo.interval(start=start, end=end) + + +class CronDataIntervalTimetable(_DataIntervalTimetable): + """Timetable that schedules data intervals with a cron expression. + + This corresponds to ``schedule_interval=``, where ```` is either + a five/six-segment representation, or one of ``cron_presets``. + + Don't pass ``@once`` in here; use ``OnceTimetable`` instead. + """ + + def __init__(self, cron: str, timezone: datetime.tzinfo) -> None: + self._schedule = CronSchedule(cron, timezone) + + +class DeltaDataIntervalTimetable(_DataIntervalTimetable): + """Timetable that schedules data intervals with a time delta. + + This corresponds to ``schedule_interval=``, where ```` is + either a ``datetime.timedelta`` or ``dateutil.relativedelta.relativedelta`` + instance. + """ + + def __init__(self, delta: Delta) -> None: + self._schedule = DeltaSchedule(delta) diff --git a/airflow/timetables/schedules.py b/airflow/timetables/schedules.py new file mode 100644 index 0000000000000..180d8bc57b501 --- /dev/null +++ b/airflow/timetables/schedules.py @@ -0,0 +1,207 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +import typing + +from cached_property import cached_property +from croniter import CroniterBadCronError, CroniterBadDateError, croniter +from dateutil.relativedelta import relativedelta +from pendulum import DateTime + +from airflow.exceptions import AirflowTimetableInvalid +from airflow.typing_compat import Protocol +from airflow.utils.dates import cron_presets +from airflow.utils.timezone import convert_to_utc, make_aware, make_naive + +Delta = typing.Union[datetime.timedelta, relativedelta] + + +class Schedule(Protocol): + """Base protocol for schedules.""" + + def skip_to_latest(self, earliest: typing.Optional[DateTime]) -> DateTime: + """Bound the earliest time a run can be scheduled. + + This is called when ``catchup=False``. See docstring of subclasses for + exact skipping behaviour of a schedule. + """ + raise NotImplementedError() + + def validate(self) -> None: + """Validate the timetable is correctly specified. + + This should raise AirflowTimetableInvalid on validation failure. + """ + raise NotImplementedError() + + def get_next(self, current: DateTime) -> DateTime: + """Get the first schedule after the current time.""" + raise NotImplementedError() + + def get_prev(self, current: DateTime) -> DateTime: + """Get the last schedule before the current time.""" + raise NotImplementedError() + + def align(self, current: DateTime) -> DateTime: + """Align given time to the scheduled. + + For fixed schedules (e.g. every midnight); this finds the next time that + aligns to the declared time, if the given time does not align. If the + schedule is not fixed (e.g. every hour), the given time is returned. + """ + raise NotImplementedError() + + +def _is_schedule_fixed(expression: str) -> bool: + """Figures out if the schedule has a fixed time (e.g. 3 AM every day). + + :return: True if the schedule has a fixed time, False if not. + + Detection is done by "peeking" the next two cron trigger time; if the + two times have the same minute and hour value, the schedule is fixed, + and we *don't* need to perform the DST fix. + + This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). + """ + cron = croniter(expression) + next_a = cron.get_next(datetime.datetime) + next_b = cron.get_next(datetime.datetime) + return next_b.minute == next_a.minute and next_b.hour == next_a.hour + + +class CronSchedule(Schedule): + """Schedule things from a cron expression. + + The implementation extends on croniter to add timezone awareness. This is + because crontier works only with naive timestamps, and cannot consider DST + when determining the next/previous time. + """ + + def __init__(self, expression: str, timezone: datetime.tzinfo) -> None: + self._expression = expression = cron_presets.get(expression, expression) + self._timezone = timezone + + def __eq__(self, other: typing.Any) -> bool: + """Both expression and timezone should match.""" + if not isinstance(other, CronSchedule): + return NotImplemented + return self._expression == other._expression and self._timezone == other._timezone + + def validate(self) -> None: + try: + croniter(self._expression) + except (CroniterBadCronError, CroniterBadDateError) as e: + raise AirflowTimetableInvalid(str(e)) + + @cached_property + def _should_fix_dst(self) -> bool: + # This is lazy so instantiating a schedule does not immediately raise + # an exception. Validity is checked with validate() during DAG-bagging. + return not _is_schedule_fixed(self._expression) + + def get_next(self, current: DateTime) -> DateTime: + """Get the first schedule after specified time, with DST fixed.""" + naive = make_naive(current, self._timezone) + cron = croniter(self._expression, start_time=naive) + scheduled = cron.get_next(datetime.datetime) + if not self._should_fix_dst: + return convert_to_utc(make_aware(scheduled, self._timezone)) + delta = scheduled - naive + return convert_to_utc(current.in_timezone(self._timezone) + delta) + + def get_prev(self, current: DateTime) -> DateTime: + """Get the first schedule before specified time, with DST fixed.""" + naive = make_naive(current, self._timezone) + cron = croniter(self._expression, start_time=naive) + scheduled = cron.get_prev(datetime.datetime) + if not self._should_fix_dst: + return convert_to_utc(make_aware(scheduled, self._timezone)) + delta = naive - scheduled + return convert_to_utc(current.in_timezone(self._timezone) - delta) + + def align(self, current: DateTime) -> DateTime: + """Get the next scheduled time. + + This is ``current + interval``, unless ``current`` is first interval, + then ``current`` is returned. + """ + next_time = self.get_next(current) + if self.get_prev(next_time) != current: + return next_time + return current + + def skip_to_latest(self, earliest: typing.Optional[DateTime]) -> DateTime: + """Bound the earliest time a run can be scheduled. + + The logic is that we move start_date up until one period before, so the + current time is AFTER the period end, and the job can be created... + + This is slightly different from the delta version at terminal values. + If the next schedule should start *right now*, we want the data interval + that start right now now, not the one that ends now. + """ + current_time = DateTime.utcnow() + next_start = self.get_next(current_time) + last_start = self.get_prev(current_time) + if next_start == current_time: + new_start = last_start + elif next_start > current_time: + new_start = self.get_prev(last_start) + else: + raise AssertionError("next schedule shouldn't be earlier") + if earliest is None: + return new_start + return max(new_start, earliest) + + +class DeltaSchedule(Schedule): + """Schedule things on a fixed time delta.""" + + def __init__(self, delta: Delta) -> None: + self._delta = delta + + def __eq__(self, other: typing.Any) -> bool: + """The offset should match.""" + if not isinstance(other, DeltaSchedule): + return NotImplemented + return self._delta == other._delta + + def validate(self) -> None: + pass # TODO: Check the delta is positive? + + def get_next(self, current: DateTime) -> DateTime: + return convert_to_utc(current + self._delta) + + def get_prev(self, current: DateTime) -> DateTime: + return convert_to_utc(current - self._delta) + + def align(self, current: DateTime) -> DateTime: + return current + + def skip_to_latest(self, earliest: typing.Optional[DateTime]) -> DateTime: + """Bound the earliest time a run can be scheduled. + + The logic is that we move start_date up until one period before, so the + current time is AFTER the period end, and the job can be created... + + This is slightly different from the cron version at terminal values. + """ + new_start = self.get_prev(DateTime.utcnow()) + if earliest is None: + return new_start + return max(new_start, earliest) diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py new file mode 100644 index 0000000000000..b6208a646542c --- /dev/null +++ b/airflow/timetables/simple.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Optional + +from pendulum import DateTime + +from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable + + +class NullTimetable(Timetable): + """Timetable that never schedules anything. + + This corresponds to ``schedule_interval=None``. + """ + + def __eq__(self, other: Any) -> bool: + """As long as *other* is of the same type.""" + if not isinstance(other, NullTimetable): + return NotImplemented + return True + + def validate(self) -> None: + pass + + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + return None + + +class OnceTimetable(Timetable): + """Timetable that schedules the execution once as soon as possible. + + This corresponds to ``schedule_interval="@once"``. + """ + + def __eq__(self, other: Any) -> bool: + """As long as *other* is of the same type.""" + if not isinstance(other, OnceTimetable): + return NotImplemented + return True + + def validate(self) -> None: + pass + + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + if last_automated_dagrun is not None: + return None # Already run, no more scheduling. + if restriction.earliest is None: # No start date, won't run. + return None + # "@once" always schedule to the start_date determined by the DAG and + # tasks, regardless of catchup or not. This has been the case since 1.10 + # and we're inheriting it. See AIRFLOW-1928. + run_after = restriction.earliest + if restriction.latest is not None and run_after > restriction.latest: + return None + return DagRunInfo.exact(run_after) diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 11690989edd04..30771b1b2c360 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. +import warnings from datetime import datetime, timedelta from typing import Dict, List, Optional, Union @@ -72,6 +73,12 @@ def date_range( :param delta: step length. It can be datetime.timedelta or cron expression as string :type delta: datetime.timedelta or str or dateutil.relativedelta """ + warnings.warn( + "`airflow.utils.dates.date_range()` is deprecated. Please use `airflow.timetables`.", + category=DeprecationWarning, + stacklevel=2, + ) + if not delta: return [] if end_date: diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 798c723da8723..67c3b26fd2e1f 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -17,6 +17,7 @@ # under the License. # import datetime as dt +from typing import Optional, Union import pendulum from pendulum.datetime import DateTime @@ -172,3 +173,14 @@ def parse(string: str, timezone=None) -> DateTime: :param timezone: the timezone """ return pendulum.parse(string, tz=timezone or TIMEZONE, strict=False) # type: ignore + + +def coerce_datetime(v: Union[None, dt.datetime, DateTime]) -> Optional[DateTime]: + """Convert whatever is passed in to ``pendulum.DateTime``.""" + if v is None: + return None + if isinstance(v, DateTime): + return v + if v.tzinfo is None: + v = make_aware(v) + return pendulum.instance(v) diff --git a/airflow/www/views.py b/airflow/www/views.py index 39e2e86794dc3..760d736040f9f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2334,21 +2334,18 @@ def duration(self, session=None): except airflow.exceptions.SerializedDagNotFound: dag = None - base_date = request.args.get('base_date') - num_runs = request.args.get('num_runs', default=default_dag_run, type=int) - if dag is None: flash(f'DAG "{dag_id}" seems to be missing.', "error") return redirect(url_for('Airflow.index')) + base_date = request.args.get('base_date') + num_runs = request.args.get('num_runs', default=default_dag_run, type=int) + if base_date: base_date = timezone.parse(base_date) else: base_date = dag.get_latest_execution_date() or timezone.utcnow() - dates = dag.date_range(base_date, num=-abs(num_runs)) - min_date = dates[0] if dates else timezone.utc_epoch() - root = request.args.get('root') if root: dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) @@ -2364,7 +2361,11 @@ def duration(self, session=None): x_points = defaultdict(list) cumulative_y = defaultdict(list) - task_instances = dag.get_task_instances(start_date=min_date, end_date=base_date) + task_instances = dag.get_task_instances_before(base_date, num_runs, session=session) + if task_instances: + min_date = task_instances[0].execution_date + else: + min_date = timezone.utc_epoch() ti_fails = ( session.query(TaskFail) .filter( @@ -2468,9 +2469,6 @@ def tries(self, session=None): else: base_date = dag.get_latest_execution_date() or timezone.utcnow() - dates = dag.date_range(base_date, num=-abs(num_runs)) - min_date = dates[0] if dates else timezone.utc_epoch() - root = request.args.get('root') if root: dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) @@ -2484,10 +2482,11 @@ def tries(self, session=None): chart_attr=self.line_chart_attr, ) + tis = dag.get_task_instances_before(base_date, num_runs, session=session) for task in dag.tasks: y_points = [] x_points = [] - for ti in task.get_task_instances(start_date=min_date, end_date=base_date): + for ti in tis: dttm = wwwutils.epoch(ti.execution_date) x_points.append(dttm) # y value should reflect completed tries to have a 0 baseline. @@ -2495,7 +2494,6 @@ def tries(self, session=None): if x_points: chart.add_serie(name=task.task_id, x=x_points, y=y_points) - tis = dag.get_task_instances(start_date=min_date, end_date=base_date) tries = sorted({ti.try_number for ti in tis}) max_date = max(ti.execution_date for ti in tis) if tries else None chart.create_y_axis('yAxis', format='.02f', custom_format=False, label='Tries') @@ -2543,13 +2541,12 @@ def landing_times(self, session=None): else: base_date = dag.get_latest_execution_date() or timezone.utcnow() - dates = dag.date_range(base_date, num=-abs(num_runs)) - min_date = dates[0] if dates else timezone.utc_epoch() - root = request.args.get('root') if root: dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) + tis = dag.get_task_instances_before(base_date, num_runs, session=session) + chart_height = wwwutils.get_chart_height(dag) chart = nvd3.lineChart( name="lineChart", x_is_date=True, height=chart_height, chart_attr=self.line_chart_attr @@ -2560,7 +2557,7 @@ def landing_times(self, session=None): task_id = task.task_id y_points[task_id] = [] x_points[task_id] = [] - for ti in task.get_task_instances(start_date=min_date, end_date=base_date): + for ti in tis: ts = ti.execution_date if dag.schedule_interval and dag.following_schedule(ts): ts = dag.following_schedule(ts) @@ -2584,7 +2581,6 @@ def landing_times(self, session=None): y=scale_time_units(y_points[task_id], y_unit), ) - tis = dag.get_task_instances(start_date=min_date, end_date=base_date) dates = sorted({ti.execution_date for ti in tis}) max_date = max(ti.execution_date for ti in tis) if dates else None diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index df2ea76f51cb4..4a510f88bb66b 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -298,10 +298,15 @@ def test_next_execution(self): # The details below is determined by the schedule_interval of example DAGs now = DEFAULT_DATE - expected_output = [str(now + timedelta(days=1)), str(now + timedelta(hours=4)), "None", "None"] + expected_output = [ + (now + timedelta(days=1)).isoformat(), + (now + timedelta(hours=4)).isoformat(), + "None", + "None", + ] expected_output_2 = [ - str(now + timedelta(days=1)) + os.linesep + str(now + timedelta(days=2)), - str(now + timedelta(hours=4)) + os.linesep + str(now + timedelta(hours=8)), + (now + timedelta(days=1)).isoformat() + os.linesep + (now + timedelta(days=2)).isoformat(), + (now + timedelta(hours=4)).isoformat() + os.linesep + (now + timedelta(hours=8)).isoformat(), "None", "None", ] diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 1bcc3fc94ccaf..e02dc672ad6bf 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1362,8 +1362,8 @@ def test_update_counters(self): session.close() def test_dag_get_run_dates(self): - def get_test_dag_for_backfill(schedule_interval=None): - dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval=schedule_interval) + def get_test_dag_for_backfill(): + dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval="@hourly") DummyOperator( task_id='dummy', dag=dag, @@ -1372,9 +1372,13 @@ def get_test_dag_for_backfill(schedule_interval=None): return dag test_dag = get_test_dag_for_backfill() - assert [DEFAULT_DATE] == test_dag.get_run_dates(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + assert [DEFAULT_DATE] == test_dag.get_run_dates( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + align=True, + ) - test_dag = get_test_dag_for_backfill(schedule_interval="@hourly") + test_dag = get_test_dag_for_backfill() assert [ DEFAULT_DATE - datetime.timedelta(hours=3), DEFAULT_DATE - datetime.timedelta(hours=2), @@ -1383,6 +1387,7 @@ def get_test_dag_for_backfill(schedule_interval=None): ] == test_dag.get_run_dates( start_date=DEFAULT_DATE - datetime.timedelta(hours=3), end_date=DEFAULT_DATE, + align=True, ) def test_backfill_run_backwards(self): diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 87be3e8d16638..35ca8eb7c53a0 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1780,7 +1780,7 @@ def evaluate_dagrun( dag = self.dagbag.get_dag(dag_id) dr = dag.create_dagrun( run_type=DagRunType.SCHEDULED, - execution_date=dag.next_dagrun_after_date(None), + execution_date=dag.next_dagrun_info(None)[0], state=State.RUNNING, ) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 902ae4819339e..33154c4a6586b 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -48,6 +48,7 @@ from airflow.operators.dummy import DummyOperator from airflow.operators.subdag import SubDagOperator from airflow.security import permissions +from airflow.timetables.simple import NullTimetable, OnceTimetable from airflow.utils import timezone from airflow.utils.file import list_py_file_paths from airflow.utils.session import create_session, provide_session @@ -58,6 +59,7 @@ from tests.models import DEFAULT_DATE from tests.test_utils.asserts import assert_queries_count from tests.test_utils.db import clear_db_dags, clear_db_runs +from tests.test_utils.timetables import cron_timetable, delta_timetable TEST_DATE = datetime_tz(2015, 1, 2, 0, 0) @@ -1135,7 +1137,7 @@ def test_schedule_dag_once(self): dag_id = "test_schedule_dag_once" dag = DAG(dag_id=dag_id) dag.schedule_interval = '@once' - assert dag.normalized_schedule_interval is None + assert isinstance(dag.timetable, OnceTimetable) dag.add_task(BaseOperator(task_id="faketastic", owner='Also fake', start_date=TEST_DATE)) # Sync once to create the DagModel @@ -1247,20 +1249,20 @@ def test_get_paused_dag_ids(self): @parameterized.expand( [ - (None, None), - ("@daily", "0 0 * * *"), - ("@weekly", "0 0 * * 0"), - ("@monthly", "0 0 1 * *"), - ("@quarterly", "0 0 1 */3 *"), - ("@yearly", "0 0 1 1 *"), - ("@once", None), - (datetime.timedelta(days=1), datetime.timedelta(days=1)), + (None, NullTimetable()), + ("@daily", cron_timetable("0 0 * * *")), + ("@weekly", cron_timetable("0 0 * * 0")), + ("@monthly", cron_timetable("0 0 1 * *")), + ("@quarterly", cron_timetable("0 0 1 */3 *")), + ("@yearly", cron_timetable("0 0 1 1 *")), + ("@once", OnceTimetable()), + (datetime.timedelta(days=1), delta_timetable(datetime.timedelta(days=1))), ] ) - def test_normalized_schedule_interval(self, schedule_interval, expected_n_schedule_interval): + def test_timetable(self, schedule_interval, expected_timetable): dag = DAG("test_schedule_interval", schedule_interval=schedule_interval) - assert dag.normalized_schedule_interval == expected_n_schedule_interval + assert dag.timetable == expected_timetable assert dag.schedule_interval == schedule_interval def test_create_dagrun_run_id_is_generated(self): @@ -1474,19 +1476,19 @@ def test_clear_dag(self, ti_state_begin, ti_state_end: Optional[str]): assert task_instance.state == ti_state_end self._clean_up(dag_id) - def test_next_dagrun_after_date_once(self): + def test_next_dagrun_info_once(self): dag = DAG( 'test_scheduler_dagrun_once', start_date=timezone.datetime(2015, 1, 1), schedule_interval="@once" ) - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2015, 1, 1) - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date is None - def test_next_dagrun_after_date_start_end_dates(self): + def test_next_dagrun_info_start_end_dates(self): """ Tests that an attempt to schedule a task after the Dag's end_date does not succeed. @@ -1503,7 +1505,7 @@ def test_next_dagrun_after_date_start_end_dates(self): dates = [] date = None for _ in range(runs): - date = dag.next_dagrun_after_date(date) + date, _ = dag.next_dagrun_info(date) dates.append(date) for date in dates: @@ -1511,9 +1513,9 @@ def test_next_dagrun_after_date_start_end_dates(self): assert dates[-1] == end_date - assert dag.next_dagrun_after_date(date) is None + assert dag.next_dagrun_info(date)[0] is None - def test_next_dagrun_after_date_catcup(self): + def test_next_dagrun_info_catchup(self): """ Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date """ @@ -1551,7 +1553,7 @@ def make_dag(dag_id, schedule_interval, start_date, catchup): start_date=six_hours_ago_to_the_hour, catchup=False, ) - next_date = dag1.next_dagrun_after_date(None) + next_date, _ = dag1.next_dagrun_info(None) # The DR should be scheduled in the last half an hour, not 6 hours ago assert next_date > half_an_hour_ago assert next_date < timezone.utcnow() @@ -1563,7 +1565,7 @@ def make_dag(dag_id, schedule_interval, start_date, catchup): catchup=False, ) - next_date = dag2.next_dagrun_after_date(None) + next_date, _ = dag2.next_dagrun_info(None) # The DR should be scheduled in the last 2 hours, not 6 hours ago assert next_date > two_hours_ago # The DR should be scheduled BEFORE now @@ -1576,12 +1578,12 @@ def make_dag(dag_id, schedule_interval, start_date, catchup): catchup=False, ) - next_date = dag3.next_dagrun_after_date(None) + next_date, _ = dag3.next_dagrun_info(None) # The DR should be scheduled in the last 2 hours, not 6 hours ago assert next_date == six_hours_ago_to_the_hour @freeze_time(timezone.datetime(2020, 1, 5)) - def test_next_dagrun_after_date_timedelta_schedule_and_catchup_false(self): + def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self): """ Test that the dag file processor does not create multiple dagruns if a dag is scheduled with 'timedelta' and catchup=False @@ -1593,15 +1595,15 @@ def test_next_dagrun_after_date_timedelta_schedule_and_catchup_false(self): catchup=False, ) - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2020, 1, 4) # The date to create is in the future, this is handled by "DagModel.dags_needing_dagruns" - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date == timezone.datetime(2020, 1, 5) @freeze_time(timezone.datetime(2020, 5, 4)) - def test_next_dagrun_after_date_timedelta_schedule_and_catchup_true(self): + def test_next_dagrun_info_timedelta_schedule_and_catchup_true(self): """ Test that the dag file processor creates multiple dagruns if a dag is scheduled with 'timedelta' and catchup=True @@ -1613,17 +1615,17 @@ def test_next_dagrun_after_date_timedelta_schedule_and_catchup_true(self): catchup=True, ) - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2020, 5, 1) - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date == timezone.datetime(2020, 5, 2) - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date == timezone.datetime(2020, 5, 3) # The date to create is in the future, this is handled by "DagModel.dags_needing_dagruns" - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date == timezone.datetime(2020, 5, 4) def test_next_dagrun_after_auto_align(self): @@ -1640,7 +1642,7 @@ def test_next_dagrun_after_auto_align(self): ) DummyOperator(task_id='dummy', dag=dag, owner='airflow') - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2016, 1, 2, 5, 4) dag = DAG( @@ -1650,7 +1652,7 @@ def test_next_dagrun_after_auto_align(self): ) DummyOperator(task_id='dummy', dag=dag, owner='airflow') - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2016, 1, 1, 10, 10) def test_next_dagrun_after_not_for_subdags(self): @@ -1690,10 +1692,10 @@ def subdag(parent_dag_name, child_dag_name, args): subdag.parent_dag = dag subdag.is_subdag = True - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2019, 1, 1, 0, 0) - next_subdag_date = subdag.next_dagrun_after_date(None) + next_subdag_date, _ = subdag.next_dagrun_info(None) assert next_subdag_date is None, "SubDags should never have DagRuns created by the scheduler" def test_replace_outdated_access_control_actions(self): diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 78991999ffde1..57e3e3a56e0bb 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -651,8 +651,8 @@ def test_depends_on_past(self, prev_ti_state, is_ti_success): dag = self.dagbag.get_dag(dag_id) task = dag.tasks[0] - self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0)) - self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0)) + self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0), is_backfill=True) + self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0), is_backfill=True) prev_ti = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0)) ti = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0)) @@ -678,8 +678,8 @@ def test_wait_for_downstream(self, prev_ti_state, is_ti_success): # For ti.set_state() to work, the DagRun has to exist, # Otherwise ti.previous_ti returns an unpersisted TI - self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0)) - self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0)) + self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0), is_backfill=True) + self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0), is_backfill=True) prev_ti_downstream = TI(task=downstream, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0)) ti = TI(task=upstream, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0)) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 2ac1381a7df11..25a42581465cd 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -868,6 +868,12 @@ def test_depends_on_past(self): ) dag.clear() + dag.create_dagrun( + execution_date=DEFAULT_DATE, + state=State.FAILED, + run_type=DagRunType.SCHEDULED, + ) + run_date = task.start_date + datetime.timedelta(days=5) dag.create_dagrun( diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index e83087101611f..f9e0ebfbb196a 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -421,7 +421,8 @@ def test_reschedule_with_test_mode(self): # poke returns False and AirflowRescheduleException is raised date1 = timezone.utcnow() with freeze_time(date1): - for date in self.dag.date_range(DEFAULT_DATE, end_date=DEFAULT_DATE): + dates = self.dag.get_run_dates(DEFAULT_DATE, end_date=DEFAULT_DATE, align=True) + for date in dates: TaskInstance(sensor, date).run(ignore_ti_state=True, test_mode=True) tis = dr.get_task_instances() assert len(tis) == 2 diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 7f78ca61078e1..04ef07bdacdab 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -41,7 +41,9 @@ from airflow.security import permissions from airflow.serialization.json_schema import load_dag_schema_dict from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG +from airflow.timetables.simple import NullTimetable, OnceTimetable from tests.test_utils.mock_operators import CustomOperator, CustomOpLink, GoogleLink +from tests.test_utils.timetables import cron_timetable, delta_timetable executor_config_pod = k8s.V1Pod( metadata=k8s.V1ObjectMeta(name="my-name"), @@ -503,14 +505,21 @@ def test_deserialization_end_date(self, dag_end_date, task_end_date, expected_ta @parameterized.expand( [ - (None, None, None), - ("@weekly", "@weekly", "0 0 * * 0"), - ("@once", "@once", None), - ({"__type": "timedelta", "__var": 86400.0}, timedelta(days=1), timedelta(days=1)), + (None, None, NullTimetable()), + ("@weekly", "@weekly", cron_timetable("0 0 * * 0")), + ("@once", "@once", OnceTimetable()), + ( + {"__type": "timedelta", "__var": 86400.0}, + timedelta(days=1), + delta_timetable(timedelta(days=1)), + ), ] ) def test_deserialization_schedule_interval( - self, serialized_schedule_interval, expected_schedule_interval, expected_n_schedule_interval + self, + serialized_schedule_interval, + expected_schedule_interval, + expected_timetable, ): serialized = { "__version": 1, @@ -529,7 +538,7 @@ def test_deserialization_schedule_interval( dag = SerializedDAG.from_dict(serialized) assert dag.schedule_interval == expected_schedule_interval - assert dag.normalized_schedule_interval == expected_n_schedule_interval + assert dag.timetable == expected_timetable @parameterized.expand( [ diff --git a/tests/test_utils/perf/scheduler_dag_execution_timing.py b/tests/test_utils/perf/scheduler_dag_execution_timing.py index e593690076233..0998a979497fa 100755 --- a/tests/test_utils/perf/scheduler_dag_execution_timing.py +++ b/tests/test_utils/perf/scheduler_dag_execution_timing.py @@ -163,18 +163,18 @@ def create_dag_runs(dag, num_runs, session): id_prefix = DagRun.ID_PREFIX - next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) - + last_dagrun_at = None for _ in range(num_runs): + next_info = dag.next_dagrun_info(last_dagrun_at) + last_dagrun_at = next_info.data_interval.start dag.create_dagrun( - run_id=id_prefix + next_run_date.isoformat(), - execution_date=next_run_date, + run_id=f"{id_prefix}{last_dagrun_at.isoformat()}", + execution_date=last_dagrun_at, start_date=timezone.utcnow(), state=State.RUNNING, external_trigger=False, session=session, ) - next_run_date = dag.following_schedule(next_run_date) @click.command() diff --git a/tests/test_utils/timetables.py b/tests/test_utils/timetables.py new file mode 100644 index 0000000000000..c6db4c7394038 --- /dev/null +++ b/tests/test_utils/timetables.py @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import settings +from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable + + +def cron_timetable(expr: str) -> CronDataIntervalTimetable: + return CronDataIntervalTimetable(expr, settings.TIMEZONE) + + +def delta_timetable(delta) -> DeltaDataIntervalTimetable: + return DeltaDataIntervalTimetable(delta) diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py b/tests/ti_deps/deps/test_prev_dagrun_dep.py index d80d47533d715..5970b5c1048d9 100644 --- a/tests/ti_deps/deps/test_prev_dagrun_dep.py +++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py @@ -17,105 +17,117 @@ # under the License. -import unittest -from datetime import datetime from unittest.mock import Mock +import pytest + from airflow.models import DAG from airflow.models.baseoperator import BaseOperator from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep from airflow.utils.state import State - - -class TestPrevDagrunDep(unittest.TestCase): - def _get_task(self, **kwargs): - return BaseOperator(task_id='test_task', dag=DAG('test_dag'), **kwargs) - - def test_not_depends_on_past(self): - """ - If depends on past isn't set in the task then the previous dagrun should be - ignored, even though there is no previous_ti which would normally fail the dep - """ - task = self._get_task( - depends_on_past=False, start_date=datetime(2016, 1, 1), wait_for_downstream=False - ) - prev_ti = Mock( - task=task, - state=State.SUCCESS, - are_dependents_done=Mock(return_value=True), +from airflow.utils.timezone import datetime + + +@pytest.mark.parametrize( + "depends_on_past, wait_for_downstream, prev_ti, context_ignore_depends_on_past, dep_met", + [ + # If the task does not set depends_on_past, the previous dagrun should + # be ignored, even though previous_ti would otherwise fail the dep. + pytest.param( + False, + False, # wait_for_downstream=True overrides depends_on_past=False. + Mock( + state=State.NONE, + **{"are_dependents_done.return_value": False}, + ), + False, + True, + id="not_depends_on_past", + ), + # If the context overrides depends_on_past, the dep should be met even + # though there is no previous_ti which would normally fail the dep. + pytest.param( + True, + False, + Mock( + state=State.SUCCESS, + **{"are_dependents_done.return_value": True}, + ), + True, + True, + id="context_ignore_depends_on_past", + ), + # The first task run should pass since it has no previous dagrun. + pytest.param(True, False, None, False, True, id="first_task_run"), + # Previous TI did not complete execution. This dep should fail. + pytest.param( + True, + False, + Mock( + state=State.NONE, + **{"are_dependents_done.return_value": True}, + ), + False, + False, + id="prev_ti_bad_state", + ), + # Previous TI specified to wait for the downstream tasks of the previous + # dagrun. It should fail this dep if the previous TI's downstream TIs + # are not done. + pytest.param( + True, + True, + Mock( + state=State.SUCCESS, + **{"are_dependents_done.return_value": False}, + ), + False, + False, + id="failed_wait_for_downstream", + ), + # All the conditions for the dep are met. + pytest.param( + True, + True, + Mock( + state=State.SUCCESS, + **{"are_dependents_done.return_value": True}, + ), + False, + True, + id="all_met", + ), + ], +) +def test_dagrun_dep( + depends_on_past, + wait_for_downstream, + prev_ti, + context_ignore_depends_on_past, + dep_met, +): + task = BaseOperator( + task_id="test_task", + dag=DAG("test_dag"), + depends_on_past=depends_on_past, + start_date=datetime(2016, 1, 1), + wait_for_downstream=wait_for_downstream, + ) + if prev_ti: + prev_dagrun = Mock( execution_date=datetime(2016, 1, 2), + **{"get_task_instance.return_value": prev_ti}, ) - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 3)) - dep_context = DepContext(ignore_depends_on_past=False) - - assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_context_ignore_depends_on_past(self): - """ - If the context overrides depends_on_past then the dep should be met, - even though there is no previous_ti which would normally fail the dep - """ - task = self._get_task( - depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=False - ) - prev_ti = Mock( - task=task, - state=State.SUCCESS, - are_dependents_done=Mock(return_value=True), - execution_date=datetime(2016, 1, 2), - ) - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 3)) - dep_context = DepContext(ignore_depends_on_past=True) - - assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_first_task_run(self): - """ - The first task run for a TI should pass since it has no previous dagrun. - """ - task = self._get_task( - depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=False - ) - prev_ti = None - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 1)) - dep_context = DepContext(ignore_depends_on_past=False) - - assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_prev_ti_bad_state(self): - """ - If the previous TI did not complete execution this dep should fail. - """ - task = self._get_task( - depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=False - ) - prev_ti = Mock(state=State.NONE, are_dependents_done=Mock(return_value=True)) - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 2)) - dep_context = DepContext(ignore_depends_on_past=False) - - assert not PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_failed_wait_for_downstream(self): - """ - If the previous TI specified to wait for the downstream tasks of the - previous dagrun then it should fail this dep if the downstream TIs of - the previous TI are not done. - """ - task = self._get_task(depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=True) - prev_ti = Mock(state=State.SUCCESS, are_dependents_done=Mock(return_value=False)) - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 2)) - dep_context = DepContext(ignore_depends_on_past=False) - - assert not PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_all_met(self): - """ - Test to make sure all the conditions for the dep are met - """ - task = self._get_task(depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=True) - prev_ti = Mock(state=State.SUCCESS, are_dependents_done=Mock(return_value=True)) - ti = Mock(task=task, execution_date=datetime(2016, 1, 2), **{'get_previous_ti.return_value': prev_ti}) - dep_context = DepContext(ignore_depends_on_past=False) - - assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) + else: + prev_dagrun = None + dagrun = Mock( + **{ + "get_previous_scheduled_dagrun.return_value": prev_dagrun, + "get_previous_dagrun.return_value": prev_dagrun, + }, + ) + ti = Mock(task=task, **{"get_dagrun.return_value": dagrun}) + dep_context = DepContext(ignore_depends_on_past=context_ignore_depends_on_past) + + assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) == dep_met diff --git a/tests/timetables/test_time_table_iter_ranges.py b/tests/timetables/test_time_table_iter_ranges.py new file mode 100644 index 0000000000000..c9ee747888ac1 --- /dev/null +++ b/tests/timetables/test_time_table_iter_ranges.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests for Timetable.iter_between().""" + +from datetime import datetime, timedelta + +import pytest + +from airflow.settings import TIMEZONE +from airflow.timetables.interval import DeltaDataIntervalTimetable + + +@pytest.fixture() +def timetable_1s(): + return DeltaDataIntervalTimetable(timedelta(seconds=1)) + + +def test_end_date_before_start_date(timetable_1s): + start = datetime(2016, 2, 1, tzinfo=TIMEZONE) + end = datetime(2016, 1, 1, tzinfo=TIMEZONE) + message = r"start \([- :+\d]{25}\) > end \([- :+\d]{25}\)" + with pytest.raises(ValueError, match=message): + list(timetable_1s.iter_between(start, end, align=True))