Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a body channel implementation that knows when buf is written #2858

Open
seanmonstar opened this issue May 20, 2022 · 4 comments · May be fixed by hyperium/hyper-util#6
Open

Create a body channel implementation that knows when buf is written #2858

seanmonstar opened this issue May 20, 2022 · 4 comments · May be fixed by hyperium/hyper-util#6
Labels
A-body Area: body streaming. C-feature Category: feature. This is adding a new feature. E-medium Effort: medium. Some knowledge of how hyper internal works would be useful. K-hyper-util Crate: hyper-util

Comments

@seanmonstar
Copy link
Member

It is very common to want a body type that allows for a push-based API, and knowledge of when those bytes have been written to the socket. We can provide a channel type that wraps the user's B: Buf in one that alerts the channel when Buf::advance has reached the end.

@seanmonstar seanmonstar added C-feature Category: feature. This is adding a new feature. E-medium Effort: medium. Some knowledge of how hyper internal works would be useful. A-body Area: body streaming. K-hyper-util Crate: hyper-util labels May 20, 2022
@seanmonstar seanmonstar added this to the 1.0 milestone May 20, 2022
@tomkarw
Copy link
Contributor

tomkarw commented Jul 26, 2022

At work, we run into this issue, where we wanted to get callback on body chunks and then on end of streaming.

We implemented custom Stream wrappers for both cases and wrapped them with Body::wrap_stream. I think it suffers from additional heap allocations, as hyper boxes the stream you pass.

Either way, I'd be more than happy to tackle this. One thing I'd like to sort out first, is do we go with alerting via channel (if so, can you elaborate a bit more how you see it), or providing a callback the way we did.
Also, what should be sent by the channel/available in the callback. We did &Bytes so that we could call chunk.len().

@seanmonstar
Copy link
Member Author

I think there's 2 parts to figure out, the easier one is the Buf wrapper:

struct Alert<B: Buf> {
    inner: B,
    signal: SignalThingy,
}

impl<B: Buf> Buf for Alert {
    fn advance(&mut self, cnt: usize) {
        self.inner.advance(cnt);
        if !self.inner.has_remaining() {
            self.signal.thingy();
        }
    }
}

The harder part is what is the signal thingy? Is it like a tokio::sync::Notify, or similar kind of channel-looking thing?

@tomkarw
Copy link
Contributor

tomkarw commented Aug 19, 2022

Alright, there is a fundamental flaw in the bytes::Buf design that makes it impossible to implement this feature.

There is no method that will guarantee that the buffer was actually consumed. One is allowed to call Buf::chunk just for peaking, so the logic to trigger notification can't go there. We are not guaranteed that somebody will call buf.advance(0) after sending the whole buffer.

Relying on a call to Buf::remaining() that returns 0 might be possible, but still doesn't guarantee anything, as one could do:

let buf = Bytes::from_static(b"abc");
loop {
    let chunk_size = buf.chunk().len();
    send(buf.chunk());
    if chunk_size == buf.remaining() {
        break;
    }
}

Notice that buf.remaining() is not guaranteed to return 0 (actually, it will only do so for empty buffer).

I could write such snippets for each of Buf methods and prove that none of them give us a definite EOS.

We would require something like:

trait ConsumingBuf {
    fn next_chunk(&mut self) -> &[u8] {
        let chunk = self.inner.chunk();
        self.inner.advance(chunk.len());
        if self.inner.remaining() == 0 {
            self.signal.thingy()
        }
    }
}

(where inner implements Buf) to guarantee we notify if-and-only-if buffer was consumed.

@seanmonstar
Copy link
Member Author

It's true there's no guarantee, but hyper does make sure to always advance to the end. The thing with readiness-based IO is that we need to peek, because there might not be enough room in the socket's write buffer to send it all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-body Area: body streaming. C-feature Category: feature. This is adding a new feature. E-medium Effort: medium. Some knowledge of how hyper internal works would be useful. K-hyper-util Crate: hyper-util
Projects
No open projects
Status: In Progress
Development

Successfully merging a pull request may close this issue.

2 participants