From 0fb86c7c5666b92ca139821ae30694e7d15360d5 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Jun 2023 15:02:19 -0600 Subject: [PATCH] gh-104812: Run Pending Calls in any Thread (gh-104813) For a while now, pending calls only run in the main thread (in the main interpreter). This PR changes things to allow any thread run a pending call, unless the pending call was explicitly added for the main thread to run. --- Include/cpython/ceval.h | 2 + Include/internal/pycore_ceval.h | 3 +- Include/internal/pycore_ceval_state.h | 38 +- Include/internal/pycore_pystate.h | 8 - Lib/test/support/threading_helper.py | 9 +- Lib/test/test_capi/test_misc.py | 401 +++++++++++++++++- ...-06-02-15-15-41.gh-issue-104812.dfZiG5.rst | 9 + Modules/_queuemodule.c | 3 +- Modules/_testinternalcapi.c | 121 +++++- Modules/_threadmodule.c | 3 +- Modules/signalmodule.c | 6 +- Python/ceval.c | 55 +++ Python/ceval_gil.c | 213 ++++++---- Python/pylifecycle.c | 3 + Python/pystate.c | 3 +- Tools/c-analyzer/cpython/ignored.tsv | 1 + 16 files changed, 760 insertions(+), 118 deletions(-) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst diff --git a/Include/cpython/ceval.h b/Include/cpython/ceval.h index 0fbbee10c2edce0..a9616bd6a4f5186 100644 --- a/Include/cpython/ceval.h +++ b/Include/cpython/ceval.h @@ -22,6 +22,8 @@ PyAPI_FUNC(PyObject *) _PyEval_EvalFrameDefault(PyThreadState *tstate, struct _P PyAPI_FUNC(void) _PyEval_SetSwitchInterval(unsigned long microseconds); PyAPI_FUNC(unsigned long) _PyEval_GetSwitchInterval(void); +PyAPI_FUNC(int) _PyEval_MakePendingCalls(PyThreadState *); + PyAPI_FUNC(Py_ssize_t) PyUnstable_Eval_RequestCodeExtraIndex(freefunc); // Old name -- remove when this API changes: _Py_DEPRECATED_EXTERNALLY(3.12) static inline Py_ssize_t diff --git a/Include/internal/pycore_ceval.h b/Include/internal/pycore_ceval.h index ca2703781de4b08..9e9b523e7c22228 100644 --- a/Include/internal/pycore_ceval.h +++ b/Include/internal/pycore_ceval.h @@ -27,7 +27,8 @@ PyAPI_FUNC(void) _PyEval_SignalReceived(PyInterpreterState *interp); PyAPI_FUNC(int) _PyEval_AddPendingCall( PyInterpreterState *interp, int (*func)(void *), - void *arg); + void *arg, + int mainthreadonly); PyAPI_FUNC(void) _PyEval_SignalAsyncExc(PyInterpreterState *interp); #ifdef HAVE_FORK extern PyStatus _PyEval_ReInitThreads(PyThreadState *tstate); diff --git a/Include/internal/pycore_ceval_state.h b/Include/internal/pycore_ceval_state.h index 95d1fa16ba40dc9..e56e43c6e0c6a7f 100644 --- a/Include/internal/pycore_ceval_state.h +++ b/Include/internal/pycore_ceval_state.h @@ -13,6 +13,24 @@ extern "C" { #include "pycore_gil.h" // struct _gil_runtime_state +struct _pending_calls { + int busy; + PyThread_type_lock lock; + /* Request for running pending calls. */ + _Py_atomic_int calls_to_do; + /* Request for looking at the `async_exc` field of the current + thread state. + Guarded by the GIL. */ + int async_exc; +#define NPENDINGCALLS 32 + struct _pending_call { + int (*func)(void *); + void *arg; + } calls[NPENDINGCALLS]; + int first; + int last; +}; + typedef enum { PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized @@ -49,6 +67,8 @@ struct _ceval_runtime_state { the main thread of the main interpreter can handle signals: see _Py_ThreadCanHandleSignals(). */ _Py_atomic_int signals_pending; + /* Pending calls to be made only on the main thread. */ + struct _pending_calls pending_mainthread; }; #ifdef PY_HAVE_PERF_TRAMPOLINE @@ -62,24 +82,6 @@ struct _ceval_runtime_state { #endif -struct _pending_calls { - int busy; - PyThread_type_lock lock; - /* Request for running pending calls. */ - _Py_atomic_int calls_to_do; - /* Request for looking at the `async_exc` field of the current - thread state. - Guarded by the GIL. */ - int async_exc; -#define NPENDINGCALLS 32 - struct { - int (*func)(void *); - void *arg; - } calls[NPENDINGCALLS]; - int first; - int last; -}; - struct _ceval_state { /* This single variable consolidates all requests to break out of the fast path in the eval loop. */ diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h index daa40cf4bcd8554..43652c4405ec1a4 100644 --- a/Include/internal/pycore_pystate.h +++ b/Include/internal/pycore_pystate.h @@ -60,14 +60,6 @@ _Py_ThreadCanHandleSignals(PyInterpreterState *interp) } -/* Only execute pending calls on the main thread. */ -static inline int -_Py_ThreadCanHandlePendingCalls(void) -{ - return _Py_IsMainThread(); -} - - /* Variable and static inline functions for in-line access to current thread and interpreter state */ diff --git a/Lib/test/support/threading_helper.py b/Lib/test/support/threading_helper.py index b9973c8bf5c914d..7f16050f32b9d19 100644 --- a/Lib/test/support/threading_helper.py +++ b/Lib/test/support/threading_helper.py @@ -115,7 +115,11 @@ def join_thread(thread, timeout=None): @contextlib.contextmanager def start_threads(threads, unlock=None): - import faulthandler + try: + import faulthandler + except ImportError: + # It isn't supported on subinterpreters yet. + faulthandler = None threads = list(threads) started = [] try: @@ -147,7 +151,8 @@ def start_threads(threads, unlock=None): finally: started = [t for t in started if t.is_alive()] if started: - faulthandler.dump_traceback(sys.stdout) + if faulthandler is not None: + faulthandler.dump_traceback(sys.stdout) raise AssertionError('Unable to join %d threads' % len(started)) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index 9cd1554614695e4..97087ed29772fe7 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -2,18 +2,21 @@ # these are all functions _testcapi exports whose name begins with 'test_'. import _thread -from collections import OrderedDict +from collections import OrderedDict, deque import contextlib import importlib.machinery import importlib.util +import json import os import pickle +import queue import random import subprocess import sys import textwrap import threading import time +import types import unittest import warnings import weakref @@ -37,6 +40,10 @@ import _testsinglephase except ImportError: _testsinglephase = None +try: + import _xxsubinterpreters as _interpreters +except ModuleNotFoundError: + _interpreters = None # Skip this test if the _testcapi module isn't available. _testcapi = import_helper.import_module('_testcapi') @@ -48,6 +55,12 @@ def decode_stderr(err): return err.decode('utf-8', 'replace').replace('\r', '') +def requires_subinterpreters(meth): + """Decorator to skip a test if subinterpreters are not supported.""" + return unittest.skipIf(_interpreters is None, + 'subinterpreters required')(meth) + + def testfunction(self): """some doc""" return self @@ -1260,6 +1273,10 @@ def test_pyobject_getitemdata_error(self): class TestPendingCalls(unittest.TestCase): + # See the comment in ceval.c (at the "handle_eval_breaker" label) + # about when pending calls get run. This is especially relevant + # here for creating deterministic tests. + def pendingcalls_submit(self, l, n): def callback(): #this function can be interrupted by thread switching so let's @@ -1342,6 +1359,388 @@ def genf(): yield gen = genf() self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code) + class PendingTask(types.SimpleNamespace): + + _add_pending = _testinternalcapi.pending_threadfunc + + def __init__(self, req, taskid=None, notify_done=None): + self.id = taskid + self.req = req + self.notify_done = notify_done + + self.creator_tid = threading.get_ident() + self.requester_tid = None + self.runner_tid = None + self.result = None + + def run(self): + assert self.result is None + self.runner_tid = threading.get_ident() + self._run() + if self.notify_done is not None: + self.notify_done() + + def _run(self): + self.result = self.req + + def run_in_pending_call(self, worker_tids): + assert self._add_pending is _testinternalcapi.pending_threadfunc + self.requester_tid = threading.get_ident() + def callback(): + assert self.result is None + # It can be tricky to control which thread handles + # the eval breaker, so we take a naive approach to + # make sure. + if threading.get_ident() not in worker_tids: + self._add_pending(callback, ensure_added=True) + return + self.run() + self._add_pending(callback, ensure_added=True) + + def create_thread(self, worker_tids): + return threading.Thread( + target=self.run_in_pending_call, + args=(worker_tids,), + ) + + def wait_for_result(self): + while self.result is None: + time.sleep(0.01) + + def test_subthreads_can_handle_pending_calls(self): + payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!' + + task = self.PendingTask(payload) + def do_the_work(): + tid = threading.get_ident() + t = task.create_thread({tid}) + with threading_helper.start_threads([t]): + task.wait_for_result() + t = threading.Thread(target=do_the_work) + with threading_helper.start_threads([t]): + pass + + self.assertEqual(task.result, payload) + + def test_many_subthreads_can_handle_pending_calls(self): + main_tid = threading.get_ident() + self.assertEqual(threading.main_thread().ident, main_tid) + + # We can't use queue.Queue since it isn't reentrant relative + # to pending calls. + _queue = deque() + _active = deque() + _done_lock = threading.Lock() + def queue_put(task): + _queue.append(task) + _active.append(True) + def queue_get(): + try: + task = _queue.popleft() + except IndexError: + raise queue.Empty + return task + def queue_task_done(): + _active.pop() + if not _active: + try: + _done_lock.release() + except RuntimeError: + assert not _done_lock.locked() + def queue_empty(): + return not _queue + def queue_join(): + _done_lock.acquire() + _done_lock.release() + + tasks = [] + for i in range(20): + task = self.PendingTask( + req=f'request {i}', + taskid=i, + notify_done=queue_task_done, + ) + tasks.append(task) + queue_put(task) + # This will be released once all the tasks have finished. + _done_lock.acquire() + + def add_tasks(worker_tids): + while True: + if done: + return + try: + task = queue_get() + except queue.Empty: + break + task.run_in_pending_call(worker_tids) + + done = False + def run_tasks(): + while not queue_empty(): + if done: + return + time.sleep(0.01) + # Give the worker a chance to handle any remaining pending calls. + while not done: + time.sleep(0.01) + + # Start the workers and wait for them to finish. + worker_threads = [threading.Thread(target=run_tasks) + for _ in range(3)] + with threading_helper.start_threads(worker_threads): + try: + # Add a pending call for each task. + worker_tids = [t.ident for t in worker_threads] + threads = [threading.Thread(target=add_tasks, args=(worker_tids,)) + for _ in range(3)] + with threading_helper.start_threads(threads): + try: + pass + except BaseException: + done = True + raise # re-raise + # Wait for the pending calls to finish. + queue_join() + # Notify the workers that they can stop. + done = True + except BaseException: + done = True + raise # re-raise + runner_tids = [t.runner_tid for t in tasks] + + self.assertNotIn(main_tid, runner_tids) + for task in tasks: + with self.subTest(f'task {task.id}'): + self.assertNotEqual(task.requester_tid, main_tid) + self.assertNotEqual(task.requester_tid, task.runner_tid) + self.assertNotIn(task.requester_tid, runner_tids) + + @requires_subinterpreters + def test_isolated_subinterpreter(self): + # We exercise the most important permutations. + + # This test relies on pending calls getting called + # (eval breaker tripped) at each loop iteration + # and at each call. + + maxtext = 250 + main_interpid = 0 + interpid = _interpreters.create() + _interpreters.run_string(interpid, f"""if True: + import json + import os + import threading + import time + import _testinternalcapi + from test.support import threading_helper + """) + + def create_pipe(): + r, w = os.pipe() + self.addCleanup(lambda: os.close(r)) + self.addCleanup(lambda: os.close(w)) + return r, w + + with self.subTest('add in main, run in subinterpreter'): + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() + timeout = time.time() + 30 # seconds + + def do_work(): + _interpreters.run_string(interpid, f"""if True: + # Wait until this interp has handled the pending call. + waiting = False + done = False + def wait(os_read=os.read): + global done, waiting + waiting = True + os_read({r_done}, 1) + done = True + t = threading.Thread(target=wait) + with threading_helper.start_threads([t]): + while not waiting: + pass + os.write({w_ready}, b'\\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) + if time.time() > {timeout}: + raise Exception('timed out!') + """) + t = threading.Thread(target=do_work) + with threading_helper.start_threads([t]): + os.read(r_ready, 1) + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify(interpid) + # Signal the subinterpreter to stop. + os.write(w_done, b'\0') + + self.assertEqual(actual, int(interpid)) + + with self.subTest('add in main, run in subinterpreter sub-thread'): + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() + timeout = time.time() + 30 # seconds + + def do_work(): + _interpreters.run_string(interpid, f"""if True: + waiting = False + done = False + def subthread(): + while not waiting: + pass + os.write({w_ready}, b'\\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) + if time.time() > {timeout}: + raise Exception('timed out!') + t = threading.Thread(target=subthread) + with threading_helper.start_threads([t]): + # Wait until this interp has handled the pending call. + waiting = True + os.read({r_done}, 1) + done = True + """) + t = threading.Thread(target=do_work) + with threading_helper.start_threads([t]): + os.read(r_ready, 1) + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify(interpid) + # Signal the subinterpreter to stop. + os.write(w_done, b'\0') + + self.assertEqual(actual, int(interpid)) + + with self.subTest('add in subinterpreter, run in main'): + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() + r_data, w_data= create_pipe() + timeout = time.time() + 30 # seconds + + def add_job(): + os.read(r_ready, 1) + _interpreters.run_string(interpid, f"""if True: + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify({main_interpid}) + # Signal the subinterpreter to stop. + os.write({w_done}, b'\\0') + os.write({w_data}, actual.to_bytes(1, 'little')) + """) + # Wait until this interp has handled the pending call. + waiting = False + done = False + def wait(os_read=os.read): + nonlocal done, waiting + waiting = True + os_read(r_done, 1) + done = True + t1 = threading.Thread(target=add_job) + t2 = threading.Thread(target=wait) + with threading_helper.start_threads([t1, t2]): + while not waiting: + pass + os.write(w_ready, b'\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) + if time.time() > timeout: + raise Exception('timed out!') + text = os.read(r_data, 1) + actual = int.from_bytes(text, 'little') + + self.assertEqual(actual, int(main_interpid)) + + with self.subTest('add in subinterpreter, run in sub-thread'): + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() + r_data, w_data= create_pipe() + timeout = time.time() + 30 # seconds + + def add_job(): + os.read(r_ready, 1) + _interpreters.run_string(interpid, f"""if True: + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify({main_interpid}) + # Signal the subinterpreter to stop. + os.write({w_done}, b'\\0') + os.write({w_data}, actual.to_bytes(1, 'little')) + """) + # Wait until this interp has handled the pending call. + waiting = False + done = False + def wait(os_read=os.read): + nonlocal done, waiting + waiting = True + os_read(r_done, 1) + done = True + def subthread(): + while not waiting: + pass + os.write(w_ready, b'\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) + if time.time() > timeout: + raise Exception('timed out!') + t1 = threading.Thread(target=add_job) + t2 = threading.Thread(target=wait) + t3 = threading.Thread(target=subthread) + with threading_helper.start_threads([t1, t2, t3]): + pass + text = os.read(r_data, 1) + actual = int.from_bytes(text, 'little') + + self.assertEqual(actual, int(main_interpid)) + + # XXX We can't use the rest until gh-105716 is fixed. + return + + with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'): + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() + r_data, w_data= create_pipe() + timeout = time.time() + 30 # seconds + + def do_work(): + _interpreters.run_string(interpid, f"""if True: + waiting = False + done = False + def subthread(): + while not waiting: + pass + os.write({w_ready}, b'\\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) + if time.time() > {timeout}: + raise Exception('timed out!') + t = threading.Thread(target=subthread) + with threading_helper.start_threads([t]): + # Wait until this interp has handled the pending call. + waiting = True + os.read({r_done}, 1) + done = True + """) + t = threading.Thread(target=do_work) + #with threading_helper.start_threads([t]): + t.start() + if True: + os.read(r_ready, 1) + _interpreters.run_string(interpid, f"""if True: + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify({interpid}) + # Signal the subinterpreter to stop. + os.write({w_done}, b'\\0') + os.write({w_data}, actual.to_bytes(1, 'little')) + """) + t.join() + text = os.read(r_data, 1) + actual = int.from_bytes(text, 'little') + + self.assertEqual(actual, int(interpid)) + class SubinterpreterTest(unittest.TestCase): diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst new file mode 100644 index 000000000000000..da29a8cae61839a --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst @@ -0,0 +1,9 @@ +The "pending call" machinery now works for all interpreters, not just the +main interpreter, and runs in all threads, not just the main thread. Some +calls are still only done in the main thread, ergo in the main interpreter. +This change does not affect signal handling nor the existing public C-API +(``Py_AddPendingCall()``), which both still only target the main thread. +The new functionality is meant strictly for internal use for now, since +consequences of its use are not well understood yet outside some very +restricted cases. This change brings the capability in line with the +intention when the state was made per-interpreter several years ago. diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index d36a911a57c02ca..db5be842b8a35c3 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -210,6 +210,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, PyObject *item; PyLockStatus r; PY_TIMEOUT_T microseconds; + PyThreadState *tstate = PyThreadState_Get(); if (block == 0) { /* Non-blocking */ @@ -253,7 +254,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, Py_END_ALLOW_THREADS } - if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) { + if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) { return NULL; } if (r == PY_LOCK_FAILURE) { diff --git a/Modules/_testinternalcapi.c b/Modules/_testinternalcapi.c index 8267dbf67790172..45778a8fe5410b7 100644 --- a/Modules/_testinternalcapi.c +++ b/Modules/_testinternalcapi.c @@ -13,16 +13,18 @@ #include "Python.h" #include "frameobject.h" +#include "interpreteridobject.h" // _PyInterpreterID_LookUp() #include "pycore_atomic_funcs.h" // _Py_atomic_int_get() #include "pycore_bitutils.h" // _Py_bswap32() #include "pycore_compile.h" // _PyCompile_CodeGen, _PyCompile_OptimizeCfg, _PyCompile_Assemble +#include "pycore_ceval.h" // _PyEval_AddPendingCall #include "pycore_fileutils.h" // _Py_normpath #include "pycore_frame.h" // _PyInterpreterFrame #include "pycore_gc.h" // PyGC_Head #include "pycore_hashtable.h" // _Py_hashtable_new() #include "pycore_initconfig.h" // _Py_GetConfigsAsDict() -#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal() #include "pycore_interp.h" // _PyInterpreterState_GetConfigCopy() +#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal() #include "pycore_pyerrors.h" // _Py_UTF8_Edit_Cost() #include "pycore_pystate.h" // _PyThreadState_GET() #include "osdefs.h" // MAXPATHLEN @@ -822,6 +824,120 @@ iframe_getlasti(PyObject *self, PyObject *frame) return PyLong_FromLong(PyUnstable_InterpreterFrame_GetLasti(f)); } + +static int _pending_callback(void *arg) +{ + /* we assume the argument is callable object to which we own a reference */ + PyObject *callable = (PyObject *)arg; + PyObject *r = PyObject_CallNoArgs(callable); + Py_DECREF(callable); + Py_XDECREF(r); + return r != NULL ? 0 : -1; +} + +/* The following requests n callbacks to _pending_callback. It can be + * run from any python thread. + */ +static PyObject * +pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs) +{ + PyObject *callable; + int ensure_added = 0; + static char *kwlist[] = {"", "ensure_added", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, + "O|$p:pending_threadfunc", kwlist, + &callable, &ensure_added)) + { + return NULL; + } + PyInterpreterState *interp = PyInterpreterState_Get(); + + /* create the reference for the callbackwhile we hold the lock */ + Py_INCREF(callable); + + int r; + Py_BEGIN_ALLOW_THREADS + r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0); + Py_END_ALLOW_THREADS + if (r < 0) { + /* unsuccessful add */ + if (!ensure_added) { + Py_DECREF(callable); + Py_RETURN_FALSE; + } + do { + Py_BEGIN_ALLOW_THREADS + r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0); + Py_END_ALLOW_THREADS + } while (r < 0); + } + + Py_RETURN_TRUE; +} + + +static struct { + int64_t interpid; +} pending_identify_result; + +static int +_pending_identify_callback(void *arg) +{ + PyThread_type_lock mutex = (PyThread_type_lock)arg; + assert(pending_identify_result.interpid == -1); + PyThreadState *tstate = PyThreadState_Get(); + pending_identify_result.interpid = PyInterpreterState_GetID(tstate->interp); + PyThread_release_lock(mutex); + return 0; +} + +static PyObject * +pending_identify(PyObject *self, PyObject *args) +{ + PyObject *interpid; + if (!PyArg_ParseTuple(args, "O:pending_identify", &interpid)) { + return NULL; + } + PyInterpreterState *interp = _PyInterpreterID_LookUp(interpid); + if (interp == NULL) { + if (!PyErr_Occurred()) { + PyErr_SetString(PyExc_ValueError, "interpreter not found"); + } + return NULL; + } + + pending_identify_result.interpid = -1; + + PyThread_type_lock mutex = PyThread_allocate_lock(); + if (mutex == NULL) { + return NULL; + } + PyThread_acquire_lock(mutex, WAIT_LOCK); + /* It gets released in _pending_identify_callback(). */ + + int r; + do { + Py_BEGIN_ALLOW_THREADS + r = _PyEval_AddPendingCall(interp, + &_pending_identify_callback, (void *)mutex, + 0); + Py_END_ALLOW_THREADS + } while (r < 0); + + /* Wait for the pending call to complete. */ + PyThread_acquire_lock(mutex, WAIT_LOCK); + PyThread_release_lock(mutex); + PyThread_free_lock(mutex); + + PyObject *res = PyLong_FromLongLong(pending_identify_result.interpid); + pending_identify_result.interpid = -1; + if (res == NULL) { + return NULL; + } + return res; +} + + static PyMethodDef module_functions[] = { {"get_configs", get_configs, METH_NOARGS}, {"get_recursion_depth", get_recursion_depth, METH_NOARGS}, @@ -850,6 +966,9 @@ static PyMethodDef module_functions[] = { {"iframe_getcode", iframe_getcode, METH_O, NULL}, {"iframe_getline", iframe_getline, METH_O, NULL}, {"iframe_getlasti", iframe_getlasti, METH_O, NULL}, + {"pending_threadfunc", _PyCFunction_CAST(pending_threadfunc), + METH_VARARGS | METH_KEYWORDS}, + {"pending_identify", pending_identify, METH_VARARGS, NULL}, {NULL, NULL} /* sentinel */ }; diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index b6f878e07526dbc..c553d039462af05 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -81,6 +81,7 @@ lock_dealloc(lockobject *self) static PyLockStatus acquire_timed(PyThread_type_lock lock, _PyTime_t timeout) { + PyThreadState *tstate = _PyThreadState_GET(); _PyTime_t endtime = 0; if (timeout > 0) { endtime = _PyDeadline_Init(timeout); @@ -103,7 +104,7 @@ acquire_timed(PyThread_type_lock lock, _PyTime_t timeout) /* Run signal handlers if we were interrupted. Propagate * exceptions from signal handlers, such as KeyboardInterrupt, by * passing up PY_LOCK_INTR. */ - if (Py_MakePendingCalls() < 0) { + if (_PyEval_MakePendingCalls(tstate) < 0) { return PY_LOCK_INTR; } diff --git a/Modules/signalmodule.c b/Modules/signalmodule.c index 2350236ad46b250..00ea4343735dabf 100644 --- a/Modules/signalmodule.c +++ b/Modules/signalmodule.c @@ -314,7 +314,8 @@ trip_signal(int sig_num) still use it for this exceptional case. */ _PyEval_AddPendingCall(interp, report_wakeup_send_error, - (void *)(intptr_t) last_error); + (void *)(intptr_t) last_error, + 1); } } } @@ -333,7 +334,8 @@ trip_signal(int sig_num) still use it for this exceptional case. */ _PyEval_AddPendingCall(interp, report_wakeup_write_error, - (void *)(intptr_t)errno); + (void *)(intptr_t)errno, + 1); } } } diff --git a/Python/ceval.c b/Python/ceval.c index df997e1ed283d72..4762dfac4812a6f 100644 --- a/Python/ceval.c +++ b/Python/ceval.c @@ -758,6 +758,61 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, _PyInterpreterFrame *frame, int * We need to do reasonably frequently, but not too frequently. * All loops should include a check of the eval breaker. * We also check on return from any builtin function. + * + * ## More Details ### + * + * The eval loop (this function) normally executes the instructions + * of a code object sequentially. However, the runtime supports a + * number of out-of-band execution scenarios that may pause that + * sequential execution long enough to do that out-of-band work + * in the current thread using the current PyThreadState. + * + * The scenarios include: + * + * - cyclic garbage collection + * - GIL drop requests + * - "async" exceptions + * - "pending calls" (some only in the main thread) + * - signal handling (only in the main thread) + * + * When the need for one of the above is detected, the eval loop + * pauses long enough to handle the detected case. Then, if doing + * so didn't trigger an exception, the eval loop resumes executing + * the sequential instructions. + * + * To make this work, the eval loop periodically checks if any + * of the above needs to happen. The individual checks can be + * expensive if computed each time, so a while back we switched + * to using pre-computed, per-interpreter variables for the checks, + * and later consolidated that to a single "eval breaker" variable + * (now a PyInterpreterState field). + * + * For the longest time, the eval breaker check would happen + * frequently, every 5 or so times through the loop, regardless + * of what instruction ran last or what would run next. Then, in + * early 2021 (gh-18334, commit 4958f5d), we switched to checking + * the eval breaker less frequently, by hard-coding the check to + * specific places in the eval loop (e.g. certain instructions). + * The intent then was to check after returning from calls + * and on the back edges of loops. + * + * In addition to being more efficient, that approach keeps + * the eval loop from running arbitrary code between instructions + * that don't handle that well. (See gh-74174.) + * + * Currently, the eval breaker check happens here at the + * "handle_eval_breaker" label. Some instructions come here + * explicitly (goto) and some indirectly. Notably, the check + * happens on back edges in the control flow graph, which + * pretty much applies to all loops and most calls. + * (See bytecodes.c for exact information.) + * + * One consequence of this approach is that it might not be obvious + * how to force any specific thread to pick up the eval breaker, + * or for any specific thread to not pick it up. Mostly this + * involves judicious uses of locks and careful ordering of code, + * while avoiding code that might trigger the eval breaker + * until so desired. */ if (_Py_HandlePending(tstate) != 0) { goto error; diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 764278ac130dfac..aacf2b5c2e2c4ff 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -68,8 +68,9 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp, _Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request) | (_Py_atomic_load_relaxed_int32(&ceval->signals_pending) && _Py_ThreadCanHandleSignals(interp)) - | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do) - && _Py_ThreadCanHandlePendingCalls()) + | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)) + | (_Py_IsMainThread() && _Py_IsMainInterpreter(interp) + &&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do)) | ceval2->pending.async_exc | _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled)); } @@ -95,11 +96,11 @@ RESET_GIL_DROP_REQUEST(PyInterpreterState *interp) static inline void -SIGNAL_PENDING_CALLS(PyInterpreterState *interp) +SIGNAL_PENDING_CALLS(struct _pending_calls *pending, PyInterpreterState *interp) { struct _ceval_runtime_state *ceval = &interp->runtime->ceval; struct _ceval_state *ceval2 = &interp->ceval; - _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 1); + _Py_atomic_store_relaxed(&pending->calls_to_do, 1); COMPUTE_EVAL_BREAKER(interp, ceval, ceval2); } @@ -109,6 +110,9 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp) { struct _ceval_runtime_state *ceval = &interp->runtime->ceval; struct _ceval_state *ceval2 = &interp->ceval; + if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) { + _Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0); + } _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0); COMPUTE_EVAL_BREAKER(interp, ceval, ceval2); } @@ -798,19 +802,31 @@ _push_pending_call(struct _pending_calls *pending, return 0; } -/* Pop one item off the queue while holding the lock. */ -static void -_pop_pending_call(struct _pending_calls *pending, - int (**func)(void *), void **arg) +static int +_next_pending_call(struct _pending_calls *pending, + int (**func)(void *), void **arg) { int i = pending->first; if (i == pending->last) { - return; /* Queue empty */ + /* Queue empty */ + assert(pending->calls[i].func == NULL); + return -1; } - *func = pending->calls[i].func; *arg = pending->calls[i].arg; - pending->first = (i + 1) % NPENDINGCALLS; + return i; +} + +/* Pop one item off the queue while holding the lock. */ +static void +_pop_pending_call(struct _pending_calls *pending, + int (**func)(void *), void **arg) +{ + int i = _next_pending_call(pending, func, arg); + if (i >= 0) { + pending->calls[i] = (struct _pending_call){0}; + pending->first = (i + 1) % NPENDINGCALLS; + } } /* This implementation is thread-safe. It allows @@ -820,9 +836,16 @@ _pop_pending_call(struct _pending_calls *pending, int _PyEval_AddPendingCall(PyInterpreterState *interp, - int (*func)(void *), void *arg) + int (*func)(void *), void *arg, + int mainthreadonly) { + assert(!mainthreadonly || _Py_IsMainInterpreter(interp)); struct _pending_calls *pending = &interp->ceval.pending; + if (mainthreadonly) { + /* The main thread only exists in the main interpreter. */ + assert(_Py_IsMainInterpreter(interp)); + pending = &_PyRuntime.ceval.pending_mainthread; + } /* Ensure that _PyEval_InitState() was called and that _PyEval_FiniState() is not called yet. */ assert(pending->lock != NULL); @@ -832,39 +855,17 @@ _PyEval_AddPendingCall(PyInterpreterState *interp, PyThread_release_lock(pending->lock); /* signal main loop */ - SIGNAL_PENDING_CALLS(interp); + SIGNAL_PENDING_CALLS(pending, interp); return result; } int Py_AddPendingCall(int (*func)(void *), void *arg) { - /* Best-effort to support subinterpreters and calls with the GIL released. - - First attempt _PyThreadState_GET() since it supports subinterpreters. - - If the GIL is released, _PyThreadState_GET() returns NULL . In this - case, use PyGILState_GetThisThreadState() which works even if the GIL - is released. - - Sadly, PyGILState_GetThisThreadState() doesn't support subinterpreters: - see bpo-10915 and bpo-15751. - - Py_AddPendingCall() doesn't require the caller to hold the GIL. */ - PyThreadState *tstate = _PyThreadState_GET(); - if (tstate == NULL) { - tstate = PyGILState_GetThisThreadState(); - } - - PyInterpreterState *interp; - if (tstate != NULL) { - interp = tstate->interp; - } - else { - /* Last resort: use the main interpreter */ - interp = _PyInterpreterState_Main(); - } - return _PyEval_AddPendingCall(interp, func, arg); + /* Legacy users of this API will continue to target the main thread + (of the main interpreter). */ + PyInterpreterState *interp = _PyInterpreterState_Main(); + return _PyEval_AddPendingCall(interp, func, arg, 1); } static int @@ -884,27 +885,24 @@ handle_signals(PyThreadState *tstate) return 0; } -static int -make_pending_calls(PyInterpreterState *interp) +static inline int +maybe_has_pending_calls(PyInterpreterState *interp) { - /* only execute pending calls on main thread */ - if (!_Py_ThreadCanHandlePendingCalls()) { - return 0; + struct _pending_calls *pending = &interp->ceval.pending; + if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) { + return 1; } - - /* don't perform recursive pending calls */ - if (interp->ceval.pending.busy) { + if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(interp)) { return 0; } - interp->ceval.pending.busy = 1; - - /* unsignal before starting to call callbacks, so that any callback - added in-between re-signals */ - UNSIGNAL_PENDING_CALLS(interp); - int res = 0; + pending = &_PyRuntime.ceval.pending_mainthread; + return _Py_atomic_load_relaxed_int32(&pending->calls_to_do); +} +static int +_make_pending_calls(struct _pending_calls *pending) +{ /* perform a bounded number of calls, in case of recursion */ - struct _pending_calls *pending = &interp->ceval.pending; for (int i=0; iceval.pending; + struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread; + + /* Only one thread (per interpreter) may run the pending calls + at once. In the same way, we don't do recursive pending calls. */ + PyThread_acquire_lock(pending->lock, WAIT_LOCK); + if (pending->busy) { + /* A pending call was added after another thread was already + handling the pending calls (and had already "unsignaled"). + Once that thread is done, it may have taken care of all the + pending calls, or there might be some still waiting. + Regardless, this interpreter's pending calls will stay + "signaled" until that first thread has finished. At that + point the next thread to trip the eval breaker will take + care of any remaining pending calls. Until then, though, + all the interpreter's threads will be tripping the eval + breaker every time it's checked. */ + PyThread_release_lock(pending->lock); + return 0; + } + pending->busy = 1; + PyThread_release_lock(pending->lock); + + /* unsignal before starting to call callbacks, so that any callback + added in-between re-signals */ + UNSIGNAL_PENDING_CALLS(interp); + + if (_make_pending_calls(pending) != 0) { + pending->busy = 0; + /* There might not be more calls to make, but we play it safe. */ + SIGNAL_PENDING_CALLS(pending, interp); + return -1; + } - interp->ceval.pending.busy = 0; - return res; + if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) { + if (_make_pending_calls(pending_main) != 0) { + pending->busy = 0; + /* There might not be more calls to make, but we play it safe. */ + SIGNAL_PENDING_CALLS(pending_main, interp); + return -1; + } + } -error: - interp->ceval.pending.busy = 0; - SIGNAL_PENDING_CALLS(interp); - return res; + pending->busy = 0; + return 0; } void @@ -939,12 +979,6 @@ _Py_FinishPendingCalls(PyThreadState *tstate) assert(PyGILState_Check()); assert(is_tstate_valid(tstate)); - struct _pending_calls *pending = &tstate->interp->ceval.pending; - - if (!_Py_atomic_load_relaxed_int32(&(pending->calls_to_do))) { - return; - } - if (make_pending_calls(tstate->interp) < 0) { PyObject *exc = _PyErr_GetRaisedException(tstate); PyErr_BadInternalCall(); @@ -953,6 +987,29 @@ _Py_FinishPendingCalls(PyThreadState *tstate) } } +int +_PyEval_MakePendingCalls(PyThreadState *tstate) +{ + int res; + + if (_Py_IsMainThread() && _Py_IsMainInterpreter(tstate->interp)) { + /* Python signal handler doesn't really queue a callback: + it only signals that a signal was received, + see _PyEval_SignalReceived(). */ + res = handle_signals(tstate); + if (res != 0) { + return res; + } + } + + res = make_pending_calls(tstate->interp); + if (res != 0) { + return res; + } + + return 0; +} + /* Py_MakePendingCalls() is a simple wrapper for the sake of backward-compatibility. */ int @@ -963,19 +1020,11 @@ Py_MakePendingCalls(void) PyThreadState *tstate = _PyThreadState_GET(); assert(is_tstate_valid(tstate)); - /* Python signal handler doesn't really queue a callback: it only signals - that a signal was received, see _PyEval_SignalReceived(). */ - int res = handle_signals(tstate); - if (res != 0) { - return res; - } - - res = make_pending_calls(tstate->interp); - if (res != 0) { - return res; + /* Only execute pending calls on the main thread. */ + if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(tstate->interp)) { + return 0; } - - return 0; + return _PyEval_MakePendingCalls(tstate); } void @@ -1015,7 +1064,7 @@ _Py_HandlePending(PyThreadState *tstate) } /* Pending calls */ - if (_Py_atomic_load_relaxed_int32(&interp_ceval_state->pending.calls_to_do)) { + if (maybe_has_pending_calls(tstate->interp)) { if (make_pending_calls(tstate->interp) != 0) { return -1; } diff --git a/Python/pylifecycle.c b/Python/pylifecycle.c index c4e6b69d8d56f3b..a67fa26b3722785 100644 --- a/Python/pylifecycle.c +++ b/Python/pylifecycle.c @@ -2152,6 +2152,9 @@ Py_EndInterpreter(PyThreadState *tstate) // Wrap up existing "threading"-module-created, non-daemon threads. wait_for_thread_shutdown(tstate); + // Make any remaining pending calls. + _Py_FinishPendingCalls(tstate); + _PyAtExit_Call(tstate->interp); if (tstate != interp->threads.head || tstate->next != NULL) { diff --git a/Python/pystate.c b/Python/pystate.c index d63a873a5c30257..da0e1c42d7275f4 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -380,7 +380,7 @@ _Py_COMP_DIAG_IGNORE_DEPR_DECLS static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime); _Py_COMP_DIAG_POP -#define NUMLOCKS 8 +#define NUMLOCKS 9 #define LOCKS_INIT(runtime) \ { \ &(runtime)->interpreters.mutex, \ @@ -388,6 +388,7 @@ _Py_COMP_DIAG_POP &(runtime)->getargs.mutex, \ &(runtime)->unicode_state.ids.lock, \ &(runtime)->imports.extensions.mutex, \ + &(runtime)->ceval.pending_mainthread.lock, \ &(runtime)->atexit.mutex, \ &(runtime)->audit_hooks.mutex, \ &(runtime)->allocators.mutex, \ diff --git a/Tools/c-analyzer/cpython/ignored.tsv b/Tools/c-analyzer/cpython/ignored.tsv index 4c40a04a6854bff..b6745b6aeb0b613 100644 --- a/Tools/c-analyzer/cpython/ignored.tsv +++ b/Tools/c-analyzer/cpython/ignored.tsv @@ -517,6 +517,7 @@ Modules/_testcapimodule.c - g_type_watchers_installed - Modules/_testimportmultiple.c - _barmodule - Modules/_testimportmultiple.c - _foomodule - Modules/_testimportmultiple.c - _testimportmultiple - +Modules/_testinternalcapi.c - pending_identify_result - Modules/_testmultiphase.c - Example_Type_slots - Modules/_testmultiphase.c - Example_Type_spec - Modules/_testmultiphase.c - Example_methods -