From 74fecdb54478fd8385f88f2d31eb83b80deafe2f Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Mon, 18 May 2026 15:29:41 +0200 Subject: [PATCH] Use recursive mutex to deal with GIL <-> internal lock deadlocks --- .../pyconnection/pyconnection.hpp | 28 +- src/duckdb_py/pyconnection.cpp | 60 +++- src/duckdb_py/pyresult.cpp | 7 +- .../test_concurrent_connection_no_crash.py | 288 ++++++++++++++++++ 4 files changed, 370 insertions(+), 13 deletions(-) create mode 100644 tests/slow/test_concurrent_connection_no_crash.py diff --git a/src/duckdb_py/include/duckdb_python/pyconnection/pyconnection.hpp b/src/duckdb_py/include/duckdb_python/pyconnection/pyconnection.hpp index dd7c9d2e..0369c9a9 100644 --- a/src/duckdb_py/include/duckdb_python/pyconnection/pyconnection.hpp +++ b/src/duckdb_py/include/duckdb_python/pyconnection/pyconnection.hpp @@ -163,9 +163,35 @@ struct DuckDBPyConnection : public enable_shared_from_this { }; public: + // RAII guard for the connection mutex (see py_connection_lock below). Constructing + // one releases the GIL while waiting for the mutex and reacquires it before + // returning, so callers always come out of the constructor with the GIL held + // and the mutex locked. The mutex is released when the guard goes out of scope. + // Holding the GIL while blocked on this mutex would deadlock against a thread + // that holds the mutex and is mid-way through a GIL-releasing native call — + // see duckdb-python#435. + class ConnectionLockGuard { + public: + explicit ConnectionLockGuard(DuckDBPyConnection &conn) : lock_(conn.py_connection_lock, std::defer_lock) { + D_ASSERT(py::gil_check()); + py::gil_scoped_release release; + lock_.lock(); + } + + private: + std::unique_lock lock_; + }; + ConnectionGuard con; Cursors cursors; - std::mutex py_connection_lock; + // Recursive so that the outer lock taken at the top of execute/fetch + // methods (while still holding the GIL) does not deadlock against the + // inner lock taken by PrepareQuery / ExecuteInternal / + // PrepareAndExecuteInternal (after releasing the GIL). Serialises every + // path that touches `con.result` so concurrent calls on a single + // DuckDBPyConnection cannot dereference an already-freed result — see + // duckdb-python#435. + std::recursive_mutex py_connection_lock; //! MemoryFileSystem used to temporarily store file-like objects for reading shared_ptr internal_object_filesystem; case_insensitive_map_t> registered_functions; diff --git a/src/duckdb_py/pyconnection.cpp b/src/duckdb_py/pyconnection.cpp index 6883ba45..6fcbe0ac 100644 --- a/src/duckdb_py/pyconnection.cpp +++ b/src/duckdb_py/pyconnection.cpp @@ -75,10 +75,18 @@ std::string DuckDBPyConnection::formatted_python_version = ""; DuckDBPyConnection::~DuckDBPyConnection() { try { - py::gil_scoped_release gil; - // Release any structures that do not need to hold the GIL here - con.SetDatabase(nullptr); - con.SetConnection(nullptr); + // The native Connection / DuckDB teardown is pure C++ work — release + // the GIL for it so other Python threads can run. The implicit member + // destructors that fire after this scope (notably + // `registered_functions`, a `case_insensitive_map_t>` + // whose entries transitively own pybind-managed Python references) + // run with the GIL reacquired because `gil` is destroyed at the end + // of the inner block. + { + py::gil_scoped_release gil; + con.SetDatabase(nullptr); + con.SetConnection(nullptr); + } } catch (...) { // NOLINT } } @@ -492,6 +500,7 @@ void DuckDBPyConnection::Initialize(py::handle &m) { shared_ptr DuckDBPyConnection::ExecuteMany(const py::object &query, py::object params_p) { py::gil_scoped_acquire gil; + ConnectionLockGuard conn_lock(*this); con.SetResult(nullptr); if (params_p.is_none()) { params_p = py::list(); @@ -623,7 +632,7 @@ unique_ptr DuckDBPyConnection::PrepareQuery(unique_ptr lock(py_connection_lock); + unique_lock lock(py_connection_lock); prep = connection.Prepare(std::move(statement)); if (prep->HasError()) { @@ -644,7 +653,7 @@ unique_ptr DuckDBPyConnection::ExecuteInternal(PreparedStatement &p { D_ASSERT(py::gil_check()); py::gil_scoped_release release; - unique_lock lock(py_connection_lock); + unique_lock lock(py_connection_lock); auto pending_query = prep.PendingQuery(named_values); if (pending_query->HasError()) { @@ -671,7 +680,7 @@ unique_ptr DuckDBPyConnection::PrepareAndExecuteInternal(unique_ptr { D_ASSERT(py::gil_check()); py::gil_scoped_release release; - unique_lock lock(py_connection_lock); + unique_lock lock(py_connection_lock); auto pending_query = con.GetConnection().PendingQuery(std::move(statement), named_values, true); @@ -710,6 +719,7 @@ shared_ptr DuckDBPyConnection::ExecuteFromString(const strin shared_ptr DuckDBPyConnection::Execute(const py::object &query, py::object params) { py::gil_scoped_acquire gil; + ConnectionLockGuard conn_lock(*this); con.SetResult(nullptr); auto statements = GetStatements(query); @@ -1879,6 +1889,7 @@ shared_ptr DuckDBPyConnection::Checkpoint() { } Optional DuckDBPyConnection::GetDescription() { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { return py::none(); } @@ -1891,11 +1902,22 @@ int DuckDBPyConnection::GetRowcount() { } void DuckDBPyConnection::Close() { + ConnectionLockGuard conn_lock(*this); con.SetResult(nullptr); D_ASSERT(py::gil_check()); - py::gil_scoped_release release; - con.SetConnection(nullptr); - con.SetDatabase(nullptr); + // Release the GIL only for the native Connection / DuckDB teardown, which + // is pure C++ work and can take noticeable time. Hold the GIL back for + // `registered_functions.clear()` because the + // `case_insensitive_map_t>` it destroys + // transitively owns pybind-managed Python references (Python UDF + // callables, registered Python objects, …). Decrementing those + // references with the GIL released is undefined behaviour — see + // duckdb-python#456. + { + py::gil_scoped_release release; + con.SetConnection(nullptr); + con.SetDatabase(nullptr); + } // https://peps.python.org/pep-0249/#Connection.close cursors.ClearCursors(); registered_functions.clear(); @@ -2025,7 +2047,13 @@ shared_ptr DuckDBPyConnection::Cursor() { } // these should be functions on the result but well +// +// All of the connection-level fetch methods below take `py_connection_lock` +// before touching `con.GetResult()`, so that another thread cannot replace +// or destroy the connection's current result while we are mid-fetch — see +// duckdb-python#435. Optional DuckDBPyConnection::FetchOne() { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2034,6 +2062,7 @@ Optional DuckDBPyConnection::FetchOne() { } py::list DuckDBPyConnection::FetchMany(idx_t size) { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2042,6 +2071,7 @@ py::list DuckDBPyConnection::FetchMany(idx_t size) { } py::list DuckDBPyConnection::FetchAll() { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2050,6 +2080,7 @@ py::list DuckDBPyConnection::FetchAll() { } py::dict DuckDBPyConnection::FetchNumpy() { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2058,6 +2089,7 @@ py::dict DuckDBPyConnection::FetchNumpy() { } PandasDataFrame DuckDBPyConnection::FetchDF(bool date_as_object) { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2066,6 +2098,7 @@ PandasDataFrame DuckDBPyConnection::FetchDF(bool date_as_object) { } PandasDataFrame DuckDBPyConnection::FetchDFChunk(const idx_t vectors_per_chunk, bool date_as_object) { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2074,6 +2107,7 @@ PandasDataFrame DuckDBPyConnection::FetchDFChunk(const idx_t vectors_per_chunk, } duckdb::pyarrow::Table DuckDBPyConnection::FetchArrow(idx_t rows_per_batch) { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2082,6 +2116,7 @@ duckdb::pyarrow::Table DuckDBPyConnection::FetchArrow(idx_t rows_per_batch) { } py::dict DuckDBPyConnection::FetchPyTorch() { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2090,6 +2125,7 @@ py::dict DuckDBPyConnection::FetchPyTorch() { } py::dict DuckDBPyConnection::FetchTF() { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2098,6 +2134,7 @@ py::dict DuckDBPyConnection::FetchTF() { } PolarsDataFrame DuckDBPyConnection::FetchPolars(idx_t rows_per_batch, bool lazy) { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2106,6 +2143,7 @@ PolarsDataFrame DuckDBPyConnection::FetchPolars(idx_t rows_per_batch, bool lazy) } duckdb::pyarrow::RecordBatchReader DuckDBPyConnection::FetchRecordBatchReader(const idx_t rows_per_batch) { + ConnectionLockGuard conn_lock(*this); if (!con.HasResult()) { throw InvalidInputException("No open result set"); } @@ -2185,7 +2223,7 @@ static shared_ptr FetchOrCreateInstance(const string &databa { D_ASSERT(py::gil_check()); py::gil_scoped_release release; - unique_lock lock(res->py_connection_lock); + unique_lock lock(res->py_connection_lock); auto database = instance_cache.GetOrCreateInstance(database_path, config, cache_instance, InstantiateNewInstance); res->con.SetDatabase(std::move(database)); diff --git a/src/duckdb_py/pyresult.cpp b/src/duckdb_py/pyresult.cpp index 91118add..d67ba420 100644 --- a/src/duckdb_py/pyresult.cpp +++ b/src/duckdb_py/pyresult.cpp @@ -34,9 +34,14 @@ DuckDBPyResult::DuckDBPyResult(unique_ptr result_p) : result(std::m } DuckDBPyResult::~DuckDBPyResult() { + // The destructor must run with the GIL held: `result` and `current_chunk` + // can transitively own pybind-managed Python references (registered + // objects, arrow release callbacks, PYTHON_OBJECT vector values, etc.), + // whose teardown calls into the Python C API. Releasing the GIL here + // (as the previous implementation did) causes Py_DECREF / PyObject_Free + // to run without a valid PyThreadState — see duckdb-python#456. try { D_ASSERT(py::gil_check()); - py::gil_scoped_release gil; result.reset(); current_chunk.reset(); } catch (...) { // NOLINT diff --git a/tests/slow/test_concurrent_connection_no_crash.py b/tests/slow/test_concurrent_connection_no_crash.py new file mode 100644 index 00000000..e45e9238 --- /dev/null +++ b/tests/slow/test_concurrent_connection_no_crash.py @@ -0,0 +1,288 @@ +"""Regression tests for duckdb-python#435 and #456. + +These reproducers crash the interpreter (SIGSEGV / heap corruption / abort) +when the underlying bug is triggered, so each test runs the workload in a +fresh Python subprocess and asserts on the exit code. A non-zero exit (in +particular -11 / 139 / 134 / -6) is treated as the bug reproducing. + +The workloads also report a `success_count` to stdout, and the assertions +verify it crossed a sensible floor — without that, a subprocess that +silently errored on every iteration (e.g. import failure, API change) +would still satisfy a "no crash" check and we would lose the signal we +care about. + +These tests spawn subprocesses, so they live under tests/slow rather than +tests/fast. +""" + +from __future__ import annotations + +import platform +import re +import subprocess +import sys +import textwrap + +import pytest + +_SUCCESS_RE = re.compile(rb"^success_count=(\d+)\r?$", re.MULTILINE) + + +def _run(script: str, timeout: float) -> subprocess.CompletedProcess[bytes]: + return subprocess.run( + [sys.executable, "-c", script], + capture_output=True, + timeout=timeout, + ) + + +def _assert_no_crash_and_min_successes( + result: subprocess.CompletedProcess[bytes], + min_successes: int, +) -> None: + if result.returncode != 0: + stdout = result.stdout.decode(errors="replace")[-2000:] + stderr = result.stderr.decode(errors="replace")[-2000:] + msg = ( + f"subprocess exited with returncode={result.returncode} " + f"(negative = killed by signal on POSIX; 139=SIGSEGV, 134=SIGABRT)\n" + f"--- stdout (tail) ---\n{stdout}\n" + f"--- stderr (tail) ---\n{stderr}" + ) + raise AssertionError(msg) + match = _SUCCESS_RE.search(result.stdout) + if match is None: + stdout = result.stdout.decode(errors="replace")[-2000:] + msg = ( + "subprocess did not report success_count — the workload may have " + "errored on every iteration without crashing, which would mask " + "the bug we are guarding against.\n" + f"--- stdout (tail) ---\n{stdout}" + ) + raise AssertionError(msg) + success_count = int(match.group(1)) + assert success_count >= min_successes, ( + f"subprocess only had success_count={success_count} (need >= " + f"{min_successes}). The workload is not exercising the code path the " + f"test is supposed to guard." + ) + + +# --------------------------------------------------------------------------- +# #435 — concurrent execute/fetchall on the same connection +# --------------------------------------------------------------------------- +# +# Both variants reliably segfault pre-fix (returncode -11 on macOS, 139 on +# Linux). After the fix, every iteration should succeed. +# + +_REPRO_435_SINGLE_CONN = textwrap.dedent( + """ + import concurrent.futures + import duckdb + + # One shared connection, many threads. ThreadPoolExecutor(8) means up to + # 8 worker threads concurrently hit the same DuckDBPyConnection's result + # slot via execute() + fetchall(). + conn = duckdb.connect() + successes = 0 + + def run(_): + conn.execute('SELECT 1').fetchall() + + with concurrent.futures.ThreadPoolExecutor(8) as ex: + for f in [ex.submit(run, i) for i in range(200)]: + try: + f.result() + successes += 1 + except Exception: + # A clean Python-level exception is acceptable — the bug we + # test for is an interpreter crash. Per-iteration failures + # still count against `success_count` so the test asserts + # that the workload actually ran. + pass + + print(f"success_count={successes}") + """ +).lstrip() + + +_REPRO_435_MULTI_CONN = textwrap.dedent( + """ + # Exact reproducer from issue #435. + import concurrent.futures + import duckdb + + conns = [duckdb.connect() for _ in range(8)] + successes = 0 + + def run(i): + conns[i % 8].execute('SELECT 1').fetchall() + + with concurrent.futures.ThreadPoolExecutor(8) as ex: + for f in [ex.submit(run, i) for i in range(200)]: + try: + f.result() + successes += 1 + except Exception: + pass + + print(f"success_count={successes}") + """ +).lstrip() + + +def test_435_concurrent_execute_fetchall_single_connection(): + """Eight threads sharing one connection must not crash the interpreter. + + Pre-fix: SIGSEGV. Post-fix: all 200 iterations should succeed because + the connection lock serialises them. + """ + result = _run(_REPRO_435_SINGLE_CONN, timeout=30.0) + _assert_no_crash_and_min_successes(result, min_successes=200) + + +def test_435_concurrent_execute_fetchall_multi_connection(): + """Reporter's 8-connection / 200-task pattern must not crash. + + Pre-fix: SIGSEGV. The race is per-connection on the result slot; + multiple workers landing on the same ``conns[i % 8]`` trigger it. + """ + result = _run(_REPRO_435_MULTI_CONN, timeout=30.0) + _assert_no_crash_and_min_successes(result, min_successes=200) + + +# --------------------------------------------------------------------------- +# #456 — DuckDBPyResult destructor must hold the GIL +# --------------------------------------------------------------------------- +# +# Reporter sees hard crashes on Windows + Python 3.12 under heavy executemany +# workloads with Python primitives flowing into the result graph. The fix +# removes the unconditional `py::gil_scoped_release` from +# `DuckDBPyResult::~DuckDBPyResult` (and tightens the same scope in +# `DuckDBPyConnection::Close` so the `case_insensitive_map_t>` +# is destroyed with the GIL held). +# +# These tests are skipped off-Windows. We verified empirically by +# temp-reverting the fix and rebuilding that the workloads below do NOT +# crash on macOS/Linux even with the destructor's GIL release restored — +# `Py_DECREF` without the GIL is undefined behaviour but POSIX CPython +# rarely faults on it deterministically (different thread-state storage / +# allocator behaviour from Windows). Running these tests off-Windows +# would always pass regardless of whether the fix is in place, so they +# would be a false signal rather than a regression guard. +# +# On Windows + Python 3.12 (the reporter's environment) the same +# workloads are expected to crash pre-fix and pass post-fix, which is +# what a regression test should do. +# +# Correctness of the fix on POSIX is established by inspection rather +# than by these tests: the destructor's `D_ASSERT(py::gil_check())` already +# requires the caller to hold the GIL on entry, and the previous +# `py::gil_scoped_release` was the only thing dropping it during +# destruction of pybind-managed members. +# + +_skip_456_off_windows = pytest.mark.skipif( + platform.system() != "Windows", + reason=( + "Issue #456 crashes only on Windows CPython (Py_DECREF without the GIL " + "is UB but POSIX CPython does not consistently fault on it). Verified " + "empirically that this workload does not crash on macOS even with the " + "destructor's GIL release restored — running it off-Windows would " + "always pass and provide no regression signal. Correctness on POSIX " + "is established by code inspection." + ), +) + + +_REPRO_456_UDF_DESTRUCTOR = textwrap.dedent( + """ + # Stress the result destructor on multiple threads concurrently, with + # Python str references in the result graph (via a UDF). Each worker + # has its OWN connection, so the connection lock from #435 does not + # serialise them — we want them genuinely concurrent so that one + # thread's destructor (releasing the GIL pre-fix) overlaps with + # another thread's Python work / allocation. + import concurrent.futures + import duckdb + from duckdb.sqltypes import INTEGER, VARCHAR + + def make_label(i): + return f"label-{i}-padded-out-a-bit-to-defeat-small-string-optimisation" + + def worker(_): + conn = duckdb.connect() + conn.create_function('make_label', make_label, [INTEGER], VARCHAR) + rows = 4 * 1024 + local_successes = 0 + for _ in range(50): + n = len(conn.execute(f'SELECT make_label(i::INTEGER) FROM range({rows}) t(i)').fetchall()) + if n == rows: + local_successes += 1 + return local_successes + + successes = 0 + with concurrent.futures.ThreadPoolExecutor(8) as ex: + for f in [ex.submit(worker, i) for i in range(16)]: + try: + successes += f.result() + except Exception: + pass + + print(f"success_count={successes}") + """ +).lstrip() + + +_REPRO_456_CLOSE_REGISTERED = textwrap.dedent( + """ + # Stress the Close() path that destroys `registered_functions`. Each + # connection has a UDF registered, so closing it tears down the + # `case_insensitive_map_t>` which + # transitively owns the Python callable. This must happen with the GIL + # held — see duckdb-python#456. + import duckdb + from duckdb.sqltypes import INTEGER + + def identity(x): + return x + + successes = 0 + for _ in range(500): + conn = duckdb.connect() + conn.create_function('identity', identity, [INTEGER], INTEGER) + # Make sure the UDF actually got bound to the connection. + ok = conn.execute('SELECT identity(7)').fetchone()[0] == 7 + conn.close() # triggers registered_functions.clear() + if ok: + successes += 1 + + print(f"success_count={successes}") + """ +).lstrip() + + +@_skip_456_off_windows +def test_456_destructor_with_python_refs_in_result_graph(): + """DuckDBPyResult destructor must not crash with Python refs in chunks. + + Repeated destruction where the result chunks hold Python ``str`` + references (via a UDF), run on 8 threads concurrently with + independent connections. Pre-fix Windows + Python 3.12: crash. + Post-fix Windows: clean exit, all 16*50=800 iterations succeed. + """ + result = _run(_REPRO_456_UDF_DESTRUCTOR, timeout=120.0) + _assert_no_crash_and_min_successes(result, min_successes=800) + + +@_skip_456_off_windows +def test_456_close_clears_registered_functions_with_gil(): + """``close()`` must hold the GIL while clearing ``registered_functions``. + + Destroying those ``ExternalDependency`` entries transitively + decrements pybind-owned Python references; doing so without the GIL + is undefined behaviour. + """ + result = _run(_REPRO_456_CLOSE_REGISTERED, timeout=60.0) + _assert_no_crash_and_min_successes(result, min_successes=500)