Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support emit error in try_fn_stream #2

Closed
negezor opened this issue Dec 8, 2023 · 3 comments · Fixed by #4
Closed

Adding support emit error in try_fn_stream #2

negezor opened this issue Dec 8, 2023 · 3 comments · Fixed by #4

Comments

@negezor
Copy link
Contributor

negezor commented Dec 8, 2023

Hi! Could we add emitter.emit_err(error) for try_fn_stream? I want to transfer control over error handling to the consumer, and they, in turn, decide whether to break the loop or not. Right now the only option is to return an error from the handler, which in turn prevents it from continuing to consume the thread in the event of an error. However, if I wanted to throw an error now, I could solve it this way

impl Stream<Item = Result<Result<ListResponse, Error>, Error>>

But this is cumbersome and duplicates the logic of the current result.

@dmitryvk
Copy link
Owner

dmitryvk commented Jan 3, 2024

Hi,

Sorry for the late response and thanks for submitting the issue.

Could you describe what are you trying to achieve?

As far as I can see, you can use fn_stream and produce a stream of impl Stream<Item = Result<ListResponse, Error>> like so:

use async_fn_stream::fn_stream;
use futures::{pin_mut, Stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = get_stream();
    pin_mut!(stream);
    while let Some(res) = stream.next().await {
        println!("{res:?}");
    }
}

fn get_stream() -> impl Stream<Item = Result<i32, MyError>> {
    fn_stream(|emitter| async move {
        emitter.emit(Err(MyError {})).await;
        emitter.emit(Ok(0)).await;
    })
}

#[derive(Debug)]
struct MyError {}

@negezor
Copy link
Contributor Author

negezor commented Apr 18, 2024

That's a really long answer already from me 😄. Indeed I can use fn_stream, but I expected about the same behavior from try_fn_stream. The inconvenience is that I can't use ? operator when using fn_stream.

Let me show you with your example, I would like to use the following:

use async_fn_stream::try_fn_stream;
use futures_util::{pin_mut, Stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = get_stream();
    pin_mut!(stream);
    while let Some(res) = stream.next().await {
        println!("{res:?}");
    }
}

fn get_payload() -> Result<i32, MyError> {
    Err(MyError {})
}

fn get_stream() -> impl Stream<Item = Result<i32, MyError>> {
    try_fn_stream(|emitter| async move {
        // End the stream immediately
        let payload = get_payload()?;

        // Handle
        emitter.emit(Err(MyError {})).await;
        emitter.emit(Ok(0)).await;

        Ok(())
    })
}

#[derive(Debug)]
struct MyError {}

But I get the following when building it:

error[E0271]: type mismatch resolving `<TryFnStream<Result<{integer}, MyError>, _, {async block@src/bin/test.rs:18:29: 27:6}> as Stream>::Item == Result<i32, MyError>`
  --> src/bin/test.rs:17:20
   |
17 | fn get_stream() -> impl Stream<Item = Result<i32, MyError>> {
   |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected `Result<Result<..., ...>, ...>`, found `Result<i32, MyError>`
   |
   = note: expected enum `Result<Result<{integer}, MyError>, _>`
              found enum `Result<i32, MyError>`

For more information about this error, try `rustc --explain E0271`.
error: could not compile `async-fn-stream` (bin "test") due to 1 previous error

However, I think try_fn_stream would be easier to use if there was something like this:

fn get_stream() -> impl Stream<Item = Result<i32, MyError>> {
    try_fn_stream(|emitter| async move {
        // End the stream immediately
        let payload = get_payload()?;

        // Handle
        emitter.emit(0).await;
        emitter.emit_err(MyError {}).await;

        Ok(())
    })
}

We'd keep the same function signature and give more control when we don't necessarily want to get out of the stream on an error

@dmitryvk
Copy link
Owner

OK, now I see what you mean :)

I like the emit_err method - it seems to be backwards-compatible and concise.

I'll try to come up with the implementation, or feel free to make a pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants