Skip to content

Commit

Permalink
Add text exceptions to the Scheduler (#5148)
Browse files Browse the repository at this point in the history
Fixes #5126
  • Loading branch information
mrocklin committed Aug 10, 2021
1 parent b0eefbd commit a742de4
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 10 deletions.
10 changes: 8 additions & 2 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ async def send_recv(comm, reply=True, serializers=None, deserializers=None, **kw
typ, exc, tb = clean_exception(**response)
raise exc.with_traceback(tb)
else:
raise Exception(response["text"])
raise Exception(response["exception_text"])
return response


Expand Down Expand Up @@ -1179,7 +1179,13 @@ def error_message(e, status="error"):
else:
tb_result = protocol.to_serialize(tb)

return {"status": status, "exception": e4, "traceback": tb_result, "text": str(e2)}
return {
"status": status,
"exception": e4,
"traceback": tb_result,
"exception_text": repr(e2),
"traceback_text": "".join(traceback.format_tb(tb)),
}


def clean_exception(exception, traceback, **kwargs):
Expand Down
25 changes: 24 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,9 @@ class TaskState:
_nbytes: Py_ssize_t
_type: str
_exception: object
_exception_text: str
_traceback: object
_traceback_text: str
_exception_blame: object
_erred_on: set
_suspicious: Py_ssize_t
Expand Down Expand Up @@ -1461,7 +1463,9 @@ class TaskState:
# Which clients want us
"_who_wants",
"_exception",
"_exception_text",
"_traceback",
"_traceback_text",
"_erred_on",
"_exception_blame",
"_suspicious",
Expand All @@ -1480,6 +1484,7 @@ def __init__(self, key: str, run_spec: object):
self._run_spec = run_spec
self._state = None
self._exception = self._traceback = self._exception_blame = None
self._exception_text = self._traceback_text = ""
self._suspicious = self._retries = 0
self._nbytes = -1
self._priority = None
Expand Down Expand Up @@ -1597,10 +1602,18 @@ def type(self):
def exception(self):
return self._exception

@property
def exception_text(self):
return self._exception_text

@property
def traceback(self):
return self._traceback

@property
def traceback_text(self):
return self._traceback_text

@property
def exception_blame(self):
return self._exception_blame
Expand Down Expand Up @@ -3066,7 +3079,15 @@ def transition_processing_released(self, key):
raise

def transition_processing_erred(
self, key, cause=None, exception=None, traceback=None, worker=None, **kwargs
self,
key: str,
cause: str = None,
exception=None,
traceback=None,
exception_text: str = None,
traceback_text: str = None,
worker: str = None,
**kwargs,
):
ws: WorkerState
try:
Expand All @@ -3092,8 +3113,10 @@ def transition_processing_erred(
ts._erred_on.add(w or worker)
if exception is not None:
ts._exception = exception
ts._exception_text = exception_text
if traceback is not None:
ts._traceback = traceback
ts._traceback_text = traceback_text
if cause is not None:
failing_ts = self._tasks[cause]
ts._exception_blame = failing_ts
Expand Down
16 changes: 16 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6926,6 +6926,22 @@ def f():
assert files == set(os.listdir()) # no change


@gen_cluster(client=True)
async def test_exception_text(c, s, a, b):
def bad(x):
raise Exception(x)

future = c.submit(bad, 123)
await wait(future)

ts = s.tasks[future.key]

assert isinstance(ts.exception_text, str)
assert "123" in ts.exception_text
assert "Exception(x)" in ts.traceback_text
assert "bad" in ts.traceback_text


@gen_cluster(client=True)
async def test_async_task(c, s, a, b):
async def f(x):
Expand Down
10 changes: 3 additions & 7 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,18 +342,14 @@ def __str__(self):
max_error_len = 100
with dask.config.set({"distributed.admin.max-error-length": max_error_len}):
msg = error_message(RuntimeError("-" * max_error_len))
assert len(msg["text"]) <= max_error_len
assert len(msg["text"]) < max_error_len * 2
assert len(msg["exception_text"]) <= max_error_len + 30
assert len(msg["exception_text"]) < max_error_len * 2
msg = error_message(RuntimeError("-" * max_error_len * 20))
cut_text = msg["text"].replace("('Long error message', '", "")[:-2]
assert len(cut_text) == max_error_len

max_error_len = 1000000
with dask.config.set({"distributed.admin.max-error-length": max_error_len}):
msg = error_message(RuntimeError("-" * max_error_len * 2))
cut_text = msg["text"].replace("('Long error message', '", "")[:-2]
assert len(cut_text) == max_error_len
assert len(msg["text"]) > 10100 # default + 100
assert len(msg["exception_text"]) > 10100 # default + 100


@gen_cluster(client=True)
Expand Down
16 changes: 16 additions & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ def __init__(self, key, runspec=None):
self.waiting_for_data = set()
self.resource_restrictions = None
self.exception = None
self.exception_text = ""
self.traceback = None
self.traceback_text = ""
self.type = None
self.suspicious_count = 0
self.startstops = list()
Expand Down Expand Up @@ -1607,7 +1609,9 @@ def add_task(
return
if ts.state == "error":
ts.exception = None
ts.exception_text = ""
ts.traceback = None
ts.traceback_text = ""
else:
# This is a scheduler re-assignment
# Either `fetch` -> `waiting` or `flight` -> `waiting`
Expand Down Expand Up @@ -1967,6 +1971,8 @@ def transition_ready_error(self, ts):
if self.validate:
assert ts.exception is not None
assert ts.traceback is not None
assert ts.exception_text
assert ts.traceback_text
self.send_task_state_to_scheduler(ts)

def transition_ready_memory(self, ts, value=no_value):
Expand Down Expand Up @@ -2007,7 +2013,9 @@ def transition_executing_done(self, ts, value=no_value, report=True):
logger.info("Failed to put key in memory", exc_info=True)
msg = error_message(e)
ts.exception = msg["exception"]
ts.exception_text = msg["exception_text"]
ts.traceback = msg["traceback"]
ts.traceback_text = msg["traceback_text"]
ts.state = "error"
out = "error"
for d in ts.dependents:
Expand Down Expand Up @@ -2233,6 +2241,8 @@ def send_task_state_to_scheduler(self, ts):
"thread": self.threads.get(ts.key),
"exception": ts.exception,
"traceback": ts.traceback,
"exception_text": ts.exception_text,
"traceback_text": ts.traceback_text,
}
else:
logger.error("Key not ready to send to worker, %s: %s", ts.key, ts.state)
Expand Down Expand Up @@ -2519,6 +2529,8 @@ def bad_dep(self, dep):
msg = error_message(exc)
ts.exception = msg["exception"]
ts.traceback = msg["traceback"]
ts.exception_text = msg["exception_text"]
ts.traceback_text = msg["traceback_text"]
self.transition(ts, "error")
self.release_key(dep.key, reason="bad dep")

Expand Down Expand Up @@ -2999,6 +3011,8 @@ async def execute(self, key):
else:
ts.exception = result["exception"]
ts.traceback = result["traceback"]
ts.exception_text = result["exception_text"]
ts.traceback_text = result["traceback_text"]
logger.warning(
"Compute Failed\n"
"Function: %s\n"
Expand All @@ -3025,6 +3039,8 @@ async def execute(self, key):
emsg = error_message(exc)
ts.exception = emsg["exception"]
ts.traceback = emsg["traceback"]
ts.exception_text = emsg["exception_text"]
ts.traceback_text = emsg["traceback_text"]
self.transition(ts, "error")
finally:
self.ensure_computing()
Expand Down

0 comments on commit a742de4

Please sign in to comment.