Skip to content

UnpicklingError when return a tuple #6509

@Nanguage

Description

@Nanguage

What happened:

UnpicklingError occurs when trying to get the result from a future where the original function returns a tuple and the input data is large.

What you expected to happen:

It's should has same behavior when input data small or large.

Minimal Complete Verifiable Example:

import numpy as np
from dask.distributed import Client

def inc(x):
    return x + 1

client = Client()
x = np.ones((1024, 1024))   # small input
f = client.submit(inc, x)
res = f.result()  # it's ok
print(res.max())


def inc_ret_tuple(x):
    return (x+1, "ok")

f = client.submit(inc, x)
res = f.result()  # it's ok
print(res[1])

y = np.ones((20, 1024, 1024))  # large input
f = client.submit(inc, y)
res = f.result()  # error
Error message
2022-06-06 17:15:30,448 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "C:\Users\Nangu\miniconda3\envs\qt\lib\site-packages\distributed\protocol\core.py", line 159, in loads
  return msgpack.loads(
File "msgpack\_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
File "C:\Users\Nangu\miniconda3\envs\qt\lib\site-packages\distributed\protocol\core.py", line 139, in _decode_default
  return merge_and_deserialize(
File "C:\Users\Nangu\miniconda3\envs\qt\lib\site-packages\distributed\protocol\serialize.py", line 487, in merge_and_deserialize
  return deserialize(header, merged_frames, deserializers=deserializers)
File "C:\Users\Nangu\miniconda3\envs\qt\lib\site-packages\distributed\protocol\serialize.py", line 400, in deserialize
  deserialize(
File "C:\Users\Nangu\miniconda3\envs\qt\lib\site-packages\distributed\protocol\serialize.py", line 416, in deserialize
  return loads(header, frames)
File "C:\Users\Nangu\miniconda3\envs\qt\lib\site-packages\distributed\protocol\serialize.py", line 95, in pickle_loads
  return pickle.loads(x, buffers=buffers)
File "C:\Users\Nangu\miniconda3\envs\qt\lib\site-packages\distributed\protocol\pickle.py", line 66, in loads
  return pickle.loads(x)
_pickle.UnpicklingError: invalid load key, '\x00'.
---------------------------------------------------------------------------
UnpicklingError                           Traceback (most recent call last)
Input In [23], in <cell line: 1>()
----> 1 f.result()

File ~\miniconda3\envs\qt\lib\site-packages\distributed\client.py:279, in Future.result(self, timeout)
  276     return self.client.sync(self._result, callback_timeout=timeout)
  278 # shorten error traceback
--> 279 result = self.client.sync(self._result, callback_timeout=timeout, raiseit=False)
  280 if self.status == "error":
  281     typ, exc, tb = result

File ~\miniconda3\envs\qt\lib\site-packages\distributed\utils.py:320, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
  318     return future
  319 else:
--> 320     return sync(
  321         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  322     )

File ~\miniconda3\envs\qt\lib\site-packages\distributed\utils.py:387, in sync(loop, func, callback_timeout, *args, **kwargs)
  385 if error:
  386     typ, exc, tb = error
--> 387     raise exc.with_traceback(tb)
  388 else:
  389     return result

File ~\miniconda3\envs\qt\lib\site-packages\distributed\utils.py:360, in sync.<locals>.f()
  358         future = asyncio.wait_for(future, callback_timeout)
  359     future = asyncio.ensure_future(future)
--> 360     result = yield future
  361 except Exception:
  362     error = sys.exc_info()

File ~\miniconda3\envs\qt\lib\site-packages\tornado\gen.py:762, in Runner.run(self)
  759 exc_info = None
  761 try:
--> 762     value = future.result()
  763 except Exception:
  764     exc_info = sys.exc_info()

File ~\miniconda3\envs\qt\lib\site-packages\distributed\client.py:304, in Future._result(self, raiseit)
  302         return exception
  303 else:
--> 304     result = await self.client._gather([self])
  305     return result[0]

File ~\miniconda3\envs\qt\lib\site-packages\distributed\client.py:2080, in Client._gather(self, futures, errors, direct, local_worker)
 2078     else:
 2079         self._gather_future = future
-> 2080     response = await future
 2082 if response["status"] == "error":
 2083     log = logger.warning if errors == "raise" else logger.debug

File ~\miniconda3\envs\qt\lib\site-packages\distributed\client.py:2131, in Client._gather_remote(self, direct, local_worker)
 2128                 response["data"].update(data2)
 2130     else:  # ask scheduler to gather data for us
-> 2131         response = await retry_operation(self.scheduler.gather, keys=keys)
 2133 return response

File ~\miniconda3\envs\qt\lib\site-packages\distributed\utils_comm.py:381, in retry_operation(coro, operation, *args, **kwargs)
  375 retry_delay_min = parse_timedelta(
  376     dask.config.get("distributed.comm.retry.delay.min"), default="s"
  377 )
  378 retry_delay_max = parse_timedelta(
  379     dask.config.get("distributed.comm.retry.delay.max"), default="s"
  380 )
--> 381 return await retry(
  382     partial(coro, *args, **kwargs),
  383     count=retry_count,
  384     delay_min=retry_delay_min,
  385     delay_max=retry_delay_max,
  386     operation=operation,
  387 )

File ~\miniconda3\envs\qt\lib\site-packages\distributed\utils_comm.py:366, in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
  364             delay *= 1 + random.random() * jitter_fraction
  365         await asyncio.sleep(delay)
--> 366 return await coro()

File ~\miniconda3\envs\qt\lib\site-packages\distributed\core.py:975, in PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
  973 prev_name, comm.name = comm.name, "ConnectionPool." + key
  974 try:
--> 975     return await send_recv(comm=comm, op=key, **kwargs)
  976 finally:
  977     self.pool.reuse(self.addr, comm)

File ~\miniconda3\envs\qt\lib\site-packages\distributed\core.py:739, in send_recv(comm, reply, serializers, deserializers, **kwargs)
  737 await comm.write(msg, serializers=serializers, on_error="raise")
  738 if reply:
--> 739     response = await comm.read(deserializers=deserializers)
  740 else:
  741     response = None

File ~\miniconda3\envs\qt\lib\site-packages\distributed\comm\tcp.py:254, in TCP.read(self, deserializers)
  251 try:
  252     frames = unpack_frames(frames)
--> 254     msg = await from_frames(
  255         frames,
  256         deserialize=self.deserialize,
  257         deserializers=deserializers,
  258         allow_offload=self.allow_offload,
  259     )
  260 except EOFError:
  261     # Frames possibly garbled or truncated by communication error
  262     self.abort()

File ~\miniconda3\envs\qt\lib\site-packages\distributed\comm\utils.py:96, in from_frames(frames, deserialize, deserializers, allow_offload)
   94     size = sum(map(nbytes, frames))
   95 if allow_offload and deserialize and OFFLOAD_THRESHOLD and size > OFFLOAD_THRESHOLD:
---> 96     res = await offload(_from_frames)
   97 else:
   98     res = _from_frames()

File ~\miniconda3\envs\qt\lib\site-packages\distributed\utils.py:1429, in offload(fn, *args, **kwargs)
 1427 # Retain context vars while deserializing; see https://bugs.python.org/issue34014
 1428 context = contextvars.copy_context()
-> 1429 return await loop.run_in_executor(
 1430     _offload_executor, lambda: context.run(fn, *args, **kwargs)
 1431 )

File ~\miniconda3\envs\qt\lib\concurrent\futures\thread.py:57, in _WorkItem.run(self)
   54     return
   56 try:
---> 57     result = self.fn(*self.args, **self.kwargs)
   58 except BaseException as exc:
   59     self.future.set_exception(exc)

File ~\miniconda3\envs\qt\lib\site-packages\distributed\utils.py:1430, in offload.<locals>.<lambda>()
 1427 # Retain context vars while deserializing; see https://bugs.python.org/issue34014
 1428 context = contextvars.copy_context()
 1429 return await loop.run_in_executor(
-> 1430     _offload_executor, lambda: context.run(fn, *args, **kwargs)
 1431 )

File ~\miniconda3\envs\qt\lib\site-packages\distributed\comm\utils.py:81, in from_frames.<locals>._from_frames()
   79 def _from_frames():
   80     try:
---> 81         return protocol.loads(
   82             frames, deserialize=deserialize, deserializers=deserializers
   83         )
   84     except EOFError:
   85         if size > 1000:

File ~\miniconda3\envs\qt\lib\site-packages\distributed\protocol\core.py:159, in loads(frames, deserialize, deserializers)
  153                 raise ValueError(
  154                     "Unpickle on the Scheduler isn't allowed, set `distributed.scheduler.pickle=true`"
  155                 )
  157         return msgpack_decode_default(obj)
--> 159     return msgpack.loads(
  160         frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  161     )
  163 except Exception:
  164     logger.critical("Failed to deserialize", exc_info=True)

File msgpack\_unpacker.pyx:194, in msgpack._cmsgpack.unpackb()

File ~\miniconda3\envs\qt\lib\site-packages\distributed\protocol\core.py:139, in loads.<locals>._decode_default(obj)
  137     if "compression" in sub_header:
  138         sub_frames = decompress(sub_header, sub_frames)
--> 139     return merge_and_deserialize(
  140         sub_header, sub_frames, deserializers=deserializers
  141     )
  142 else:
  143     return Serialized(sub_header, sub_frames)

File ~\miniconda3\envs\qt\lib\site-packages\distributed\protocol\serialize.py:487, in merge_and_deserialize(header, frames, deserializers)
  483             merged = bytearray().join(subframes)
  485         merged_frames.append(merged)
--> 487 return deserialize(header, merged_frames, deserializers=deserializers)

File ~\miniconda3\envs\qt\lib\site-packages\distributed\protocol\serialize.py:400, in deserialize(header, frames, deserializers)
  397 lst = []
  398 for _header, _length in zip(headers, lengths):
  399     lst.append(
--> 400         deserialize(
  401             _header,
  402             frames[start : start + _length],
  403             deserializers=deserializers,
  404         )
  405     )
  406     start += _length
  407 return cls(lst)

File ~\miniconda3\envs\qt\lib\site-packages\distributed\protocol\serialize.py:416, in deserialize(header, frames, deserializers)
  411     raise TypeError(
  412         "Data serialized with %s but only able to deserialize "
  413         "data with %s" % (name, str(list(deserializers)))
  414     )
  415 dumps, loads, wants_context = families[name]
--> 416 return loads(header, frames)

File ~\miniconda3\envs\qt\lib\site-packages\distributed\protocol\serialize.py:95, in pickle_loads(header, frames)
   88     writeable = len(buffers) * (None,)
   90 buffers = [
   91     memoryview(bytearray(mv) if w else bytes(mv)) if w == mv.readonly else mv
   92     for w, mv in zip(writeable, map(ensure_memoryview, buffers))
   93 ]
---> 95 return pickle.loads(x, buffers=buffers)

File ~\miniconda3\envs\qt\lib\site-packages\distributed\protocol\pickle.py:66, in loads(x, buffers)
   64         return pickle.loads(x, buffers=buffers)
   65     else:
---> 66         return pickle.loads(x)
   67 except Exception:
   68     logger.info("Failed to deserialize %s", x[:10000], exc_info=True)

UnpicklingError: invalid load key, '\x00'.

Anything else we need to know?:

Environment:

  • Dask version: Dask(2022.05.1) Distributed(2022.5.1)
  • Python version: Python 3.8.12
  • Operating System: Windows10
  • Install method (conda, pip, source): pip
Cluster Dump State:

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions