Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential bugs in DropCloserReadHalf::take() #578

Closed
allada opened this issue Dec 23, 2023 · 1 comment · Fixed by #606
Closed

Potential bugs in DropCloserReadHalf::take() #578

allada opened this issue Dec 23, 2023 · 1 comment · Fixed by #606
Assignees
Labels
bug Something isn't working

Comments

@allada
Copy link
Collaborator

allada commented Dec 23, 2023

While looking at some other issues I noticed that the take() function here:

// This is an optimization for a relatively common case when the first chunk in the

is a bit confusing and when digging deep into it, I believe it was sometimes hiding possible EOFs. I rewrote it to try and better understand it. Right now the only places that use this function are CompressionStore and S3Store.

Here's how I rewrote it:

    /// Takes exactly `size` number of bytes from the stream and returns them.
    /// This means the stream will keep polling until either an EOF is received or
    /// `size` bytes are received and concat them all together then return them.
    /// This method is optimized to reduce copies when possible.
    pub async fn take(&mut self, size: usize) -> Result<Bytes, Error> {
        fn populate_partial_if_needed(
            current_size: usize,
            desired_size: usize,
            chunk: &mut Bytes,
            partial: &mut Option<Result<Bytes, Error>>,
        ) {
            if current_size + chunk.len() <= desired_size {
                return;
            }
            assert!(partial.is_none(), "Partial should have been consumed during the recv()");
            let local_partial = chunk.split_off(desired_size - current_size);
            *partial = if local_partial.is_empty() {
                None
            } else {
                Some(Ok(local_partial))
            };
        }

        let first_chunk = {
            // This is an optimization for a relatively common case when the first chunk in the
            // stream satisfies all the requirements to fill this `take()`.
            // This will prevent us from needing to copy the data into a new buffer and instead
            // we can just forward on the original Bytes object. If we need more than the first
            // chunk we will then go the slow path and actually copy our data.

            // 1. Read some data from our stream (or self.partial).
            let mut first_chunk = self.recv().await.err_tip(|| "During first buf_channel::take()")?;
            assert!(self.partial.is_none(), "Partial should have been consumed during the recv()");
            // 2. Split our data so `first_chunk` is <= `size` and puts any remaining
            //    in `self.partial` (or set it to None).
            populate_partial_if_needed(0, size, &mut first_chunk, &mut self.partial);
            // 3a. If our `first_chunk` is EOF, we are done.
            // 3b. If our `first_chunk` is exactly `size` we can return our current state.
            if first_chunk.is_empty() || first_chunk.len() >= size {
                assert!(first_chunk.len() < size, "Length should be exactly size here");
                return Ok(first_chunk);
            }
            assert!(
                self.partial.is_none(),
                "If `first_chunk`'s len is < size and not EOF, partial should be None"
            );
            first_chunk
        };
        let mut output = BytesMut::with_capacity(size);
        output.put(first_chunk);

        loop {
            let mut chunk = self.recv().await.err_tip(|| "During buf_channel::take()")?;
            assert!(self.partial.is_none(), "Partial should have been consumed during the recv()");
            if chunk.is_empty() {
                // Forward EOF to next recv() and return our current buffer.
                self.partial = Some(Ok(chunk));
                return Ok(output.freeze());
            }

            populate_partial_if_needed(output.len(), size, &mut chunk, &mut self.partial);

            output.put(chunk);

            if output.len() == size {
                return Ok(output.freeze());
            }
            assert!(output.len() < size, "Length should never be larger than size in take()");
            assert!(self.partial.is_none(), "Partial shouldn't be set if chunk < size in take()");
        }
    }

It would still need a test to validate my theory.

@allada allada added the bug Something isn't working label Dec 23, 2023
@steedmicro
Copy link
Contributor

I'd like to work on that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants