Skip to content

Feature/aip 94 airflowctl tasks operations#66318

Open
awadhesh14 wants to merge 6 commits into
apache:mainfrom
awadhesh14:feature/aip-94-airflowctl-tasks-operations
Open

Feature/aip 94 airflowctl tasks operations#66318
awadhesh14 wants to merge 6 commits into
apache:mainfrom
awadhesh14:feature/aip-94-airflowctl-tasks-operations

Conversation

@awadhesh14
Copy link
Copy Markdown

@awadhesh14 awadhesh14 commented May 3, 2026

Add airflowctl API operations for tasks and task instances.

This adds support for:

  • listing tasks for a DAG
  • listing task instances for a DAG run
  • fetching a single task instance
  • fetching task instance dependencies
  • clearing task instances for a DAG run

It also wires the new operations into the API client and adds unit tests using httpx.MockTransport with the existing make_api_client helper.


Closes #66173

Was generative AI tooling used to co-author this PR?
  • Yes — Codex (GPT-5)

Generated-by: Codex (GPT-5) following the guidelines

awadhesh14 added 2 commits May 3, 2026 15:25
- Add TasksOperations.list (GET /dags/{dag_id}/tasks)
- Add TaskInstancesOperations:
  - list (GET /dags/{dag_id}/dagRuns/{run_id}/taskInstances)
  - get (GET /dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id})
  - get_dependencies (GET .../dependencies)
  - clear (POST /dags/{dag_id}/clearTaskInstances)

Operations return API responses directly to support CLI auto-generation.
Validated via REST and airflowctl; clear triggers task re-execution.
Note: CLI output formatting error for `clear` is pre-existing and out of scope.
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented May 3, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example Dag that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Copy Markdown
Contributor

@justinpakzad justinpakzad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left a couple of comments.

Comment on lines +765 to +774
class TasksOperations(BaseOperations):
"""Tasks operations."""

def list(self, dag_id: str):
"""List tasks for a DAG."""
self.response = self.client.get(f"/dags/{dag_id}/tasks")
return self.response.json()


class TaskInstancesOperations(BaseOperations):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These appear to be duplicated. You already defined the TasksOperations() and TaskInstancesOperations() classes above (723-762).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed the duplicate code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any new commands should probably be added to the integration tests which live in the airflow-ctl-tests/ directory. Also, this will fail the pre-commit checks since help texts are required (can be found in the help_texts.yaml file) for any new commands added to the operations file.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests and help_text.

Comment on lines +726 to +729
def list(self, dag_id: str):
"""List tasks for a DAG."""
self.response = self.client.get(f"/dags/{dag_id}/tasks")
return self.response.json()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the list commands, I think we should follow a similar pattern to the rest of the operations where we do something like: return super().execute_list()

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Comment on lines +744 to +747
self.response = self.client.get(
f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
)
return self.response.json()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should follow the same pattern as the other operations. Return validated Pydantic models instead of raw .json() and wrap each call in a try/except catching ServerResponseError.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Changed.

@potiuk
Copy link
Copy Markdown
Member

potiuk commented May 18, 2026

@awadhesh14 A few things need addressing before review — see our Pull Request quality criteria.

Issues found:

  • Pre-commit / static checks: CI image checks / Static checks is failing. Run prek run --from-ref main --stage pre-commit locally and fix anything that flags. See the static-checks docs.
  • ⚠️ Peer review feedback from @justinpakzad: there are several open threads with specific suggestions (duplicated operations, integration-test pattern, list-command pattern). Please respond in-thread.

What to do next:

  • Push a fix for the static-check failure.
  • Engage with @justinpakzad's review threads.

No rush — take your time. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

def list(self, dag_id: str) -> TaskCollectionResponse | ServerResponseError:
"""List tasks for a DAG."""
try:
self.response = self.client.get(f"/dags/{dag_id}/tasks")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we aren't using super().execute_list here as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[AIP-94] airflowctl tasks: add TasksOperations and TaskInstancesOperations to operations.py

3 participants