Skip to content

Key error on future.retry() with dask/distributed 2021.7.2 #5198

@gerrymanoim

Description

@gerrymanoim

Apologies in advance, I haven't found a reliable way to trigger this error in order to provide a MCVE. Hopefully this description and traceback is helpful.

I have code of the following form, that submits a bunch of tasks, releases them when they're completed, with some very simple retry mechanism to deal with some flakiness of downstream services.

            futures = dask_client.compute(tasks)
            errors = []
            max_reties = 3
            retry_counter: Dict[Future, int] = defaultdict(lambda: 0)
            futures_monitor = as_completed(futures)
            for future in futures_monitor:
                if future.status == "error":
                    if retry_counter.get(future, 0) < max_reties:
                        retry_counter[future] += 1
                        future.retry()
                        futures_monitor.add(future)
                    else:
                        errors.append((future.exception(), future.traceback()))
                        future.release()
                else:
                    future.release()

            return errors

This code worked well in simple testing. Today when trying this on a larger set of tasks (~11k total), I was surprised to see I get a KeyError when trying to retry the errored future:

    238                     if retry_counter.get(future, 0) < max_reties:
    239                         retry_counter[future] += 1
--> 240                         future.retry()
    241                         futures_monitor.add(future)
    242                     else:

.../client.py in retry(self, **kwargs)
    320         Client.retry
    321         """
--> 322         return self.client.retry([self], **kwargs)
    323 
    324     def cancelled(self):

../distributed/client.py in retry(self, futures, asynchronous)
   2277         futures : list of Futures
   2278         """
-> 2279         return self.sync(self._retry, futures, asynchronous=asynchronous)
   2280 
   2281     async def _publish_dataset(self, *args, name=None, override=False, **kwargs):

.../distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    865         else:
    866             return sync(
--> 867                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    868             )
    869 

.../distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    324     if error[0]:
    325         typ, exc, tb = error[0]
--> 326         raise exc.with_traceback(tb)
    327     else:
    328         return result[0]

.../distributed/utils.py in f()
    307             if callback_timeout is not None:
    308                 future = asyncio.wait_for(future, callback_timeout)
--> 309             result[0] = yield future
    310         except Exception:
    311             error[0] = sys.exc_info()

.../tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

.../distributed/client.py in _retry(self, futures)
   2266         response = await self.scheduler.retry(keys=keys, client=self.id)
   2267         for key in response:
-> 2268             st = self.futures[key]
   2269             st.retry()
   2270 

KeyError: 'the-task-uuid'

I can add another try/except here on the retry, but it feels like I'm fighting the system at that point.

Is this a bad pattern?
I'm not entirely sure the mechanism by which retry would lead to a key error in this code path.

Environment:

  • Dask version: 2021.7.2
  • Python version: 3.7
  • Operating System: Linux

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions