From 52445a1c234cef5f065d76c0af938b5744dc732d Mon Sep 17 00:00:00 2001 From: allada Date: Sat, 23 Apr 2022 15:48:11 -0500 Subject: [PATCH] Fix bug in memory store when receiving a zero byte object An edge case was discovered in another unit test where a zero byte object (empty file) would error out during receive. --- cas/store/memory_store.rs | 10 ++++++---- cas/store/tests/memory_store_test.rs | 22 ++++++++++++++++++++++ util/buf_channel.rs | 14 +++++++------- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/cas/store/memory_store.rs b/cas/store/memory_store.rs index 45fad3b0a..81c8992be 100644 --- a/cas/store/memory_store.rs +++ b/cas/store/memory_store.rs @@ -98,10 +98,12 @@ impl StoreTrait for MemoryStore { let default_len = value.len() - offset; let length = length.unwrap_or(default_len).min(default_len); - writer - .send(value.0.slice(offset..(offset + length))) - .await - .err_tip(|| "Failed to write data in memory store")?; + if length > 0 { + writer + .send(value.0.slice(offset..(offset + length))) + .await + .err_tip(|| "Failed to write data in memory store")?; + } writer .send_eof() .await diff --git a/cas/store/tests/memory_store_test.rs b/cas/store/tests/memory_store_test.rs index b0f681742..9a3538624 100644 --- a/cas/store/tests/memory_store_test.rs +++ b/cas/store/tests/memory_store_test.rs @@ -82,6 +82,28 @@ mod memory_store_tests { Ok(()) } + // A bug was found where reading an empty value from memory store would result in an error + // due to internal EOF handling. This is an edge case test. + #[tokio::test] + async fn read_zero_size_item_test() -> Result<(), Error> { + let store_owned = MemoryStore::new(&config::backends::MemoryStore::default()); + let store = Pin::new(&store_owned); + + // Insert dummy value into store. + const VALUE: &str = ""; + store + .update_oneshot(DigestInfo::try_new(&VALID_HASH1, VALUE.len())?, VALUE.into()) + .await?; + assert_eq!( + store + .get_part_unchunked(DigestInfo::try_new(&VALID_HASH1, VALUE.len())?, 0, None, None) + .await, + Ok("".into()), + "Expected memory store to have empty value" + ); + Ok(()) + } + #[tokio::test] async fn errors_with_invalid_inputs() -> Result<(), Error> { let store_owned = MemoryStore::new(&config::backends::MemoryStore::default()); diff --git a/util/buf_channel.rs b/util/buf_channel.rs index 7d4a057b8..132903dd0 100644 --- a/util/buf_channel.rs +++ b/util/buf_channel.rs @@ -8,7 +8,7 @@ use futures::{task::Context, Future, Stream, StreamExt}; use tokio::sync::{mpsc, oneshot}; pub use tokio_util::io::StreamReader; -use error::{make_err, Code, Error, ResultExt}; +use error::{error_if, make_err, Code, Error, ResultExt}; /// Create a channel pair that can be used to transport buffer objects around to /// different components. This wrapper is used because the streams give some @@ -54,7 +54,7 @@ impl DropCloserWriteHalf { .as_ref() .ok_or_else(|| make_err!(Code::Internal, "Tried to send while stream is closed"))?; let buf_len = buf.len() as u64; - assert!(buf_len != 0, "Cannot send EOF in send(). Instead use send_eof()"); + error_if!(buf_len == 0, "Cannot send EOF in send(). Instead use send_eof()"); let result = tx .send(Ok(buf)) .await @@ -70,7 +70,7 @@ impl DropCloserWriteHalf { /// Sends an EOF (End of File) message to the receiver which will gracefully let the /// stream know it has no more data. This will close the stream. pub async fn send_eof(&mut self) -> Result<(), Error> { - assert!(self.tx.is_some(), "Tried to send an EOF when pipe is broken"); + error_if!(self.tx.is_none(), "Tried to send an EOF when pipe is broken"); self.tx = None; // The final result will be provided in this oneshot channel. @@ -168,14 +168,14 @@ impl DropCloserReadHalf { match maybe_chunk { Some(Ok(chunk)) => { let chunk_len = chunk.len() as u64; - assert!(chunk_len != 0, "Chunk should never be EOF, expected None in this case"); - assert!( - self.close_after_size >= chunk_len, + error_if!(chunk_len == 0, "Chunk should never be EOF, expected None in this case"); + error_if!( + self.close_after_size < chunk_len, "Received too much data. This only happens when `close_after_size` is set." ); self.close_after_size -= chunk_len; if self.close_after_size == 0 { - assert!(self.close_tx.is_some(), "Expected stream to not be closed"); + error_if!(self.close_tx.is_none(), "Expected stream to not be closed"); self.close_tx.take().unwrap().send(Ok(())).map_err(|_| { make_err!(Code::Internal, "Failed to send closing ok message to write with size") })?;