Skip to content

[AIRFLOW-558] Add Support for dag.backfill=(True|False) Option#1830

Closed
btallman wants to merge 1 commit into
apache:masterfrom
btallman:NoBackfill_clean_feature
Closed

[AIRFLOW-558] Add Support for dag.backfill=(True|False) Option#1830
btallman wants to merge 1 commit into
apache:masterfrom
btallman:NoBackfill_clean_feature

Conversation

@btallman
Copy link
Copy Markdown
Contributor

@btallman btallman commented Oct 8, 2016

Dear Airflow Maintainers,

Please accept this PR that addresses the following issues:

Add Support for dag.backfill=(True|False) Option

Added a dag.backfill option and modified the scheduler to look at
the value when scheduling DagRuns (by moving dag.start_date up to
dag.previous_schedule), and added a config option scheduler.backfill_by_default (defaults to True)
that allows users to set this to False for all dags modifying the existing DAGs

In addition, we added a test to jobs.py (test_dag_backfill_option)

@bolkedebruin
Copy link
Copy Markdown
Contributor

bolkedebruin commented Oct 8, 2016

Overall this looks good - from a first review. However, I am not sure about having a global option in. Changing this option would fundamentally change the behavior of what airflow does and will create questions on the mailling list "why doesnt my dag backfill?".

Secondly, I would like to have this properly documented.

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Oct 8, 2016

Good points... I will look at the documentation to figure out where to put that. As to the global option, I think as long as it is well documented and doesn't change the default it should be ok...
Ben 

Get Outlook for iOS

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Oct 9, 2016

@bolkedebruin and @r39132 and @plypaul and others, I added docs. VERY open to suggestions for improvements.

@codecov-io
Copy link
Copy Markdown

codecov-io commented Oct 9, 2016

Current coverage is 67.01% (diff: 67.39%)

Merging #1830 into master will decrease coverage by <.01%

@@             master      #1830   diff @@
==========================================
  Files           135        135          
  Lines         10255      10297    +42   
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
+ Hits           6873       6901    +28   
- Misses         3382       3396    +14   
  Partials          0          0          

Powered by Codecov. Last update 9df4789...f296939

@r39132
Copy link
Copy Markdown
Contributor

r39132 commented Oct 9, 2016

I do have some questions about this functionality as well. Is the idea that some companies don't want backfill at all? If so, I'd like to see some votes for this feature (via the JIRA).

Alternate functionality could be to have a UI feature like "pause" that is exposed on the UI per DAG and have it disabled by default (e.g. dags_are_paused_at_creation = True)? That way, it becomes easier to override.

I feel that functionality is the right way to handle this.

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Oct 9, 2016

That actually doesn't stop backfill. As soon as you un pause the dag, it backfills. 
This doesn't stop the backfill by default, only when set either in config, or on a per dag basis. Bolke and I came up with the design, and Paul said that Airbnb needs it as well. We also have several people asking on gitter for this.
In addition, this allows your own request for cron like behavior to work. This the allows us at apigee to move to the scheduler, from externally triggered dags (we use cron for all of our triggering now).
Ben 

Get Outlook for iOS

Comment thread docs/scheduler.rst Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please dont mention this here. Documentation will be release together with Airflow itself, so there is no need.

@r39132
Copy link
Copy Markdown
Contributor

r39132 commented Oct 10, 2016

Hi!
I just tested this fix locally with a combination of DAGs. There seems to be an issue with using @hourly (and possibly the other shortcuts as well, e.g. @daily) and backfill=False. For that combination, the DAG is never scheduled!

screenshot 2016-10-10 11 46 51

To reproduce a case where this does not work, try

import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
    dag_id='simple_dummy_op_v1',
    schedule_interval='@hourly',
    backfill=False,
    start_date=dt.datetime(2016, 9, 1),
)


task1 = DummyOperator(task_id='task1', dag=dag)

However, a similar DAG with backfill=True will work :

import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
    dag_id='simple_dummy_w_backfill_op_v1',
    schedule_interval='@hourly',
    backfill=True,
    start_date=dt.datetime(2016, 9, 1),
)

