Skip to content

Commit

Permalink
Introduce get_part_ref() and migrate primary use to .get_part()
Browse files Browse the repository at this point in the history
Due to DropCloserWriteHalf being so fragile in how it is dropped,
it was decided that we should prefer to pass an owned version to
the store api and introduce `.get_part_ref()` to handle cases
where the writer should not be dropped on the return.

This will protect from cases where users join!() the reader and
writer sides, but a deadlock happens because the writer errors
but doesn't drop, notifying the reader side.
  • Loading branch information
allada committed Sep 19, 2023
1 parent d3d0b64 commit fb6e1fd
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 32 deletions.
4 changes: 2 additions & 2 deletions cas/grpc_service/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl ByteStreamServer {

let digest = DigestInfo::try_new(resource_info.hash, resource_info.expected_size)?;

let (mut tx, rx) = buf_channel::make_buf_channel_pair();
let (tx, rx) = buf_channel::make_buf_channel_pair();

struct ReaderState {
max_bytes_per_stream: usize,
Expand All @@ -262,7 +262,7 @@ impl ByteStreamServer {
maybe_get_part_result: None,
get_part_fut: Box::pin(async move {
store
.get_part_arc(digest, &mut tx, read_request.read_offset as usize, read_limit)
.get_part_arc(digest, tx, read_request.read_offset as usize, read_limit)
.await
}),
});
Expand Down
6 changes: 3 additions & 3 deletions cas/store/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,20 +365,20 @@ impl StoreTrait for CompressionStore {
write_result.merge(update_result)
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
let offset = offset as u64;
let (mut tx, mut rx) = make_buf_channel_pair();
let (tx, mut rx) = make_buf_channel_pair();

let inner_store = self.inner_store.clone();
let get_part_fut = JoinHandleDropGuard::new(tokio::spawn(async move {
Pin::new(inner_store.as_ref())
.get_part(digest, &mut tx, 0, None)
.get_part(digest, tx, 0, None)
.await
.err_tip(|| "Inner store get in compression store failed")
}))
Expand Down
2 changes: 1 addition & 1 deletion cas/store/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl StoreTrait for DedupStore {
Ok(())
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
Expand Down
12 changes: 6 additions & 6 deletions cas/store/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ impl FastSlowStore {
// TODO(blaise.bruer) This is extremely inefficient, since we are just trying
// to send the stream to /dev/null. Maybe we could instead make a version of
// the stream that can send to the drain more efficiently?
let (mut tx, mut rx) = make_buf_channel_pair();
let (tx, mut rx) = make_buf_channel_pair();
let drain_fut = async move {
while !rx.recv().await?.is_empty() {}
Ok(())
};
let (drain_res, get_res) = join!(drain_fut, self.get(digest, &mut tx));
let (drain_res, get_res) = join!(drain_fut, self.get(digest, tx));
get_res.err_tip(|| "Failed to populate()").merge(drain_res)
}

Expand Down Expand Up @@ -189,7 +189,7 @@ impl StoreTrait for FastSlowStore {
Ok(())
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
Expand All @@ -201,7 +201,7 @@ impl StoreTrait for FastSlowStore {
let fast_store = self.pin_fast_store();
let slow_store = self.pin_slow_store();
if fast_store.has(digest).await?.is_some() {
return fast_store.get_part(digest, writer, offset, length).await;
return fast_store.get_part_ref(digest, writer, offset, length).await;
}

let sz = slow_store
Expand All @@ -220,7 +220,7 @@ impl StoreTrait for FastSlowStore {
let mut bytes_received: usize = 0;

let (mut fast_tx, fast_rx) = make_buf_channel_pair();
let (mut slow_tx, mut slow_rx) = make_buf_channel_pair();
let (slow_tx, mut slow_rx) = make_buf_channel_pair();
let data_stream_fut = async move {
let mut writer_pin = Pin::new(writer);
loop {
Expand Down Expand Up @@ -253,7 +253,7 @@ impl StoreTrait for FastSlowStore {
}
};

let slow_store_fut = slow_store.get(digest, &mut slow_tx);
let slow_store_fut = slow_store.get(digest, slow_tx);
let fast_store_fut = fast_store.update(digest, fast_rx, UploadSizeInfo::ExactSize(sz));

let (data_stream_res, slow_res, fast_res) = join!(data_stream_fut, slow_store_fut, fast_store_fut);
Expand Down
2 changes: 1 addition & 1 deletion cas/store/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ impl<Fe: FileEntry> StoreTrait for FilesystemStore<Fe> {
.err_tip(|| format!("While processing with temp file {:?}", temp_full_path))
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
Expand Down
2 changes: 1 addition & 1 deletion cas/store/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ impl StoreTrait for GrpcStore {
Ok(())
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
Expand Down
2 changes: 1 addition & 1 deletion cas/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl StoreTrait for MemoryStore {
Ok(())
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
Expand Down
6 changes: 4 additions & 2 deletions cas/store/ref_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,17 @@ impl StoreTrait for RefStore {
Pin::new(store.as_ref()).update(digest, reader, size_info).await
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
let store = self.get_store()?;
Pin::new(store.as_ref()).get_part(digest, writer, offset, length).await
Pin::new(store.as_ref())
.get_part_ref(digest, writer, offset, length)
.await
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Expand Down
2 changes: 1 addition & 1 deletion cas/store/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl StoreTrait for S3Store {
complete_result
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
Expand Down
6 changes: 3 additions & 3 deletions cas/store/size_partitioning_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl StoreTrait for SizePartitioningStore {
.await
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
Expand All @@ -107,11 +107,11 @@ impl StoreTrait for SizePartitioningStore {
) -> Result<(), Error> {
if digest.size_bytes < self.size {
return Pin::new(self.lower_store.as_ref())
.get_part(digest, writer, offset, length)
.get_part_ref(digest, writer, offset, length)
.await;
}
Pin::new(self.upper_store.as_ref())
.get_part(digest, writer, offset, length)
.get_part_ref(digest, writer, offset, length)
.await
}

Expand Down
35 changes: 30 additions & 5 deletions cas/store/store_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub trait StoreTrait: Sync + Send + Unpin {
/// Look up a digest in the store and return None if it does not exist in
/// the store, or Some(size) if it does.
/// Note: On an AC store the size will be incorrect and should not be used!
#[inline]
async fn has(self: Pin<&Self>, digest: DigestInfo) -> Result<Option<usize>, Error> {
let mut result = [None];
self.has_with_results(&[digest], &mut result).await?;
Expand All @@ -54,6 +55,7 @@ pub trait StoreTrait: Sync + Send + Unpin {
/// the same order as input. The result will either be None if it does not
/// exist in the store, or Some(size) if it does.
/// Note: On an AC store the size will be incorrect and should not be used!
#[inline]
async fn has_many(self: Pin<&Self>, digests: &[DigestInfo]) -> Result<Vec<Option<usize>>, Error> {
let mut results = vec![None; digests.len()];
self.has_with_results(digests, &mut results).await?;
Expand Down Expand Up @@ -99,22 +101,45 @@ pub trait StoreTrait: Sync + Send + Unpin {
Ok(())
}

async fn get_part(
/// Retreives part of the data from the store and writes it to the given writer.
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error>;

async fn get(self: Pin<&Self>, digest: DigestInfo, writer: &mut DropCloserWriteHalf) -> Result<(), Error> {
/// Same as `get_part_ref`, but takes ownership of the writer. This is preferred
/// when the writer is definitly not going to be needed after the function returns.
/// This is useful because the read half of the writer will block until the writer
/// is dropped or EOF is sent. If the writer was passed as a reference, and the
/// reader was being waited with the `.get_part()`, it could deadlock if the writer
/// is not dropped or EOF sent. `.get_part_ref()` should be used when the writer
/// might be used after the function returns.
#[inline]
async fn get_part(
self: Pin<&Self>,
digest: DigestInfo,
mut writer: DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
self.get_part_ref(digest, &mut writer, offset, length).await
}

/// Utility that works the same as ``.get_part()`, but writes all the data.
#[inline]
async fn get(self: Pin<&Self>, digest: DigestInfo, writer: DropCloserWriteHalf) -> Result<(), Error> {
self.get_part(digest, writer, 0, None).await
}

/// Utility for when `self` is an `Arc` to make the code easier to write.
#[inline]
async fn get_part_arc(
self: Arc<Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
writer: DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
Expand All @@ -132,11 +157,11 @@ pub trait StoreTrait: Sync + Send + Unpin {
// TODO(blaise.bruer) This is extremely inefficient, since we have exactly
// what we need here. Maybe we could instead make a version of the stream
// that can take objects already fully in memory instead?
let (mut tx, rx) = make_buf_channel_pair();
let (tx, rx) = make_buf_channel_pair();

let (data_res, get_part_res) = join!(
rx.collect_all_with_size_hint(length.unwrap_or(size_hint.unwrap_or(0))),
async move { self.get_part(digest, &mut tx, offset, length).await },
self.get_part(digest, tx, offset, length),
);
get_part_res
.err_tip(|| "Failed to get_part in get_part_unchunked")
Expand Down
8 changes: 4 additions & 4 deletions cas/store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,10 @@ mod filesystem_store_tests {
// Insert data into store.
store_pin.as_ref().update_oneshot(digest1, VALUE1.into()).await?;

let (mut writer, mut reader) = make_buf_channel_pair();
let (writer, mut reader) = make_buf_channel_pair();
let store_clone = store.clone();
let digest1_clone = digest1;
tokio::spawn(async move { Pin::new(store_clone.as_ref()).get(digest1_clone, &mut writer).await });
tokio::spawn(async move { Pin::new(store_clone.as_ref()).get(digest1_clone, writer).await });

{
// Check to ensure our first byte has been received. The future should be stalled here.
Expand Down Expand Up @@ -455,9 +455,9 @@ mod filesystem_store_tests {
store_pin.as_ref().update_oneshot(digest1, VALUE1.into()).await?;

let mut reader = {
let (mut writer, reader) = make_buf_channel_pair();
let (writer, reader) = make_buf_channel_pair();
let store_clone = store.clone();
tokio::spawn(async move { Pin::new(store_clone.as_ref()).get(digest1, &mut writer).await });
tokio::spawn(async move { Pin::new(store_clone.as_ref()).get(digest1, writer).await });
reader
};
// Ensure we have received 1 byte in our buffer. This will ensure we have a reference to
Expand Down
4 changes: 2 additions & 2 deletions cas/store/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ impl StoreTrait for VerifyStore {
update_res.merge(check_res)
}

async fn get_part(
async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
self.pin_inner().get_part(digest, writer, offset, length).await
self.pin_inner().get_part_ref(digest, writer, offset, length).await
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Expand Down

0 comments on commit fb6e1fd

Please sign in to comment.