diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index b8b49ad3ae778..8625223e9cc2d 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -126,6 +126,8 @@ def date_param(): # Plugins command "plugins list", "plugins list-import-errors", + # Tasks commands + "tasks clear example_bash_operator", ] NO_AUTH_TEST_COMMANDS = [ diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 8e590f3b820cd..e94c5e1f0cdf2 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -12,4 +12,5 @@ providers:34502fe09dc0b8b0a13e7e46efdffda6 variables:f8fc76d3d398b2780f4e97f7cd816646 version:31f4efdf8de0dbaaa4fac71ff7efecc3 plugins:4864fd8f356704bd2b3cd1aec3567e35 +tasks:8ca7306be97d1c8788dbfbe4b0f8bf61 auth login:9fe2bb1dd5c602beea2eefb33a2b20a8 diff --git a/airflow-ctl/docs/images/output_tasks.svg b/airflow-ctl/docs/images/output_tasks.svg new file mode 100644 index 0000000000000..d8e5cda021e26 --- /dev/null +++ b/airflow-ctl/docs/images/output_tasks.svg @@ -0,0 +1,98 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Usage: airflowctl tasks [-h] COMMAND ... + +Perform Tasks operations + +Positional Arguments: +  COMMAND +    clear     Clear task instances + +Options: +  -h, --help  show this help message and exit + + + + + diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index b01200fac1c7f..1cfecb3acf5f0 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -57,6 +57,7 @@ PoolsOperations, ProvidersOperations, ServerResponseError, + TasksOperations, VariablesOperations, VersionOperations, XComOperations, @@ -473,6 +474,12 @@ def plugins(self): """Operations related to plugins.""" return PluginsOperations(self) + @lru_cache() # type: ignore[prop-decorator] + @property + def tasks(self): + """Operations related to tasks.""" + return TasksOperations(self) + # API Client Decorator for CLI Actions @contextlib.contextmanager diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index e250b66e127dd..0c1546dcabcfb 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -39,6 +39,7 @@ BulkBodyPoolBody, BulkBodyVariableBody, BulkResponse, + ClearTaskInstancesBody, Config, ConnectionBody, ConnectionCollectionResponse, @@ -68,6 +69,7 @@ ProviderCollectionResponse, QueuedEventCollectionResponse, QueuedEventResponse, + TaskInstanceCollectionResponse, TriggerDAGRunPostBody, VariableBody, VariableCollectionResponse, @@ -646,6 +648,21 @@ def list( raise e +class TasksOperations(BaseOperations): + """Tasks operations.""" + + def clear( + self, dag_id: str, body: ClearTaskInstancesBody + ) -> TaskInstanceCollectionResponse | ServerResponseError: + """Clear task instances.""" + try: + payload = body.model_dump(exclude_unset=True, by_alias=True) + self.response = self.client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) + return TaskInstanceCollectionResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e + + class JobsOperations(BaseOperations): """Job operations.""" diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 1d3c30121b619..f55433c818f4a 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -394,7 +394,7 @@ def __init__(self, file_path: str | Path | None = None): # Exclude parameters that are not needed for CLI from datamodels self.excluded_parameters = ["schema_"] # This list is used to determine if the command/operation needs to output data - self.output_command_list = ["list", "get", "create", "delete", "update", "trigger", "add", "edit"] + self.output_command_list = ["list", "get", "create", "delete", "update", "trigger", "add", "edit", "clear"] self.exclude_operation_names = ["LoginOperations", "VersionOperations", "BaseOperations"] self.exclude_method_names = [ "error", diff --git a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml index eb566a96b1fb8..26fa7d808f0a5 100644 --- a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml +++ b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml @@ -100,3 +100,7 @@ xcom: plugins: list: "List all installed Airflow plugins" list-import-errors: "List all plugin import errors" + +tasks: + clear: "Clear task instances" + diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 52faecee73ea0..58736512e3b97 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -1140,6 +1140,27 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert response == self.dag_run_response +class TestTasksOperations: + dag_id = "dag_id" + task_instance_response = Mock() + + def test_clear(self): + from airflowctl.api.datamodels.generated import ClearTaskInstancesBody, TaskInstanceCollectionResponse + + clear_body = ClearTaskInstancesBody(dry_run=True, task_ids=["task_1", "task_2"]) + expected_response = TaskInstanceCollectionResponse(task_instances=[], total_entries=0) + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == f"/api/v2/dags/{self.dag_id}/clearTaskInstances" + # Ensure body is correctly serialized + assert json.loads(request.content.decode()) == clear_body.model_dump(exclude_unset=True, by_alias=True) + return httpx.Response(200, json=json.loads(expected_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.tasks.clear(dag_id=self.dag_id, body=clear_body) + assert response == expected_response + + class TestDagRunOperations: dag_id = "dag_id" dag_run_id = "dag_run_id" diff --git a/scripts/in_container/run_capture_airflowctl_help.py b/scripts/in_container/run_capture_airflowctl_help.py index 9529dbe04390c..00001dc97048d 100644 --- a/scripts/in_container/run_capture_airflowctl_help.py +++ b/scripts/in_container/run_capture_airflowctl_help.py @@ -48,6 +48,7 @@ "variables", "version", "plugins", + "tasks", ] SUBCOMMANDS = [