Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI support for bulk pause and resume of DAGs #38265

Merged
merged 4 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 23 additions & 4 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,19 @@ def string_lower_type(val):
),
action="store_true",
)

ARG_TREAT_DAG_AS_REGEX = Arg(
("--treat-dag-as-regex",),
help=("Deprecated -- use `--treat-dag-id-as-regex` instead"),
action="store_true",
)

ARG_TREAT_DAG_ID_AS_REGEX = Arg(
("--treat-dag-id-as-regex",),
help=("if set, dag_id will be treated as regex instead of an exact string"),
action="store_true",
)

# test_dag
ARG_SHOW_DAGRUN = Arg(
("--show-dagrun",),
Expand Down Expand Up @@ -1098,15 +1106,25 @@ class GroupCommand(NamedTuple):
),
ActionCommand(
name="pause",
help="Pause a DAG",
help="Pause DAG(s)",
description=(
"Pause one or more DAGs. This command allows to halt the execution of specified DAGs, "
"disabling further task scheduling. Use `--treat-dag-id-as-regex` to target multiple DAGs by "
"treating the `--dag-id` as a regex pattern."
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_pause"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_TREAT_DAG_ID_AS_REGEX, ARG_YES, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="unpause",
help="Resume a paused DAG",
help="Resume paused DAG(s)",
description=(
"Resume one or more DAGs. This command allows to restore the execution of specified "
"DAGs, enabling further task scheduling. Use `--treat-dag-id-as-regex` to target multiple DAGs "
"treating the `--dag-id` as a regex pattern."
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_unpause"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_TREAT_DAG_ID_AS_REGEX, ARG_YES, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="trigger",
Expand Down Expand Up @@ -1222,6 +1240,7 @@ class GroupCommand(NamedTuple):
ARG_RERUN_FAILED_TASKS,
ARG_RUN_BACKWARDS,
ARG_TREAT_DAG_AS_REGEX,
ARG_TREAT_DAG_ID_AS_REGEX,
),
),
ActionCommand(
Expand Down
49 changes: 42 additions & 7 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.helpers import ask_yesno
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -137,14 +138,21 @@ def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None:
category=RemovedInAirflow3Warning,
)

if not args.treat_dag_id_as_regex and args.treat_dag_as_regex:
warnings.warn(
"--treat-dag-as-regex is deprecated, use --treat-dag-id-as-regex instead",
category=RemovedInAirflow3Warning,
)
args.treat_dag_id_as_regex = args.treat_dag_as_regex

if args.ignore_first_depends_on_past is False:
args.ignore_first_depends_on_past = True

if not args.start_date and not args.end_date:
raise AirflowException("Provide a start_date and/or end_date")

if not dag:
dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_as_regex)
dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex)
elif isinstance(dag, list):
dags = dag
else:
Expand Down Expand Up @@ -214,14 +222,41 @@ def dag_unpause(args) -> None:
@providers_configuration_loaded
def set_is_paused(is_paused: bool, args) -> None:
"""Set is_paused for DAG by a given dag_id."""
dag = DagModel.get_dagmodel(args.dag_id)

if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
should_apply = True
dags = [
dag
for dag in get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex)
if is_paused != dag.get_is_paused()
]

if not dags:
raise AirflowException(f"No {'un' if is_paused else ''}paused DAGs were found")

if not args.yes and args.treat_dag_id_as_regex:
dags_ids = [dag.dag_id for dag in dags]
question = (
f"You are about to {'un' if not is_paused else ''}pause {len(dags_ids)} DAGs:\n"
f"{','.join(dags_ids)}"
f"\n\nAre you sure? [y/n]"
)
should_apply = ask_yesno(question)

dag.set_is_paused(is_paused=is_paused)
if should_apply:
dags_models = [DagModel.get_dagmodel(dag.dag_id) for dag in dags]
for dag_model in dags_models:
if dag_model is not None:
dag_model.set_is_paused(is_paused=is_paused)

print(f"Dag: {args.dag_id}, paused: {is_paused}")
AirflowConsole().print_as(
data=[
{"dag_id": dag.dag_id, "is_paused": dag.get_is_paused()}
for dag in dags_models
if dag is not None
],
output=args.output,
)
else:
print("Operation cancelled by user")


