From 4c6dbbb4caea040a965da62803593d40780cbe4c Mon Sep 17 00:00:00 2001 From: Tao feng Date: Tue, 15 Jan 2019 00:23:16 -0800 Subject: [PATCH] [AIRFLOW-3702] Reverse Backfilling --- airflow/bin/cli.py | 15 ++++++++++++++- airflow/jobs.py | 10 +++++++++- tests/test_jobs.py | 27 +++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 9db5b6eebd514..3a42a24ad9be8 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -170,6 +170,11 @@ def backfill(args, dag=None): if not args.start_date and not args.end_date: raise AirflowException("Provide a start_date and/or end_date") + if args.reverse_backfill_order and not args.ignore_first_depends_on_past: + # if we backfill in reversed chronological order, + # we should specify ignore depends on past explicitly + raise AirflowException("Provide -I option when using --reverse_backfill_order option") + # If only one date is passed, using same as start and end args.end_date = args.end_date or args.start_date args.start_date = args.start_date or args.end_date @@ -214,6 +219,7 @@ def backfill(args, dag=None): verbose=args.verbose, conf=run_conf, rerun_failed_tasks=args.rerun_failed_tasks, + reverse_backfill_order=args.reverse_backfill_order, ) @@ -1565,6 +1571,13 @@ class CLIFactory(object): "all the failed tasks for the backfill date range " "instead of throwing exceptions"), "store_true"), + 'reverse_backfill_order': Arg( + ('--reverse_backfill_order',), + ( + "if set, the backfill dagrun will be " + "created in reverse order if dag doesnt depends on past." + ), + 'store_true'), # list_tasks 'tree': Arg(("-t", "--tree"), "Tree view", "store_true"), @@ -1924,7 +1937,7 @@ class CLIFactory(object): 'mark_success', 'local', 'donot_pickle', 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past', 'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf', - 'reset_dag_run', 'rerun_failed_tasks', + 'reset_dag_run', 'rerun_failed_tasks', 'reverse_backfill_order', ) }, { 'func': list_dag_runs, diff --git a/airflow/jobs.py b/airflow/jobs.py index 7832adcf64b95..3cdc2927050b0 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1876,6 +1876,7 @@ def __init__( verbose=False, conf=None, rerun_failed_tasks=False, + reverse_backfill_order=False, *args, **kwargs): """ :param dag: DAG object. @@ -1902,7 +1903,9 @@ def __init__( :param rerun_failed_tasks: flag to whether to auto rerun the failed task in backfill :type rerun_failed_tasks: bool - :param args: + :param reverse_backfill_order: flag to whether to create dagrun from most latest date + to oldest date + :param args: bool :param kwargs: """ self.dag = dag @@ -1918,6 +1921,7 @@ def __init__( self.verbose = verbose self.conf = conf self.rerun_failed_tasks = rerun_failed_tasks + self.reverse_backfill_order = reverse_backfill_order super(BackfillJob, self).__init__(*args, **kwargs) def _update_counters(self, ti_status): @@ -2391,6 +2395,10 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id, :param session: the current session object :type session: Session """ + if self.reverse_backfill_order and self.ignore_first_depends_on_past: + # create dag run in reversed chronological order, + # depends_on_past option should be ignored + run_dates = run_dates[::-1] for next_run_date in run_dates: dag_run = self._get_dag_run(next_run_date, session=session) tis_map = self._task_instances_for_dag_run(dag_run, diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 4ed5faa37b430..74c71e27814cb 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -915,6 +915,33 @@ def test_backfill_fill_blanks(self): elif ti.task_id == op5.task_id: self.assertEqual(ti.state, State.UPSTREAM_FAILED) + def test_backfill_reversed_order(self): + dag = self.dagbag.get_dag('example_bash_operator') + dag.clear() + + job = BackfillJob( + dag=dag, + start_date=DEFAULT_DATE + datetime.timedelta(days=2), + end_date=DEFAULT_DATE + datetime.timedelta(days=3), + ignore_first_depends_on_past=True, + reverse_backfill_order=True + ) + job.run() + + session = settings.Session() + drs = session.query(DagRun).filter( + DagRun.dag_id == 'example_bash_operator' + ).order_by(DagRun.execution_date).all() + + self.assertTrue(drs[0].execution_date == DEFAULT_DATE + datetime.timedelta(days=3)) + self.assertTrue(drs[0].state == State.SUCCESS) + self.assertTrue(drs[1].execution_date == + DEFAULT_DATE + datetime.timedelta(days=2)) + self.assertTrue(drs[1].state == State.SUCCESS) + + dag.clear() + session.close() + def test_backfill_execute_subdag(self): dag = self.dagbag.get_dag('example_subdag_operator') subdag_op_task = dag.get_task('section-1')