Skip to content

cuDF UCX Deserialization Failure #2830

@quasiben

Description

@quasiben

When naively trying to deserialize a distributed cuDF dataframe an error is raised in cuDF but I suspect the issue may be with the deserialization scheme in distributed.

import os
import dask
import distributed
import cudf
import dask.array as da
import dask.dataframe as dd

print(f'Dask: {dask.__version__}')
print(f'Distributed: {distributed.__version__}')
print(f'cuDF: {cudf.__version__}')

n_rows = 10
n_keys = 3

ddf = dd.concat([
    da.random.random(n_rows).to_dask_dataframe(columns='x'),
    da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()

print(ddf.head())

print(ddf.map_partitions(cudf.from_pandas).head().to_pandas()) # succeeds 

from dask.distributed import Client, wait
from dask_cuda import DGX

cluster = DGX(CUDA_VISIBLE_DEVICES=[0,1,2,3], dashboard_address='0.0.0.0:8989')
client = Client(cluster)
client

ddf2 = dd.concat([
    da.random.random(n_rows).to_dask_dataframe(columns='x'),
    da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()


ddf2.map_partitions(cudf.from_pandas).head().to_pandas() # fails:

Gist to Notebook

Error

Details
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/nfs/bzaitlen/GitRepos/distributed/distributed/protocol/core.py", line 126, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/home/nfs/bzaitlen/GitRepos/distributed/distributed/protocol/serialize.py", line 190, in deserialize
    return loads(header, frames)
  File "/home/nfs/bzaitlen/GitRepos/distributed/distributed/protocol/cuda.py", line 30, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/bzaitlen/GitRepos/cudf/python/cudf/cudf/dataframe/dataframe.py", line 181, in deserialize
    index = deserialize(index_header, index_frames)
  File "/home/nfs/bzaitlen/GitRepos/distributed/distributed/protocol/serialize.py", line 190, in deserialize
    return loads(header, frames)
  File "/home/nfs/bzaitlen/GitRepos/distributed/distributed/protocol/cuda.py", line 30, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/bzaitlen/GitRepos/cudf/python/cudf/cudf/dataframe/index.py", line 511, in deserialize
    header["payload"], frames[: header["frame_count"]]
  File "/home/nfs/bzaitlen/GitRepos/distributed/distributed/protocol/serialize.py", line 190, in deserialize
    return loads(header, frames)
  File "/home/nfs/bzaitlen/GitRepos/distributed/distributed/protocol/cuda.py", line 30, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/bzaitlen/GitRepos/cudf/python/cudf/cudf/dataframe/numerical.py", line 65, in deserialize
    dtype=deserialize(*header["dtype"]),
  File "/home/nfs/bzaitlen/GitRepos/cudf/python/cudf/cudf/dataframe/numerical.py", line 42, in __init__
    super(NumericalColumn, self).__init__(**kwargs)
  File "/home/nfs/bzaitlen/GitRepos/cudf/python/cudf/cudf/dataframe/columnops.py", line 36, in __init__
    super(TypedColumnBase, self).__init__(**kwargs)
  File "/home/nfs/bzaitlen/GitRepos/cudf/python/cudf/cudf/dataframe/column.py", line 160, in __init__
    self._update_null_count(null_count)
  File "/home/nfs/bzaitlen/GitRepos/cudf/python/cudf/cudf/dataframe/column.py", line 173, in _update_null_count
    assert 0 <= null_count <= len(self)
TypeError: 'NoneType' object cannot be interpreted as an integer
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-12-7015a364c3b2> in <module>
----> 1 ddf2.map_partitions(cudf.from_pandas).head().to_pandas()

~/GitRepos/dask/dask/dataframe/core.py in head(self, n, npartitions, compute)
    956             Whether to compute the result, default is True.
    957         """
--> 958         return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
    959 
    960     def _head(self, n, npartitions, compute, safe):

~/GitRepos/dask/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
    989 
    990         if compute:
--> 991             result = result.compute()
    992         return result
    993 

~/GitRepos/dask/dask/base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 

~/GitRepos/dask/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

~/GitRepos/distributed/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2525                     should_rejoin = False
   2526             try:
-> 2527                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2528             finally:
   2529                 for f in futures.values():

~/GitRepos/distributed/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1821                 direct=direct,
   1822                 local_worker=local_worker,
-> 1823                 asynchronous=asynchronous,
   1824             )
   1825 

~/GitRepos/distributed/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    761         else:
    762             return sync(
--> 763                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    764             )
    765 

~/GitRepos/distributed/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330             e.wait(10)
    331     if error[0]:
--> 332         six.reraise(*error[0])
    333     else:
    334         return result[0]

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/GitRepos/distributed/distributed/utils.py in f()
    315             if callback_timeout is not None:
    316                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317             result[0] = yield future
    318         except Exception as exc:
    319             error[0] = sys.exc_info()

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    734                     if exc_info is not None:
    735                         try:
--> 736                             yielded = self.gen.throw(*exc_info)  # type: ignore
    737                         finally:
    738                             # Break up a reference to itself

~/GitRepos/distributed/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1705                 else:
   1706                     self._gather_future = future
-> 1707                 response = yield future
   1708 
   1709             if response["status"] == "error":

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    734                     if exc_info is not None:
    735                         try:
--> 736                             yielded = self.gen.throw(*exc_info)  # type: ignore
    737                         finally:
    738                             # Break up a reference to itself

~/GitRepos/distributed/distributed/client.py in _gather_remote(self, direct, local_worker)
   1758 
   1759             else:  # ask scheduler to gather data for us
-> 1760                 response = yield self.scheduler.gather(keys=keys)
   1761         finally:
   1762             self._gather_semaphore.release()

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    734                     if exc_info is not None:
    735                         try:
--> 736                             yielded = self.gen.throw(*exc_info)  # type: ignore
    737                         finally:
    738                             # Break up a reference to itself

~/GitRepos/distributed/distributed/core.py in send_recv_from_rpc(**kwargs)
    739             name, comm.name = comm.name, "ConnectionPool." + key
    740             try:
--> 741                 result = yield send_recv(comm=comm, op=key, **kwargs)
    742             finally:
    743                 self.pool.reuse(self.addr, comm)

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    734                     if exc_info is not None:
    735                         try:
--> 736                             yielded = self.gen.throw(*exc_info)  # type: ignore
    737                         finally:
    738                             # Break up a reference to itself

~/GitRepos/distributed/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    533         yield comm.write(msg, serializers=serializers, on_error="raise")
    534         if reply:
--> 535             response = yield comm.read(deserializers=deserializers)
    536         else:
    537             response = None

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

~/GitRepos/distributed/distributed/comm/ucx.py in read(self, deserializers)
    143 
    144         msg = await from_frames(
--> 145             frames, deserialize=self.deserialize, deserializers=deserializers
    146         )
    147 

~/miniconda3/envs/cudf-dev/lib/python3.7/site-packages/tornado/gen.py in wrapper(*args, **kwargs)
    207                 # performance penalty for the synchronous case.
    208                 try:
--> 209                     yielded = next(result)
    210                 except (StopIteration, Return) as e:
    211                     future_set_result_unless_cancelled(

~/GitRepos/distributed/distributed/comm/utils.py in from_frames(frames, deserialize, deserializers)
     83         res = yield offload(_from_frames)
     84     else:
---> 85         res = _from_frames()
     86 
     87     raise gen.Return(res)

~/GitRepos/distributed/distributed/comm/utils.py in _from_frames()
     69         try:
     70             return protocol.loads(
---> 71                 frames, deserialize=deserialize, deserializers=deserializers
     72             )
     73         except EOFError:

~/GitRepos/distributed/distributed/protocol/core.py in loads(frames, deserialize, deserializers)
    124                     fs = decompress(head, fs)
    125                 fs = merge_frames(head, fs)
--> 126                 value = _deserialize(head, fs, deserializers=deserializers)
    127             else:
    128                 value = Serialized(head, fs)

~/GitRepos/distributed/distributed/protocol/serialize.py in deserialize(header, frames, deserializers)
    188         )
    189     dumps, loads, wants_context = families[name]
--> 190     return loads(header, frames)
    191 
    192 

~/GitRepos/distributed/distributed/protocol/cuda.py in cuda_loads(header, frames)
     28     typ = pickle.loads(header["type-serialized"])
     29     loads = cuda_deserialize.dispatch(typ)
---> 30     return loads(header, frames)
     31 
     32 

~/GitRepos/cudf/python/cudf/cudf/dataframe/dataframe.py in deserialize(cls, deserialize, header, frames)
    179         index_header = header["index"]
    180         index_frames = frames[: header["index_frame_count"]]
--> 181         index = deserialize(index_header, index_frames)
    182         # Reconstruct the columns
    183         column_frames = frames[header["index_frame_count"] :]

~/GitRepos/distributed/distributed/protocol/serialize.py in deserialize(header, frames, deserializers)
    188         )
    189     dumps, loads, wants_context = families[name]
--> 190     return loads(header, frames)
    191 
    192 

~/GitRepos/distributed/distributed/protocol/cuda.py in cuda_loads(header, frames)
     28     typ = pickle.loads(header["type-serialized"])
     29     loads = cuda_deserialize.dispatch(typ)
---> 30     return loads(header, frames)
     31 
     32 

~/GitRepos/cudf/python/cudf/cudf/dataframe/index.py in deserialize(cls, deserialize, header, frames)
    509     def deserialize(cls, deserialize, header, frames):
    510         payload = deserialize(
--> 511             header["payload"], frames[: header["frame_count"]]
    512         )
    513         return cls(payload)

~/GitRepos/distributed/distributed/protocol/serialize.py in deserialize(header, frames, deserializers)
    188         )
    189     dumps, loads, wants_context = families[name]
--> 190     return loads(header, frames)
    191 
    192 

~/GitRepos/distributed/distributed/protocol/cuda.py in cuda_loads(header, frames)
     28     typ = pickle.loads(header["type-serialized"])
     29     loads = cuda_deserialize.dispatch(typ)
---> 30     return loads(header, frames)
     31 
     32 

~/GitRepos/cudf/python/cudf/cudf/dataframe/numerical.py in deserialize(cls, deserialize, header, frames)
     63             mask=mask,
     64             null_count=header["null_count"],
---> 65             dtype=deserialize(*header["dtype"]),
     66         )
     67         return col

~/GitRepos/cudf/python/cudf/cudf/dataframe/numerical.py in __init__(self, **kwargs)
     40             Data type
     41         """
---> 42         super(NumericalColumn, self).__init__(**kwargs)
     43         assert self._dtype == self._data.dtype
     44 

~/GitRepos/cudf/python/cudf/cudf/dataframe/columnops.py in __init__(self, **kwargs)
     34     def __init__(self, **kwargs):
     35         dtype = kwargs.pop("dtype")
---> 36         super(TypedColumnBase, self).__init__(**kwargs)
     37         # Logical dtype
     38         self._dtype = pd.api.types.pandas_dtype(dtype)

~/GitRepos/cudf/python/cudf/cudf/dataframe/column.py in __init__(self, data, mask, null_count, name)
    158             assert mask.size * utils.mask_bitsize >= len(self)
    159 
--> 160         self._update_null_count(null_count)
    161 
    162     def _update_null_count(self, null_count=None):

~/GitRepos/cudf/python/cudf/cudf/dataframe/column.py in _update_null_count(self, null_count)
    171                 null_count = 0
    172 
--> 173         assert 0 <= null_count <= len(self)
    174         if null_count == 0:
    175             # Remove mask if null_count is zero

TypeError: 'NoneType' object cannot be interpreted as an integer

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions