Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ def backfill(args, dag=None):
if not args.start_date and not args.end_date:
raise AirflowException("Provide a start_date and/or end_date")

if args.reverse_backfill_order and not args.ignore_first_depends_on_past:
# if we backfill in reversed chronological order,
# we should specify ignore depends on past explicitly
raise AirflowException("Provide -I option when using --reverse_backfill_order option")

# If only one date is passed, using same as start and end
args.end_date = args.end_date or args.start_date
args.start_date = args.start_date or args.end_date
Expand Down Expand Up @@ -214,6 +219,7 @@ def backfill(args, dag=None):
verbose=args.verbose,
conf=run_conf,
rerun_failed_tasks=args.rerun_failed_tasks,
reverse_backfill_order=args.reverse_backfill_order,
)


Expand Down Expand Up @@ -1565,6 +1571,13 @@ class CLIFactory(object):
"all the failed tasks for the backfill date range "
"instead of throwing exceptions"),
"store_true"),
'reverse_backfill_order': Arg(
('--reverse_backfill_order',),
(
"if set, the backfill dagrun will be "
"created in reverse order if dag doesnt depends on past."
),
'store_true'),

# list_tasks
'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
Expand Down Expand Up @@ -1924,7 +1937,7 @@ class CLIFactory(object):
'mark_success', 'local', 'donot_pickle',
'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf',
'reset_dag_run', 'rerun_failed_tasks',
'reset_dag_run', 'rerun_failed_tasks', 'reverse_backfill_order',
)
}, {
'func': list_dag_runs,
Expand Down
10 changes: 9 additions & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,7 @@ def __init__(
verbose=False,
conf=None,
rerun_failed_tasks=False,
reverse_backfill_order=False,
*args, **kwargs):
"""
:param dag: DAG object.
Expand All @@ -1902,7 +1903,9 @@ def __init__(
:param rerun_failed_tasks: flag to whether to
auto rerun the failed task in backfill
:type rerun_failed_tasks: bool
:param args:
:param reverse_backfill_order: flag to whether to create dagrun from most latest date
to oldest date
:param args: bool
:param kwargs:
"""
self.dag = dag
Expand All @@ -1918,6 +1921,7 @@ def __init__(
self.verbose = verbose
self.conf = conf
self.rerun_failed_tasks = rerun_failed_tasks
self.reverse_backfill_order = reverse_backfill_order
super(BackfillJob, self).__init__(*args, **kwargs)

def _update_counters(self, ti_status):
Expand Down Expand Up @@ -2391,6 +2395,10 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
:param session: the current session object
:type session: Session
"""
if self.reverse_backfill_order and self.ignore_first_depends_on_past:
# create dag run in reversed chronological order,
# depends_on_past option should be ignored
run_dates = run_dates[::-1]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some flag on the dag that we look at to examine if the DAG does depend on past or not? Cos if not then not having to specify -I would be nice.

Seeing this behaviour/need I now think that this shouldn't be a config option.

Additionally if -I isn't passed then this config setting is silently ignored - that is going to be confusing, and this case should be an error.

for next_run_date in run_dates:
dag_run = self._get_dag_run(next_run_date, session=session)
tis_map = self._task_instances_for_dag_run(dag_run,
Expand Down
27 changes: 27 additions & 0 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,33 @@ def test_backfill_fill_blanks(self):
elif ti.task_id == op5.task_id:
self.assertEqual(ti.state, State.UPSTREAM_FAILED)

def test_backfill_reversed_order(self):
dag = self.dagbag.get_dag('example_bash_operator')
dag.clear()

job = BackfillJob(
dag=dag,
start_date=DEFAULT_DATE + datetime.timedelta(days=2),
end_date=DEFAULT_DATE + datetime.timedelta(days=3),
ignore_first_depends_on_past=True,
reverse_backfill_order=True
)
job.run()

session = settings.Session()
drs = session.query(DagRun).filter(
DagRun.dag_id == 'example_bash_operator'
).order_by(DagRun.execution_date).all()

self.assertTrue(drs[0].execution_date == DEFAULT_DATE + datetime.timedelta(days=3))
self.assertTrue(drs[0].state == State.SUCCESS)
self.assertTrue(drs[1].execution_date ==
DEFAULT_DATE + datetime.timedelta(days=2))
self.assertTrue(drs[1].state == State.SUCCESS)

dag.clear()
session.close()

def test_backfill_execute_subdag(self):
dag = self.dagbag.get_dag('example_subdag_operator')
subdag_op_task = dag.get_task('section-1')
Expand Down