Skip to content

Commit

Permalink
Add CLI support for bulk pause and resume of DAGs
Browse files Browse the repository at this point in the history
  • Loading branch information
shahar1 committed Mar 18, 2024
1 parent 985b603 commit 0d6a43f
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 12 deletions.
4 changes: 2 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,13 +1100,13 @@ class GroupCommand(NamedTuple):
name="pause",
help="Pause a DAG",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_pause"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_TREAT_DAG_AS_REGEX, ARG_YES, ARG_VERBOSE),
),
ActionCommand(
name="unpause",
help="Resume a paused DAG",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_unpause"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_TREAT_DAG_AS_REGEX, ARG_YES, ARG_VERBOSE),
),
ActionCommand(
name="trigger",
Expand Down
39 changes: 31 additions & 8 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.helpers import ask_yesno
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -214,14 +215,36 @@ def dag_unpause(args) -> None:
@providers_configuration_loaded
def set_is_paused(is_paused: bool, args) -> None:
"""Set is_paused for DAG by a given dag_id."""
dag = DagModel.get_dagmodel(args.dag_id)

if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")

dag.set_is_paused(is_paused=is_paused)

print(f"Dag: {args.dag_id}, paused: {is_paused}")
should_apply = True
dangerous_inputs = [
".",
".?",
".*",
".*?",
"^.",
"^.*",
"^.*?",
"^.*$",
r"[^\n]*",
"(?s:.*)",
r"[^\n]?",
"(?s:.*?)",
r"[\s\S]*",
r"[\w\W]*",
]
if not args.yes and args.treat_dag_as_regex and args.dag_id in dangerous_inputs:
question = f"You are about to {'un' if not is_paused else ''}pause all DAGs.\n\nAre you sure? [y/n]"
should_apply = ask_yesno(question)

if should_apply:
dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_as_regex)
dags_models = [DagModel.get_dagmodel(dag.dag_id) for dag in dags]
for dag_model in dags_models:
if dag_model is not None:
dag_model.set_is_paused(is_paused=is_paused)
print(f"Dag: {dag_model.dag_id}, paused: {is_paused}")
else:
print("Operation cancelled")


@providers_configuration_loaded
Expand Down
34 changes: 32 additions & 2 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,11 +650,41 @@ def test_cli_list_jobs_with_args(self):
def test_pause(self):
args = self.parser.parse_args(["dags", "pause", "example_bash_operator"])
dag_command.dag_pause(args)
assert self.dagbag.dags["example_bash_operator"].get_is_paused() in [True, 1]
assert self.dagbag.dags["example_bash_operator"].get_is_paused()

args = self.parser.parse_args(["dags", "unpause", "example_bash_operator"])
dag_command.dag_unpause(args)
assert self.dagbag.dags["example_bash_operator"].get_is_paused() in [False, 0]
assert not self.dagbag.dags["example_bash_operator"].get_is_paused()

def test_pause_regex(self):
args = self.parser.parse_args(["dags", "pause", "^example_.*$", "--treat-dag-as-regex"])
dag_command.dag_pause(args)
assert self.dagbag.dags["example_bash_decorator"].get_is_paused()
assert self.dagbag.dags["example_kubernetes_executor"].get_is_paused()
assert self.dagbag.dags["example_xcom_args"].get_is_paused()

args = self.parser.parse_args(["dags", "pause", "^example_.*$", "--treat-dag-as-regex"])
dag_command.dag_unpause(args)
assert not self.dagbag.dags["example_bash_decorator"].get_is_paused()
assert not self.dagbag.dags["example_kubernetes_executor"].get_is_paused()
assert not self.dagbag.dags["example_xcom_args"].get_is_paused()

@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex_all_dags_confirmation(self, mock_yesno):
args = self.parser.parse_args(["dags", "pause", ".*", "--treat-dag-as-regex"])
dag_command.dag_pause(args)
mock_yesno.assert_called_once()

@mock.patch("airflow.cli.commands.dag_command.ask_yesno")
def test_pause_regex_all_dags_yes(self, mock_yesno):
args = self.parser.parse_args(["dags", "pause", ".*", "--treat-dag-as-regex", "--yes"])
dag_command.dag_pause(args)
mock_yesno.assert_not_called()

def test_pause_non_existing_dag_error(self):
args = self.parser.parse_args(["dags", "pause", "non_existing_dag"])
with pytest.raises(AirflowException):
dag_command.dag_pause(args)

def test_trigger_dag(self):
dag_command.dag_trigger(
Expand Down

0 comments on commit 0d6a43f

Please sign in to comment.