Skip to content

observability: add structured diagnostics to CeleryExecutor sync gap …#67086

Open
prince8273 wants to merge 3 commits into
apache:mainfrom
prince8273:observability/celery-executor-sync-gap-logging
Open

observability: add structured diagnostics to CeleryExecutor sync gap …#67086
prince8273 wants to merge 3 commits into
apache:mainfrom
prince8273:observability/celery-executor-sync-gap-logging

Conversation

@prince8273
Copy link
Copy Markdown

@prince8273 prince8273 commented May 18, 2026

Summary

Two silent failure modes in CeleryExecutor with no structured logging
or metrics at the executor layer.

Gap 1 — task absent from broker state
When BulkStateFetcher.get_many() returns a falsy/None state, the
executor silently skips the task inside the if state: guard. Reachable
on AMQP and non-KV backends when a worker dies mid-task.

Gap 2 — unexpected state with no context
The unexpected-state branch emitted a single log.info with only
TaskInstanceKey and raw state — no worker hostname, no Celery info
payload, no metric.

Changes

Additive only — no behavior change, no new imports.

  • update_all_workload_states(): else-branch on falsy state —
    log.warning with celery_task_id + key,
    Stats.incr("celery.task_not_found_in_broker")
  • update_task_state(): replace log.info in unexpected-state branch —
    log.warning with key, celery_state, worker hostname, raw info,
    Stats.incr("celery.task_unexpected_state")

Stats already imported via airflow.providers.common.compat.sdk.

…events

CeleryExecutor.update_all_workload_states() and update_task_state() had two silent failure modes with no structured logging or metrics:

- When a tracked Celery task returns a falsy/None state from the broker (reachable on AMQP and non-KV backends where BulkStateFetcher does not guarantee PENDING conversion), the task was silently skipped with no log line and no metric.

- When update_task_state() hit the unexpected-state branch, the existing log.info emitted only the TaskInstanceKey and the raw state string, with no worker hostname, no Celery info payload, and no metric.

Changes (additive only, no behavior change):

- Add else-branch in update_all_workload_states() loop: warning log with celery_task_id + key, Stats.incr(celery.task_not_found_in_broker)

- Replace single-line log.info in update_task_state() unexpected-state branch: warning log with key + celery_state + worker hostname (extracted from info dict when available) + raw info payload, Stats.incr(celery.task_unexpected_state)

- Level bump log.info -> log.warning for unexpected-state: this branch represents a state the executor has no handler for

No new imports required -- Stats already present via airflow.providers.common.compat.sdk.

Signed-off-by: Prince Kumar <princesingh29757@gmail.com>
- celery.task_not_found_in_broker
- celery.task_unexpected_state

Signed-off-by: Prince Kumar <princesingh29757@gmail.com>
Signed-off-by: Prince Kumar <princesingh29757@gmail.com>
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label May 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants