Skip to content

Commit

Permalink
Send explicit task logs when marking tasks stuck in queued as failed
Browse files Browse the repository at this point in the history
Using the feature built in apache#32646, when the scheduler marks tasks
stuck in queued as failed, send such an explicit log indicating
the action to the task logs so that it helps users identify why
exactly the task was marked failed in such a case.
  • Loading branch information
pankajkoti committed Nov 26, 2023
1 parent 2d58e70 commit 008ca3d
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,15 +1577,18 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
)
).all()
try:
tis_for_warning_message = self.job.executor.cleanup_stuck_queued_tasks(tis=tasks_stuck_in_queued)
if tis_for_warning_message:
task_instance_str = "\n\t".join(tis_for_warning_message)
self.log.warning(
"Marked the following %s task instances stuck in queued as failed. "
"If the task instance has available retries, it will be retried.\n\t%s",
len(tasks_stuck_in_queued),
task_instance_str,
)
cleaned_up_task_instances = self.job.executor.cleanup_stuck_queued_tasks(
tis=tasks_stuck_in_queued
)
cleaned_up_task_instances = set(cleaned_up_task_instances)
for ti in tasks_stuck_in_queued:
if repr(ti) in cleaned_up_task_instances:
self._task_context_logger.error(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
ti=ti,
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
...
Expand Down

0 comments on commit 008ca3d

Please sign in to comment.