task1 = DummyOperator(task_id='task1', dag=dag)

This suggests a problem with the combination of @hourly and backfill=False.

@r39132
Copy link
Copy Markdown
Contributor

r39132 commented Oct 10, 2016

It looks like the issue above has been fixed but I am now seeing an issue with a DAG that has 'depends_on_past': True, It's stuck in "RUNNING" state and does not appear to get scheduled:

screenshot 2016-10-10 16 28 28

The code for the problematic DAG is below :

from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': seven_days_ago,
}

dag = DAG(
    dag_id='example_bash_operator_v5', default_args=args,
    schedule_interval='@hourly',
    backfill=False, 
    dagrun_timeout=timedelta(minutes=60))

cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)

run_this = BashOperator(
    task_id='run_after_loop', bash_command='echo 1', dag=dag)
run_this.set_downstream(run_this_last)

for i in range(3):
    i = str(i)
    task = BashOperator(
        task_id='runme_'+i,
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        dag=dag)
    task.set_downstream(run_this)

task = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag)
task.set_downstream(run_this_last)

if __name__ == "__main__":
    dag.cli()

@btallman
Copy link
Copy Markdown
Contributor Author

Testing this change which fixes that Depends on past here:

`
@Property
@provide_session
def previous_ti(self, session=None):
""" The task instance for the task that ran before this task instance """
dag = self.task.dag

    if dag.backfill:
        return session.query(TaskInstance).filter(
            TaskInstance.dag_id == self.dag_id,
            TaskInstance.task_id == self.task.task_id,
            TaskInstance.execution_date ==
            self.task.dag.previous_schedule(self.execution_date),
        ).first()

    else:
        last_dagrun = session.query(DagRun).filter(
            DagRun.dag_id == dag.dag_id,
            DagRun.execution_date < self.execution_date
        ).order_by(
            DagRun.execution_date.desc()
        ).first()

        if last_dagrun:
            return session.query(TaskInstance).filter(
                TaskInstance.dag_id == self.dag_id,
                TaskInstance.task_id == self.task.task_id,
                TaskInstance.execution_date ==
                last_dagrun.execution_date,
            ).first()
        else:
            return None

`

@r39132
Copy link
Copy Markdown
Contributor

r39132 commented Oct 13, 2016

@btallman Did you test your fix against my attached code? Because, it still doesn't work against my provided example. I am resetting the DB before running by the way.

@btallman btallman force-pushed the NoBackfill_clean_feature branch 3 times, most recently from 2197ab1 to 9fe4f83 Compare October 13, 2016 04:35
@btallman
Copy link
Copy Markdown
Contributor Author

OK, this is passing CI now. Ready for you to look at it @r39132.

@boristyukin
Copy link
Copy Markdown
Contributor

voting for this PR! IMHO it will make Airflow behave more like enterprise workflow management tools

Comment thread airflow/models.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be part of class DagRun not of TaskInstance.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it here, because ti.previous_dagrun() seems cleaner than dr.previous_dagrun(ti)

It is a helper function, as we could also leverage something like:

tip = ti.previous_ti()
if tip:
    drp = tip.get_dagrun()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dr = Ti.get_dagrun()
Previous = dr.previous_dagrun()

DagRuns are the basis where to work from. TIs should only handle stuff within their own unit. Using previous here disregards that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, cleaned it up... Good idea. Interestingly, there was no dr.get_previous_dagrun, so I added that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A decision on style for ti.previous_ti vs ti.get_previous_ti() is needed as well, but that isn't something that I introduced.

@btallman btallman force-pushed the NoBackfill_clean_feature branch 2 times, most recently from 4714683 to 4ac201d Compare October 19, 2016 21:01
@btallman
Copy link
Copy Markdown
Contributor Author

BTW, while it says fail here, all 6 are succeeding on my fork, although I did have to kick one to run again...

@r39132
Copy link
Copy Markdown
Contributor

r39132 commented Oct 27, 2016

@btallman Please rebase. I will be happy to test once ready.

@btallman btallman force-pushed the NoBackfill_clean_feature branch from 4ac201d to f758c01 Compare October 27, 2016 15:49
@btallman
Copy link
Copy Markdown
Contributor Author

@r39132 I went ahead and rebased...

@bolkedebruin
Copy link
Copy Markdown
Contributor

bolkedebruin commented Nov 13, 2016

@r39132 did you test this yet? @btallman can you fix the small landscape error? Will schedule some time this week to also tests this.

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Nov 13, 2016

What is the landscape error? Btw, we are running this on half of our production...

Get Outlook for iOS

@bolkedebruin
Copy link
Copy Markdown
Contributor

@btallman one of the comments is too long (click on details for landscape). Good to hear you are running this in production. Would be nice to know a little bit about the complexity of your dags (ie. @r39132 earlier caught some errors that might have slipped through in a not too complex setup).

@btallman btallman force-pushed the NoBackfill_clean_feature branch 2 times, most recently from 567076d to 6c87204 Compare November 15, 2016 06:31
Comment thread tests/jobs.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be named six_hours_ago_to_the_hour

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zodiac - Missed that one when we switched to testing a longer period. Good catch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @zodiac

Comment thread tests/jobs.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be named three_minutes_ago and two_hours_and_three_minutes_ago for consistency

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zodiac - That would be better, not sure if this will make it into a PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @zodiac

@mistercrunch
Copy link
Copy Markdown
Member

This overload new meaning into the term backfill. Let's clarify some of the semantics here.

backfill: "a manually triggered CLI process to run or re-run task instances in the past"
scheduler catchup (new term): when the scheduler has task instances to run in the past. This assumes a range of dates between tasks' start_date and the current date, and active DagRuns in between.

I'd suggest renaming DAG.backfill to allow_scheduler_catchup.

Comment thread airflow/models.py Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow_scheduler_catchup=configuration.getboolean('scheduler', 'allow_scheduler_catchup_default')

Comment thread airflow/models.py Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearly describes how it affects the scheduler behavihor around DagRun creation

Comment thread airflow/configuration.py Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Default setting for `DAG.allow_scheduler_catchup` [clearly describe how it affects DagRun creation]
allow_scheduler_catchup_default = True

@bolkedebruin
Copy link
Copy Markdown
Contributor

@mistercrunch I would suggest using DAG.catch_up (or DAG.allow_catch_up) (we are reasoning from the perspective DAG not from the scheduler). In the config section I would suggest to use catch_up_by_default as it already resides under the scheduler section. Bit semantic, but it made it easier in my mind.

@btallman btallman force-pushed the NoBackfill_clean_feature branch from 99b67f9 to 754d96f Compare December 8, 2016 21:56
@r39132
Copy link
Copy Markdown
Contributor

r39132 commented Dec 8, 2016

@btallman it looks like there is a lot of interest in this PR.. have a look at the comments above and I can help shepherd this through.

Comment thread airflow/models.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added in a recent PR, please don't remove it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was a rebase error!!! Sorry.

Comment thread airflow/models.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem as below.

Comment thread airflow/models.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it is outside the scope of this PR, this is potentially a quite expensive operation on the DB.

Comment thread airflow/models.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please leave description in. See below.

Comment thread airflow/models.py Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much cleaner. Thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to have to dig into this. Apparently, moving to a DR based previous_ti causes issues when NOT using a DR (backfill). Finally figure that out this afternoon. Will attempt to fix tonight.

@bolkedebruin
Copy link
Copy Markdown
Contributor

Look much better! Just the final dots on I and then we're there. Thanks for all the work and bearing with us.

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Dec 10, 2016 via email

@bolkedebruin
Copy link
Copy Markdown
Contributor

Hey @btallman would be nice to have this in before the new year. Can you rebase and see why the builds are failing? If needed I can provide some help.

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Dec 27, 2016 via email

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Dec 28, 2016 via email

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Dec 28, 2016 via email

@btallman btallman force-pushed the NoBackfill_clean_feature branch from 754d96f to f296939 Compare January 4, 2017 05:07
@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Jan 4, 2017

