Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,14 @@ def run_command(command):
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300

# Turn off scheduler catchup by setting this to False.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
# will not do scheduler catchup if this is False,
# however it can be set on a per DAG basis in the
# DAG definition (catchup)
catchup_by_default = True

# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = False
statsd_host = localhost
Expand Down Expand Up @@ -486,6 +494,8 @@ def run_command(command):
scheduler_heartbeat_sec = 5
authenticate = true
max_threads = 2
catchup_by_default = True
scheduler_zombie_task_threshold = 300
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a duplicate from above

"""


Expand Down
23 changes: 23 additions & 0 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,25 @@ def create_dag_run(self, dag, session=None):
if dag.schedule_interval == '@once' and last_scheduled_run:
return None

# don't do scheduler catchup for dag's that don't have dag.catchup = True
if not dag.catchup:
# The logic is that we move start_date up until
# one period before, so that datetime.now() is AFTER
# the period end, and the job can be created...
now = datetime.now()
next_start = dag.following_schedule(now)
last_start = dag.previous_schedule(now)
if next_start <= now:
new_start = last_start
else:
new_start = dag.previous_schedule(last_start)

if dag.start_date:
if new_start >= dag.start_date:
dag.start_date = new_start
else:
dag.start_date = new_start

next_run_date = None
if not last_scheduled_run:
# First run
Expand Down Expand Up @@ -756,6 +775,10 @@ def create_dag_run(self, dag, session=None):
self.logger.debug("Dag start date: {}. Next run date: {}"
.format(dag.start_date, next_run_date))

# don't ever schedule in the future
if next_run_date > datetime.now():
return

# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
Expand Down
55 changes: 49 additions & 6 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,12 +1023,28 @@ def are_dependents_done(self, session=None):
@provide_session
def previous_ti(self, session=None):
""" The task instance for the task that ran before this task instance """
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much cleaner. Thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to have to dig into this. Apparently, moving to a DR based previous_ti causes issues when NOT using a DR (backfill). Finally figure that out this afternoon. Will attempt to fix tonight.

return session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task.task_id,
TaskInstance.execution_date ==
self.task.dag.previous_schedule(self.execution_date),
).first()

dag = self.task.dag
if dag:
dr = self.get_dagrun(session=session)
if not dr:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you are doing this, although the comment does not reflect the requirement I think. Please mark it as FIXME (it should be removed in the future / throw an exception)

# Means that this TI is NOT being run from a DR, but from a catchup
previous_scheduled_date = dag.previous_schedule(self.execution_date)
if not previous_scheduled_date:
return None
else:
return TaskInstance(task=self.task, execution_date=previous_scheduled_date)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that whether or not there is a "previous_scheduled_date" (i.e. there won't be one by convention before start_date) one always gets a TaskInstance. I don't think that should be the case.


if dag.catchup:
last_dagrun = dr.get_previous_scheduled_dagrun(session=session) if dr else None

else:
last_dagrun = dr.get_previous_dagrun(session=session) if dr else None

if last_dagrun:
return last_dagrun.get_task_instance(self.task_id, session=session)

return None

@provide_session
def are_dependencies_met(
Expand Down Expand Up @@ -2540,6 +2556,8 @@ class DAG(BaseDag, LoggingMixin):
:type sla_miss_callback: types.FunctionType
:param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)
:type orientation: string
:param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
"type catchup: bool"
"""

def __init__(
Expand All @@ -2557,6 +2575,7 @@ def __init__(
dagrun_timeout=None,
sla_miss_callback=None,
orientation=configuration.get('webserver', 'dag_orientation'),
catchup=configuration.getboolean('scheduler', 'catchup_by_default'),
params=None):

self.user_defined_macros = user_defined_macros
Expand Down Expand Up @@ -2597,6 +2616,7 @@ def __init__(
self.dagrun_timeout = dagrun_timeout
self.sla_miss_callback = sla_miss_callback
self.orientation = orientation
self.catchup = catchup

self._comps = {
'dag_id',
Expand Down Expand Up @@ -3847,6 +3867,29 @@ def get_dag(self):

return self.dag

@provide_session
def get_previous_dagrun(self, session=None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename to "get_previous", it is already part of DagRun (or even make it a property)

"""The previous DagRun, if there is one"""

return session.query(DagRun).filter(
DagRun.dag_id == self.dag_id,
DagRun.execution_date < self.execution_date
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it is outside the scope of this PR, this is potentially a quite expensive operation on the DB.

).order_by(
DagRun.execution_date.desc()
).first()

@provide_session
def get_previous_scheduled_dagrun(self, session=None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I understand the difference between get_previous_dagrun and get_previous_scheduled_dagrun . Should get_previous_dagrun also not return the same as get_previous_scheduled_dagrun? Or do you want to distinguish between backfilled and intervalled dag runs?

Please rename to "get_previous_scheduled" it is already part of DagRun.

"""The previous, SCHEDULED DagRun, if there is one"""

if not self.dag:
return None

return session.query(DagRun).filter(
DagRun.dag_id == self.dag_id,
DagRun.execution_date == self.dag.previous_schedule(self.execution_date)
).first()

@provide_session
def update_state(self, session=None):
"""
Expand Down
20 changes: 16 additions & 4 deletions airflow/ti_deps/deps/prev_dagrun_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,22 @@ def _get_dep_statuses(self, ti, session, dep_context):
raise StopIteration

# Don't depend on the previous task instance if we are the first task
if ti.execution_date == ti.task.start_date:
yield self._passing_status(
reason="This task instance was the first task instance for it's task.")
raise StopIteration
dag = ti.task.dag
if dag.catchup:
if ti.execution_date == ti.task.start_date:
yield self._passing_status(
reason="This task instance was the first task instance for its task.")
raise StopIteration

else:

dr = ti.get_dagrun()
last_dagrun = dr.get_previous_dagrun() if dr else None

if not last_dagrun:
yield self._passing_status(
reason="This task instance was the first task instance for its task.")
raise StopIteration

previous_ti = ti.previous_ti
if not previous_ti:
Expand Down
52 changes: 52 additions & 0 deletions docs/scheduler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ the run stamped ``2016-01-01`` will be trigger soon after ``2016-01-01T23:59``.
In other words, the job instance is started once the period it covers
has ended.

**Let's Repeat That** The scheduler runs your job one ``schedule_interval`` AFTER the
start date, at the END of the period.

The scheduler starts an instance of the executor specified in the your
``airflow.cfg``. If it happens to be the ``LocalExecutor``, tasks will be
executed as subprocesses; in the case of ``CeleryExecutor`` and
Expand Down Expand Up @@ -72,6 +75,55 @@ should be triggered and come to a crawl. It might also create undesired
processing when changing the shape of your DAG, by say adding in new
tasks.

Backfill and Catchup
''''''''''''''''''''

An Airflow DAG with a ``start_date``, possibly an ``end_date``, and a ``schedule_interval`` defines a
series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of
Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine
the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any
interval that has not been run (or has been cleared). This concept is called Catchup.

If your DAG is written to handle it's own catchup (IE not limited to the interval, but instead to "Now"
for instance.), then you will want to turn catchup off (Either on the DAG itself with ``dag.catchup =
False``) or by default at the configuration file level with ``catchup_by_default = False``. What this
will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG
interval series.

.. code:: python
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@hourly',
}

dag = DAG('tutorial', catchup=False, default_args=default_args)

In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the
command line), a single DAG Run will be created, with an ``execution_date`` of 2016-01-01, and the next
one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02.

If the ``dag.catchup`` value had been True instead, the scheduler would have created a DAG Run for each
completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, as that interval
hasn't completed) and the scheduler will execute them sequentially. This behavior is great for atomic
datasets that can easily be split into periods. Turning catchup off is great if your DAG Runs perform
backfill internally.

External Triggers
'''''''''''''''''

Expand Down
102 changes: 102 additions & 0 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,3 +1101,105 @@ def test_dag_get_active_runs(self):
running_date = 'Except'

self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date')

def test_dag_catchup_option(self):
"""
Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date
"""

now = datetime.datetime.now()
six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
three_minutes_ago = now - datetime.timedelta(minutes=3)
two_hours_and_three_minutes_ago = three_minutes_ago - datetime.timedelta(hours=2)

START_DATE = six_hours_ago_to_the_hour
DAG_NAME1 = 'no_catchup_test1'
DAG_NAME2 = 'no_catchup_test2'
DAG_NAME3 = 'no_catchup_test3'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': START_DATE

}
dag1 = DAG(DAG_NAME1,
schedule_interval='* * * * *',
max_active_runs=1,
default_args=default_args
)

default_catchup = configuration.getboolean('scheduler', 'catchup_by_default')
# Test configs have catchup by default ON

self.assertEqual(default_catchup, True)

# Correct default?
self.assertEqual(dag1.catchup, True)

dag2 = DAG(DAG_NAME2,
schedule_interval='* * * * *',
max_active_runs=1,
catchup=False,
default_args=default_args
)

run_this_1 = DummyOperator(task_id='run_this_1', dag=dag2)
run_this_2 = DummyOperator(task_id='run_this_2', dag=dag2)
run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag2)
run_this_3.set_upstream(run_this_2)

session = settings.Session()
orm_dag = DagModel(dag_id=dag2.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag2.clear()

dr = scheduler.create_dag_run(dag2)

# We had better get a dag run
self.assertIsNotNone(dr)

# The DR should be scheduled in the last 3 minutes, not 6 hours ago
self.assertGreater(dr.execution_date, three_minutes_ago)

# The DR should be scheduled BEFORE now
self.assertLess(dr.execution_date, datetime.datetime.now())

dag3 = DAG(DAG_NAME3,
schedule_interval='@hourly',
max_active_runs=1,
catchup=False,
default_args=default_args
)

run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3)
run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3)
run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag3)
run_this_3.set_upstream(run_this_2)

session = settings.Session()
orm_dag = DagModel(dag_id=dag3.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag3.clear()

dr = None
dr = scheduler.create_dag_run(dag3)

# We had better get a dag run
self.assertIsNotNone(dr)

# The DR should be scheduled in the last two hours, not 6 hours ago
self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago)

# The DR should be scheduled BEFORE now
self.assertLess(dr.execution_date, datetime.datetime.now())