Skip to content

Exception kills all workers/scheduler #1469

@jhamman

Description

@jhamman

I'm running dask-distributed with xarray using dask-mpi to launch the scheduler/workers on a PBS system (cheyenne). Under certain conditions, when an Exception is raised, it appears dask is killing the workers and all my PBS jobs are killed. Ideally, distributed would handle these failures a bit more gracefully and at a minimum keep the PBS jobs running.

The exit code given to PBS was:

265: SIGKILL (e.g. kill -9 command), possible out-of-memory error

cc @mrocklin

Traceback:

Details
distributed.utils - ERROR - ("('open_dataset-ec23f9cbbe2cf1b92d4d771bb7537415t_mean-dc61bc3a3dff54591f7eee3c1e60aa57', 23, 0, 0)", 'tcp://10.148.13.182:54442')
Traceback (most recent call last):
  File "/glade/u/home/jhamman/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py", line 229, in f
    result[0] = yield make_coro()
  File "/glade/u/home/jhamman/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/glade/u/home/jhamman/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/glade/u/home/jhamman/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/glade/u/home/jhamman/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py", line 1246, in _gather
    traceback)
  File "/glade/u/home/jhamman/anaconda/envs/pangeo/lib/python3.6/site-packages/six.py", line 686, in reraise
    raise value
distributed.scheduler.KilledWorker: ("('open_dataset-ec23f9cbbe2cf1b92d4d771bb7537415t_mean-dc61bc3a3dff54591f7eee3c1e60aa57', 23, 0, 0)", 'tcp://10.148.13.182:54442')
---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<timed exec> in <module>()

~/anaconda/envs/pangeo/lib/python3.6/site-packages/xarray/core/dataarray.py in compute(self)
    590         """
    591         new = self.copy(deep=False)
--> 592         return new.load()
    593 
    594     def persist(self):

~/anaconda/envs/pangeo/lib/python3.6/site-packages/xarray/core/dataarray.py in load(self)
    573         working with many file objects on disk.
    574         """
--> 575         ds = self._to_temp_dataset().load()
    576         new = self._from_temp_dataset(ds)
    577         self._variable = new._variable

~/anaconda/envs/pangeo/lib/python3.6/site-packages/xarray/core/dataset.py in load(self)
    461 
    462             # evaluate all the dask arrays simultaneously
--> 463             evaluated_data = da.compute(*lazy_data.values())
    464 
    465             for k, data in zip(lazy_data, evaluated_data):

~/anaconda/envs/pangeo/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    204     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    205     keys = [var._keys() for var in variables]
--> 206     results = get(dsk, keys, **kwargs)
    207 
    208     results_iter = iter(results)

~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, **kwargs)
   1921         if sync:
   1922             try:
-> 1923                 results = self.gather(packed, asynchronous=asynchronous)
   1924             finally:
   1925                 for f in futures.values():

~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1366             return self.sync(self._gather, futures, errors=errors,
   1367                              direct=direct, local_worker=local_worker,
-> 1368                              asynchronous=asynchronous)
   1369 
   1370     @gen.coroutine

~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    538             return future
    539         else:
--> 540             return sync(self.loop, func, *args, **kwargs)
    541 
    542     def __str__(self):

~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    239         e.wait(1000000)
    240     if error[0]:
--> 241         six.reraise(*error[0])
    242     else:
    243         return result[0]

~/anaconda/envs/pangeo/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py in f()
    227             yield gen.moment
    228             thread_state.asynchronous = True
--> 229             result[0] = yield make_coro()
    230         except Exception as exc:
    231             logger.exception(exc)

~/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

~/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

~/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)

~/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1244                             six.reraise(type(exception),
   1245                                         exception,
-> 1246                                         traceback)
   1247                     if errors == 'skip':
   1248                         bad_keys.add(key)

~/anaconda/envs/pangeo/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

KilledWorker: ("('open_dataset-ec23f9cbbe2cf1b92d4d771bb7537415t_mean-dc61bc3a3dff54591f7eee3c1e60aa57', 23, 0, 0)", 'tcp://10.148.13.182:54442')```
</details>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions