Skip to content

Add task-level circuit breaker to pause failing tasks#67724

Draft
bennyc0de wants to merge 4 commits into
apache:mainfrom
bennyc0de:task-circuit-breaker
Draft

Add task-level circuit breaker to pause failing tasks#67724
bennyc0de wants to merge 4 commits into
apache:mainfrom
bennyc0de:task-circuit-breaker

Conversation

@bennyc0de
Copy link
Copy Markdown
Contributor

@bennyc0de bennyc0de commented May 29, 2026

Operators declare a failure budget: max failures within a rolling time window. When the threshold is hit the circuit opens and future scheduled instances are skipped until reset via the REST API or after a configurable cooldown period.

New operator params (all optional, backward-compatible):
circuit_breaker_max_failures: int | None
circuit_breaker_window: timedelta | None
circuit_breaker_reset_delay: timedelta | None

New TaskCircuitBreaker model tracks one row per (dag_id, task_id). Alembic migration adds the table. Execution API records failures. Scheduler excludes open-circuit tasks before the QUEUED transition. Heartbeat auto-resets expired circuits every 60s. Three REST endpoints expose and reset circuit state per task and per Dag.


Was generative AI tooling used to co-author this PR?
  • [X ] Yes (please specify the tool below)

Generated-by: [Claude Sonnet 4.6] following the guidelines

Operators declare a failure budget: max failures within a rolling
time window. When the threshold is hit the circuit opens and future
scheduled instances are skipped until reset via the REST API or
after a configurable cooldown period.

New operator params (all optional, backward-compatible):
  circuit_breaker_max_failures: int | None
  circuit_breaker_window: timedelta | None
  circuit_breaker_reset_delay: timedelta | None

New TaskCircuitBreaker model tracks one row per (dag_id, task_id).
Alembic migration adds the table. Execution API records failures.
Scheduler excludes open-circuit tasks before the QUEUED transition.
Heartbeat auto-resets expired circuits every 60s. Three REST
endpoints expose and reset circuit state per task and per Dag.
@Vamsi-klu
Copy link
Copy Markdown
Contributor

Vamsi-klu commented May 30, 2026

Found a few blockers in the current diff:

  1. The migration creates a second Alembic head. 0117_3_3_0_add_task_circuit_breaker.py revises acc215baed80, but 0117_3_3_0_change_deadline_interval_to_json.py already revises the same parent. CI confirms this with Multiple heads are present; a10edcba2695, 8812eb67b63c.

  2. The scheduler interval callbacks add unbounded writes/scans. reset_expired() performs one bulk update, and _skip_circuit_breaker_blocked_tis() selects all scheduled blocked task instances and mutates them in one interval. Scheduler-loop cleanup/write paths need batching with limits and commits between batches.

  3. record_failure() is not concurrency-safe. It reads the circuit row, increments in memory, and writes later without locking or an atomic upsert/update. Parallel failures for the same (dag_id, task_id) can lose increments or race on first insert, so the circuit may not open at the configured threshold.

  4. Static checks also catch new positional session parameters on @provide_session methods. These need keyword-only session parameters.


Drafted-by: Codex (GPT-5); reviewed by @Vamsi-klu before posting

@bennyc0de bennyc0de force-pushed the task-circuit-breaker branch from 4e9fc08 to 0e6e67a Compare May 31, 2026 15:11
@bennyc0de
Copy link
Copy Markdown
Contributor Author

Hi @Vamsi-klu thank you for your comment, I've looked into your suggestions and put them into place

- Rename migration to 0118 and chain it after 8812eb67b63c to resolve
  the duplicate Alembic head (was both 0117s revising acc215baed80)
- Add row-level locking (with_for_update) and IntegrityError handling
  to record_failure() to prevent lost increments under concurrent writes
- Batch reset_expired() and _skip_circuit_breaker_blocked_tis() at 100
  rows per flush to avoid unbounded single-transaction scans
- Move session to keyword-only position on all new @provide_session
  methods (reset_expired, _reset_expired_circuit_breakers,
  _skip_circuit_breaker_blocked_tis)
@bennyc0de bennyc0de force-pushed the task-circuit-breaker branch from 0e6e67a to 6cc2174 Compare May 31, 2026 15:39
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Jun 3, 2026
Copy link
Copy Markdown
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting real work into this — automatically pausing chronically-failing tasks is a genuinely interesting idea. But the size and nature of the change mean it should start as a design discussion, not a code review, and I'd like to steer it there rather than iterate on the implementation.

This is a cross-cutting behavioral change to core: a new scheduler state transition (open circuits auto-SKIP SCHEDULED TIs, which propagates downstream via trigger rules), a new metadata table + migration, a new Execution-API path, and three new public REST endpoints. Changes that alter the scheduler state machine and add public API surface go through an AIP (Airflow Improvement Proposal) / dev-list discussion first, so the community can agree on the model — e.g. whether auto-SKIP is the right terminal transition, how it's opted into, and whether the failure budget belongs on the task — before anyone reviews ~1700 lines of implementation.

A few things also block the current code regardless of the design outcome:

  • The Alembic migration collides with one already merged to main (0118_ filename + a second head off 8812eb67b63c) — this is the root cause of the red CI.
  • The migration uses naive sa.DateTime() while the model uses UtcDateTime; use TIMESTAMP(timezone=True).
  • The Execution-API failure path deserializes the full Dag per task failure to read three scalars — that config should ride on the TI/payload instead.
  • The auto-SKIP has no opt-out/config and silently skips downstream subgraphs.

Suggested next step: open an AIP (or start a thread on dev@airflow.apache.org) describing the problem and proposed model, link it here, and we'll take the design discussion there. I'm converting this to draft in the meantime so it's clear it's pending that discussion rather than waiting on a code review — flip it back to ready once there's consensus.

AIP process: https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals


This review was drafted by an AI-assisted tool and confirmed by an Apache Airflow maintainer. After you've addressed the points above and pushed an update, an Apache Airflow maintainer — a real person — will take the next look. The findings cite the project's review criteria; if you think one is mis-applied, please reply on the PR and a maintainer will weigh in.

More on how Apache Airflow handles maintainer review: contributing-docs/05_pull_requests.rst.


Drafted-by: Claude Code (Opus 4.8); reviewed by @potiuk before posting

@potiuk potiuk marked this pull request as draft June 5, 2026 02:39
@bennyc0de
Copy link
Copy Markdown
Contributor Author

Hi @potiuk thank you very much for your comment and attention to this PR.

I've followed your advice and opened an AIP here: https://cwiki.apache.org/confluence/display/AIRFLOW/Add+task-level+circuit+breaker+to+pause+failing+tasks

I will also get to fixing the blockers you mentioned shortly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:airflow-ctl area:API Airflow's REST/HTTP API area:DAG-processing area:db-migrations PRs with DB migration area:Scheduler including HA (high availability) scheduler area:task-sdk area:UI Related to UI/UX. For Frontend Developers. backport-to-airflow-ctl/v0-1-test kind:documentation ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants