Skip to content

Commit

Permalink
Use a lock-free stack to implement the global reference pool to penal…
Browse files Browse the repository at this point in the history
…ize producers versus consumers
  • Loading branch information
adamreichold committed May 20, 2024
1 parent 3e4b3c5 commit 714f4d4
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rust-version = "1.63"
cfg-if = "1.0"
libc = "0.2.62"
memoffset = "0.9"
lockfree-collector = { git = "https://github.com/adamreichold/lockfree-collector.git" }

# ffi bindings to the python interpreter, split into a separate crate so they can be used independently
pyo3-ffi = { path = "pyo3-ffi", version = "=0.22.0-dev" }
Expand Down
124 changes: 52 additions & 72 deletions src/gil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ use crate::impl_::not_send::{NotSend, NOT_SEND};
#[cfg(pyo3_disable_reference_pool)]
use crate::impl_::panic::PanicTrap;
use crate::{ffi, Python};
use lockfree_collector::Collector;
use std::cell::Cell;
#[cfg(all(feature = "gil-refs", debug_assertions))]
use std::cell::RefCell;
#[cfg(all(feature = "gil-refs", not(debug_assertions)))]
use std::cell::UnsafeCell;
use std::{mem, ptr::NonNull, sync};
#[cfg(feature = "gil-refs")]
use std::mem;
use std::{ptr::NonNull, sync};

static START: sync::Once = sync::Once::new();

#[cfg(feature = "gil-refs")]
type PyObjVec = Vec<NonNull<ffi::PyObject>>;

std::thread_local! {
/// This is an internal counter in pyo3 monitoring whether this thread has the GIL.
///
Expand Down Expand Up @@ -233,7 +239,7 @@ impl GILGuard {
increment_gil_count();
// Update counts of PyObjects / Py that have been cloned or dropped since last acquisition
#[cfg(not(pyo3_disable_reference_pool))]
POOL.update_counts(unsafe { Python::assume_gil_acquired() });
update_counts(unsafe { Python::assume_gil_acquired() });

GILGuard::Ensured { gstate }
}
Expand Down Expand Up @@ -272,47 +278,20 @@ impl Drop for GILGuard {
}
}

// Vector of PyObject
type PyObjVec = Vec<NonNull<ffi::PyObject>>;

#[cfg(not(pyo3_disable_reference_pool))]
/// Thread-safe storage for objects which were dec_ref while the GIL was not held.
struct ReferencePool {
pending_decrefs: sync::Mutex<PyObjVec>,
}
struct PendingDecRef(NonNull<ffi::PyObject>);

#[cfg(not(pyo3_disable_reference_pool))]
impl ReferencePool {
const fn new() -> Self {
Self {
pending_decrefs: sync::Mutex::new(Vec::new()),
}
}

fn register_decref(&self, obj: NonNull<ffi::PyObject>) {
self.pending_decrefs.lock().unwrap().push(obj);
}

fn update_counts(&self, _py: Python<'_>) {
let mut pending_decrefs = self.pending_decrefs.lock().unwrap();
if pending_decrefs.is_empty() {
return;
}

let decrefs = mem::take(&mut *pending_decrefs);
drop(pending_decrefs);

for ptr in decrefs {
unsafe { ffi::Py_DECREF(ptr.as_ptr()) };
}
}
}
unsafe impl Send for PendingDecRef {}

#[cfg(not(pyo3_disable_reference_pool))]
unsafe impl Sync for ReferencePool {}
static POOL: Collector<PendingDecRef, 30> = Collector::new();

#[cfg(not(pyo3_disable_reference_pool))]
static POOL: ReferencePool = ReferencePool::new();
fn update_counts(_py: Python<'_>) {
POOL.collect(|obj| {
unsafe { ffi::Py_DECREF(obj.0.as_ptr()) };
});
}

/// A guard which can be used to temporarily release the GIL and restore on `Drop`.
pub(crate) struct SuspendGIL {
Expand All @@ -337,7 +316,7 @@ impl Drop for SuspendGIL {

// Update counts of PyObjects / Py that were cloned or dropped while the GIL was released.
#[cfg(not(pyo3_disable_reference_pool))]
POOL.update_counts(Python::assume_gil_acquired());
update_counts(Python::assume_gil_acquired());
}
}
}
Expand Down Expand Up @@ -410,7 +389,7 @@ impl GILPool {
pub unsafe fn new() -> GILPool {
// Update counts of PyObjects / Py that have been cloned or dropped since last acquisition
#[cfg(not(pyo3_disable_reference_pool))]
POOL.update_counts(Python::assume_gil_acquired());
update_counts(Python::assume_gil_acquired());
GILPool {
start: OWNED_OBJECTS
.try_with(|owned_objects| {
Expand Down Expand Up @@ -489,7 +468,7 @@ pub unsafe fn register_decref(obj: NonNull<ffi::PyObject>) {
ffi::Py_DECREF(obj.as_ptr())
} else {
#[cfg(not(pyo3_disable_reference_pool))]
POOL.register_decref(obj);
POOL.push(PendingDecRef(obj));
#[cfg(all(
pyo3_disable_reference_pool,
not(pyo3_leak_on_drop_without_reference_pool)
Expand Down Expand Up @@ -549,17 +528,19 @@ fn decrement_gil_count() {

#[cfg(test)]
mod tests {
#[cfg(not(pyo3_disable_reference_pool))]
use super::{gil_is_acquired, POOL};
#[cfg(feature = "gil-refs")]
#[allow(deprecated)]
use super::{GILPool, GIL_COUNT, OWNED_OBJECTS};
use crate::types::any::PyAnyMethods;
#[cfg(feature = "gil-refs")]
use crate::{ffi, gil};
use crate::{PyObject, Python};
use std::ptr::NonNull;
use super::*;

use crate::types::any::PyAnyMethods;
#[cfg(any(
feature = "gil-refs",
all(not(pyo3_disable_reference_pool), not(target_arch = "wasm32")) // We are building wasm Python with pthreads disabled
))]
use crate::PyObject;

#[cfg(any(
feature = "gil-refs",
all(not(pyo3_disable_reference_pool), not(target_arch = "wasm32")) // We are building wasm Python with pthreads disabled
))]
fn get_object(py: Python<'_>) -> PyObject {
py.eval_bound("object()", None, None).unwrap().unbind()
}
Expand All @@ -573,21 +554,19 @@ mod tests {
len
}

#[cfg(not(pyo3_disable_reference_pool))]
fn pool_dec_refs_does_not_contain(obj: &PyObject) -> bool {
!POOL
.pending_decrefs
.lock()
.unwrap()
.contains(&unsafe { NonNull::new_unchecked(obj.as_ptr()) })
}

#[cfg(all(not(pyo3_disable_reference_pool), not(target_arch = "wasm32")))]
fn pool_dec_refs_contains(obj: &PyObject) -> bool {
POOL.pending_decrefs
.lock()
.unwrap()
.contains(&unsafe { NonNull::new_unchecked(obj.as_ptr()) })
let mut found = false;

POOL.collect(|obj1| {
if obj1.0.as_ptr() == obj.as_ptr() {
found = true;
}

unsafe { ffi::Py_DECREF(obj1.0.as_ptr()) };
});

found
}

#[test]
Expand All @@ -603,7 +582,7 @@ mod tests {
unsafe {
{
let pool = py.new_pool();
gil::register_owned(pool.python(), NonNull::new_unchecked(obj.into_ptr()));
register_owned(pool.python(), NonNull::new_unchecked(obj.into_ptr()));

assert_eq!(owned_object_count(), 1);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
Expand Down Expand Up @@ -632,14 +611,14 @@ mod tests {
let _pool = py.new_pool();
assert_eq!(owned_object_count(), 0);

gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
register_owned(py, NonNull::new_unchecked(obj.into_ptr()));

assert_eq!(owned_object_count(), 1);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
{
let _pool = py.new_pool();
let obj = get_object(py);
gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
assert_eq!(owned_object_count(), 2);
}
assert_eq!(owned_object_count(), 1);
Expand All @@ -653,6 +632,7 @@ mod tests {
}

#[test]
#[cfg(all(not(pyo3_disable_reference_pool), not(target_arch = "wasm32")))] // We are building wasm Python with pthreads disabled
fn test_pyobject_drop_with_gil_decreases_refcnt() {
Python::with_gil(|py| {
let obj = get_object(py);
Expand All @@ -662,14 +642,14 @@ mod tests {

assert_eq!(obj.get_refcnt(py), 2);
#[cfg(not(pyo3_disable_reference_pool))]
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));

// With the GIL held, reference count will be decreased immediately.
drop(reference);

assert_eq!(obj.get_refcnt(py), 1);
#[cfg(not(pyo3_disable_reference_pool))]
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));
});
}

Expand All @@ -682,7 +662,7 @@ mod tests {
let reference = obj.clone_ref(py);

assert_eq!(obj.get_refcnt(py), 2);
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));

// Drop reference in a separate thread which doesn't have the GIL.
std::thread::spawn(move || drop(reference)).join().unwrap();
Expand All @@ -697,7 +677,7 @@ mod tests {
// Next time the GIL is acquired, the reference is released
Python::with_gil(|py| {
assert_eq!(obj.get_refcnt(py), 1);
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));
});
}

Expand Down Expand Up @@ -820,10 +800,10 @@ mod tests {
let capsule =
unsafe { ffi::PyCapsule_New(ptr as _, std::ptr::null(), Some(capsule_drop)) };

POOL.register_decref(NonNull::new(capsule).unwrap());
POOL.push(PendingDecRef(NonNull::new(capsule).unwrap()));

// Updating the counts will call decref on the capsule, which calls capsule_drop
POOL.update_counts(py);
update_counts(py);
})
}
}

0 comments on commit 714f4d4

Please sign in to comment.