Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log worker info on error #7947

Closed
57 changes: 52 additions & 5 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pickle
import re
import sys
import textwrap
import threading
import traceback
import uuid
Expand Down Expand Up @@ -112,7 +113,7 @@
scatter_to_workers,
unpack_remotedata,
)
from distributed.worker import get_client, get_worker, secede
from distributed.worker import _deserialize, get_client, get_worker, secede

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -334,6 +335,7 @@
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
logger.error(self._state.format_error())
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
Expand All @@ -352,6 +354,7 @@
async def _exception(self):
await self._state.wait()
if self.status == "error":
logger.error(self._state.format_error())
return self._state.exception
else:
return None
Expand Down Expand Up @@ -546,7 +549,17 @@
This is shared between all Futures with the same key and client.
"""

__slots__ = ("_event", "status", "type", "exception", "traceback")
__slots__ = (
"_event",
"status",
"type",
"exception",
"traceback",
"erred_on",
"func_args",
"func_kwargs",
"failing_key",
)

def __init__(self):
self._event = None
Expand Down Expand Up @@ -591,7 +604,7 @@
self.status = "pending"
self._get_event().clear()

def set_error(self, exception, traceback):
def set_error(self, exception, traceback, **kwargs):
"""Sets the error data

Sets the status to 'error'. Sets the exception, the traceback,
Expand All @@ -603,14 +616,47 @@
The exception
traceback: Exception
The traceback
erred_on: set
Workers where it errored
failing_key: Hashable
Key of the task that originally caused the error
run_spec: dict[str, Any]
dict with args, kwargs, function
"""
_, exception, traceback = clean_exception(exception, traceback)

self.failing_key = kwargs.get("failing_key")
self.erred_on = kwargs.get("erred_on")
self.status = "error"
self.exception = exception
self.traceback = traceback
run_spec = kwargs.get("run_spec")
if run_spec:
_, self.func_args, self.func_kwargs = _deserialize(
args=run_spec.get("args"), kwargs=run_spec.get("kwargs")
)
else:
self.func_args = ()
self.func_kwargs = {}
self._get_event().set()

def format_error(self):
"""Pretty-format the error for logging"""
frame = self.traceback
while frame is not None and frame.tb_next is not None:
frame = frame.tb_next
func_name = frame.tb_frame.f_code.co_name if frame else "<unknown>"
return textwrap.dedent(
f"""\
Compute Failed
Key: {self.failing_key}
Function: {func_name}
args: {self.func_args}
kwargs: {self.func_kwargs}
Exception: {self.exception!r}
Worker: {self.erred_on}"""
)
j-bennet marked this conversation as resolved.
Show resolved Hide resolved

def done(self):
"""Returns 'True' if the event is not None and the event is set"""
return self._event is not None and self._event.is_set()
Expand Down Expand Up @@ -1650,10 +1696,10 @@
if state is not None:
state.retry()

def _handle_task_erred(self, key=None, exception=None, traceback=None):
def _handle_task_erred(self, key=None, exception=None, traceback=None, **kwargs):
state = self.futures.get(key)
if state is not None:
state.set_error(exception, traceback)
state.set_error(exception, traceback, **kwargs)

def _handle_restart(self):
logger.info("Receive restart signal from scheduler")
Expand Down Expand Up @@ -2264,6 +2310,7 @@
except (KeyError, AttributeError):
exc = CancelledError(key)
else:
logger.error(st.format_error())

Check warning on line 2313 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L2313

Added line #L2313 was not covered by tests
raise exception.with_traceback(traceback)
raise exc
if errors == "skip":
Expand Down
6 changes: 6 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,9 @@ def transition_released_erred(self, key: str, stimulus_id: str) -> RecsMsgs:
"key": key,
"exception": failing_ts.exception,
"traceback": failing_ts.traceback,
"erred_on": failing_ts.erred_on,
"run_spec": failing_ts.run_spec,
"failing_key": failing_ts.key,
}
for cs in ts.who_wants:
client_msgs[cs.client_key] = [report_msg]
Expand Down Expand Up @@ -2681,6 +2684,9 @@ def transition_processing_erred(
"key": key,
"exception": failing_ts.exception,
"traceback": failing_ts.traceback,
"erred_on": failing_ts.erred_on,
"run_spec": failing_ts.run_spec,
"failing_key": failing_ts.key,
}
for cs in ts.who_wants:
client_msgs[cs.client_key] = [report_msg]
Expand Down
42 changes: 37 additions & 5 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,18 +401,50 @@ async def test_future_tuple_repr(c, s, a, b):

@gen_cluster(client=True)
async def test_Future_exception(c, s, a, b):
x = c.submit(div, 1, 0)
result = await x.exception()
assert isinstance(result, ZeroDivisionError)
with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0)
exc = await x.exception()
assert isinstance(exc, ZeroDivisionError)
assert "Worker: " in logger.getvalue()

with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0)
with pytest.raises(ZeroDivisionError):
_ = await x.result()
assert "Worker: " in logger.getvalue()

with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0, key="x")
y = c.submit(inc, x, key="y")
with pytest.raises(ZeroDivisionError):
_ = await y.result()
assert "Worker: " in logger.getvalue()
assert re.search("Key:\\s+x", logger.getvalue()) is not None

x = c.submit(div, 1, 1)
result = await x.exception()
assert result is None


def test_Future_exception_sync(c):
x = c.submit(div, 1, 0)
assert isinstance(x.exception(), ZeroDivisionError)
with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0)
assert isinstance(x.exception(), ZeroDivisionError)
assert "Worker: " in logger.getvalue()

with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0)
with pytest.raises(ZeroDivisionError):
_ = x.result()
assert "Worker: " in logger.getvalue()
j-bennet marked this conversation as resolved.
Show resolved Hide resolved

with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0, key="x")
y = c.submit(inc, x, key="y")
with pytest.raises(ZeroDivisionError):
_ = y.result()
assert "Worker: " in logger.getvalue()
assert re.search("Key:\\s+x", logger.getvalue()) is not None

x = c.submit(div, 1, 1)
assert x.exception() is None
Expand Down
Loading