@providers_configuration_loaded
Expand Down
64 changes: 60 additions & 4 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def test_backfill(self, mock_run):
"--task-regex",
"run_this_first",
"--dry-run",
"--treat-dag-as-regex",
"--treat-dag-id-as-regex",
"--start-date",
DEFAULT_DATE.isoformat(),
]
Expand All @@ -244,6 +244,24 @@ def test_backfill(self, mock_run):
assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE_REPR}\n" in output
assert "Task run_this_first located in DAG example_branch_operator\n" in output

@mock.patch("airflow.cli.commands.dag_command._run_dag_backfill")
def test_backfill_treat_dag_as_regex_deprecation(self, _):
run_date = DEFAULT_DATE + timedelta(days=1)
cli_args = self.parser.parse_args(
[
"dags",
"backfill",
"example_bash_operator",
"--treat-dag-as-regex",
"--start-date",
run_date.isoformat(),
]
)

with pytest.warns(DeprecationWarning, match="--treat-dag-as-regex is deprecated"):
dag_command.dag_backfill(cli_args)
assert cli_args.treat_dag_id_as_regex == cli_args.treat_dag_as_regex

@mock.patch("airflow.cli.commands.dag_command.get_dag")
def test_backfill_fails_without_loading_dags(self, mock_get_dag):
cli_args = self.parser.parse_args(["dags", "backfill", "example_bash_operator"])
Expand Down Expand Up @@ -650,11 +668,49 @@ def test_cli_list_jobs_with_args(self):
def test_pause(self):
args = self.parser.parse_args(["dags", "pause", "example_bash_operator"])
dag_command.dag_pause(args)
assert self.dagbag.dags["example_bash_operator"].get_is_paused() in [True, 1]
assert self.dagbag.dags["example_bash_operator"].get_is_paused()
dag_command.dag_unpause(args)
assert not self.dagbag.dags["example_bash_operator"].get_is_paused()

@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex(self, mock_yesno):
args = self.parser.parse_args(["dags", "pause", "^example_.*$", "--treat-dag-id-as-regex"])
dag_command.dag_pause(args)
mock_yesno.assert_called_once()
assert self.dagbag.dags["example_bash_decorator"].get_is_paused()
assert self.dagbag.dags["example_kubernetes_executor"].get_is_paused()
assert self.dagbag.dags["example_xcom_args"].get_is_paused()

args = self.parser.parse_args(["dags", "unpause", "example_bash_operator"])
args = self.parser.parse_args(["dags", "unpause", "^example_.*$", "--treat-dag-id-as-regex"])
dag_command.dag_unpause(args)
assert self.dagbag.dags["example_bash_operator"].get_is_paused() in [False, 0]
assert not self.dagbag.dags["example_bash_decorator"].get_is_paused()
assert not self.dagbag.dags["example_kubernetes_executor"].get_is_paused()
assert not self.dagbag.dags["example_xcom_args"].get_is_paused()

@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex_operation_cancelled(self, ask_yesno, capsys):
args = self.parser.parse_args(["dags", "pause", "example_bash_operator", "--treat-dag-id-as-regex"])
ask_yesno.return_value = False
dag_command.dag_pause(args)
stdout = capsys.readouterr().out
assert "Operation cancelled by user" in stdout

@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex_yes(self, mock_yesno):
args = self.parser.parse_args(["dags", "pause", ".*", "--treat-dag-id-as-regex", "--yes"])
dag_command.dag_pause(args)
mock_yesno.assert_not_called()
dag_command.dag_unpause(args)

def test_pause_non_existing_dag_error(self):
args = self.parser.parse_args(["dags", "pause", "non_existing_dag"])
with pytest.raises(AirflowException):
dag_command.dag_pause(args)

def test_unpause_already_unpaused_dag_error(self):
args = self.parser.parse_args(["dags", "unpause", "example_bash_operator", "--yes"])
with pytest.raises(AirflowException, match="No paused DAGs were found"):
dag_command.dag_unpause(args)

def test_trigger_dag(self):
dag_command.dag_trigger(
Expand Down