Skip to content

Commit

Permalink
adding a Terminate Run method for in-progress execution to the Python…
Browse files Browse the repository at this point in the history
… GraphQL client (#7443)

* adding the first PR draft

* returing an None instead of a string

* applying black to the project

* erasing not used modules and applying pylint

* solvigng mypy typos

* testing exceptions using a different approach

* adding needed graphql queries
  • Loading branch information
Javier162380 committed May 2, 2022
1 parent 4c65d82 commit d543f82
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 0 deletions.
25 changes: 25 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
GET_PIPELINE_RUN_STATUS_QUERY,
RELOAD_REPOSITORY_LOCATION_MUTATION,
SHUTDOWN_REPOSITORY_LOCATION_MUTATION,
TERMINATE_RUN_JOB_MUTATION,
)
from .utils import (
DagsterGraphQLClientError,
Expand Down Expand Up @@ -422,3 +423,27 @@ def shutdown_repository_location(
)
else:
raise Exception(f"Unexpected query result type {query_result_type}")

def terminate_run(self, run_id: str):
"""
Terminates a pipeline run. This method it is useful when you would like to stop a pipeline run
based on a external event.
Args:
run_id (str): The run id of the pipeline run to terminate
"""
check.str_param(run_id, "run_id")

res_data: Dict[str, Dict[str, Any]] = self._execute(
TERMINATE_RUN_JOB_MUTATION, {"runId": run_id}
)

query_result: Dict[str, Any] = res_data["terminateRun"]
query_result_type: str = query_result["__typename"]
if query_result_type == "TerminateRunSuccess":
return

elif query_result_type == "RunNotFoundError":
raise DagsterGraphQLClientError("RunNotFoundError", f"Run Id {run_id} not found")
else:
raise DagsterGraphQLClientError(query_result_type, query_result["message"])
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,26 @@
}
}
"""

TERMINATE_RUN_JOB_MUTATION = """
mutation TerminateRun($runId: String!) {
terminateRun(runId: $runId){
__typename
... on TerminateRunSuccess{
run {
runId
}
}
... on TerminateRunFailure {
message
}
... on RunNotFoundError {
runId
}
... on PythonError {
message
stack
}
}
}
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pytest
from dagster_graphql import DagsterGraphQLClientError

from .conftest import MockClient, python_client_test_suite

RUN_ID = "foo"


@python_client_test_suite
def test_terminate_run_status_success(mock_client: MockClient):
expected_result = None
response = {"terminateRun": {"__typename": "TerminateRunSuccess", "run": expected_result}}
mock_client.mock_gql_client.execute.return_value = response

actual_result = mock_client.python_client.terminate_run(RUN_ID)
assert actual_result == expected_result


@python_client_test_suite
def test_terminate_run_not_failure(mock_client: MockClient):
error_type, error_message = "TerminateRunFailure", "Unable to terminate run"
response = {"terminateRun": {"__typename": "TerminateRunFailure", "message": error_message}}
mock_client.mock_gql_client.execute.return_value = response

with pytest.raises(DagsterGraphQLClientError) as e:
mock_client.python_client.terminate_run(RUN_ID)
assert e.value.args == (error_type, error_message)


@python_client_test_suite
def test_terminate_run_not_found(mock_client: MockClient):
error_type, error_message = "RunNotFoundError", "Run Id foo not found"
response = {"terminateRun": {"__typename": "RunNotFoundError", "runId": error_message}}

with pytest.raises(DagsterGraphQLClientError) as e:
mock_client.mock_gql_client.execute.return_value = response
mock_client.python_client.terminate_run(RUN_ID)
assert e.value.args == (error_type, error_message)


@python_client_test_suite
def test_terminate_run_python_error(mock_client: MockClient):
error_type, error_message = "PythonError", "Unable to terminate run"
response = {"terminateRun": {"__typename": "PythonError", "message": error_message}}

with pytest.raises(DagsterGraphQLClientError) as e:
mock_client.mock_gql_client.execute.return_value = response
mock_client.python_client.terminate_run(RUN_ID)
assert e.value.args == (error_type, error_message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
mutation TerminateRun($runId: String!) {
terminateRun(runId: $runId){
__typename
... on TerminateRunSuccess{
run {
runId
}
}
... on TerminateRunFailure {
message
}
... on RunNotFoundError {
runId
}
... on PythonError {
message
stack
}
}
}

0 comments on commit d543f82

Please sign in to comment.