Skip to content

Commit

Permalink
RFC: Better error handling when an ExternalRepository grpc call fails (
Browse files Browse the repository at this point in the history
…#7929)

Summary:
I can't come up with a case where this *should* happen - to the point where I'm having trouble finding a good test to write for this (an error getting raised not when the repo is first loaded, but when we go to fetch the external repository), but at least one user-submitted issue suggests that it *does* happen: #7927. Add better error handling like the other gRPC methods for this where we will surface a nice stack trace when it does.

This is technically a minor breaking change, in that errors on older versions that used to fail with one gross exception (DagsterUserCodeUnreachableError with an inscrutable stack trace) will now fail with a different gross exception until you upgrade (failed to find ExternalRepositoryErrorData to deserialize it). If we wanted to be really sneaky we could repurpose a class like  ExternalSensorExecutionErrorData, which has the same arguments, to avoid that.
  • Loading branch information
gibsondan committed May 18, 2022
1 parent 3b5c135 commit d0524c2
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 13 deletions.
15 changes: 11 additions & 4 deletions python_modules/dagster/dagster/api/snapshot_repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import TYPE_CHECKING, Mapping

import dagster._check as check
from dagster.core.host_representation.external_data import ExternalRepositoryData
from dagster.core.errors import DagsterUserCodeProcessError
from dagster.core.host_representation.external_data import (
ExternalRepositoryData,
ExternalRepositoryErrorData,
)
from dagster.serdes import deserialize_as

if TYPE_CHECKING:
Expand All @@ -27,15 +31,18 @@ def sync_get_streaming_external_repositories_data_grpc(
)
)

external_repository_data = deserialize_as(
result = deserialize_as(
"".join(
[
chunk["serialized_external_repository_chunk"]
for chunk in external_repository_chunks
]
),
ExternalRepositoryData,
(ExternalRepositoryData, ExternalRepositoryErrorData),
)

repo_datas[repository_name] = external_repository_data
if isinstance(result, ExternalRepositoryErrorData):
raise DagsterUserCodeProcessError.from_error_info(result.error)

repo_datas[repository_name] = result
return repo_datas
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,17 @@ def __new__(
)


@whitelist_for_serdes
class ExternalRepositoryErrorData(
NamedTuple("_ExternalRepositoryErrorData", [("error", Optional[SerializableErrorInfo])])
):
def __new__(cls, error: Optional[SerializableErrorInfo]):
return super(ExternalRepositoryErrorData, cls).__new__(
cls,
error=check.opt_inst_param(error, "error", SerializableErrorInfo),
)


@whitelist_for_serdes
class ExternalSensorExecutionErrorData(
NamedTuple("_ExternalSensorExecutionErrorData", [("error", Optional[SerializableErrorInfo])])
Expand Down
30 changes: 21 additions & 9 deletions python_modules/dagster/dagster/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from dagster.core.code_pointer import CodePointer
from dagster.core.definitions.reconstruct import ReconstructableRepository
from dagster.core.errors import DagsterUserCodeUnreachableError
from dagster.core.host_representation.external_data import external_repository_data_from_def
from dagster.core.host_representation.external_data import (
ExternalRepositoryErrorData,
external_repository_data_from_def,
)
from dagster.core.host_representation.origin import ExternalPipelineOrigin, ExternalRepositoryOrigin
from dagster.core.instance import DagsterInstance
from dagster.core.origin import DEFAULT_DAGSTER_ENTRY_POINT, get_python_environment_entry_point
Expand Down Expand Up @@ -129,6 +132,10 @@ def code_pointers_by_repo_name(self):
return self._code_pointers_by_repo_name

def get_recon_repo(self, name: str) -> ReconstructableRepository:

if name not in self._recon_repos_by_name:
raise Exception(f'Could not find a repository called "{name}"')

return self._recon_repos_by_name[name]


Expand Down Expand Up @@ -464,15 +471,20 @@ def ExternalPipelineSubsetSnapshot(self, request, _context):
)

def _get_serialized_external_repository_data(self, request):
repository_origin = deserialize_json_to_dagster_namedtuple(
request.serialized_repository_python_origin
)
try:
repository_origin = deserialize_json_to_dagster_namedtuple(
request.serialized_repository_python_origin
)

check.inst_param(repository_origin, "repository_origin", ExternalRepositoryOrigin)
recon_repo = self._recon_repository_from_origin(repository_origin)
return serialize_dagster_namedtuple(
external_repository_data_from_def(recon_repo.get_definition())
)
check.inst_param(repository_origin, "repository_origin", ExternalRepositoryOrigin)
recon_repo = self._recon_repository_from_origin(repository_origin)
return serialize_dagster_namedtuple(
external_repository_data_from_def(recon_repo.get_definition())
)
except Exception:
return serialize_dagster_namedtuple(
ExternalRepositoryErrorData(serializable_error_info_from_exc_info(sys.exc_info()))
)

def ExternalRepository(self, request, _context):
serialized_external_repository_data = self._get_serialized_external_repository_data(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dagster import lambda_solid, pipeline, repository
from dagster.api.snapshot_repository import sync_get_streaming_external_repositories_data_grpc
from dagster.core.errors import DagsterUserCodeProcessError
from dagster.core.host_representation import (
ExternalRepositoryData,
ManagedGrpcPythonEnvRepositoryLocationOrigin,
Expand All @@ -29,6 +30,19 @@ def test_streaming_external_repositories_api_grpc(instance):
assert external_repository_data.name == "bar_repo"


def test_streaming_external_repositories_error(instance):
with get_bar_repo_repository_location(instance) as repository_location:
repository_location.repository_names = {"does_not_exist"}
assert repository_location.repository_names == {"does_not_exist"}

with pytest.raises(
DagsterUserCodeProcessError, match='Could not find a repository called "does_not_exist"'
):
sync_get_streaming_external_repositories_data_grpc(
repository_location.client, repository_location
)


@lambda_solid
def do_something():
return 1
Expand Down

0 comments on commit d0524c2

Please sign in to comment.