Skip to content

Commit

Permalink
Be more willing to return serialized errors in schedule/sensor/grpc p…
Browse files Browse the repository at this point in the history
…artition methods (#7164)

Summary:
Right now if there's an error in these grpc methods that isn't caught, we get these unhelpful messages at the gRPC layer like:

Exception iterating responses: Error executing resource_fn on ResourceDefinition

Being more willing to package up errors and serialize them over the wire will give us better visualization of those errors on the other side.

Even if it's a system error or an invariant error that's our fault, i'd still rather that error be the thing that is displayed in Dagit.

Test Plan:
Existing BK tests that cover schedule/sensor error handling
  • Loading branch information
gibsondan committed Mar 22, 2022
1 parent dc994c9 commit 0f75e1a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
15 changes: 7 additions & 8 deletions python_modules/dagster/dagster/grpc/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from dagster.core.definitions.sensor_definition import SensorEvaluationContext
from dagster.core.errors import (
DagsterExecutionInterruptedError,
DagsterInvalidSubsetError,
DagsterRunNotFoundError,
PartitionExecutionError,
ScheduleExecutionError,
Expand Down Expand Up @@ -209,7 +208,7 @@ def get_external_pipeline_subset_result(
try:
sub_pipeline = recon_pipeline.subset_for_execution(solid_selection)
definition = sub_pipeline.get_definition()
except DagsterInvalidSubsetError:
except Exception:
return ExternalPipelineSubsetResult(
success=False, error=serializable_error_info_from_exc_info(sys.exc_info())
)
Expand Down Expand Up @@ -251,7 +250,7 @@ def get_external_schedule_execution(
"{schedule_name}".format(schedule_name=schedule_def.name),
):
return schedule_def.evaluate_tick(schedule_context)
except ScheduleExecutionError:
except Exception:
return ExternalScheduleExecutionErrorData(
serializable_error_info_from_exc_info(sys.exc_info())
)
Expand Down Expand Up @@ -283,7 +282,7 @@ def get_external_sensor_execution(
"{sensor_name}".format(sensor_name=sensor_def.name),
):
return sensor_def.evaluate_tick(sensor_context)
except SensorExecutionError:
except Exception:
return ExternalSensorExecutionErrorData(
serializable_error_info_from_exc_info(sys.exc_info())
)
Expand All @@ -303,7 +302,7 @@ def get_partition_config(recon_repo, partition_set_name, partition_name):
):
run_config = partition_set_def.run_config_for_partition(partition)
return ExternalPartitionConfigData(name=partition.name, run_config=run_config)
except PartitionExecutionError:
except Exception:
return ExternalPartitionExecutionErrorData(
serializable_error_info_from_exc_info(sys.exc_info())
)
Expand All @@ -328,7 +327,7 @@ def get_partition_names(recon_repo, partition_set_name):
return ExternalPartitionNamesData(
partition_names=partition_set_def.get_partition_names()
)
except PartitionExecutionError:
except Exception:
return ExternalPartitionExecutionErrorData(
serializable_error_info_from_exc_info(sys.exc_info())
)
Expand All @@ -346,7 +345,7 @@ def get_partition_tags(recon_repo, partition_set_name, partition_name):
):
tags = partition_set_def.tags_for_partition(partition)
return ExternalPartitionTagsData(name=partition.name, tags=tags)
except PartitionExecutionError:
except Exception:
return ExternalPartitionExecutionErrorData(
serializable_error_info_from_exc_info(sys.exc_info())
)
Expand Down Expand Up @@ -419,7 +418,7 @@ def _error_message_fn(partition_name):

return ExternalPartitionSetExecutionParamData(partition_data=partition_data)

except PartitionExecutionError:
except Exception:
return ExternalPartitionExecutionErrorData(
serializable_error_info_from_exc_info(sys.exc_info())
)
Expand Down
12 changes: 11 additions & 1 deletion python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from dagster.core.definitions.decorators.sensor_decorator import sensor
from dagster.core.definitions.sensor_definition import RunRequest
from dagster.core.errors import DagsterError
from dagster.core.test_utils import default_mode_def_for_test


Expand Down Expand Up @@ -165,6 +166,11 @@ def sensor_error(_):
raise Exception("womp womp")


@sensor(pipeline_name="foo")
def sensor_raises_dagster_error(_):
raise DagsterError("Dagster error")


@repository
def bar_repo():
return {
Expand All @@ -176,7 +182,11 @@ def bar_repo():
},
"schedules": define_bar_schedules(),
"partition_sets": define_baz_partitions(),
"sensors": {"sensor_foo": sensor_foo, "sensor_error": lambda: sensor_error},
"sensors": {
"sensor_foo": sensor_foo,
"sensor_error": lambda: sensor_error,
"sensor_raises_dagster_error": lambda: sensor_raises_dagster_error,
},
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,12 @@ def test_external_sensor_error():
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_error", None, None, None
)


def test_external_sensor_raises_dagster_error():
with get_bar_repo_handle() as repository_handle:
with instance_for_test() as instance:
with pytest.raises(DagsterUserCodeProcessError, match="Dagster error"):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_raises_dagster_error", None, None, None
)

0 comments on commit 0f75e1a

Please sign in to comment.