From d2a5f46df12c0d34af909f7e5572e67ca6f5cbac Mon Sep 17 00:00:00 2001 From: Benjamin Tallman Date: Wed, 11 Jan 2017 15:25:12 -0800 Subject: [PATCH] Add Support for dag.catchup=(True|False) Option Added a dag.catchup option and modified the scheduler to look at the value when scheduling DagRuns (by moving dag.start_date up to dag.previous_schedule), and added a config option catchup_by_default (defaults to True) that allows users to set this to False for all dags modifying the existing DAGs In addition, we added a test to jobs.py (test_dag_catchup_option) --- airflow/configuration.py | 10 +++ airflow/jobs.py | 23 ++++++ airflow/models.py | 55 +++++++++++-- airflow/ti_deps/deps/prev_dagrun_dep.py | 20 ++++- docs/scheduler.rst | 52 ++++++++++++ tests/jobs.py | 102 ++++++++++++++++++++++++ 6 files changed, 252 insertions(+), 10 deletions(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index 1f2eafaa01491..6778464129ab8 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -356,6 +356,14 @@ def run_command(command): # associated task instance as failed and will re-schedule the task. scheduler_zombie_task_threshold = 300 +# Turn off scheduler catchup by setting this to False. +# Default behavior is unchanged and +# Command Line Backfills still work, but the scheduler +# will not do scheduler catchup if this is False, +# however it can be set on a per DAG basis in the +# DAG definition (catchup) +catchup_by_default = True + # Statsd (https://github.com/etsy/statsd) integration settings statsd_on = False statsd_host = localhost @@ -486,6 +494,8 @@ def run_command(command): scheduler_heartbeat_sec = 5 authenticate = true max_threads = 2 +catchup_by_default = True +scheduler_zombie_task_threshold = 300 """ diff --git a/airflow/jobs.py b/airflow/jobs.py index 819d1071f1ad0..89d0502ebfbb7 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -729,6 +729,25 @@ def create_dag_run(self, dag, session=None): if dag.schedule_interval == '@once' and last_scheduled_run: return None + # don't do scheduler catchup for dag's that don't have dag.catchup = True + if not dag.catchup: + # The logic is that we move start_date up until + # one period before, so that datetime.now() is AFTER + # the period end, and the job can be created... + now = datetime.now() + next_start = dag.following_schedule(now) + last_start = dag.previous_schedule(now) + if next_start <= now: + new_start = last_start + else: + new_start = dag.previous_schedule(last_start) + + if dag.start_date: + if new_start >= dag.start_date: + dag.start_date = new_start + else: + dag.start_date = new_start + next_run_date = None if not last_scheduled_run: # First run @@ -756,6 +775,10 @@ def create_dag_run(self, dag, session=None): self.logger.debug("Dag start date: {}. Next run date: {}" .format(dag.start_date, next_run_date)) + # don't ever schedule in the future + if next_run_date > datetime.now(): + return + # this structure is necessary to avoid a TypeError from concatenating # NoneType if dag.schedule_interval == '@once': diff --git a/airflow/models.py b/airflow/models.py index 0bd744e52be31..c53368fb5c435 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1023,12 +1023,28 @@ def are_dependents_done(self, session=None): @provide_session def previous_ti(self, session=None): """ The task instance for the task that ran before this task instance """ - return session.query(TaskInstance).filter( - TaskInstance.dag_id == self.dag_id, - TaskInstance.task_id == self.task.task_id, - TaskInstance.execution_date == - self.task.dag.previous_schedule(self.execution_date), - ).first() + + dag = self.task.dag + if dag: + dr = self.get_dagrun(session=session) + if not dr: + # Means that this TI is NOT being run from a DR, but from a catchup + previous_scheduled_date = dag.previous_schedule(self.execution_date) + if not previous_scheduled_date: + return None + else: + return TaskInstance(task=self.task, execution_date=previous_scheduled_date) + + if dag.catchup: + last_dagrun = dr.get_previous_scheduled_dagrun(session=session) if dr else None + + else: + last_dagrun = dr.get_previous_dagrun(session=session) if dr else None + + if last_dagrun: + return last_dagrun.get_task_instance(self.task_id, session=session) + + return None @provide_session def are_dependencies_met( @@ -2540,6 +2556,8 @@ class DAG(BaseDag, LoggingMixin): :type sla_miss_callback: types.FunctionType :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT) :type orientation: string + :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True + "type catchup: bool" """ def __init__( @@ -2557,6 +2575,7 @@ def __init__( dagrun_timeout=None, sla_miss_callback=None, orientation=configuration.get('webserver', 'dag_orientation'), + catchup=configuration.getboolean('scheduler', 'catchup_by_default'), params=None): self.user_defined_macros = user_defined_macros @@ -2597,6 +2616,7 @@ def __init__( self.dagrun_timeout = dagrun_timeout self.sla_miss_callback = sla_miss_callback self.orientation = orientation + self.catchup = catchup self._comps = { 'dag_id', @@ -3847,6 +3867,29 @@ def get_dag(self): return self.dag + @provide_session + def get_previous_dagrun(self, session=None): + """The previous DagRun, if there is one""" + + return session.query(DagRun).filter( + DagRun.dag_id == self.dag_id, + DagRun.execution_date < self.execution_date + ).order_by( + DagRun.execution_date.desc() + ).first() + + @provide_session + def get_previous_scheduled_dagrun(self, session=None): + """The previous, SCHEDULED DagRun, if there is one""" + + if not self.dag: + return None + + return session.query(DagRun).filter( + DagRun.dag_id == self.dag_id, + DagRun.execution_date == self.dag.previous_schedule(self.execution_date) + ).first() + @provide_session def update_state(self, session=None): """ diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py index 82355ec2fa0d0..2fce7045a75f5 100644 --- a/airflow/ti_deps/deps/prev_dagrun_dep.py +++ b/airflow/ti_deps/deps/prev_dagrun_dep.py @@ -39,10 +39,22 @@ def _get_dep_statuses(self, ti, session, dep_context): raise StopIteration # Don't depend on the previous task instance if we are the first task - if ti.execution_date == ti.task.start_date: - yield self._passing_status( - reason="This task instance was the first task instance for it's task.") - raise StopIteration + dag = ti.task.dag + if dag.catchup: + if ti.execution_date == ti.task.start_date: + yield self._passing_status( + reason="This task instance was the first task instance for its task.") + raise StopIteration + + else: + + dr = ti.get_dagrun() + last_dagrun = dr.get_previous_dagrun() if dr else None + + if not last_dagrun: + yield self._passing_status( + reason="This task instance was the first task instance for its task.") + raise StopIteration previous_ti = ti.previous_ti if not previous_ti: diff --git a/docs/scheduler.rst b/docs/scheduler.rst index 9c8a618417421..749d58a635998 100644 --- a/docs/scheduler.rst +++ b/docs/scheduler.rst @@ -17,6 +17,9 @@ the run stamped ``2016-01-01`` will be trigger soon after ``2016-01-01T23:59``. In other words, the job instance is started once the period it covers has ended. +**Let's Repeat That** The scheduler runs your job one ``schedule_interval`` AFTER the +start date, at the END of the period. + The scheduler starts an instance of the executor specified in the your ``airflow.cfg``. If it happens to be the ``LocalExecutor``, tasks will be executed as subprocesses; in the case of ``CeleryExecutor`` and @@ -72,6 +75,55 @@ should be triggered and come to a crawl. It might also create undesired processing when changing the shape of your DAG, by say adding in new tasks. +Backfill and Catchup +'''''''''''''''''''' + +An Airflow DAG with a ``start_date``, possibly an ``end_date``, and a ``schedule_interval`` defines a +series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of +Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine +the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any +interval that has not been run (or has been cleared). This concept is called Catchup. + +If your DAG is written to handle it's own catchup (IE not limited to the interval, but instead to "Now" +for instance.), then you will want to turn catchup off (Either on the DAG itself with ``dag.catchup = +False``) or by default at the configuration file level with ``catchup_by_default = False``. What this +will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG +interval series. + +.. code:: python + """ + Code that goes along with the Airflow tutorial located at: + https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py + """ + from airflow import DAG + from airflow.operators.bash_operator import BashOperator + from datetime import datetime, timedelta + + + default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2015, 12, 1), + 'email': ['airflow@airflow.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'schedule_interval': '@hourly', + } + + dag = DAG('tutorial', catchup=False, default_args=default_args) + +In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the +command line), a single DAG Run will be created, with an ``execution_date`` of 2016-01-01, and the next +one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02. + +If the ``dag.catchup`` value had been True instead, the scheduler would have created a DAG Run for each +completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, as that interval +hasn't completed) and the scheduler will execute them sequentially. This behavior is great for atomic +datasets that can easily be split into periods. Turning catchup off is great if your DAG Runs perform +backfill internally. + External Triggers ''''''''''''''''' diff --git a/tests/jobs.py b/tests/jobs.py index 32c615df806ba..dab7a47b648a2 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -1101,3 +1101,105 @@ def test_dag_get_active_runs(self): running_date = 'Except' self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date') + + def test_dag_catchup_option(self): + """ + Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date + """ + + now = datetime.datetime.now() + six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0) + three_minutes_ago = now - datetime.timedelta(minutes=3) + two_hours_and_three_minutes_ago = three_minutes_ago - datetime.timedelta(hours=2) + + START_DATE = six_hours_ago_to_the_hour + DAG_NAME1 = 'no_catchup_test1' + DAG_NAME2 = 'no_catchup_test2' + DAG_NAME3 = 'no_catchup_test3' + + default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': START_DATE + + } + dag1 = DAG(DAG_NAME1, + schedule_interval='* * * * *', + max_active_runs=1, + default_args=default_args + ) + + default_catchup = configuration.getboolean('scheduler', 'catchup_by_default') + # Test configs have catchup by default ON + + self.assertEqual(default_catchup, True) + + # Correct default? + self.assertEqual(dag1.catchup, True) + + dag2 = DAG(DAG_NAME2, + schedule_interval='* * * * *', + max_active_runs=1, + catchup=False, + default_args=default_args + ) + + run_this_1 = DummyOperator(task_id='run_this_1', dag=dag2) + run_this_2 = DummyOperator(task_id='run_this_2', dag=dag2) + run_this_2.set_upstream(run_this_1) + run_this_3 = DummyOperator(task_id='run_this_3', dag=dag2) + run_this_3.set_upstream(run_this_2) + + session = settings.Session() + orm_dag = DagModel(dag_id=dag2.dag_id) + session.merge(orm_dag) + session.commit() + session.close() + + scheduler = SchedulerJob() + dag2.clear() + + dr = scheduler.create_dag_run(dag2) + + # We had better get a dag run + self.assertIsNotNone(dr) + + # The DR should be scheduled in the last 3 minutes, not 6 hours ago + self.assertGreater(dr.execution_date, three_minutes_ago) + + # The DR should be scheduled BEFORE now + self.assertLess(dr.execution_date, datetime.datetime.now()) + + dag3 = DAG(DAG_NAME3, + schedule_interval='@hourly', + max_active_runs=1, + catchup=False, + default_args=default_args + ) + + run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3) + run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3) + run_this_2.set_upstream(run_this_1) + run_this_3 = DummyOperator(task_id='run_this_3', dag=dag3) + run_this_3.set_upstream(run_this_2) + + session = settings.Session() + orm_dag = DagModel(dag_id=dag3.dag_id) + session.merge(orm_dag) + session.commit() + session.close() + + scheduler = SchedulerJob() + dag3.clear() + + dr = None + dr = scheduler.create_dag_run(dag3) + + # We had better get a dag run + self.assertIsNotNone(dr) + + # The DR should be scheduled in the last two hours, not 6 hours ago + self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago) + + # The DR should be scheduled BEFORE now + self.assertLess(dr.execution_date, datetime.datetime.now())