Skip to content

Commit

Permalink
pythongh-109593: Fix reentrancy issue in multiprocessing resource_tra…
Browse files Browse the repository at this point in the history
…cker (python#109629)

---------

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and csm10495 committed Sep 28, 2023
1 parent a9ed4b2 commit 80619be
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 2 deletions.
34 changes: 32 additions & 2 deletions Lib/multiprocessing/resource_tracker.py
Expand Up @@ -51,15 +51,31 @@
})


class ReentrantCallError(RuntimeError):
pass


class ResourceTracker(object):

def __init__(self):
self._lock = threading.Lock()
self._lock = threading.RLock()
self._fd = None
self._pid = None

def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
# gets interrupted by a garbage collection, invoking a finalizer (*)
# that itself calls back into ResourceTracker.
# (*) for example the SemLock finalizer
raise ReentrantCallError(
"Reentrant call into the multiprocessing resource tracker")

def _stop(self):
with self._lock:
# This should not happen (_stop() isn't called by a finalizer)
# but we check for it anyway.
if self._lock._recursion_count() > 1:
return self._reentrant_call_error()
if self._fd is None:
# not running
return
Expand All @@ -81,6 +97,9 @@ def ensure_running(self):
This can be run from any process. Usually a child process will use
the resource created by its parent.'''
with self._lock:
if self._lock._recursion_count() > 1:
# The code below is certainly not reentrant-safe, so bail out
return self._reentrant_call_error()
if self._fd is not None:
# resource tracker was launched before, is it still running?
if self._check_alive():
Expand Down Expand Up @@ -159,7 +178,17 @@ def unregister(self, name, rtype):
self._send('UNREGISTER', name, rtype)

def _send(self, cmd, name, rtype):
self.ensure_running()
try:
self.ensure_running()
except ReentrantCallError:
# The code below might or might not work, depending on whether
# the resource tracker was already running and still alive.
# Better warn the user.
# (XXX is warnings.warn itself reentrant-safe? :-)
warnings.warn(
f"ResourceTracker called reentrantly for resource cleanup, "
f"which is unsupported. "
f"The {rtype} object {name!r} might leak.")
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
if len(msg) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
Expand All @@ -176,6 +205,7 @@ def _send(self, cmd, name, rtype):
unregister = _resource_tracker.unregister
getfd = _resource_tracker.getfd


def main(fd):
'''Run resource tracker.'''
# protect the process from ^C and "killall python" etc
Expand Down
36 changes: 36 additions & 0 deletions Lib/test/lock_tests.py
Expand Up @@ -330,6 +330,42 @@ def test_release_save_unacquired(self):
lock.release()
self.assertRaises(RuntimeError, lock._release_save)

def test_recursion_count(self):
lock = self.locktype()
self.assertEqual(0, lock._recursion_count())
lock.acquire()
self.assertEqual(1, lock._recursion_count())
lock.acquire()
lock.acquire()
self.assertEqual(3, lock._recursion_count())
lock.release()
self.assertEqual(2, lock._recursion_count())
lock.release()
lock.release()
self.assertEqual(0, lock._recursion_count())

phase = []

def f():
lock.acquire()
phase.append(None)
while len(phase) == 1:
_wait()
lock.release()
phase.append(None)

with threading_helper.wait_threads_exit():
start_new_thread(f, ())
while len(phase) == 0:
_wait()
self.assertEqual(len(phase), 1)
self.assertEqual(0, lock._recursion_count())
phase.append(None)
while len(phase) == 2:
_wait()
self.assertEqual(len(phase), 3)
self.assertEqual(0, lock._recursion_count())

def test_different_thread(self):
# Cannot release from a different thread
lock = self.locktype()
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_importlib/test_locks.py
Expand Up @@ -29,6 +29,8 @@ class ModuleLockAsRLockTests:
test_timeout = None
# _release_save() unsupported
test_release_save_unacquired = None
# _recursion_count() unsupported
test_recursion_count = None
# lock status in repr unsupported
test_repr = None
test_locked_repr = None
Expand Down
3 changes: 3 additions & 0 deletions Lib/test/test_threading.py
Expand Up @@ -1783,6 +1783,9 @@ class ConditionAsRLockTests(lock_tests.RLockTests):
# Condition uses an RLock by default and exports its API.
locktype = staticmethod(threading.Condition)

def test_recursion_count(self):
self.skipTest("Condition does not expose _recursion_count()")

class ConditionTests(lock_tests.ConditionTests):
condtype = staticmethod(threading.Condition)

Expand Down
7 changes: 7 additions & 0 deletions Lib/threading.py
Expand Up @@ -245,6 +245,13 @@ def _release_save(self):
def _is_owned(self):
return self._owner == get_ident()

# Internal method used for reentrancy checks

def _recursion_count(self):
if self._owner != get_ident():
return 0
return self._count

_PyRLock = _RLock


Expand Down
@@ -0,0 +1 @@
Avoid deadlocking on a reentrant call to the multiprocessing resource tracker. Such a reentrant call, though unlikely, can happen if a GC pass invokes the finalizer for a multiprocessing object such as SemLock.
14 changes: 14 additions & 0 deletions Modules/_threadmodule.c
Expand Up @@ -490,6 +490,18 @@ PyDoc_STRVAR(rlock_release_save_doc,
\n\
For internal use by `threading.Condition`.");

static PyObject *
rlock_recursion_count(rlockobject *self, PyObject *Py_UNUSED(ignored))
{
unsigned long tid = PyThread_get_thread_ident();
return PyLong_FromUnsignedLong(
self->rlock_owner == tid ? self->rlock_count : 0UL);
}

PyDoc_STRVAR(rlock_recursion_count_doc,
"_recursion_count() -> int\n\
\n\
For internal use by reentrancy checks.");

static PyObject *
rlock_is_owned(rlockobject *self, PyObject *Py_UNUSED(ignored))
Expand Down Expand Up @@ -565,6 +577,8 @@ static PyMethodDef rlock_methods[] = {
METH_VARARGS, rlock_acquire_restore_doc},
{"_release_save", (PyCFunction)rlock_release_save,
METH_NOARGS, rlock_release_save_doc},
{"_recursion_count", (PyCFunction)rlock_recursion_count,
METH_NOARGS, rlock_recursion_count_doc},
{"__enter__", _PyCFunction_CAST(rlock_acquire),
METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
{"__exit__", (PyCFunction)rlock_release,
Expand Down

0 comments on commit 80619be

Please sign in to comment.