Skip to content

Commit

Permalink
DAG regex flag in backfill command (#23870)
Browse files Browse the repository at this point in the history
  • Loading branch information
domagojrazum committed Aug 8, 2022
1 parent 63f2067 commit 72a6ac5
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 52 deletions.
6 changes: 6 additions & 0 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ def string_lower_type(val):
),
action="store_true",
)
ARG_TREAT_DAG_AS_REGEX = Arg(
("--treat-dag-as-regex",),
help=("if set, dag_id will be treated as regex instead of an exact string"),
action="store_true",
)
# test_dag
ARG_SHOW_DAGRUN = Arg(
("--show-dagrun",),
Expand Down Expand Up @@ -1135,6 +1140,7 @@ class GroupCommand(NamedTuple):
ARG_RESET_DAG_RUN,
ARG_RERUN_FAILED_TASKS,
ARG_RUN_BACKWARDS,
ARG_TREAT_DAG_AS_REGEX,
),
),
ActionCommand(
Expand Down
111 changes: 61 additions & 50 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
from airflow.utils.cli import get_dag, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState

log = logging.getLogger(__name__)


@cli_utils.action_cli
def dag_backfill(args, dag=None):
"""Creates backfill job or dry run for a DAG"""
"""Creates backfill job or dry run for a DAG or list of DAGs using regex"""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)

signal.signal(signal.SIGTERM, sigint_handler)
Expand All @@ -66,64 +68,73 @@ def dag_backfill(args, dag=None):
if not args.start_date and not args.end_date:
raise AirflowException("Provide a start_date and/or end_date")

dag = dag or get_dag(args.subdir, args.dag_id)
if not dag:
dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_as_regex)
else:
dags = dag if type(dag) == list else [dag]

dags.sort(key=lambda d: d.dag_id)

# If only one date is passed, using same as start and end
args.end_date = args.end_date or args.start_date
args.start_date = args.start_date or args.end_date

if args.task_regex:
dag = dag.partial_subset(
task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies
)
if not dag.task_dict:
raise AirflowException(
f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..."
)

run_conf = None
if args.conf:
run_conf = json.loads(args.conf)

if args.dry_run:
print(f"Dry run of DAG {args.dag_id} on {args.start_date}")
dr = DagRun(dag.dag_id, execution_date=args.start_date)
for task in dag.tasks:
print(f"Task {task.task_id}")
ti = TaskInstance(task, run_id=None)
ti.dag_run = dr
ti.dry_run()
else:
if args.reset_dagruns:
DAG.clear_dags(
[dag],
start_date=args.start_date,
end_date=args.end_date,
confirm_prompt=not args.yes,
include_subdags=True,
dag_run_state=DagRunState.QUEUED,
)

try:
dag.run(
start_date=args.start_date,
end_date=args.end_date,
mark_success=args.mark_success,
local=args.local,
donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
ignore_first_depends_on_past=args.ignore_first_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
pool=args.pool,
delay_on_limit_secs=args.delay_on_limit,
verbose=args.verbose,
conf=run_conf,
rerun_failed_tasks=args.rerun_failed_tasks,
run_backwards=args.run_backwards,
continue_on_failures=args.continue_on_failures,
for dag in dags:
if args.task_regex:
dag = dag.partial_subset(
task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies
)
except ValueError as vr:
print(str(vr))
sys.exit(1)
if not dag.task_dict:
raise AirflowException(
f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..."
)

if args.dry_run:
print(f"Dry run of DAG {dag.dag_id} on {args.start_date}")
dr = DagRun(dag.dag_id, execution_date=args.start_date)
for task in dag.tasks:
print(f"Task {task.task_id} located in DAG {dag.dag_id}")
ti = TaskInstance(task, run_id=None)
ti.dag_run = dr
ti.dry_run()
else:
if args.reset_dagruns:
DAG.clear_dags(
[dag],
start_date=args.start_date,
end_date=args.end_date,
confirm_prompt=not args.yes,
include_subdags=True,
dag_run_state=DagRunState.QUEUED,
)

try:
dag.run(
start_date=args.start_date,
end_date=args.end_date,
mark_success=args.mark_success,
local=args.local,
donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
ignore_first_depends_on_past=args.ignore_first_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
pool=args.pool,
delay_on_limit_secs=args.delay_on_limit,
verbose=args.verbose,
conf=run_conf,
rerun_failed_tasks=args.rerun_failed_tasks,
run_backwards=args.run_backwards,
continue_on_failures=args.continue_on_failures,
)
except ValueError as vr:
print(str(vr))
sys.exit(1)

if len(dags) > 1:
log.info("All of the backfills are done.")


@cli_utils.action_cli
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ def _execute(self, session=None):
session.commit()
executor.end()

self.log.info("Backfill done. Exiting.")
self.log.info("Backfill done for DAG %s. Exiting.", self.dag)

@provide_session
def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
Expand Down
29 changes: 28 additions & 1 deletion tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_backfill(self, mock_run):

output = stdout.getvalue()
assert f"Dry run of DAG example_bash_operator on {DEFAULT_DATE.isoformat()}\n" in output
assert "Task runme_0\n" in output
assert "Task runme_0 located in DAG example_bash_operator\n" in output

mock_run.assert_not_called() # Dry run shouldn't run the backfill

Expand Down Expand Up @@ -176,6 +176,33 @@ def test_backfill(self, mock_run):
)
mock_run.reset_mock()

with contextlib.redirect_stdout(io.StringIO()) as stdout:
dag_command.dag_backfill(
self.parser.parse_args(
[
'dags',
'backfill',
'example_branch_(python_){0,1}operator(_decorator){0,1}',
'--task-regex',
'run_this_first',
'--dry-run',
'--treat-dag-as-regex',
'--start-date',
DEFAULT_DATE.isoformat(),
]
),
)

output = stdout.getvalue()

assert (
f"Dry run of DAG example_branch_python_operator_decorator on "
f"{DEFAULT_DATE.isoformat()}\n" in output
)
assert "Task run_this_first located in DAG example_branch_python_operator_decorator\n" in output
assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE.isoformat()}\n" in output
assert "Task run_this_first located in DAG example_branch_operator\n" in output

@mock.patch("airflow.cli.commands.dag_command.get_dag")
def test_backfill_fails_without_loading_dags(self, mock_get_dag):

Expand Down

0 comments on commit 72a6ac5

Please sign in to comment.