From 54ec0c8b1c253631f63b9d6ab22f00a3835e3a74 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Sun, 7 Apr 2024 19:38:28 +0200 Subject: [PATCH] feat: add PyFuture, an async wrapper around Python future-like objects. --- src/types/future.rs | 133 ++++++++++++++++++++++++++++++++++++++++++++ src/types/mod.rs | 2 + 2 files changed, 135 insertions(+) create mode 100644 src/types/future.rs diff --git a/src/types/future.rs b/src/types/future.rs new file mode 100644 index 00000000000..d8e871464c1 --- /dev/null +++ b/src/types/future.rs @@ -0,0 +1,133 @@ +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll, Waker}, +}; + +use parking_lot::Mutex; + +use crate::types::PyCFunction; +use crate::{ + pyobject_native_type_named, + sync::GILOnceCell, + types::{any::PyAnyMethods, tuple::PyTupleMethods}, + Bound, Py, PyAny, PyObject, PyResult, PyTypeCheck, Python, +}; + +/// A Python future-like object. +/// +/// It can be either an asyncio future-like or a concurrent.futures.Future object. +#[repr(transparent)] +pub struct PyFuture(PyAny); +pyobject_native_type_named!(PyFuture); + +impl Py { + /// Convert a `PyFuture` into a Rust `Future`. + /// + /// Contrary to asyncio.Future, Rust future will panic if polled after completion, in order + /// to optimize the result retrieving. + pub fn as_rust_future( + &self, + py: Python<'_>, + ) -> PyResult> + Send + Sync + 'static> { + self.bind(py).as_rust_future() + } +} + +impl Bound<'_, PyFuture> { + /// Convert a `PyFuture` into a Rust `Future`. + /// + /// Contrary to asyncio.Future, Rust future will panic if polled after completion, in order + /// to optimize the result retrieving. + pub fn as_rust_future( + &self, + ) -> PyResult> + Send + Sync + 'static> { + #[derive(Default)] + struct PendingInner { + result: Option>, + waker: Option, + } + enum FutureImpl { + Done(Option>), + Pending(Arc>), + } + impl Future for FutureImpl { + type Output = PyResult; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + match this { + Self::Done(res) => { + Poll::Ready(res.take().expect("Future polled after completion")) + } + Self::Pending(cb) => { + let mut inner = cb.lock(); + if inner.result.is_some() { + let res = inner.result.take().unwrap(); + drop(inner); + *this = Self::Done(None); + return Poll::Ready(res); + } + if !matches!(&mut inner.waker, Some(waker) if waker.will_wake(cx.waker())) { + inner.waker = Some(cx.waker().clone()); + } + Poll::Pending + } + } + } + } + Ok(if self.call_method0("done")?.extract()? { + FutureImpl::Done(Some(self.call_method0("result").map(Bound::unbind))) + } else { + let pending = Arc::new(Mutex::new(PendingInner::default())); + let pending2 = pending.clone(); + // For asyncio futures, `add_done_callback` should be called the event loop thread, + // but because the GIL is held, and because we just checked that the future isn't done, + // the future result cannot be set in the meantime, so it's ok. + let callback = + PyCFunction::new_closure_bound(self.py(), None, None, move |args, _| { + let mut inner = pending2.lock(); + inner.result = + Some(args.get_item(0)?.call_method0("result").map(Bound::unbind)); + if let Some(waker) = &inner.waker { + waker.wake_by_ref(); + } + PyResult::Ok(()) + })?; + self.call_method1("add_done_callback", (callback,))?; + FutureImpl::Pending(pending) + }) + } +} + +fn is_asyncio_future(object: &Bound<'_, PyAny>) -> PyResult { + static IS_FUTURE: GILOnceCell = GILOnceCell::new(); + let import = || { + let module = object.py().import_bound("asyncio")?; + PyResult::Ok(module.getattr("isfuture")?.into()) + }; + IS_FUTURE + .get_or_try_init(object.py(), import)? + .call1(object.py(), (object,))? + .extract(object.py()) +} + +fn is_concurrent_future(object: &Bound<'_, PyAny>) -> PyResult { + static FUTURE: GILOnceCell = GILOnceCell::new(); + let import = || { + let module = object.py().import_bound("concurrent.futures")?; + PyResult::Ok(module.getattr("Future")?.into()) + }; + let future_type = FUTURE + .get_or_try_init(object.py(), import)? + .bind(object.py()); + object.is_instance(future_type) +} + +impl PyTypeCheck for PyFuture { + const NAME: &'static str = "Future"; + + fn type_check(object: &Bound<'_, PyAny>) -> bool { + is_asyncio_future(object).unwrap_or(false) || is_concurrent_future(object).unwrap_or(false) + } +} diff --git a/src/types/mod.rs b/src/types/mod.rs index a03d01b301a..649a101cadf 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -27,6 +27,7 @@ pub use self::frozenset::{PyFrozenSet, PyFrozenSetBuilder, PyFrozenSetMethods}; pub use self::function::PyCFunction; #[cfg(all(not(Py_LIMITED_API), not(PyPy), not(GraalPy)))] pub use self::function::PyFunction; +pub use self::future::PyFuture; pub use self::iterator::PyIterator; pub use self::list::{PyList, PyListMethods}; pub use self::mapping::{PyMapping, PyMappingMethods}; @@ -333,6 +334,7 @@ pub(crate) mod float; mod frame; pub(crate) mod frozenset; mod function; +mod future; pub(crate) mod iterator; pub(crate) mod list; pub(crate) mod mapping;