Skip to content

Commit

Permalink
Determine fail_stop on client side when db isolated
Browse files Browse the repository at this point in the history
This is needed because we do not ser the dag on Operator objects.
  • Loading branch information
dstandish committed Apr 27, 2024
1 parent b977335 commit a166e8d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
15 changes: 13 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ def _handle_failure(
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
fail_stop: bool = False,
) -> None:
"""
Handle Failure for a task instance.
Expand All @@ -901,6 +902,7 @@ def _handle_failure(
context=context,
force_fail=force_fail,
session=session,
fail_stop=fail_stop,
)

_log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "")
Expand Down Expand Up @@ -2964,8 +2966,13 @@ def fetch_handle_failure_context(
context: Context | None = None,
force_fail: bool = False,
session: Session = NEW_SESSION,
fail_stop: bool = False,
):
"""Handle Failure for the TaskInstance."""
"""
Handle Failure for the TaskInstance.
:param fail_stop: if true, stop remaining tasks in dag
"""
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error, session=session
)
Expand Down Expand Up @@ -3028,7 +3035,7 @@ def fetch_handle_failure_context(
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None

if task and task.dag and task.dag.fail_stop:
if task and fail_stop:
_stop_remaining_tasks(task_instance=ti, session=session)
else:
if ti.state == TaskInstanceState.QUEUED:
Expand Down Expand Up @@ -3077,13 +3084,17 @@ def handle_failure(
:param context: Jinja2 context
:param force_fail: if True, task does not retry
"""
if TYPE_CHECKING:
assert self.task
assert self.task.dag
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=self.task.dag.fail_stop,
)

def is_eligible_to_retry(self):
Expand Down
4 changes: 4 additions & 0 deletions airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,17 @@ def handle_failure(
"""
from airflow.models.taskinstance import _handle_failure

if TYPE_CHECKING:
assert self.task
assert self.task.dag
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=self.task.dag.fail_stop,
)

def refresh_from_task(self, task: Operator, pool_override: str | None = None) -> None:
Expand Down

0 comments on commit a166e8d

Please sign in to comment.