Skip to content

Locks Not Released on MemoryError #7371

@jamesstidard

Description

@jamesstidard

Describe the issue:
Hi, I'm currently using dask as a backend for a web service and trying to improve the stability. I'm using dask.distributed.Lock in order to coordinate access to certain files by the workers.

I've noticed when a worker handles a task that results in a MemoryError and the worker being restarted, the dask.distributed.Locks that the worker has acquired are never released. This results in other workers slowly hitting the same dask.distributed.Lock and not being able to acquire it, though no worker is currently using it.

Because the long running nature of some of these tasks, this can lead to workers that will end up freezing on trying to acquire a lock, unable to progress, or do other work. A timeout is another option, though with how long these tasks might take, a appropriate timeout will lead to essentially the same impact as just hanging forever (especially as that timeout will need to be waited each time a worker reencounters the same lock).

Having the ability, built in or not, to be able to release these locks when the worker is killed, would be really useful to handle the stability the service.

Additionally, other context managers are never able to clean up, like you might expect in native python code. So if a MemoryError occurs while not just the dask.distributed.Lock is acquired, but when a open context manager is open, the __exit__ for these is never able to call, like it would if a unhandled exception occurred in non-dask-distributed code. This is why my example uses multiple opens.

Thanks.

Minimal Complete Verifiable Example:
This is the bit of code I used to reproduce that, tweaking the GIGABYTES_TARGET variable to induce a memory error.

$ dask scheduler
$ dask worker 0.0.0.0:8786 --memory-limit '1GB'
$ python main.py
# main.py
import time

import numpy as np

from dask.delayed import delayed
from dask.distributed import Client, Lock, get_client


GIGABYTES_TARGET = 1


@delayed(pure=False)
def memory_hog():
    client = get_client()

    with open("log.txt", "w+") as fp:
        fp.write("entered function\n")

    with Lock(name="my-lock", client=client):
        with open("log.txt", "a") as fp:
            fp.write("entered lock\n")

        dtype = np.dtype("float16")
        n_required = int((GIGABYTES_TARGET * 1e9) / dtype.itemsize)
        _ = np.ones((n_required,), dtype=dtype)

        with open("log.txt", "a") as fp:
            fp.write("entered sleep\n")

        time.sleep(2)

    with open("log.txt", "a") as fp:
        fp.write("exit function\n")


def main():
    client = Client("0.0.0.0:8786")
    client.compute(memory_hog(), sync=True)
    print("done")


if __name__ == "__main__":
    main()

Environment:

  • Dask version: ==2022.12.0
  • Python version: 3.10.4
  • Operating System: macOS (also seen on production server, ubuntu running python Docker container)
  • Install method (conda, pip, source): pipenv

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementImprove existing functionality or make things work bettergood second issueClearly described, educational, but less trivial than "good first issue".

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions