diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 8e590f3b820cd..594622a7bcdca 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,4 +1,4 @@ -main:27a22c00dcf32e7a1a4f06672dc8e3c8 +main:09b3e026597a90df1db2583eb7ea7e58 assets:70619a2d92bda80930cde2aefcd8e1cd auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 @@ -9,6 +9,7 @@ dagrun:c32e0011aa9a845456c778786717208e jobs:a5b644c5da8889443bb40ee10b599270 pools:19efe105b9515ab1926ebcaf0e028d71 providers:34502fe09dc0b8b0a13e7e46efdffda6 +tasks:1289c6a1f20632e1285dc5a003e46134 variables:f8fc76d3d398b2780f4e97f7cd816646 version:31f4efdf8de0dbaaa4fac71ff7efecc3 plugins:4864fd8f356704bd2b3cd1aec3567e35 diff --git a/airflow-ctl/docs/images/output_main.svg b/airflow-ctl/docs/images/output_main.svg index f586877bce8eb..1d34c6a68002d 100644 --- a/airflow-ctl/docs/images/output_main.svg +++ b/airflow-ctl/docs/images/output_main.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + - + - + - - Usage:airflowctl [-hGROUP_OR_COMMAND... - -Positional Arguments: -GROUP_OR_COMMAND - -    Groups -assetsPerform Assets operations -authManage authentication for CLI. Either pass token from -environment variable/parameter or pass username and -password. -backfillPerform Backfill operations -configPerform Config operations -connectionsPerform Connections operations -dagrunPerform DagRun operations -dagsPerform Dags operations -jobsPerform Jobs operations -pluginsPerform Plugins operations -poolsPerform Pools operations -providersPerform Providers operations -variablesPerform Variables operations -xcomPerform XCom operations - -    Commands: -versionShow version information - -Options: --h--helpshow this help message and exit + + Usage:airflowctl [-hGROUP_OR_COMMAND... + +Positional Arguments: +GROUP_OR_COMMAND + +    Groups +assetsPerform Assets operations +authManage authentication for CLI. Either pass token from +environment variable/parameter or pass username and +password. +backfillPerform Backfill operations +configPerform Config operations +connectionsPerform Connections operations +dagrunPerform DagRun operations +dagsPerform Dags operations +jobsPerform Jobs operations +pluginsPerform Plugins operations +poolsPerform Pools operations +providersPerform Providers operations +tasksManage Airflow tasks +variablesPerform Variables operations +xcomPerform XCom operations + +    Commands: +versionShow version information + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/docs/images/output_tasks.svg b/airflow-ctl/docs/images/output_tasks.svg new file mode 100644 index 0000000000000..8fc18a902b320 --- /dev/null +++ b/airflow-ctl/docs/images/output_tasks.svg @@ -0,0 +1,97 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Usage:airflowctl tasks [-hCOMMAND... + +Manage Airflow tasks + +Positional Arguments: +COMMAND +clearClear task instances for a Dag + +Options: +-h--helpshow this help message and exit + + + + diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index b01200fac1c7f..b72e253e3b4a2 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -57,6 +57,8 @@ PoolsOperations, ProvidersOperations, ServerResponseError, + TaskInstancesOperations, + TasksOperations, VariablesOperations, VersionOperations, XComOperations, @@ -473,6 +475,18 @@ def plugins(self): """Operations related to plugins.""" return PluginsOperations(self) + @lru_cache() # type: ignore[prop-decorator] + @property + def tasks(self): + """Operations related to tasks.""" + return TasksOperations(self) + + @lru_cache() # type: ignore[prop-decorator] + @property + def task_instances(self): + """Operations related to task instances.""" + return TaskInstancesOperations(self) + # API Client Decorator for CLI Actions @contextlib.contextmanager diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index f52ba055c1c72..8d73f18ee86d3 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -39,6 +39,7 @@ BulkBodyPoolBody, BulkBodyVariableBody, BulkResponse, + ClearTaskInstancesBody, Config, ConnectionBody, ConnectionCollectionResponse, @@ -68,6 +69,10 @@ ProviderCollectionResponse, QueuedEventCollectionResponse, QueuedEventResponse, + TaskCollectionResponse, + TaskInstanceCollectionResponse, + TaskInstanceResponse, + TaskResponse, TriggerDAGRunPostBody, VariableBody, VariableCollectionResponse, @@ -920,3 +925,56 @@ def list_import_errors(self) -> PluginImportErrorCollectionResponse | ServerResp return PluginImportErrorCollectionResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e + + +class TasksOperations(BaseOperations): + """Tasks operations.""" + + def get(self, dag_id: str, task_id: str) -> TaskResponse | ServerResponseError: + """Get a task.""" + try: + self.response = self.client.get(f"dags/{dag_id}/tasks/{task_id}") + return TaskResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e + + def list(self, dag_id: str, order_by: str | None = None) -> TaskCollectionResponse | ServerResponseError: + """List tasks for a Dag.""" + params: dict[str, Any] = {} + if order_by is not None: + params["order_by"] = order_by + return super().execute_list( + path=f"dags/{dag_id}/tasks", data_model=TaskCollectionResponse, params=params + ) + + +class TaskInstancesOperations(BaseOperations): + """Task instances operations.""" + + def get(self, dag_id: str, dag_run_id: str, task_id: str) -> TaskInstanceResponse | ServerResponseError: + """Get a task instance.""" + try: + self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") + return TaskInstanceResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e + + def list(self, dag_id: str, dag_run_id: str) -> TaskInstanceCollectionResponse | ServerResponseError: + """List task instances for a Dag run.""" + return super().execute_list( + path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances", + data_model=TaskInstanceCollectionResponse, + ) + + def clear( + self, dag_id: str, clear_task_instances_body: ClearTaskInstancesBody + ) -> TaskInstanceCollectionResponse | ServerResponseError: + """Clear task instances for a Dag.""" + try: + self.response = self.client.post( + f"dags/{dag_id}/clearTaskInstances", + json=clear_task_instances_body.model_dump(mode="json", exclude_unset=True), + ) + return TaskInstanceCollectionResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 1d3c30121b619..bde8d05c393ec 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -268,6 +268,27 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]: help="The Dag ID of the Dag to pause or unpause", ) +# Task Commands Args +ARG_TASK_ID = Arg( + flags=("task_id",), + type=str, + help="The task ID", +) +ARG_START_DATE = Arg( + flags=("--start-date",), + type=str, + default=None, + dest="start_date", + help="The start date (ISO format) for clearing task instances", +) +ARG_END_DATE = Arg( + flags=("--end-date",), + type=str, + default=None, + dest="end_date", + help="The end date (ISO format) for clearing task instances", +) + ARG_ACTION_ON_EXISTING_KEY = Arg( flags=("-a", "--action-on-existing-key"), type=str, @@ -395,7 +416,13 @@ def __init__(self, file_path: str | Path | None = None): self.excluded_parameters = ["schema_"] # This list is used to determine if the command/operation needs to output data self.output_command_list = ["list", "get", "create", "delete", "update", "trigger", "add", "edit"] - self.exclude_operation_names = ["LoginOperations", "VersionOperations", "BaseOperations"] + self.exclude_operation_names = [ + "LoginOperations", + "VersionOperations", + "BaseOperations", + "TasksOperations", + "TaskInstancesOperations", + ] self.exclude_method_names = [ "error", "__init__", @@ -1006,6 +1033,21 @@ def merge_commands( ), ) +TASK_COMMANDS = ( + ActionCommand( + name="clear", + help="Clear task instances for a Dag", + func=lazy_load_command("airflowctl.ctl.commands.task_command.clear"), + args=( + ARG_DAG_ID, + ARG_TASK_ID, + ARG_START_DATE, + ARG_END_DATE, + ARG_OUTPUT, + ), + ), +) + core_commands: list[CLICommand] = [ GroupCommand( name="auth", @@ -1048,6 +1090,11 @@ def merge_commands( help="Manage Airflow variables", subcommands=VARIABLE_COMMANDS, ), + GroupCommand( + name="tasks", + help="Manage Airflow tasks", + subcommands=TASK_COMMANDS, + ), ] # Add generated group commands core_commands = merge_commands( diff --git a/airflow-ctl/src/airflowctl/ctl/commands/task_command.py b/airflow-ctl/src/airflowctl/ctl/commands/task_command.py new file mode 100644 index 0000000000000..85d893fe55526 --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/commands/task_command.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import datetime +import sys + +import rich + +from airflowctl.api.client import NEW_API_CLIENT, ClientKind, ServerResponseError, provide_api_client +from airflowctl.api.datamodels.generated import ClearTaskInstancesBody +from airflowctl.ctl.console_formatting import AirflowConsole + + +@provide_api_client(kind=ClientKind.CLI) +def clear(args, api_client=NEW_API_CLIENT) -> None: + """Clear task instances for a Dag.""" + start_date = _parse_date(args.start_date) if args.start_date else None + end_date = _parse_date(args.end_date) if args.end_date else None + + body = ClearTaskInstancesBody( + dry_run=False, + task_ids=[args.task_id], + start_date=start_date, + end_date=end_date, + ) + + try: + response = api_client.task_instances.clear(dag_id=args.dag_id, clear_task_instances_body=body) + except ServerResponseError as e: + rich.print(f"[red]Error clearing task instances: {e}[/red]") + sys.exit(1) + + response_list = [ti.model_dump() for ti in (response.task_instances or [])] + rich.print( + f"[green]Cleared {len(response_list)} task instance(s) " + f"for dag {args.dag_id}, task {args.task_id}[/green]" + ) + AirflowConsole().print_as(data=response_list, output=args.output) + + +def _parse_date(value: str) -> datetime.datetime | None: + """Parse a date string into a datetime object.""" + if not value: + return None + try: + return datetime.datetime.fromisoformat(value) + except ValueError: + rich.print(f"[red]Invalid date format: {value}. Use ISO format (e.g. 2026-01-01T00:00:00).[/red]") + sys.exit(1) diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py new file mode 100644 index 0000000000000..f99ca2ebf7318 --- /dev/null +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflowctl.api.client import ClientKind +from airflowctl.ctl import cli_parser +from airflowctl.ctl.commands import task_command + + +class TestTaskCommands: + parser = cli_parser.get_parser() + dag_id = "test_dag" + task_id = "test_task" + + def _make_clear_response(self): + """Build a minimal response payload for the clear endpoint.""" + return { + "task_instances": [], + "total_entries": 0, + } + + def test_clear_success(self, api_client_maker): + response_json = self._make_clear_response() + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}/clearTaskInstances", + response_json=response_json, + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + with mock.patch("airflowctl.ctl.commands.task_command.AirflowConsole") as mock_console_cls: + task_command.clear( + self.parser.parse_args(["tasks", "clear", self.dag_id, self.task_id]), + api_client=api_client, + ) + + mock_console_cls.return_value.print_as.assert_called_once_with(data=[], output="json") + + def test_clear_with_dates(self, api_client_maker): + response_json = self._make_clear_response() + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}/clearTaskInstances", + response_json=response_json, + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + with mock.patch("airflowctl.ctl.commands.task_command.AirflowConsole") as mock_console_cls: + task_command.clear( + self.parser.parse_args( + [ + "tasks", + "clear", + self.dag_id, + self.task_id, + "--start-date", + "2026-01-01T00:00:00", + "--end-date", + "2026-01-02T00:00:00", + ] + ), + api_client=api_client, + ) + + mock_console_cls.return_value.print_as.assert_called_once_with(data=[], output="json") + + def test_clear_fail(self, api_client_maker): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}/clearTaskInstances", + response_json={"detail": "DAG not found"}, + expected_http_status_code=404, + kind=ClientKind.CLI, + ) + with pytest.raises(SystemExit): + task_command.clear( + self.parser.parse_args(["tasks", "clear", self.dag_id, self.task_id]), + api_client=api_client, + ) diff --git a/scripts/ci/prek/check_airflowctl_command_coverage.py b/scripts/ci/prek/check_airflowctl_command_coverage.py index 6ee54d369bb7d..f8d61e45be4c3 100755 --- a/scripts/ci/prek/check_airflowctl_command_coverage.py +++ b/scripts/ci/prek/check_airflowctl_command_coverage.py @@ -39,7 +39,13 @@ ) # Operations excluded from CLI (see cli_config.py) -EXCLUDED_OPERATION_CLASSES = {"BaseOperations", "LoginOperations", "VersionOperations"} +EXCLUDED_OPERATION_CLASSES = { + "BaseOperations", + "LoginOperations", + "TaskInstancesOperations", + "TasksOperations", + "VersionOperations", +} EXCLUDED_METHODS = { "__init__", "__init_subclass__", diff --git a/scripts/ci/prek/check_airflowctl_help_texts.py b/scripts/ci/prek/check_airflowctl_help_texts.py index 8f31e092d88a2..500bc83e5307a 100755 --- a/scripts/ci/prek/check_airflowctl_help_texts.py +++ b/scripts/ci/prek/check_airflowctl_help_texts.py @@ -37,7 +37,13 @@ OPERATIONS_FILE = AIRFLOW_ROOT_PATH / "airflow-ctl" / "src" / "airflowctl" / "api" / "operations.py" HELP_TEXTS_FILE = AIRFLOW_ROOT_PATH / "airflow-ctl" / "src" / "airflowctl" / "ctl" / "help_texts.yaml" # Operations excluded from CLI (see cli_config.py) -EXCLUDED_OPERATION_CLASSES = {"BaseOperations", "LoginOperations", "VersionOperations"} +EXCLUDED_OPERATION_CLASSES = { + "BaseOperations", + "LoginOperations", + "TaskInstancesOperations", + "TasksOperations", + "VersionOperations", +} EXCLUDED_METHODS = { "__init__", "__init_subclass__", diff --git a/scripts/in_container/run_capture_airflowctl_help.py b/scripts/in_container/run_capture_airflowctl_help.py index 9529dbe04390c..4944d85a7ea0f 100644 --- a/scripts/in_container/run_capture_airflowctl_help.py +++ b/scripts/in_container/run_capture_airflowctl_help.py @@ -45,6 +45,7 @@ "jobs", "pools", "providers", + "tasks", "variables", "version", "plugins",