Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log worker info on error #7947

Closed
34 changes: 30 additions & 4 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
logger.error(
f"in task: {self._state.key}\non worker:{self._state.erred_on}"
)
j-bennet marked this conversation as resolved.
Show resolved Hide resolved
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
Expand All @@ -351,6 +354,9 @@
async def _exception(self):
await self._state.wait()
if self.status == "error":
logger.error(
f"in task: {self._state.key}\non worker:{self._state.erred_on}"
)
return self._state.exception
else:
return None
Expand Down Expand Up @@ -545,7 +551,15 @@
This is shared between all Futures with the same key and client.
"""

__slots__ = ("_event", "status", "type", "exception", "traceback")
__slots__ = (
"_event",
"status",
"type",
"exception",
"traceback",
"key",
"erred_on",
)

def __init__(self):
self._event = None
Expand Down Expand Up @@ -590,7 +604,7 @@
self.status = "pending"
self._get_event().clear()

def set_error(self, exception, traceback):
def set_error(self, exception, traceback, erred_on, key):
"""Sets the error data

Sets the status to 'error'. Sets the exception, the traceback,
Expand All @@ -602,9 +616,15 @@
The exception
traceback: Exception
The traceback
erred_on: set
Workers where it errored
key: Hashable
Task key
"""
_, exception, traceback = clean_exception(exception, traceback)

self.key = key
self.erred_on = erred_on
self.status = "error"
self.exception = exception
self.traceback = traceback
Expand Down Expand Up @@ -1618,10 +1638,12 @@
if state is not None:
state.retry()

def _handle_task_erred(self, key=None, exception=None, traceback=None):
def _handle_task_erred(
self, key=None, exception=None, traceback=None, erred_on=None
):
state = self.futures.get(key)
if state is not None:
state.set_error(exception, traceback)
state.set_error(exception, traceback, erred_on, key)

def _handle_restart(self):
logger.info("Receive restart signal from scheduler")
Expand Down Expand Up @@ -2228,6 +2250,7 @@
except (KeyError, AttributeError):
exc = CancelledError(key)
else:
logger.error(f"in task: {st.key}\non worker:{st.erred_on}")

Check warning on line 2253 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L2253

Added line #L2253 was not covered by tests
raise exception.with_traceback(traceback)
raise exc
if errors == "skip":
Expand Down Expand Up @@ -3241,6 +3264,9 @@
should_rejoin = False
try:
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
raise exc_value.with_traceback(exc_traceback)

Check warning on line 3269 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L3267-L3269

Added lines #L3267 - L3269 were not covered by tests
j-bennet marked this conversation as resolved.
Show resolved Hide resolved
finally:
for f in futures.values():
f.release()
Expand Down
2 changes: 1 addition & 1 deletion distributed/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def process(d):
if d["type"] == "Future":
value = Future(d["value"], self.client, inform=True, state=d["state"])
if d["state"] == "erred":
value._state.set_error(d["exception"], d["traceback"])
value._state.set_error(d["exception"], d["traceback"], None, None)
self.client._send_to_scheduler(
{"op": "queue-future-release", "name": self.name, "key": d["value"]}
)
Expand Down
2 changes: 2 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2483,6 +2483,7 @@ def transition_released_erred(self, key: str, stimulus_id: str) -> RecsMsgs:
"key": key,
"exception": failing_ts.exception,
"traceback": failing_ts.traceback,
"erred_on": failing_ts.erred_on,
}
for cs in ts.who_wants:
client_msgs[cs.client_key] = [report_msg]
Expand Down Expand Up @@ -2677,6 +2678,7 @@ def transition_processing_erred(
"key": key,
"exception": failing_ts.exception,
"traceback": failing_ts.traceback,
"erred_on": failing_ts.erred_on,
}
for cs in ts.who_wants:
client_msgs[cs.client_key] = [report_msg]
Expand Down
26 changes: 21 additions & 5 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,18 +400,34 @@ async def test_future_tuple_repr(c, s, a, b):

@gen_cluster(client=True)
async def test_Future_exception(c, s, a, b):
x = c.submit(div, 1, 0)
result = await x.exception()
assert isinstance(result, ZeroDivisionError)
with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0)
exc = await x.exception()
assert isinstance(exc, ZeroDivisionError)
assert logger.getvalue().startswith("in task: ")

with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0)
with pytest.raises(ZeroDivisionError):
_ = await x.result()
assert logger.getvalue().startswith("in task: ")

x = c.submit(div, 1, 1)
result = await x.exception()
assert result is None


def test_Future_exception_sync(c):
x = c.submit(div, 1, 0)
assert isinstance(x.exception(), ZeroDivisionError)
with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0)
assert isinstance(x.exception(), ZeroDivisionError)
assert logger.getvalue().startswith("in task: ")

with captured_logger("distributed.client") as logger:
x = c.submit(div, 1, 0)
with pytest.raises(ZeroDivisionError):
_ = x.result()
assert logger.getvalue().startswith("in task: ")

x = c.submit(div, 1, 1)
assert x.exception() is None
Expand Down
2 changes: 1 addition & 1 deletion distributed/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async def _get(self, timeout=None):
if d["type"] == "Future":
value = Future(d["value"], self.client, inform=True, state=d["state"])
if d["state"] == "erred":
value._state.set_error(d["exception"], d["traceback"])
value._state.set_error(d["exception"], d["traceback"], None, None)
self.client._send_to_scheduler(
{
"op": "variable-future-release",
Expand Down
Loading