Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating awaitable methods #50

Open
1frag opened this issue Oct 1, 2021 · 13 comments
Open

Creating awaitable methods #50

1frag opened this issue Oct 1, 2021 · 13 comments

Comments

@1frag
Copy link

1frag commented Oct 1, 2021

Hi, I haven't found any information on how to write asynchronous methods. e.g. I'd like to use module like:

await RustSleeper(42).sleep()  # sleep for 42 secs

I write the next rust code:

use std::time::Duration;
use pyo3::prelude::*;

#[pyclass]
struct RustSleeper(u64);

#[pymethods]
impl RustSleeper {
    fn sleep<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
        pyo3_asyncio::tokio::future_into_py(py, async move {
            tokio::time::sleep(Duration::from_secs(self.0)).await;
            Python::with_gil(|py| Ok(py.None()))
        })
    }
}

#[pymodule]
fn sleeper_rs(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_class::<RustSleeper>()?;
    Ok(())
}

It doesn't work. The compiler's output:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/lib.rs:10:60
   |
9  |       fn sleep<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
   |                    ----- this data with an anonymous lifetime `'_`...
10 |           pyo3_asyncio::tokio::future_into_py(py, async move {
   |  ____________________________________________________________^
11 | |             tokio::time::sleep(Duration::from_secs(self.0)).await;
12 | |             Python::with_gil(|py| Ok(py.None()))
13 | |         })
   | |_________^ ...is captured here...
   |
note: ...and is required to live as long as `'static` here
  --> src/lib.rs:10:9
   |
