diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 3783c1ffc6e4a9..8e41f461cc934e 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -51,15 +51,31 @@ }) +class ReentrantCallError(RuntimeError): + pass + + class ResourceTracker(object): def __init__(self): - self._lock = threading.Lock() + self._lock = threading.RLock() self._fd = None self._pid = None + def _reentrant_call_error(self): + # gh-109629: this happens if an explicit call to the ResourceTracker + # gets interrupted by a garbage collection, invoking a finalizer (*) + # that itself calls back into ResourceTracker. + # (*) for example the SemLock finalizer + raise ReentrantCallError( + "Reentrant call into the multiprocessing resource tracker") + def _stop(self): with self._lock: + # This should not happen (_stop() isn't called by a finalizer) + # but we check for it anyway. + if self._lock._recursion_count() > 1: + return self._reentrant_call_error() if self._fd is None: # not running return @@ -81,6 +97,9 @@ def ensure_running(self): This can be run from any process. Usually a child process will use the resource created by its parent.''' with self._lock: + if self._lock._recursion_count() > 1: + # The code below is certainly not reentrant-safe, so bail out + return self._reentrant_call_error() if self._fd is not None: # resource tracker was launched before, is it still running? if self._check_alive(): @@ -159,7 +178,17 @@ def unregister(self, name, rtype): self._send('UNREGISTER', name, rtype) def _send(self, cmd, name, rtype): - self.ensure_running() + try: + self.ensure_running() + except ReentrantCallError: + # The code below might or might not work, depending on whether + # the resource tracker was already running and still alive. + # Better warn the user. + # (XXX is warnings.warn itself reentrant-safe? :-) + warnings.warn( + f"ResourceTracker called reentrantly for resource cleanup, " + f"which is unsupported. " + f"The {rtype} object {name!r} might leak.") msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') if len(msg) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF @@ -176,6 +205,7 @@ def _send(self, cmd, name, rtype): unregister = _resource_tracker.unregister getfd = _resource_tracker.getfd + def main(fd): '''Run resource tracker.''' # protect the process from ^C and "killall python" etc diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py index 0890ec87afd1c6..e53e24b18f2760 100644 --- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -330,6 +330,42 @@ def test_release_save_unacquired(self): lock.release() self.assertRaises(RuntimeError, lock._release_save) + def test_recursion_count(self): + lock = self.locktype() + self.assertEqual(0, lock._recursion_count()) + lock.acquire() + self.assertEqual(1, lock._recursion_count()) + lock.acquire() + lock.acquire() + self.assertEqual(3, lock._recursion_count()) + lock.release() + self.assertEqual(2, lock._recursion_count()) + lock.release() + lock.release() + self.assertEqual(0, lock._recursion_count()) + + phase = [] + + def f(): + lock.acquire() + phase.append(None) + while len(phase) == 1: + _wait() + lock.release() + phase.append(None) + + with threading_helper.wait_threads_exit(): + start_new_thread(f, ()) + while len(phase) == 0: + _wait() + self.assertEqual(len(phase), 1) + self.assertEqual(0, lock._recursion_count()) + phase.append(None) + while len(phase) == 2: + _wait() + self.assertEqual(len(phase), 3) + self.assertEqual(0, lock._recursion_count()) + def test_different_thread(self): # Cannot release from a different thread lock = self.locktype() diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py index ba9cf51c261d52..7091c36aaaf761 100644 --- a/Lib/test/test_importlib/test_locks.py +++ b/Lib/test/test_importlib/test_locks.py @@ -29,6 +29,8 @@ class ModuleLockAsRLockTests: test_timeout = None # _release_save() unsupported test_release_save_unacquired = None + # _recursion_count() unsupported + test_recursion_count = None # lock status in repr unsupported test_repr = None test_locked_repr = None diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 9c16c4044f66a8..71fcad268b8036 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1783,6 +1783,9 @@ class ConditionAsRLockTests(lock_tests.RLockTests): # Condition uses an RLock by default and exports its API. locktype = staticmethod(threading.Condition) + def test_recursion_count(self): + self.skipTest("Condition does not expose _recursion_count()") + class ConditionTests(lock_tests.ConditionTests): condtype = staticmethod(threading.Condition) diff --git a/Lib/threading.py b/Lib/threading.py index f6bbdb0d0c80ee..31cefd2143a8c4 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -245,6 +245,13 @@ def _release_save(self): def _is_owned(self): return self._owner == get_ident() + # Internal method used for reentrancy checks + + def _recursion_count(self): + if self._owner != get_ident(): + return 0 + return self._count + _PyRLock = _RLock diff --git a/Misc/NEWS.d/next/Library/2023-09-22-20-16-44.gh-issue-109593.LboaNM.rst b/Misc/NEWS.d/next/Library/2023-09-22-20-16-44.gh-issue-109593.LboaNM.rst new file mode 100644 index 00000000000000..292aea0be24dfb --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-09-22-20-16-44.gh-issue-109593.LboaNM.rst @@ -0,0 +1 @@ +Avoid deadlocking on a reentrant call to the multiprocessing resource tracker. Such a reentrant call, though unlikely, can happen if a GC pass invokes the finalizer for a multiprocessing object such as SemLock. diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index fa98df516b8b10..e77e30dfe5e821 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -490,6 +490,18 @@ PyDoc_STRVAR(rlock_release_save_doc, \n\ For internal use by `threading.Condition`."); +static PyObject * +rlock_recursion_count(rlockobject *self, PyObject *Py_UNUSED(ignored)) +{ + unsigned long tid = PyThread_get_thread_ident(); + return PyLong_FromUnsignedLong( + self->rlock_owner == tid ? self->rlock_count : 0UL); +} + +PyDoc_STRVAR(rlock_recursion_count_doc, +"_recursion_count() -> int\n\ +\n\ +For internal use by reentrancy checks."); static PyObject * rlock_is_owned(rlockobject *self, PyObject *Py_UNUSED(ignored)) @@ -565,6 +577,8 @@ static PyMethodDef rlock_methods[] = { METH_VARARGS, rlock_acquire_restore_doc}, {"_release_save", (PyCFunction)rlock_release_save, METH_NOARGS, rlock_release_save_doc}, + {"_recursion_count", (PyCFunction)rlock_recursion_count, + METH_NOARGS, rlock_recursion_count_doc}, {"__enter__", _PyCFunction_CAST(rlock_acquire), METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc}, {"__exit__", (PyCFunction)rlock_release,