Skip to content

Commit

Permalink
Don't give up if the terminate() call raises an exception during forc…
Browse files Browse the repository at this point in the history
…e-termination (#7110)

Summary:
The idea with force-terminate is that no matter what the run is moved into a terminal state, but if an exception is raised we give up. this fixes that by logging to the event log if there's an exception, but still moving ahead and force-terminating the run.

Test Plan: BK
  • Loading branch information
gibsondan committed Mar 18, 2022
1 parent 8063405 commit 62883d7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 29 deletions.
2 changes: 1 addition & 1 deletion examples/deploy_docker/tests/test_deploy_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def test_deploy_docker():
assert (
terminate_res["data"]["terminatePipelineExecution"]["__typename"]
== "TerminateRunSuccess"
)
), str(terminate_res)

_wait_for_run_status(hanging_run_id, dagit_host, PipelineRunStatus.CANCELED)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import sys

from graphql.execution.base import ResolveInfo
from rx import Observable

from dagster import check
from dagster.core.events import EngineEventData
from dagster.core.instance import DagsterInstance
from dagster.core.storage.compute_log_manager import ComputeIOType
from dagster.core.storage.pipeline_run import PipelineRunStatus, RunsFilter
from dagster.serdes import serialize_dagster_namedtuple
Expand All @@ -19,12 +23,10 @@
from .launch_execution import launch_pipeline_execution, launch_pipeline_reexecution


def _force_mark_as_canceled(graphene_info, run_id):
def _force_mark_as_canceled(instance, run_id):
from ...schema.pipelines.pipeline import GrapheneRun
from ...schema.roots.mutation import GrapheneTerminateRunSuccess

instance = graphene_info.context.instance

reloaded_record = instance.get_run_records(RunsFilter(run_ids=[run_id]))[0]

if not reloaded_record.pipeline_run.is_finished:
Expand All @@ -39,7 +41,7 @@ def _force_mark_as_canceled(graphene_info, run_id):


@capture_error
def terminate_pipeline_execution(graphene_info, run_id, terminate_policy):
def terminate_pipeline_execution(instance, run_id, terminate_policy):
from ...schema.errors import GrapheneRunNotFoundError
from ...schema.pipelines.pipeline import GrapheneRun
from ...schema.roots.mutation import (
Expand All @@ -48,10 +50,9 @@ def terminate_pipeline_execution(graphene_info, run_id, terminate_policy):
GrapheneTerminateRunSuccess,
)

check.inst_param(graphene_info, "graphene_info", ResolveInfo)
check.inst_param(instance, "instance", DagsterInstance)
check.str_param(run_id, "run_id")

instance = graphene_info.context.instance
records = instance.get_run_records(RunsFilter(run_ids=[run_id]))

force_mark_as_canceled = (
Expand All @@ -78,24 +79,30 @@ def terminate_pipeline_execution(graphene_info, run_id, terminate_policy):
),
)

if force_mark_as_canceled:
try:
if instance.run_coordinator and instance.run_coordinator.can_cancel_run(run_id):
instance.run_coordinator.cancel_run(run_id)
except:
instance.report_engine_event(
"Exception while attempting to force-terminate run. Run will still be marked as canceled.",
pipeline_name=run.pipeline_name,
run_id=run.run_id,
engine_event_data=EngineEventData(
error=serializable_error_info_from_exc_info(sys.exc_info()),
),
)
return _force_mark_as_canceled(instance, run_id)

if (
graphene_info.context.instance.run_coordinator
and graphene_info.context.instance.run_coordinator.can_cancel_run(run_id)
and graphene_info.context.instance.run_coordinator.cancel_run(run_id)
instance.run_coordinator
and instance.run_coordinator.can_cancel_run(run_id)
and instance.run_coordinator.cancel_run(run_id)
):
return GrapheneTerminateRunSuccess(graphene_run)

return (
_force_mark_as_canceled(graphene_info, run_id)
if force_mark_as_canceled
else GrapheneTerminateRunSuccess(graphene_run)
)