10 |         pyo3_asyncio::tokio::future_into_py(py, async move {
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Pretty understandable, but I cannot set 'static for self due to specific handling in pymethods:

Error when set `'static` for `self`
error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'p` due to conflicting requirements
 --> src/lib.rs:7:1
  |
7 | #[pymethods]
  | ^^^^^^^^^^^^
  |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the body at 7:1...
 --> src/lib.rs:7:1
  |
7 | #[pymethods]
  | ^^^^^^^^^^^^
note: ...so that the types are compatible
 --> src/lib.rs:7:1
  |
7 | #[pymethods]
  | ^^^^^^^^^^^^
  = note: expected `pyo3::Python<'_>`
             found `pyo3::Python<'_>`
  = note: but, the lifetime must be valid for the static lifetime...
note: ...so that reference does not outlive borrowed content
 --> src/lib.rs:7:1
  |
7 | #[pymethods]
  | ^^^^^^^^^^^^
  = note: this error originates in the attribute macro `pymethods` (in Nightly builds, run with -Z macro-backtrace for more info)
or (with `-Z macro-backtrace`)
error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'p` due to conflicting requirements
   --> src/lib.rs:7:1
    |
7   | #[pymethods]
    | ^^^^^^^^^^^^ in this procedural macro expansion
    |
   ::: ~/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-macros-0.14.5/src/lib.rs:190:1
    |
190 | pub fn pymethods(_: TokenStream, input: TokenStream) -> TokenStream {
    | ------------------------------------------------------------------- in this expansion of `#[pymethods]`
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the body at 7:1...
   --> src/lib.rs:7:1
    |
7   | #[pymethods]
    | ^^^^^^^^^^^^ in this procedural macro expansion
    |
   ::: ~/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-macros-0.14.5/src/lib.rs:190:1
    |
190 | pub fn pymethods(_: TokenStream, input: TokenStream) -> TokenStream {
    | ------------------------------------------------------------------- in this expansion of `#[pymethods]`
note: ...so that the types are compatible
   --> src/lib.rs:7:1
    |
7   | #[pymethods]
    | ^^^^^^^^^^^^ in this procedural macro expansion
    |
   ::: ~/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-macros-0.14.5/src/lib.rs:190:1
    |
190 | pub fn pymethods(_: TokenStream, input: TokenStream) -> TokenStream {
    | ------------------------------------------------------------------- in this expansion of `#[pymethods]`
    = note: expected `pyo3::Python<'_>`
               found `pyo3::Python<'_>`
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that reference does not outlive borrowed content
   --> src/lib.rs:7:1
    |
7   | #[pymethods]
    | ^^^^^^^^^^^^ in this procedural macro expansion
    |
   ::: ~/.cargo/registry/src/github.com-1ecc6299db9ec823/pyo3-macros-0.14.5/src/lib.rs:190:1
    |
190 | pub fn pymethods(_: TokenStream, input: TokenStream) -> TokenStream {
    | ------------------------------------------------------------------- in this expansion of `#[pymethods]`
Dependencies:
[dependencies]
tokio = "1.12.0"

[dependencies.pyo3-asyncio]
version = "0.14.0"
features = ["tokio-runtime"]

[dependencies.pyo3]
version = "0.14.5"
features = ["extension-module", "serde"]

So, is it possible to write async method? And if yes, maybe extend the documentation or examples of this library?

@awestlake87
Copy link
Owner

Yeah, I've gotten this question a few times so it's probably worth adding to the docs. Essentially the problem you're facing is that the &self can only last as long as a GIL borrow since the structure is owned by the Python interpreter when it's a #[pyclass]. You have a couple options in this case:

  1. Make a clone of the internal data that you want to reference in your async method. This can be easy for your example since self.0 implements Copy (in more complex situations wrapping the struct's data in an Arc<T> would work as well). All you need to do is store the value before the move so you aren't referencing self in your closure:
#[pymethods]
impl RustSleeper {
    fn sleep<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
        let secs = self.0;
        pyo3_asyncio::tokio::future_into_py(py, async move {
            tokio::time::sleep(Duration::from_secs(secs)).await;
            Python::with_gil(|py| Ok(py.None()))
        })
    }
}
  1. Make self a Py<Self> so that it's not tied to the lifetime of the GIL. Once you need a value out of it, you can borrow the value:
#[pymethods]
impl RustSleeper {
    fn sleep<'p>(this: Py<Self>, py: Python<'p>) -> PyResult<&'p PyAny> {
        pyo3_asyncio::tokio::future_into_py(py, async move {
            tokio::time::sleep(Duration::from_secs(Python::with_gil(|py| {
                this.borrow(py).0
            })))
            .await;
            Python::with_gil(|py| Ok(py.None()))
        })
    }
}

Full disclosure, I haven't actually tried option 2 before, but I've been told that it should work. I got a compiler error when I declared the receiver type as self: Py<Self> instead of this: Py<Self>, so I'm not certain my example will work the way I intended it to. Might be worth a try though.

@1frag
Copy link
Author

1frag commented Oct 2, 2021

Thanks, it's working now. This explanation is very helpful!

@relsunkaev
Copy link

Would it be possible to mutate self inside of the async move block? I need to attach a value that needs to be awaited to one of the fields.

@1frag
Copy link
Author

1frag commented Oct 3, 2021

In a simple case, it can be done with Py<Self>:

use pyo3::prelude::*;
use tokio::fs::File;
use tokio::io::AsyncReadExt;

#[pyclass]
struct RustSleeper {
    #[pyo3(get, set)]
    n: u64,
}

async fn get_random_u64() -> u64 {
    let mut file = File::open("/dev/urandom").await.unwrap();
    file.read_u64().await.unwrap() % 10
}

#[pymethods]
impl RustSleeper {
    #[new]
    fn new(n: u64) -> Self {
        RustSleeper {n}
    }

    fn change_n(this: Py<Self>, py: Python) -> PyResult<&PyAny> {
        pyo3_asyncio::tokio::future_into_py(py, async move {
            let n = get_random_u64().await;
            Python::with_gil(|py| {
                let cell: &PyCell<RustSleeper> = this.as_ref(py);
                let mut slf = cell.try_borrow_mut().unwrap();
                slf.n = n;
                Ok(py.None())
            })
        })
    }
}

#[pymodule]
fn sleeper_rs(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_class::<RustSleeper>()?;
    Ok(())
}

And use like:

import asyncio
import sleeper_rs

sl = sleeper_rs.RustSleeper(2)

async def main():
    await sl.change_n()

print(sl.n)  # 2
asyncio.run(main())
print(sl.n)  # from 0 to 9

But I couldn't find a way to use method like this:

impl RustSleeper {
    async fn async_set_n(&mut self, n: u64) {
        self.n = n;
    }
}

inside async move, because py is available only in sync functions.

@awestlake87
Copy link
Owner

awestlake87 commented Oct 3, 2021

Yeah the example that @1frag provided should work.

Another alternative that I've used in my projects is to create an inner structure protected with anArc<Mutex<Inner>>. The Arc can be cloned and passed into an async move { } and any async methods on the Inner struct can be safely awaited. You can also use an async-capable Mutex for the lock too. This would be a workaround for the problem mentioned by @1frag

I'm away from my computer right now, but I can provide some snippets to elaborate later if needed.

@relsunkaev
Copy link

Thanks, that worked great!

@RoastVeg
Copy link

Another alternative that I've used in my projects is to create an inner structure protected with anArc<Mutex<Inner>>. The Arc can be cloned and passed into an async move { } and any async methods on the Inner struct can be safely awaited. You can also use an async-capable Mutex for the lock too. This would be a workaround for the problem mentioned by @1frag

Here's an example for those (like me) that came here to find one:

use pyo3::prelude::*;
use std::sync::Arc;
use tokio::sync::Mutex;

struct MyType {
    value: bool
}

impl MyType {
    pub async fn my_async_fn(&self, or: bool) -> bool {
        self.value | or
    }
}

#[pyclass(name = "MyType")]
struct PyMyType(Arc<Mutex<MyType>>);

#[pymethods]
impl PyMyType {
    #[new]
    pub fn new() -> Self {
        Self(Arc::new(Mutex::new(MyType { value: false })))
    }

    pub fn my_async_fn<'a>(&self, py: Python<'a>, or: bool) -> PyResult<&'a PyAny> {
        let inner = self.0.clone();
        pyo3_asyncio::tokio::future_into_py(py, async move {
            Ok(inner.lock().await.my_async_fn(or).await)
        })
    }
}

@igiloh-pinecone
Copy link

igiloh-pinecone commented Feb 27, 2023

@RoastVeg thank you very much for posting this detailed solution!

I tried it in my case, which is similar (though slightly more complex). If I don't define a clear 'a lifetime - the compiler says

associated function was supposed to return data with lifetime `'2` but it is returning data with lifetime `'1`

where '2 refers to self and '1 to py.

If I do define an explicit lifetime, the compiler now says

`py` escapes the associated function body here
argument requires that `'a` must outlive `'static`

Maybe it's because the return type of the inner async function is not a simple type that implements Copy (like the bool you've shown here), but a Vec<pyclass>.

Any ideas?

@awestlake87
Copy link
Owner

@igiloh-pinecone can you share your function? From what you're saying, I think you're capturing a value with the lifetime of the GIL in your async block (which requires 'static), but without context I couldn't really tell you where + how to get around the error.

@igiloh-pinecone
Copy link

igiloh-pinecone commented Mar 12, 2023

@awestlake87 thank you, and apologies for the delayed response!
I'm afraid this code isn't public yet (it will be soon), so I can't refer you to it directly.

The code itself is slightly convoluted, involving a cascade of wrapped struct method calls.
I tried to create the most minimal gist depicting this code, please see here.
As I mentioned, the error I get is:

|        pub fn upsert_async<'a>(&mut self, py: Python<'a>, vectors: Vec<pinecone_core::Vector>, namespace: &'a str) -> PyResult<&'a PyAny>{
|                            --             -- `py` is a reference that is only valid in the associated function body
|                            |
|                            lifetime `'a` defined here
|            let mut inner_index = self.inner.clone();
| /              pyo3_asyncio::tokio::future_into_py(py, async move {
| |                  let res = inner_index.lock().await.upsert(&namespace, &vectors, None).await?;
| |                  Ok(res)
| |                  // Ok(Python::with_gil(|py| res.into_py(py)))
| |
| |              })
| |               ^
| |               |
| |_______________`py` escapes the associated function body here
|                 argument requires that `'a` must outlive `'static`

I then suspected that maybe py "escapes" since Vector is a pyclass, so it somehow require the GIL.
I changed the code structure a bit, making Vector a simple Rust struct, and wrapping it with an pyclass which is stripped before the async move. You can see this slightly different version here (this is the diff).
But sadly that still gave the exact same error.

Thank you again for taking the time and effort to look into this problem!
I've been wrapping my had around this problem for days now, but I still can't figure out what the lifetime problem is.

@awestlake87
Copy link
Owner

awestlake87 commented Mar 12, 2023

I think your issue is namespace actually. It is a &'a str since you're potentially borrowing a string from Python. What you need to do is take ownership of it before it's captured by the async move { } block.

You can do this by converting it to a Rust String or using a Py<T> reference like PyObject or maybe Py<PyString> outside of the async move { } so it will have a 'static reference to the data inside the block. Converting it to a String is pretty easy, so I'd give that a try first.

@igiloh-pinecone
Copy link

@awestlake87 thank you very much, you're absolutely right!
I've totally missed that 🤦 .

However, I only now realized there's a deeper problem with this approach.
In my code, the whole point was to call the same my_async_fn() from python multiple times with different inputs, having these calls run concurrently. So adding a Mutex sort of misses the point.

I ended up simply cloning self.inner itself, which is "cheap" in my particular use case. But I'm not sure that will be a viable solution for other users who encounter the same problem.

@awestlake87
Copy link
Owner

From what I can see, your mutex is locked within the async block, so the bigger question IMO is whether you can perform upsert concurrently.

Your function can be called multiple times from python without blocking because it essentially just schedules the task on tokio. The lock occurs inside the async block so these scheduled tasks will run at the same time, but they will wait on each other so that only one is doing the upsert at any given time.

If your struct can perform upserts concurrently there's no need for the mutex and you might be able to get away with just an Arc, but if it can't you may need to find a different solution with your library. You may be able to queue the items and perform a batch of upserts for example. The solution to this problem can be pretty library-specific though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants