Skip to content

Commit

Permalink
Mark task as failed when it fails sending in Celery (#10881)
Browse files Browse the repository at this point in the history
If a task failed hard on celery, _before_ being able to execute the
airflow code the task would end up stuck in queued state. This change
makes it get retried.

This was discovered in load testing the HA work (but unrelated to HA
changes), where I swamped the kube-dns pod, meaning the worker was
sometimes unable to resolve the db name via DNS, so the state in the DB
was never updated
  • Loading branch information
ashb committed Sep 14, 2020
1 parent ad4f232 commit 9e42a97
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
6 changes: 3 additions & 3 deletions airflow/executors/celery_executor.py
Expand Up @@ -42,6 +42,7 @@
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.state import State
from airflow.utils.timeout import timeout

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -188,14 +189,13 @@ def _process_tasks(self, task_tuples_to_send: List[TaskInstanceInCelery]) -> Non
self.log.debug('Sent all tasks.')

for key, _, result in key_and_async_results:
self.queued_tasks.pop(key)
if isinstance(result, ExceptionWithTraceback):
self.log.error( # pylint: disable=logging-not-lazy
CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
)
self.event_buffer[key] = (State.FAILED, None)
elif result is not None:
# Only pops when enqueued successfully, otherwise keep it
# and expect scheduler loop to deal with it.
self.queued_tasks.pop(key)
result.backend = cached_celery_backend
self.running.add(key)
self.tasks[key] = result
Expand Down
7 changes: 4 additions & 3 deletions tests/executors/test_celery_executor.py
Expand Up @@ -159,13 +159,14 @@ def fake_execute_command():
dag=DAG(dag_id='id'),
start_date=datetime.datetime.now()
)
when = datetime.datetime.now()
value_tuple = 'command', 1, None, \
SimpleTaskInstance(ti=TaskInstance(task=task, execution_date=datetime.datetime.now()))
key = ('fail', 'fake_simple_ti', datetime.datetime.now(), 0)
key = ('fail', 'fake_simple_ti', when, 0)
executor.queued_tasks[key] = value_tuple
executor.heartbeat()
self.assertEqual(1, len(executor.queued_tasks))
self.assertEqual(executor.queued_tasks[key], value_tuple)
self.assertEqual(0, len(executor.queued_tasks), "Task should no longer be queued")
self.assertEqual(executor.event_buffer[('fail', 'fake_simple_ti', when, 0)][0], State.FAILED)

@pytest.mark.backend("mysql", "postgres")
def test_exception_propagation(self):
Expand Down

0 comments on commit 9e42a97

Please sign in to comment.