@bolkedebruin and @r39132 - I finally was able to figure out what the issue was (turns out using DagRun to get previous TI only works if there IS a dag run (so not on backfill, go figure) so that minor change that I agreed to for Bolke turned out to be an issue.

Tracked that down and the tests are now passing.

Please review ASAP.

@bolkedebruin
Copy link
Copy Markdown
Contributor

@btallman I need to look in to your comment on backfills, as in master backfills actually create dag runs. Can you elaborate a bit further what you think went wrong?

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Jan 4, 2017 via email

@bolkedebruin
Copy link
Copy Markdown
Contributor

Gotcha. That makes sense.

Copy link
Copy Markdown
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor things. I will start integration testing now.

Comment thread airflow/models.py
DagRun.execution_date.desc()
).first()

@provide_session
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I understand the difference between get_previous_dagrun and get_previous_scheduled_dagrun . Should get_previous_dagrun also not return the same as get_previous_scheduled_dagrun? Or do you want to distinguish between backfilled and intervalled dag runs?

Please rename to "get_previous_scheduled" it is already part of DagRun.

Comment thread airflow/models.py
dag = self.task.dag
if dag:
dr = self.get_dagrun(session=session)
if not dr:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you are doing this, although the comment does not reflect the requirement I think. Please mark it as FIXME (it should be removed in the future / throw an exception)

Comment thread airflow/models.py
if not previous_scheduled_date:
return None
else:
return TaskInstance(task=self.task, execution_date=previous_scheduled_date)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that whether or not there is a "previous_scheduled_date" (i.e. there won't be one by convention before start_date) one always gets a TaskInstance. I don't think that should be the case.

Comment thread airflow/models.py

return self.dag

@provide_session
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename to "get_previous", it is already part of DagRun (or even make it a property)

Comment thread airflow/configuration.py
authenticate = true
max_threads = 2
catchup_by_default = True
scheduler_zombie_task_threshold = 300
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a duplicate from above

@bolkedebruin
Copy link
Copy Markdown
Contributor

In initial testing it seems to behave quite ok (thats an understatement for currently flawless)

@bolkedebruin
Copy link
Copy Markdown
Contributor

What's left (apart from the minor changes) is to make sure it is documented, maybe with a small example.

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Jan 6, 2017 via email

@bolkedebruin
Copy link
Copy Markdown
Contributor

bolkedebruin commented Jan 11, 2017

Hey @btallman can you add the documentation, I would really like to merge this and get it into 1.8. (And the minor nits)

@btallman btallman force-pushed the NoBackfill_clean_feature branch from f296939 to de0c044 Compare January 11, 2017 22:41
@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Jan 11, 2017 via email

@btallman
Copy link
Copy Markdown
Contributor Author

btallman commented Jan 11, 2017

BTW - Although this failed tests on here, it succeeded on my fork... I'll re-push a NON update to force a re-test I guess. - https://travis-ci.org/btallman/incubator-airflow

Added a dag.catchup option and modified the
scheduler to look at the value when scheduling DagRuns
(by moving dag.start_date up to dag.previous_schedule),
and added a config option catchup_by_default
(defaults to True) that allows users to set this to False for all
dags modifying the existing DAGs

In addition, we added a test to jobs.py
(test_dag_catchup_option)
@btallman btallman force-pushed the NoBackfill_clean_feature branch from de0c044 to d2a5f46 Compare January 11, 2017 23:25
@bolkedebruin
Copy link
Copy Markdown
Contributor

That seems to be an error in an assumption on one of the tests. Tests were executed at 23 hours. The test was doing now.hour() + 1, while hours should stay within 0..23 .

Will merge. Thanks a lot @btallman !

@asfgit asfgit closed this in 1caaceb Jan 13, 2017
alekstorm pushed a commit to alekstorm/incubator-airflow that referenced this pull request Jun 1, 2017
Added a dag.catchup option and modified the
scheduler to look at the value when scheduling
DagRuns
(by moving dag.start_date up to
dag.previous_schedule),
and added a config option catchup_by_default
(defaults to True) that allows users to set this
to False for all
dags modifying the existing DAGs

In addition, we added a test to jobs.py
(test_dag_catchup_option)

Closes apache#1830 from
btallman/NoBackfill_clean_feature
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants