Skip to content

Commit

Permalink
wOrkaround failing dedlock when running backfill
Browse files Browse the repository at this point in the history
The dask_executor backfill tests started to fail recently more often due
to backfill exception, and the likely cause for it is that it is now
better parallelise execution and triggering of the deadlocks because of
contention betwee dag_run state update and task state update had
become much easier.

While this PR does not fix the underlying issue, it catches the
operational error where the deadlock occured during the backfill.
and rolls back the operation.

This **should** be safe. backfil has a built-in mechanism to loop and
retry failed tasks and the test passed multiple times, completing the
backfill after this fix was applied. It was not easy to reproduce it
locally but it failed every 20-30 times. When extra logging was added,
it was always connected to OperationalException raised (and caught)
right after _per_task_process. The same exception was observed few times
when rollback was added, and despite it backfill job retried and
completed the process successfully every time. We also leave the logs
with exceptions and add reassuring messages that should make it clear
that in case backfill completes, the exceptions can be ignored as
the updates will be retried by the backfill job.

Fixes: #32778
  • Loading branch information
potiuk committed Aug 8, 2023
1 parent 624cf7f commit 048c9b9
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 45 deletions.
117 changes: 73 additions & 44 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,61 +588,83 @@ def _per_task_process(key, ti: TaskInstance, session):
try:
for task in self.dag.topological_sort(include_subdag_tasks=True):
for key, ti in list(ti_status.to_run.items()):
if task.task_id != ti.task_id:
continue

pool = session.scalar(
select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
)
if not pool:
raise PoolNotFound(f"Unknown pool: {task.pool}")

open_slots = pool.open_slots(session=session)
if open_slots <= 0:
raise NoAvailablePoolSlot(
f"Not scheduling since there are {open_slots} open slots in pool {task.pool}"
)

num_running_task_instances_in_dag = DAG.get_num_task_instances(
self.dag_id,
states=self.STATES_COUNT_AS_RUNNING,
session=session,
)

if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
raise DagConcurrencyLimitReached(
"Not scheduling since DAG max_active_tasks limit is reached."
# Attempt to workaround deadlock on backfill by attempting to commit the transaction
# state update few times before giving up
max_attempts = 5
for i in range(max_attempts):
if task.task_id != ti.task_id:
continue

pool = session.scalar(
select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
)
if not pool:
raise PoolNotFound(f"Unknown pool: {task.pool}")

open_slots = pool.open_slots(session=session)
if open_slots <= 0:
raise NoAvailablePoolSlot(
f"Not scheduling since there are {open_slots} "
f"open slots in pool {task.pool}"
)

if task.max_active_tis_per_dag is not None:
num_running_task_instances_in_task = DAG.get_num_task_instances(
dag_id=self.dag_id,
task_ids=[task.task_id],
num_running_task_instances_in_dag = DAG.get_num_task_instances(
self.dag_id,
states=self.STATES_COUNT_AS_RUNNING,
session=session,
)

if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
raise TaskConcurrencyLimitReached(
"Not scheduling since Task concurrency limit is reached."
if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
raise DagConcurrencyLimitReached(
"Not scheduling since DAG max_active_tasks limit is reached."
)

if task.max_active_tis_per_dagrun is not None:
num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
dag_id=self.dag_id,
run_id=ti.run_id,
task_ids=[task.task_id],
states=self.STATES_COUNT_AS_RUNNING,
session=session,
)
if task.max_active_tis_per_dag is not None:
num_running_task_instances_in_task = DAG.get_num_task_instances(
dag_id=self.dag_id,
task_ids=[task.task_id],
states=self.STATES_COUNT_AS_RUNNING,
session=session,
)

if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun:
raise TaskConcurrencyLimitReached(
"Not scheduling since Task concurrency per DAG run limit is reached."
if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
raise TaskConcurrencyLimitReached(
"Not scheduling since Task concurrency limit is reached."
)

if task.max_active_tis_per_dagrun is not None:
num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
dag_id=self.dag_id,
run_id=ti.run_id,
task_ids=[task.task_id],
states=self.STATES_COUNT_AS_RUNNING,
session=session,
)

_per_task_process(key, ti, session)
session.commit()
if (
num_running_task_instances_in_task_dagrun
>= task.max_active_tis_per_dagrun
):
raise TaskConcurrencyLimitReached(
"Not scheduling since Task concurrency per DAG run limit is reached."
)

_per_task_process(key, ti, session)
try:
session.commit()
# break the retry loop
break
except OperationalError:
self.log.error(
"Failed to commit task state due to operational error. "
"The job will retry this operation so if your backfill succeeds, "
"you can safely ignore this message.",
exc_info=True,
)
session.rollback()
if i == max_attempts - 1:
raise
# retry the loop
except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e:
self.log.debug(e)

Expand Down Expand Up @@ -939,6 +961,13 @@ def _execute(self, session: Session = NEW_SESSION) -> None:
# TODO: we will need to terminate running task instances and set the
# state to failed.
self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
except OperationalError:
self.log.error(
"Backfill job dead-locked. The job will retry the job so it is likely "
"to heal itself. If your backfill succeeds you can ignore this exception.",
exc_info=True,
)
raise
finally:
session.commit()
executor.end()
Expand Down
1 change: 0 additions & 1 deletion tests/providers/daskexecutor/test_dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def test_dask_executor_functions(self):
# This test is quarantined because it became rather flaky on our CI in July 2023 and reason for this
# is unknown. An issue for that was created: https://github.com/apache/airflow/issues/32778 and the
# marker should be removed while (possibly) the reason for flaky behaviour is found and fixed.
@pytest.mark.quarantined
@pytest.mark.execution_timeout(180)
def test_backfill_integration(self):
"""
Expand Down

0 comments on commit 048c9b9

Please sign in to comment.