Skip to content

Serializer not passed to workers #7381

@AlecThomson

Description

@AlecThomson

Describe the issue:
Possibly related to #5561. I'm attempted to find a fix for issues related dask/dask-mpi#76, and astropy/astropy#11317. As part of this I have tried changing the serializer/deserializer (to e.g. ["pickle"] for both) options in my Client. I found, however, that this then gave me the error of:

2022-12-09 13:30:31,195 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/protocol/core.py", line 138, in _decode_default
    return merge_and_deserialize(
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 497, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 421, in deserialize
    raise TypeError(
TypeError: Data serialized with dask but only able to deserialize data with ['pickle']

After reading #5561, it looks like a similar issue is taking place but instead with the predefined serializers.

Minimal Complete Verifiable Example:

from dask.distributed import Client, get_client
from dask import delayed


@delayed()
def worker(i):
    client = get_client()
    print(f"Worker -- {client._serializers=}")
    print(f"Worker -- {client._deserializers=}")
    return i

def main():
    with Client(serializers=["pickle"], deserializers=["pickle"]) as client:
        print(f"Main -- {client._serializers=}")
        print(f"Main -- {client._deserializers=}")

        tests = []
        for i in range(10):
            tests.append(worker(i))
        _ = client.compute(tests)


if __name__ == "__main__":
    main()

Output:

Main -- client._serializers=['pickle']
Main -- client._deserializers=['pickle']
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None
Worker -- client._deserializers=None
Worker -- client._serializers=None
Worker -- client._deserializers=None

Anything else we need to know?:
As a slight aside, I think this lies at the heart of compatibility issues with astropy.coordinates and astropy.units -- since the serialization of these modules appears to be breaking.

Environment:

  • Dask version: 2022.9.2
  • Python version: Python 3.8.13
  • Operating System: SUSE Linux Enterprise Server 12 SP3
  • Install method (conda, pip, source): conda

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions