Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry if failed from queued should be separate from try_number #38304

Open
1 of 2 tasks
collinmcnulty opened this issue Mar 19, 2024 · 10 comments
Open
1 of 2 tasks

Retry if failed from queued should be separate from try_number #38304

collinmcnulty opened this issue Mar 19, 2024 · 10 comments
Assignees

Comments

@collinmcnulty
Copy link
Contributor

Description

I think Airflow should have a configureable number of attempts for re-attempting to launch a task if it was killed for being stuck in queued for too long. Currently, such re-attempts consume task retries, but these are conceptually distinct from a task failing to run at all.

Use case/motivation

On a certain task that happens to not be idempotent, an Airflow user sets retries to zero intentionally, as a human will need to examine if the task can be safely retried or if manual intervention is necessary. However, if the same task is killed for being stuck in queued, the task never started, so the lack of idempotency does not matter and the task should definitely be re-attempted. Airflow currently does not allow a user to express this set of preferences.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@collinmcnulty collinmcnulty added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels Mar 19, 2024
@Taragolis Taragolis added area:core and removed needs-triage label for new issues that we didn't triage yet labels Mar 20, 2024
@AMK9978
Copy link

AMK9978 commented Mar 20, 2024

@potiuk As a new contributor to Airflow, I would like to work on this issue!

@AMK9978 AMK9978 removed their assignment Mar 31, 2024
@AMK9978
Copy link

AMK9978 commented Mar 31, 2024

@potiuk
Unfortunately, I have had difficulties running the project and its test although the code of this feature may not be complex. I may add an issue about my problem because I didn't find anything relevant to it. I unassigned myself.

@Bowrna
Copy link
Contributor

Bowrna commented Apr 4, 2024

Could i check this issue @potiuk ?

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

Assigned you

@Bowrna
Copy link
Contributor

Bowrna commented Apr 10, 2024

@potiuk Currently if the task is struck in the queue for a longer time, we fail the task. To have a separate try_number for failed queue task, we may not know during the retry part, why the task failed. Like if it's due to fail on the run or fail due to struck in the queue. How do you think we can handle this case?

def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
"""
Mark tasks stuck in queued for longer than `task_queued_timeout` as failed.
Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
track of a task, a cluster can't further scale up its workers, etc.), but tasks
should not be stuck in queued for a long time. This will mark tasks stuck in
queued for longer than `self._task_queued_timeout` as failed. If the task has
available retries, it will be retried.
"""
self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method")
tasks_stuck_in_queued = session.scalars(
select(TI).where(
TI.state == TaskInstanceState.QUEUED,
TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)),
TI.queued_by_job_id == self.job.id,
)
).all()
try:
cleaned_up_task_instances = self.job.executor.cleanup_stuck_queued_tasks(
tis=tasks_stuck_in_queued
)
cleaned_up_task_instances = set(cleaned_up_task_instances)
for ti in tasks_stuck_in_queued:
if repr(ti) in cleaned_up_task_instances:
self._task_context_logger.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
ti=ti,
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
...

@Bowrna
Copy link
Contributor

Bowrna commented Apr 11, 2024

The retry logic handled here in taskinstance.py. It is failed and therefore checks if its eligible for retry, if yes its queued again.
But having another logic like FAILED_IN_QUEUE and TRY_NUMBER_FOR_QUEUE to handle the queue failed task makes sense to me for now. If you see other way, please let me know.

if force_fail or not ti.is_eligible_to_retry():
ti.state = TaskInstanceState.FAILED
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None
if task and task.dag and task.dag.fail_stop:
_stop_remaining_tasks(task_instance=ti, session=session)
else:
if ti.state == TaskInstanceState.QUEUED:
# We increase the try_number to fail the task if it fails to start after sometime
ti._try_number += 1
ti.state = State.UP_FOR_RETRY
email_for_state = operator.attrgetter("email_on_retry")
callbacks = task.on_retry_callback if task else None
return {
"ti": ti,
"email_for_state": email_for_state,
"task": task,
"callbacks": callbacks,
"context": context,
}

@potiuk
Copy link
Member

potiuk commented Apr 11, 2024

The thing is that task_instance is not created yet because ... the task is in the queue. So what needs to happen is that the whole logic should happen in scheduler - becuase it's the scheduler (and to be precise - executor) that realizes that task is queued state. And it should be a different handling in executor, not in task instance -that's the whole complexity of the task.

@Bowrna
Copy link
Contributor

Bowrna commented Apr 11, 2024

Got it ... let me check where this is handled in the executor part where the failed task is moved to the queue again but the count is deducted from the try_number.

@Bowrna
Copy link
Contributor

Bowrna commented Apr 12, 2024

The thing is that task_instance is not created yet because ... the task is in the queue. So what needs to happen is that the whole logic should happen in scheduler - becuase it's the scheduler (and to be precise - executor) that realizes that task is queued state. And it should be a different handling in executor, not in task instance -that's the whole complexity of the task.

@potiuk task_instance would be created with state as queued right? i could understand this should be handled in executor part as how the _fail_tasks_stuck_in_queued is handled. But I can see that TI models are queried to find the queued tasks and that could mean that task_instance object is created. If I am missing to understand the point you have specified, please let me know.

def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
"""
Mark tasks stuck in queued for longer than `task_queued_timeout` as failed.
Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
track of a task, a cluster can't further scale up its workers, etc.), but tasks
should not be stuck in queued for a long time. This will mark tasks stuck in
queued for longer than `self._task_queued_timeout` as failed. If the task has
available retries, it will be retried.
"""
self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method")
tasks_stuck_in_queued = session.scalars(
select(TI).where(
TI.state == TaskInstanceState.QUEUED,
TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)),
TI.queued_by_job_id == self.job.id,
)
).all()
try:
cleaned_up_task_instances = self.job.executor.cleanup_stuck_queued_tasks(
tis=tasks_stuck_in_queued
)
cleaned_up_task_instances = set(cleaned_up_task_instances)
for ti in tasks_stuck_in_queued:
if repr(ti) in cleaned_up_task_instances:
self._task_context_logger.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
ti=ti,
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
...

@potiuk
Copy link
Member

potiuk commented Apr 15, 2024

it's good understanding - it should likely be done somewhere there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants