Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support async fn in macros with coroutine implementation #3540

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ unindent = { version = "0.2.1", optional = true }
# support crate for multiple-pymethods feature
inventory = { version = "0.3.0", optional = true }

# coroutine implementation
futures-util = "0.3"

# crate integrations that can be added using the eponymous features
anyhow = { version = "1.0", optional = true }
chrono = { version = "0.4.25", default-features = false, optional = true }
Expand All @@ -54,6 +57,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.61"
rayon = "1.6.1"
widestring = "0.5.1"
futures = "0.3.28"

[build-dependencies]
pyo3-build-config = { path = "pyo3-build-config", version = "0.21.0-dev", features = ["resolve-config"] }
Expand Down
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- [Conversion traits](conversions/traits.md)]
- [Python exceptions](exception.md)
- [Calling Python from Rust](python_from_rust.md)
- [Using `async` and `await`](async-await.md)
- [GIL, mutability and object types](types.md)
- [Parallelism](parallelism.md)
- [Debugging](debugging.md)
Expand Down
78 changes: 78 additions & 0 deletions guide/src/async-await.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Using `async` and `await`

*This feature is still in active development. See [the related issue](https://github.com/PyO3/pyo3/issues/1632).*

`#[pyfunction]` and `#[pymethods]` attributes also support `async fn`.

```rust
# #![allow(dead_code)]
use std::{thread, time::Duration};
use futures::channel::oneshot;
use pyo3::prelude::*;

#[pyfunction]
async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_secs_f64(seconds));
tx.send(()).unwrap();
});
rx.await.unwrap();
result
}
```

*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.*

## `Send + 'static` constraint

Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object.

As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a signature like `async fn does_not_compile(arg: &PyAny, py: Python<'_>) -> &PyAny`.

It also means that methods cannot use `&self`/`&mut self`, *but this restriction should be dropped in the future.*


## Implicit GIL holding

Even if it is not possible to pass a `py: Python<'_>` parameter to `async fn`, the GIL is still held during the execution of the future – it's also the case for regular `fn` without `Python<'_>`/`&PyAny` parameter, yet the GIL is held.

It is still possible to get a `Python` marker using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/struct.Python.html#method.with_gil); because `with_gil` is reentrant and optimized, the cost will be negligible.

## Release the GIL across `.await`

There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in development*.

Here is the advised workaround for now:

```rust,ignore
use std::{future::Future, pin::{Pin, pin}, task::{Context, Poll}};
use pyo3::prelude::*;

struct AllowThreads<F>(F);

impl<F> Future for AllowThreads<F>
where
F: Future + Unpin + Send,
F::Output: Send,
{
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let waker = cx.waker();
Python::with_gil(|gil| {
gil.allow_threads(|| pin!(&mut self.0).poll(&mut Context::from_waker(waker)))
})
}
}
```

## Cancellation

*To be implemented*

## The `Coroutine` type

To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine). Each `coroutine.send` call is translated to `Future::poll` call, while `coroutine.throw` call reraise the exception *(this behavior will be configurable with cancellation support)*.

*The type does not yet have a public constructor until the design is finalized.*
2 changes: 2 additions & 0 deletions guide/src/ecosystem/async-await.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Using `async` and `await`

*`async`/`await` support is currently being integrated in PyO3. See the [dedicated documentation](../async-await.md)*

If you are working with a Python library that makes use of async functions or wish to provide
Python bindings for an async Rust library, [`pyo3-asyncio`](https://github.com/awestlake87/pyo3-asyncio)
likely has the tools you need. It provides conversions between async functions in both Python and
Expand Down
1 change: 1 addition & 0 deletions newsfragments/3540.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support `async fn` in macros with coroutine implementation
8 changes: 7 additions & 1 deletion pyo3-macros-backend/src/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub struct FnSpec<'a> {
pub output: syn::Type,
pub convention: CallingConvention,
pub text_signature: Option<TextSignatureAttribute>,
pub asyncness: Option<syn::Token![async]>,
pub unsafety: Option<syn::Token![unsafe]>,
pub deprecations: Deprecations,
}
Expand Down Expand Up @@ -317,6 +318,7 @@ impl<'a> FnSpec<'a> {
signature,
output: ty,
text_signature,
asyncness: sig.asyncness,
unsafety: sig.unsafety,
deprecations,
})
Expand Down Expand Up @@ -445,7 +447,11 @@ impl<'a> FnSpec<'a> {
let func_name = &self.name;

let rust_call = |args: Vec<TokenStream>| {
quotes::map_result_into_ptr(quotes::ok_wrap(quote! { function(#self_arg #(#args),*) }))
let mut call = quote! { function(#self_arg #(#args),*) };
if self.asyncness.is_some() {
call = quote! { _pyo3::impl_::coroutine::wrap_future(#call) };
}
quotes::map_result_into_ptr(quotes::ok_wrap(call))
};

let rust_name = if let Some(cls) = cls {
Expand Down
5 changes: 2 additions & 3 deletions pyo3-macros-backend/src/pyfunction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
deprecations::Deprecations,
method::{self, CallingConvention, FnArg},
pymethod::check_generic,
utils::{ensure_not_async_fn, get_pyo3_crate},
utils::get_pyo3_crate,
};
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
Expand Down Expand Up @@ -179,8 +179,6 @@ pub fn impl_wrap_pyfunction(
options: PyFunctionOptions,
) -> syn::Result<TokenStream> {
check_generic(&func.sig)?;
ensure_not_async_fn(&func.sig)?;

let PyFunctionOptions {
pass_module,
name,
Expand Down Expand Up @@ -230,6 +228,7 @@ pub fn impl_wrap_pyfunction(
signature,
output: ty,
text_signature,
asyncness: func.sig.asyncness,
unsafety: func.sig.unsafety,
deprecations: Deprecations::new(),
};
Expand Down
3 changes: 1 addition & 2 deletions pyo3-macros-backend/src/pymethod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;

use crate::attributes::{NameAttribute, RenamingRule};
use crate::method::{CallingConvention, ExtractErrorMode};
use crate::utils::{ensure_not_async_fn, PythonDoc};
use crate::utils::PythonDoc;
use crate::{
method::{FnArg, FnSpec, FnType, SelfType},
pyfunction::PyFunctionOptions,
Expand Down Expand Up @@ -188,7 +188,6 @@ pub fn gen_py_method(
options: PyFunctionOptions,
) -> Result<GeneratedPyMethod> {
check_generic(sig)?;
ensure_not_async_fn(sig)?;
ensure_function_options_valid(&options)?;
let method = PyMethod::parse(sig, meth_attrs, options)?;
let spec = &method.spec;
Expand Down
13 changes: 1 addition & 12 deletions pyo3-macros-backend/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use proc_macro2::{Span, TokenStream};
use quote::ToTokens;
use syn::{punctuated::Punctuated, spanned::Spanned, Token};
use syn::{punctuated::Punctuated, Token};

use crate::attributes::{CrateAttribute, RenamingRule};

Expand Down Expand Up @@ -137,17 +137,6 @@ impl quote::ToTokens for PythonDoc {
}
}

pub fn ensure_not_async_fn(sig: &syn::Signature) -> syn::Result<()> {
adamreichold marked this conversation as resolved.
Show resolved Hide resolved
if let Some(asyncness) = &sig.asyncness {
bail_spanned!(
asyncness.span() => "`async fn` is not yet supported for Python functions.\n\n\
Additional crates such as `pyo3-asyncio` can be used to integrate async Rust and \
Python. For more information, see https://github.com/PyO3/pyo3/issues/1632"
);
};
Ok(())
}

pub fn unwrap_ty_group(mut ty: &syn::Type) -> &syn::Type {
while let syn::Type::Group(g) = ty {
ty = &*g.elem;
Expand Down
137 changes: 137 additions & 0 deletions src/coroutine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//! Python coroutine implementation, used notably when wrapping `async fn`
//! with `#[pyfunction]`/`#[pymethods]`.
use std::{
any::Any,
future::Future,
panic,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use futures_util::FutureExt;
use pyo3_macros::{pyclass, pymethods};

use crate::{
coroutine::waker::AsyncioWaker,
exceptions::{PyRuntimeError, PyStopIteration},
panic::PanicException,
pyclass::IterNextOutput,
types::PyIterator,
IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python,
};

mod waker;

const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";

type FutureOutput = Result<PyResult<PyObject>, Box<dyn Any + Send>>;

/// Python coroutine wrapping a [`Future`].
#[pyclass(crate = "crate")]
pub struct Coroutine {
future: Option<Pin<Box<dyn Future<Output = FutureOutput> + Send>>>,
waker: Option<Arc<AsyncioWaker>>,
}

impl Coroutine {
/// Wrap a future into a Python coroutine.
///
/// Coroutine `send` polls the wrapped future, ignoring the value passed
/// (should always be `None` anyway).
davidhewitt marked this conversation as resolved.
Show resolved Hide resolved
///
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed
pub(crate) fn from_future<F, T, E>(future: F) -> Self
where
F: Future<Output = Result<T, E>> + Send + 'static,
T: IntoPy<PyObject>,
PyErr: From<E>,
{
let wrap = async move {
let obj = future.await?;
// SAFETY: GIL is acquired when future is polled (see `Coroutine::poll`)
Ok(obj.into_py(unsafe { Python::assume_gil_acquired() }))
};
Self {
future: Some(Box::pin(panic::AssertUnwindSafe(wrap).catch_unwind())),
waker: None,
}
}

fn poll(
&mut self,
py: Python<'_>,
throw: Option<PyObject>,
) -> PyResult<IterNextOutput<PyObject, PyObject>> {
// raise if the coroutine has already been run to completion
let future_rs = match self.future {
Some(ref mut fut) => fut,
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)),
};
// reraise thrown exception it
if let Some(exc) = throw {
self.close();
return Err(PyErr::from_value(exc.as_ref(py)));
}
// create a new waker, or try to reset it in place
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) {
waker.reset();
} else {
self.waker = Some(Arc::new(AsyncioWaker::new()));
}
let waker = futures_util::task::waker(self.waker.clone().unwrap());
// poll the Rust future and forward its results if ready
if let Poll::Ready(res) = future_rs.as_mut().poll(&mut Context::from_waker(&waker)) {
self.close();
return match res {
Ok(res) => Ok(IterNextOutput::Return(res?)),
Err(err) => Err(PanicException::from_panic_payload(err)),
};
}
// otherwise, initialize the waker `asyncio.Future`
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__`
// and will yield itself if its result has not been set in polling above
Comment on lines +93 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't help but wonder if somehow deep down in the Rust future, if it awaits a Python awaitable, whether we want to yield whatever the Python awaitable that blocked it here, rather than make a new asyncio.Future. That seems like a possible avenue to support all Python runtimes automatically.

However, it needs some way to lift whatever that object was up to here. Maybe that's done through the waker, or maybe that's done through our own mini thread-local "runtime"?

Of course, there's also the possibility that this Rust future never awaits on anything from Python, and if that's the case, doing this asyncio dance seems correct.

Copy link
Contributor Author

@wyfo wyfo Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I've an idea to make this possible, and in a quite elegant way, but it would require a nightly feature.

That seems like a possible avenue to support all Python runtimes automatically.

This would indeed be a way to support arbitrary awaitables. You would still need to target a particular runtime for arbitrary Rust futures (but you could also have a runtime-agnostic coroutine which would only support Python awaitables). However, it would not be possible to do a select! between two Python awaitables, (because you can't yield two objects), so you would have to panic! in that case, and I'm not sure about the ergonomy of this.

An additional side effect would be cancellation, because if you delegate to the Python awaitable, you should also delegate cancellation, and CoroutineCancel should not register the exception in that case.

I really like this idea though! I may implement a POC in the next days, maybe in pyo3-async to begin with.

EDIT: Actually, you can't select! with only one Python awaitable, because you can't use its future to wake up the other branch, as it would prevent it to get its result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be interesting to explore, and if we find that this design makes the most sense but requires nightly, we can put it behind our nightly feature and use it as evidence for upstream to explore for stabilisation!

if let Some(future) = PyIterator::from_object(future).unwrap().next() {
// future has not been leaked into Python for now, and Rust code can only call
// `set_result(None)` in `ArcWake` implementation, so it's safe to unwrap
return Ok(IterNextOutput::Yield(future.unwrap().into()));
}
}
// if waker has been waken during future polling, this is roughly equivalent to
// `await asyncio.sleep(0)`, so just yield `None`.
Ok(IterNextOutput::Yield(py.None().into()))
}
}

pub(crate) fn iter_result(result: IterNextOutput<PyObject, PyObject>) -> PyResult<PyObject> {
Copy link
Member

@adamreichold adamreichold Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make our handling of iterators even more of a mess, won't it? So this is basically what we want to do with all iterators but it would apply only to coroutines? Maybe we should align all of this if we go ahead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sorry if this not clear, but this is mainly directed at @davidhewitt, not so much at @wyfo.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly think __next__ should allow to return PyResult<PyObject> in addition to PyResult<IterNextOutput<PyObject, PyObject>> or PyResult<Option<PyObject>>.
It would remove the need for this small utilitary function.
But I did want to touch too much parts of PyO3 in this first draft.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a long discussion in #3190 about reworking __next__ and __anext__. Really just needs someone to write up a bunch of test cases / examples so we can commit to a design. I keep meaning to do it, but am sort of trying to avoid breaking too much at the same time as #3382

match result {
IterNextOutput::Yield(ob) => Ok(ob),
IterNextOutput::Return(ob) => Err(PyStopIteration::new_err(ob)),
}
}

#[pymethods(crate = "crate")]
impl Coroutine {
fn send(&mut self, py: Python<'_>, _value: &PyAny) -> PyResult<PyObject> {
iter_result(self.poll(py, None)?)
wyfo marked this conversation as resolved.
Show resolved Hide resolved
}

fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> {
iter_result(self.poll(py, Some(exc))?)
}

fn close(&mut self) {
// the Rust future is dropped, and the field set to `None`
// to indicate the coroutine has been run to completion
drop(self.future.take());
}

fn __await__(self_: Py<Self>) -> Py<Self> {
self_
}

fn __next__(&mut self, py: Python<'_>) -> PyResult<IterNextOutput<PyObject, PyObject>> {
self.poll(py, None)
}
}
Loading
Loading