diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 1a017304f1580..6792a0234ef25 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -4,7 +4,7 @@ auth:f396d4bce90215599dde6ad0a8f30f29 backfill:bbce9859a2d1ce054ad22db92dea8c05 config:cb175bedf29e8a2c2c6a2ebd13d770a7 connections:a16225e1c7d28488d0da612752669b4b -dags:4c2ae65c76b32efcd86b5fd0c1831a2e +dags:287a128a71c97d2b537e09a5c7c73c09 dagrun:f47ed2a89ed0f8c71f79dba53a3a3882 jobs:7f8680afff230eb9940bc7fca727bd52 pools:03fc7d948cbecf16ff8d640eb8f0ce43 diff --git a/airflow-ctl/docs/images/output_dags.svg b/airflow-ctl/docs/images/output_dags.svg index 496fa8d995ce8..6863827b594ed 100644 --- a/airflow-ctl/docs/images/output_dags.svg +++ b/airflow-ctl/docs/images/output_dags.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + - + - + - - Usage:airflowctl dags [-hCOMMAND... - -Perform Dags operations - -Positional Arguments: -COMMAND -deletePerform delete operation -getPerform get operation -get-detailsPerform get_details operation -get-import-errorPerform get_import_error operation -get-statsPerform get_stats operation -get-tagsPerform get_tags operation -get-versionPerform get_version operation -listPerform list operation -list-import-errors -Perform list_import_errors operation -list-versionPerform list_version operation -list-warningPerform list_warning operation -triggerPerform trigger operation -updatePerform update operation - -Options: --h--helpshow this help message and exit + + Usage:airflowctl dags [-hCOMMAND... + +Perform Dags operations + +Positional Arguments: +COMMAND +deletePerform delete operation +getPerform get operation +get-detailsPerform get_details operation +get-import-errorPerform get_import_error operation +get-statsPerform get_stats operation +get-tagsPerform get_tags operation +get-versionPerform get_version operation +listPerform list operation +list-import-errors +Perform list_import_errors operation +list-versionPerform list_version operation +list-warningPerform list_warning operation +pausePause a Dag +triggerPerform trigger operation +unpauseUnpause a Dag +updatePerform update operation + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index 355304ae37a14..9e138ffc87b55 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -73,6 +73,7 @@ def wrapper(f): "provide_api_client", "NEW_API_CLIENT", "ClientKind", + "ServerResponseError", ] PS = ParamSpec("PS") diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index b4b283c658a3b..da548530eb0ce 100644 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -252,6 +252,14 @@ def __call__(self, parser, namespace, values, option_string=None): nargs="?", ) +# Dag Commands Args +ARG_DAG_ID = Arg( + flags=("--dag-id",), + type=str, + dest="dag_id", + help="The DAG ID of the DAG to pause or unpause", +) + # Variable Commands Args ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg( flags=("-a", "--action-on-existing-key"), @@ -804,6 +812,27 @@ def merge_commands( ), ) +DAG_COMMANDS = ( + ActionCommand( + name="pause", + help="Pause a Dag", + func=lazy_load_command("airflowctl.ctl.commands.dag_command.pause"), + args=( + ARG_DAG_ID, + ARG_OUTPUT, + ), + ), + ActionCommand( + name="unpause", + help="Unpause a Dag", + func=lazy_load_command("airflowctl.ctl.commands.dag_command.unpause"), + args=( + ARG_DAG_ID, + ARG_OUTPUT, + ), + ), +) + POOL_COMMANDS = ( ActionCommand( name="import", @@ -854,6 +883,11 @@ def merge_commands( help="Manage Airflow connections", subcommands=CONNECTION_COMMANDS, ), + GroupCommand( + name="dags", + help="Manage Airflow Dags", + subcommands=DAG_COMMANDS, + ), GroupCommand( name="pools", help="Manage Airflow pools", diff --git a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py new file mode 100644 index 0000000000000..9b43be47eb27b --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import sys +from typing import Literal + +import rich + +from airflowctl.api.client import NEW_API_CLIENT, ClientKind, ServerResponseError, provide_api_client +from airflowctl.api.datamodels.generated import DAGPatchBody +from airflowctl.ctl.console_formatting import AirflowConsole + + +def update_dag_state( + dag_id: str, + operation: Literal["pause", "unpause"], + api_client, + output: str, +): + """Update DAG state (pause/unpause).""" + try: + response = api_client.dags.update( + dag_id=dag_id, dag_body=DAGPatchBody(is_paused=operation == "pause") + ) + except ServerResponseError as e: + rich.print(f"[red]Error while trying to {operation} Dag {dag_id}: {e}[/red]") + sys.exit(1) + + response_dict = response.model_dump() + rich.print(f"[green]Dag {operation} successful {dag_id}[/green]") + rich.print("[green]Further Dag details:[/green]") + AirflowConsole().print_as( + data=[response_dict], + output=output, + ) + return response_dict + + +@provide_api_client(kind=ClientKind.CLI) +def pause(args, api_client=NEW_API_CLIENT) -> None: + """Pause a DAG.""" + return update_dag_state( + dag_id=args.dag_id, + operation="pause", + api_client=api_client, + output=args.output, + ) + + +@provide_api_client(kind=ClientKind.CLI) +def unpause(args, api_client=NEW_API_CLIENT) -> None: + """Unpause a DAG.""" + return update_dag_state( + dag_id=args.dag_id, + operation="unpause", + api_client=api_client, + output=args.output, + ) diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_config_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_config_command.py index cd5af73605c0a..f87a9851a5277 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_config_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_config_command.py @@ -26,7 +26,7 @@ from airflowctl.ctl.commands.config_command import ConfigChange, ConfigParameter -class TestCliConfigLint: +class TestCliConfigCommands: parser = cli_parser.get_parser() @patch("rich.print") diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py new file mode 100644 index 0000000000000..98fb318024b34 --- /dev/null +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import datetime + +import pytest + +from airflowctl.api.client import ClientKind +from airflowctl.api.datamodels.generated import DAGResponse +from airflowctl.ctl import cli_parser +from airflowctl.ctl.commands import dag_command + + +class TestDagCommands: + parser = cli_parser.get_parser() + dag_id = "test_dag" + dag_display_name = "dag_display_name" + dag_response_paused = DAGResponse( + dag_id=dag_id, + dag_display_name=dag_display_name, + is_paused=False, + last_parsed_time=datetime.datetime(2024, 12, 31, 23, 59, 59), + last_expired=datetime.datetime(2025, 1, 1, 0, 0, 0), + fileloc="fileloc", + relative_fileloc="relative_fileloc", + description="description", + timetable_summary="timetable_summary", + timetable_description="timetable_description", + tags=[], + max_active_tasks=1, + max_active_runs=1, + max_consecutive_failed_dag_runs=1, + has_task_concurrency_limits=True, + has_import_errors=True, + next_dagrun_logical_date=datetime.datetime(2025, 1, 1, 0, 0, 0), + next_dagrun_data_interval_start=datetime.datetime(2025, 1, 1, 0, 0, 0), + next_dagrun_data_interval_end=datetime.datetime(2025, 1, 1, 0, 0, 0), + next_dagrun_run_after=datetime.datetime(2025, 1, 1, 0, 0, 0), + owners=["apache-airflow"], + file_token="file_token", + bundle_name="bundle_name", + is_stale=False, + ) + + dag_response_unpaused = DAGResponse( + dag_id=dag_id, + dag_display_name=dag_display_name, + is_paused=True, + last_parsed_time=datetime.datetime(2024, 12, 31, 23, 59, 59), + last_expired=datetime.datetime(2025, 1, 1, 0, 0, 0), + fileloc="fileloc", + relative_fileloc="relative_fileloc", + description="description", + timetable_summary="timetable_summary", + timetable_description="timetable_description", + tags=[], + max_active_tasks=1, + max_active_runs=1, + max_consecutive_failed_dag_runs=1, + has_task_concurrency_limits=True, + has_import_errors=True, + next_dagrun_logical_date=datetime.datetime(2025, 1, 1, 0, 0, 0), + next_dagrun_data_interval_start=datetime.datetime(2025, 1, 1, 0, 0, 0), + next_dagrun_data_interval_end=datetime.datetime(2025, 1, 1, 0, 0, 0), + next_dagrun_run_after=datetime.datetime(2025, 1, 1, 0, 0, 0), + owners=["apache-airflow"], + file_token="file_token", + bundle_name="bundle_name", + is_stale=False, + ) + + def test_pause_dag(self, api_client_maker, monkeypatch): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}", + response_json=self.dag_response_paused.model_dump(mode="json"), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + assert self.dag_response_paused.is_paused is False + dag_response_dict = dag_command.pause( + self.parser.parse_args(["dags", "pause", "--dag-id", self.dag_id]), + api_client=api_client, + ) + assert dag_response_dict["is_paused"] is False + + def test_pause_fail(self, api_client_maker, monkeypatch): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}", + response_json={"detail": "DAG not found"}, + expected_http_status_code=404, + kind=ClientKind.CLI, + ) + with pytest.raises(SystemExit): + dag_command.pause( + self.parser.parse_args(["dags", "pause", "--dag-id", self.dag_id]), + api_client=api_client, + ) + + def test_unpause_dag(self, api_client_maker, monkeypatch): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}", + response_json=self.dag_response_unpaused.model_dump(mode="json"), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + assert self.dag_response_unpaused.is_paused is True + dag_response_dict = dag_command.unpause( + self.parser.parse_args(["dags", "unpause", "--dag-id", self.dag_id]), + api_client=api_client, + ) + assert dag_response_dict["is_paused"] is True + + def test_unpause_fail(self, api_client_maker, monkeypatch): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}", + response_json={"detail": "DAG not found"}, + expected_http_status_code=404, + kind=ClientKind.CLI, + ) + with pytest.raises(SystemExit): + dag_command.unpause( + self.parser.parse_args(["dags", "unpause", "--dag-id", self.dag_id]), + api_client=api_client, + )