return (
_force_mark_as_canceled(graphene_info, run_id)
if force_mark_as_canceled
else GrapheneTerminateRunFailure(
run=graphene_run, message="Unable to terminate run {run_id}".format(run_id=run.run_id)
)
return GrapheneTerminateRunFailure(
run=graphene_run, message="Unable to terminate run {run_id}".format(run_id=run.run_id)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class Meta:
@check_permission(Permissions.TERMINATE_PIPELINE_EXECUTION)
def mutate(self, graphene_info, **kwargs):
return terminate_pipeline_execution(
graphene_info,
graphene_info.context.instance,
kwargs["runId"],
kwargs.get("terminatePolicy", GrapheneTerminateRunPolicy.SAFE_TERMINATE),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
from typing import Any

import pytest
from dagster_graphql.client.query import LAUNCH_PIPELINE_EXECUTION_MUTATION
from dagster_graphql.test.utils import execute_dagster_graphql, infer_pipeline_selector

Expand Down Expand Up @@ -97,7 +98,9 @@ def test_cancel_queued_run(self, graphql_context):
result = execute_dagster_graphql(
graphql_context, RUN_CANCELLATION_QUERY, variables={"runId": run_id}
)
assert result.data["terminatePipelineExecution"]["__typename"] == "TerminateRunSuccess"
assert (
result.data["terminatePipelineExecution"]["__typename"] == "TerminateRunSuccess"
), str(result.data)

def test_force_cancel_queued_run(self, graphql_context):
selector = infer_pipeline_selector(graphql_context, "infinite_loop_pipeline")
Expand Down Expand Up @@ -136,6 +139,14 @@ def test_force_cancel_queued_run(self, graphql_context):
)


def _exception_terminate(_run_id):
raise Exception("FAILED TO TERMINATE")


def _return_fail_terminate(_run_id):
return False


class TestRunVariantTermination(RunTerminationTestSuite):
def test_basic_termination(self, graphql_context):
selector = infer_pipeline_selector(graphql_context, "infinite_loop_pipeline")
Expand Down Expand Up @@ -215,11 +226,21 @@ def test_run_not_found(self, graphql_context):
)
assert result.data["terminatePipelineExecution"]["__typename"] == "RunNotFoundError"

def test_terminate_failed(self, graphql_context):
@pytest.mark.parametrize(
argnames=["new_terminate_method", "terminate_result"],
argvalues=[
[
_return_fail_terminate,
"TerminateRunFailure",
],
[_exception_terminate, "PythonError"],
],
)
def test_terminate_failed(self, graphql_context, new_terminate_method, terminate_result):
selector = infer_pipeline_selector(graphql_context, "infinite_loop_pipeline")
with safe_tempfile_path() as path:
old_terminate = graphql_context.instance.run_launcher.terminate
graphql_context.instance.run_launcher.terminate = lambda _run_id: False
graphql_context.instance.run_launcher.terminate = new_terminate_method
result = execute_dagster_graphql(
graphql_context,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
Expand All @@ -245,9 +266,8 @@ def test_terminate_failed(self, graphql_context):
result = execute_dagster_graphql(
graphql_context, RUN_CANCELLATION_QUERY, variables={"runId": run_id}
)
assert result.data["terminatePipelineExecution"]["__typename"] == "TerminateRunFailure"
assert result.data["terminatePipelineExecution"]["message"].startswith(
"Unable to terminate run"
assert result.data["terminatePipelineExecution"]["__typename"] == terminate_result, str(
result.data
)

result = execute_dagster_graphql(
Expand All @@ -266,6 +286,10 @@ def test_terminate_failed(self, graphql_context):
repository_location = graphql_context.repository_locations[0]
repository_location.client.cancel_execution(CancelExecutionRequest(run_id=run_id))

assert (
graphql_context.instance.get_run_by_id(run_id).status == PipelineRunStatus.CANCELED
)

def test_run_finished(self, graphql_context):
instance = graphql_context.instance

Expand Down

0 comments on commit 62883d7

Please sign in to comment.