Skip to content

Commit

Permalink
[AIRFLOW-2511] Fix improper failed session commit handling causing de…
Browse files Browse the repository at this point in the history
…adlocks (apache#4769)
  • Loading branch information
fenglu-g authored and ashb committed Mar 19, 2019
1 parent 0196b5f commit f559d43
Showing 1 changed file with 128 additions and 126 deletions.
254 changes: 128 additions & 126 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2179,147 +2179,149 @@ def _process_backfill_task_instances(self,
# or leaf to root, as otherwise tasks might be
# determined deadlocked while they are actually
# waiting for their upstream to finish
@provide_session
def _per_task_process(task, key, ti, session=None):
if task.task_id != ti.task_id:
return

for task in self.dag.topological_sort():
for key, ti in list(ti_status.to_run.items()):

if task.task_id != ti.task_id:
continue

ti.refresh_from_db()
ti.refresh_from_db()

task = self.dag.get_task(ti.task_id)
ti.task = task
task = self.dag.get_task(ti.task_id)
ti.task = task

ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))
self.log.debug(
"Task instance to run %s state %s", ti, ti.state)
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))
self.log.debug(
"Task instance to run %s state %s", ti, ti.state)

# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if ti.state == State.SUCCESS:
ti_status.succeeded.add(key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
return
elif ti.state == State.SKIPPED:
ti_status.skipped.add(key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
return

# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if ti.state == State.SUCCESS:
ti_status.succeeded.add(key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.to_run.pop(key)
# guard against externally modified tasks instances or
# in case max concurrency has been reached at task runtime
elif ti.state == State.NONE:
self.log.warning(
"FIXME: task instance {} state was set to None "
"externally. This should not happen"
)
ti.set_state(State.SCHEDULED, session=session)
if self.rerun_failed_tasks:
# Rerun failed tasks or upstreamed failed tasks
if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
self.log.error("Task instance {ti} "
"with state {state}".format(ti=ti,
state=ti.state))
if key in ti_status.running:
ti_status.running.pop(key)
continue
elif ti.state == State.SKIPPED:
ti_status.skipped.add(key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
# Reset the failed task in backfill to scheduled state
ti.set_state(State.SCHEDULED, session=session)
else:
# Default behaviour which works for subdag.
if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
self.log.error("Task instance {ti} "
"with {state} state".format(ti=ti,
state=ti.state))
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
continue
return

# guard against externally modified tasks instances or
# in case max concurrency has been reached at task runtime
elif ti.state == State.NONE:
self.log.warning(
"FIXME: task instance {} state was set to None "
"externally. This should not happen"
)
ti.set_state(State.SCHEDULED, session=session)
if self.rerun_failed_tasks:
# Rerun failed tasks or upstreamed failed tasks
if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
self.log.error("Task instance {ti} "
"with state {state}".format(ti=ti,
state=ti.state))
if key in ti_status.running:
ti_status.running.pop(key)
# Reset the failed task in backfill to scheduled state
ti.set_state(State.SCHEDULED, session=session)
else:
# Default behaviour which works for subdag.
if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
self.log.error("Task instance {ti} "
"with {state} state".format(ti=ti,
state=ti.state))
ti_status.failed.add(key)
backfill_context = DepContext(
deps=RUN_DEPS,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=self.ignore_task_deps,
flag_upstream_failed=True)

# Is the task runnable? -- then run it
# the dependency checker can change states of tis
if ti.are_dependencies_met(
dep_context=backfill_context,
session=session,
verbose=self.verbose):
ti.refresh_from_db(lock_for_update=True, session=session)
if ti.state in (State.SCHEDULED, State.UP_FOR_RETRY, State.UP_FOR_RESCHEDULE):
if executor.has_task(ti):
self.log.debug(
"Task Instance %s already in executor "
"waiting for queue to clear",
ti
)
else:
self.log.debug('Sending %s to executor', ti)
# Skip scheduled state, we are executing immediately
ti.state = State.QUEUED
ti.queued_dttm = timezone.utcnow() if not ti.queued_dttm else ti.queued_dttm
session.merge(ti)

cfg_path = None
if executor.__class__ in (executors.LocalExecutor,
executors.SequentialExecutor):
cfg_path = tmp_configuration_copy()

executor.queue_task_instance(
ti,
mark_success=self.mark_success,
pickle_id=pickle_id,
ignore_task_deps=self.ignore_task_deps,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool,
cfg_path=cfg_path)
ti_status.running[key] = ti
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
continue

backfill_context = DepContext(
deps=RUN_DEPS,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=self.ignore_task_deps,
flag_upstream_failed=True)

# Is the task runnable? -- then run it
# the dependency checker can change states of tis
if ti.are_dependencies_met(
dep_context=backfill_context,
session=session,
verbose=self.verbose):
ti.refresh_from_db(lock_for_update=True, session=session)
if ti.state in (State.SCHEDULED, State.UP_FOR_RETRY, State.UP_FOR_RESCHEDULE):
if executor.has_task(ti):
self.log.debug(
"Task Instance %s already in executor "
"waiting for queue to clear",
ti
)
else:
self.log.debug('Sending %s to executor', ti)
# Skip scheduled state, we are executing immediately
ti.state = State.QUEUED
ti.queued_dttm = timezone.utcnow() if not ti.queued_dttm else ti.queued_dttm
session.merge(ti)

cfg_path = None
if executor.__class__ in (executors.LocalExecutor,
executors.SequentialExecutor):
cfg_path = tmp_configuration_copy()

executor.queue_task_instance(
ti,
mark_success=self.mark_success,
pickle_id=pickle_id,
ignore_task_deps=self.ignore_task_deps,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool,
cfg_path=cfg_path)
ti_status.running[key] = ti
ti_status.to_run.pop(key)
session.commit()
continue
session.commit()
return

if ti.state == State.UPSTREAM_FAILED:
self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
continue
if ti.state == State.UPSTREAM_FAILED:
self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
return

# special case
if ti.state == State.UP_FOR_RETRY:
self.log.debug(
"Task instance %s retry period not "
"expired yet", ti)
if key in ti_status.running:
ti_status.running.pop(key)
ti_status.to_run[key] = ti
continue
# special case
if ti.state == State.UP_FOR_RETRY:
self.log.debug(
"Task instance %s retry period not "
"expired yet", ti)
if key in ti_status.running:
ti_status.running.pop(key)
ti_status.to_run[key] = ti
return

# special case
if ti.state == State.UP_FOR_RESCHEDULE:
self.log.debug(
"Task instance %s reschedule period not "
"expired yet", ti)
if key in ti_status.running:
ti_status.running.pop(key)
ti_status.to_run[key] = ti
continue
# special case
if ti.state == State.UP_FOR_RESCHEDULE:
self.log.debug(
"Task instance %s reschedule period not "
"expired yet", ti)
if key in ti_status.running:
ti_status.running.pop(key)
ti_status.to_run[key] = ti
return

# all remaining tasks
self.log.debug('Adding %s to not_ready', ti)
ti_status.not_ready.add(key)

# all remaining tasks
self.log.debug('Adding %s to not_ready', ti)
ti_status.not_ready.add(key)
for task in self.dag.topological_sort():
for key, ti in list(ti_status.to_run.items()):
_per_task_process(task, key, ti)

# execute the tasks in the queue
self.heartbeat()
Expand Down

0 comments on commit f559d43

Please sign in to comment.