Skip to content

df.compute() on Mac OS LocalCluster with Client(processes=True) generates OSError #4674

@chambersmatt

Description

@chambersmatt

What happened:

Attempting to process dask dataframe on LocalCluster on Mac OS 11.2.3 with 8 cores and 16gb of RAM.

df.compute() works when Client(processes=False) but fails if Client(processes=True).

Other operations on the dataframe complete successfully, but attempting to return the entirety of the dataframe fails, eg:

df[df.columns[:120]].compute() succeeds, while df.compute() fails.

See traceback below:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-38-9bb416d45ef6> in <module>
----> 1 df.compute()

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
    282         dask.base.compute
    283         """
--> 284         (result,) = compute(self, traverse=False, **kwargs)
    285         return result
    286 

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
    564         postcomputes.append(x.__dask_postcompute__())
    565 
--> 566     results = schedule(dsk, keys, **kwargs)
    567     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    568 

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2664                     should_rejoin = False
   2665             try:
-> 2666                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2667             finally:
   2668                 for f in futures.values():

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1973             else:
   1974                 local_worker = None
-> 1975             return self.sync(
   1976                 self._gather,
   1977                 futures,

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    841             return future
    842         else:
--> 843             return sync(
    844                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    845             )

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    351     if error[0]:
    352         typ, exc, tb = error[0]
--> 353         raise exc.with_traceback(tb)
    354     else:
    355         return result[0]

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/utils.py in f()
    334             if callback_timeout is not None:
    335                 future = asyncio.wait_for(future, callback_timeout)
--> 336             result[0] = yield future
    337         except Exception as exc:
    338             error[0] = sys.exc_info()

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1867                 else:
   1868                     self._gather_future = future
-> 1869                 response = await future
   1870 
   1871             if response["status"] == "error":

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
   1918 
   1919             else:  # ask scheduler to gather data for us
-> 1920                 response = await retry_operation(self.scheduler.gather, keys=keys)
   1921 
   1922         return response

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
    383         dask.config.get("distributed.comm.retry.delay.max"), default="s"
    384     )
--> 385     return await retry(
    386         partial(coro, *args, **kwargs),
    387         count=retry_count,

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    368                 delay *= 1 + random.random() * jitter_fraction
    369             await asyncio.sleep(delay)
--> 370     return await coro()
    371 
    372 

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    860             name, comm.name = comm.name, "ConnectionPool." + key
    861             try:
--> 862                 result = await send_recv(comm=comm, op=key, **kwargs)
    863             finally:
    864                 self.pool.reuse(self.addr, comm)

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    643         await comm.write(msg, serializers=serializers, on_error="raise")
    644         if reply:
--> 645             response = await comm.read(deserializers=deserializers)
    646         else:
    647             response = None

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    198 
    199             frames = bytearray(frames_nbytes)
--> 200             n = await stream.read_into(frames)
    201             assert n == frames_nbytes, (n, frames_nbytes)
    202         except StreamClosedError as e:

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/tornado/iostream.py in read_into(self, buf, partial)
    473 
    474         try:
--> 475             self._try_inline_read()
    476         except:
    477             future.add_done_callback(lambda f: f.exception())

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/tornado/iostream.py in _try_inline_read(self)
    840             return
    841         self._check_closed()
--> 842         pos = self._read_to_buffer_loop()
    843         if pos is not None:
    844             self._read_from_buffer(pos)

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/tornado/iostream.py in _read_to_buffer_loop(self)
    753             # can't see it; the only way to find out if it's there is to
    754             # try to read it.
--> 755             if self._read_to_buffer() == 0:
    756                 break
    757 

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/tornado/iostream.py in _read_to_buffer(self)
    865                     else:
    866                         buf = bytearray(self.read_chunk_size)
--> 867                     bytes_read = self.read_from_fd(buf)
    868                 except (socket.error, IOError, OSError) as e:
    869                     # ssl.SSLError is a subclass of socket.error

~/.local/share/virtualenvs/stonks-ncnmWyPa/lib/python3.9/site-packages/tornado/iostream.py in read_from_fd(***failed resolving arguments***)
   1138     def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
   1139         try:
-> 1140             return self.socket.recv_into(buf, len(buf))
   1141         except BlockingIOError:
   1142             return None

OSError: [Errno 22] Invalid argument

What you expected to happen:

Successful return of computed dataframe using multiple processes.

Minimal Complete Verifiable Example:

from dask.distributed import Client
from dask import dataframe as dd

# load 3.5gb CSV
basefile = '/path/to/basefile.csv'
dtypes = {'foo': 'int8', 'bar': 'float16', 'total': 'int64'}
df = dd.read_csv(basefile, sep='\t', dtype=dtypes, parse_dates=['Date'])

client = Client(processes=True)

# fails
df.compute()

# succeeds
df[df.columns[:120]].compute()

Environment:

  • Dask version: dask-2021.4.0
  • Python version: 3.9.2
  • Operating System: Mac OS 11.2.3
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions