Add task instance management commands to airflowctl CLI#66469
Add task instance management commands to airflowctl CLI#66469Suraj-kumar00 wants to merge 12 commits into
Conversation
…tanceOperations - Remove bare re-raise try/except from get, clear, and update methods - Fix ruff E302: add missing blank line before TaskInstanceOperations class - Fix ruff F841: remove unused variable in pool command export test - Run ruff format on operations.py, cli_config.py, test_operations.py
|
Hello @potiuk, I have raised the PR again. Could you please check now? |
…Suraj-kumar00/airflow into feat/task-instance-cli-support
|
Hello team, Can i get review on this PR and workflow to run so that we can ensure the pipeline is successfully passing all the tests? |
potiuk
left a comment
There was a problem hiding this comment.
Welcome — first contribution to airflow-ctl, nice to see! The shape of the change is right (matches how every other operations class in the file is wired) and the test coverage is well thought out — the matrix of get/list/clear/update × happy-path/edge cases is more than the existing classes have.
Six observations inline. Two of them (#1 and #2 — the over-broad return types and the tests for response shapes the API never produces) are the substantive ones; the rest are nits. None are blocking, but I'd like to see #1 + #2 before APPROVE. CI is still mid-flight (5 of 8 checks IN_PROGRESS) — I'll re-look once it lands.
This review was drafted by an AI-assisted tool and
confirmed by an Apache Airflow maintainer. The findings
below are observations, not blockers; an Apache Airflow
maintainer — a real person — will take the next look at the
PR. If you think a finding is mis-applied, please reply on
the PR and a maintainer will weigh in.More on how Apache Airflow handles maintainer review:
contributing-docs/05_pull_requests.rst.
| def _parse_task_instance_response( | ||
| self, data: dict | _list | ||
| ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: | ||
| """Parse task instance response data into appropriate models.""" | ||
| if isinstance(data, list): | ||
| return [TaskInstanceResponse.model_validate(item) for item in data] | ||
| if "task_instances" in data: | ||
| return TaskInstanceCollectionResponse.model_validate(data) | ||
| return TaskInstanceResponse.model_validate(data) | ||
|
|
||
| def get( | ||
| self, dag_id: str, dag_run_id: str, task_id: str | ||
| ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: | ||
| """Get a task instance.""" | ||
| self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") | ||
| return self._parse_task_instance_response(self.response.json()) | ||
|
|
||
| def list(self, dag_id: str, dag_run_id: str) -> TaskInstanceCollectionResponse | ServerResponseError: | ||
| """List task instances.""" | ||
| return super().execute_list( | ||
| path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances", | ||
| data_model=TaskInstanceCollectionResponse, | ||
| ) | ||
|
|
||
| def clear( | ||
| self, dag_id: str, body: ClearTaskInstancesBody | ||
| ) -> TaskInstanceCollectionResponse | ServerResponseError: | ||
| """Clear task instances.""" | ||
| self.response = self.client.post( | ||
| f"dags/{dag_id}/clearTaskInstances", | ||
| json=body.model_dump(mode="json", exclude_unset=True), | ||
| ) | ||
| return TaskInstanceCollectionResponse.model_validate_json(self.response.content) | ||
|
|
||
| def update( | ||
| self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody | ||
| ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: | ||
| """Update a task instance.""" | ||
| self.response = self.client.patch( | ||
| f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", | ||
| json=body.model_dump(mode="json", exclude_unset=True), | ||
| ) | ||
| return self._parse_task_instance_response(self.response.json()) |
There was a problem hiding this comment.
Major — return types are wider than the API contract. I checked the OpenAPI spec and the route implementations to be sure:
GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}always returns oneTaskInstanceResponse(task_instances.py:get_task_instance).PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}always returns aTaskInstanceCollectionResponse— the implementation wraps results in a collection regardless of whethermap_indexis supplied (task_instances.py:1226-1234).POST /dags/{dag_id}/clearTaskInstancesalways returnsTaskInstanceCollectionResponse.
None of these endpoints ever returns a bare list, so _parse_task_instance_response's isinstance(data, list) branch is dead, and the union return types in get() and update() force every caller to type-narrow against shapes that won't appear.
Matching the rest of this file (e.g. ConnectionsOperations.get → ConnectionResponse, PoolsOperations.list → PoolCollectionResponse), I'd commit to one concrete type per method:
def get(self, dag_id: str, dag_run_id: str, task_id: str) -> TaskInstanceResponse:
self.response = self.client.get(
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
)
return TaskInstanceResponse.model_validate_json(self.response.content)
def update(
self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody
) -> TaskInstanceCollectionResponse:
self.response = self.client.patch(
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}",
json=body.model_dump(mode="json", exclude_unset=True),
)
return TaskInstanceCollectionResponse.model_validate_json(self.response.content)…and drop _parse_task_instance_response entirely. The code becomes shorter, the types match the API contract, and CLI consumers don't have to introspect the result. Pairs with finding #2 (the tests for the dead branches).
| ) | ||
| assert response == self.task_instance_response | ||
|
|
||
| def test_get_list(self): |
There was a problem hiding this comment.
Major — tests for response shapes the API never produces. test_get_list (line 1909), test_get_collection (line 1926), test_update_list (line 2051), and test_update_collection (line 2070) mock the server returning shapes the actual REST API never emits (per the spec/implementation analysis in finding #1).
These contribute no real coverage — they test that _parse_task_instance_response does what _parse_task_instance_response does. Worse, they'd hide a regression: if the dispatcher ever started returning the wrong type, these tests would pass against synthetic input while the real API path broke.
Once #1 lands (single concrete return type per method), drop these four tests. Keep the canonical happy-path tests:
test_get(returnsTaskInstanceResponse)test_list(returnsTaskInstanceCollectionResponse)test_clearandtest_clear_with_options(returnTaskInstanceCollectionResponse)test_updateandtest_update_with_note(returnTaskInstanceCollectionResponse)
That's still 6 unit tests covering all four endpoints with body-variant coverage on clear and update. Plenty for a CLI-facing operations class.
|
|
||
| # Type alias used inside classes that define a ``list()`` method, which | ||
| # shadows the builtin ``list`` and confuses mypy when used in annotations. | ||
| _list = list |
There was a problem hiding this comment.
Minor. This works, but the bare _list = list at module scope reads as a typo at first glance (no import, no immediately-obvious purpose).
The file already has from __future__ import annotations at the top, so annotations are strings at parse time and runtime shadowing of list by def list(self, ...) shouldn't reach mypy. If mypy is genuinely confused in practice, from builtins import list as _list communicates intent more clearly to a reader.
And if findings #1+#2 land — none of the kept methods would have list[...] in their signatures, so the alias may be unnecessary entirely. Worth re-checking once those are applied.
| def _is_list_annotation(annotation: Any) -> bool: | ||
| """ | ||
| Check whether a Pydantic field annotation is a list type. | ||
|
|
||
| Handles ``Annotated[list[...] | None, ...]`` and similar wrapped forms | ||
| that ``typing.get_origin`` alone cannot detect. | ||
| """ | ||
| origin = typing.get_origin(annotation) | ||
|
|
||
| # Direct list[...] | ||
| if origin is list: | ||
| return True | ||
|
|
||
| # Unwrap Annotated[X, ...] | ||
| if origin is typing.Annotated: | ||
| inner = typing.get_args(annotation)[0] | ||
| return _is_list_annotation(inner) | ||
|
|
||
| # Unwrap Union / X | None | ||
| if origin is typing.Union: | ||
| return any(_is_list_annotation(arg) for arg in typing.get_args(annotation) if arg is not type(None)) |
There was a problem hiding this comment.
Minor — possibly dead branch. Per Python typing docs, typing.get_origin(Annotated[X, ...]) returns get_origin(X) (i.e. the Annotated marker is transparent to introspection). Additionally, Pydantic v2's FieldInfo.annotation strips the Annotated wrapper before storing, so by the time this helper runs against model.model_fields[name].annotation, the value is already list[str] | None (no Annotated).
In that case get_origin returns Union directly, the function recurses into the union branch, and finds list. The explicit if origin is typing.Annotated: check on line 71 likely never fires.
Worth either:
- A unit test that asserts
_is_list_annotation(Annotated[list[str], 'x'])returnsTrueto lock in the behaviour and confirm the branch is needed. - Or trim the
Annotatedbranch and rely onget_origin's built-in transparency.
(If you've observed a case where it WAS needed in practice, a one-line comment naming that case would help future readers understand why the branch is there.)
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import re |
There was a problem hiding this comment.
Minor — scope creep. The changes in this file (import re move + ANSI-strip / line-wrap-tolerant assertion in test_export_json_to_file) are unrelated to the taskinstance commands this PR adds. They look correct in isolation, but they belong in a separate cleanup PR.
Project convention is to keep PRs focused on one logical change — see 05_pull_requests.rst. If these changes were needed to make CI green on top of this branch (e.g. a flaky test was blocking your PR), call it out in the PR body so a reviewer doesn't wonder why pool tests are touched in a taskinstance PR.
Not a hard blocker — happy to wave it through if you call it out — but next time, splitting these would be cleaner.
| main:27a22c00dcf32e7a1a4f06672dc8e3c8 | ||
| assets:70619a2d92bda80930cde2aefcd8e1cd | ||
| main:df0fbf2487ad50774d706a96d76f5c70 | ||
| assets:b3ae2b933e54528bf486ff28e887804d |
There was a problem hiding this comment.
Question / nit. Adding the taskinstance group to the help should only change the main: hash on line 1 (the top-level help gains a new entry). The assets: hash also changing on line 2 is suspicious — there's no taskinstance-related change visible to the assets help.
Likely either:
- A regeneration artifact (the regenerator processed the whole tree and picked up unrelated drift somewhere upstream of you), or
- An accidental hash update from a regeneration that wasn't deterministic.
Worth re-running the hash regeneration on a clean checkout of main, then on top of your branch, and confirming only main differs.
potiuk
left a comment
There was a problem hiding this comment.
Switching to REQUEST_CHANGES based on a second-read pass through Codex (adversarial reviewer). It found two [high] correctness issues I missed in the primary review — both involve destructive operations on mapped task instances silently affecting more rows than the operator asked for. These need to be addressed before this can land.
Both findings are about the CLI not faithfully expressing the API's mapped-TI semantics:
clear --task-idsflattens nested[task_id, map_index]pairs into a comma-list of strings, so a request meant to target one mapped index becomes a request that clears every map index of that task plus a separately-typed task id of0.updatehas nomap_indexparameter and routes to the unindexed endpoint, which the server interprets as "update every map index" (set_task_instance_statetreatsmap_indexes=Noneas bulk).
These are operator-facing destructive paths. Even with a careful operator, the CLI today can blow up more rows than they meant to touch. Cross-checked both findings against the airflow-core route implementations (the patch_task_instance_by_map_index endpoint, the _patch_ti_validate_request semantics, the ClearTaskInstancesBody.task_ids schema) — Codex's analysis holds.
My primary review's findings #1 (return-type narrowing) and #2 (dead-branch tests) still stand, but those are tightening; these two are correctness.
This review was drafted by an AI-assisted tool and
confirmed by an Apache Airflow maintainer. After you've
addressed the points above and pushed an update, an Apache
Airflow maintainer — a real person — will take the next look
at the PR. The findings cite the project's review criteria;
if you think one of them is mis-applied, please reply on the
PR and a maintainer will weigh in.More on how Apache Airflow handles maintainer review:
contributing-docs/05_pull_requests.rst.
| if _is_list_annotation( | ||
| datamodel.model_fields[expanded_parameter].annotation | ||
| ): | ||
| val = [v.strip() for v in val.split(",") if v.strip()] |
There was a problem hiding this comment.
Flagged by the adversarial reviewer (Codex). Cross-checked — the mapped-clear path is genuinely destructive.
[high] CLI cannot express mapped task identifiers for clear
ClearTaskInstancesBody.task_idssupports entries shaped as eithertask_idor[task_id, map_index], and the API tests cover mapped clears with nested lists. The CLI conversion added here treats any list-typed field as a comma-separated flat string list, so an input intended to target['times_2', 0]becomes['times_2', '0']. That is a different request: it targets every map index fortimes_2and also a separate task id0, or fails depending on the Dag contents. This is a destructive operation, so an operator trying to clear one mapped task instance can clear more task instances than intended.Recommendation: Add explicit parsing for
ClearTaskInstancesBody.task_idsthat preserves nested[task_id, map_index]pairs, with CLI tests covering single mapped index, multiple mapped indexes, and plain task ids before wiring the clear command as supported.
For reference, the task_ids field in ClearTaskInstancesBody is typed roughly as list[str | tuple[str, int]] | None — the _is_list_annotation helper added in this PR doesn't model nested structure at all, only "is this annotation list-shaped". A targeted fix likely needs a per-field hook (or per-Pydantic-field-validator hook) rather than a generic comma-splitter.
If supporting mapped-index task_ids is out of scope for this PR, an acceptable alternative is to reject any --task-ids input that looks like it might want a map index (e.g. dotted form, or any form other than plain identifiers) and document the limitation, then add the nested-pair support in a follow-up. Either way, today's behaviour — silently flattening — is not safe for a destructive command.
| def update( | ||
| self, dag_id: str, dag_run_id: str, task_id: str, body: PatchTaskInstanceBody | ||
| ) -> TaskInstanceResponse | _list[TaskInstanceResponse] | TaskInstanceCollectionResponse: | ||
| """Update a task instance.""" | ||
| self.response = self.client.patch( | ||
| f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}", | ||
| json=body.model_dump(mode="json", exclude_unset=True), |
There was a problem hiding this comment.
Flagged by the adversarial reviewer (Codex). Cross-checked — the unindexed-PATCH path is documented to update all map indexes.
[high] update has no map_index path and can mutate all mapped indexes
TaskInstanceOperations.update()only calls/taskInstances/{task_id}and exposes nomap_indexargument, while the server has a distinct/{task_id}/{map_index}route for indexed updates. On the unindexed route, the server passesmap_index=None;SerializedDAG.set_task_instance_state()documents thatmap_indexes=Nonesets all mapped task instances. So the CLI cannot intentionally update one mapped task instance, and a state update for a mapped task id goes through the all-index path. That can turn a targeted manual state correction into a bulk state mutation.Recommendation: Add a
map_index: int | Noneparameter and route to/taskInstances/{task_id}/{map_index}when provided; add CLI/API tests proving mapped updates affect only the selected map index and that unindexed mapped updates are either explicit bulk behavior or rejected.
Concretely:
def update(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
body: PatchTaskInstanceBody,
map_index: int | None = None,
) -> TaskInstanceCollectionResponse:
if map_index is not None:
path = f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}"
else:
path = f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
self.response = self.client.patch(
path, json=body.model_dump(mode="json", exclude_unset=True)
)
return TaskInstanceCollectionResponse.model_validate_json(self.response.content)The CLI side then needs --map-index exposed as an argparse arg on the taskinstance update command, and the integration test in test_airflowctl_commands.py should add a parametrised case showing mapped-update scoped to a single index.
This pairs with my primary-review finding #1 (return-type narrowing): once update is the indexed/unindexed pair, the return-type narrowing decision can apply per branch (indexed → single TI? unindexed → collection).
|
Hi @potiuk, I'll be fixing this soon. |
|
@Suraj-kumar00 Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.
See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting |
Adds Task Instance management support to the airflowctl CLI, enabling users to get, list, clear, and update task instances directly from the terminal.
Closes: #61547
Related: #66173
Changes
airflowctl/api/operations.py
TaskInstanceOperationsclass with get, list, clear, update methods_parse_task_instance_responsehelper to handle the different response shapes (single instance, list, or collection) from the APIPluginsOperationsclass that was lost during merge with main (Added plugins command to airflowctl #64935)airflowctl/api/client.py
task_instancesproperty to the Client for auto-generated CLI command wiringpluginsproperty that was lost during merge with mainairflowctl/ctl/cli_config.py
"clear"tooutput_command_listfor correct CLI output handling_is_list_annotation()helper that properly unwrapsAnnotated[list[...] | None, Field(...)]to convert comma-separated CLI inputs into lists when the Pydantic model expects a list field(e.g.
--task-ids t1,t2 → ["t1", "t2"])_parse_task_instance_responsefrom auto-generated CLI commandstests/airflow_ctl/api/test_operations.py
TestTaskInstanceOperationswith 10 unit tests covering all response shapes for get, list, clear, and updateTestPluginsOperationstests lost during mergetests/airflow_ctl/ctl/commands/test_pool_command.py
import reto module scopeairflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py
New CLI Commands
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.