Skip to content

Commit

Permalink
Populate the fast store on partial read
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisstaite-menlo committed Sep 13, 2023
1 parent 16f2ca5 commit e0e0a88
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cas/store/BUILD
Expand Up @@ -157,6 +157,7 @@ rust_library(
"//util:common",
"//util:error",
"//util:metrics_utils",
"@crate_index//:bytes",
"@crate_index//:futures",
],
)
Expand Down
74 changes: 49 additions & 25 deletions cas/store/fast_slow_store.rs
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::{max, min};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;

Expand Down Expand Up @@ -81,6 +83,26 @@ impl FastSlowStore {
fn pin_slow_store(&self) -> Pin<&dyn StoreTrait> {
Pin::new(self.slow_store.as_ref())
}

/// Returns the range of bytes that should be sent given a slice bounds
/// offset so the output range maps the received_range.start to 0.
// TODO(allada) This should be put into utils, as this logic is used
// elsewhere in the code.
pub fn calculate_range(received_range: &Range<usize>, send_range: &Range<usize>) -> Option<Range<usize>> {
// Protect against subtraction overflow.
if received_range.start >= received_range.end {
return None;
}

let start = max(received_range.start, send_range.start);
let end = min(received_range.end, send_range.end);
if received_range.contains(&start) && received_range.contains(&(end - 1)) {
// Offset both to the start of the received_range.
Some(start - received_range.start..end - received_range.start)
} else {
None
}
}
}

#[async_trait]
Expand Down Expand Up @@ -181,21 +203,21 @@ impl StoreTrait for FastSlowStore {
if fast_store.has(digest).await?.is_some() {
return fast_store.get_part(digest, writer, offset, length).await;
}
// We can only copy the data to our fast store if we are copying everything.
if offset != 0 || length.is_some() {
return slow_store.get_part(digest, writer, offset, length).await;
}

let sz_result = slow_store
let sz = slow_store
.has(digest)
.await
.err_tip(|| "Failed to run has() on slow store")?;
let Some(sz) = sz_result else {
return Err(make_err!(
Code::NotFound,
"Object not found in either fast or slow store"
));
};
.err_tip(|| "Failed to run has() on slow store")?
.ok_or_else(|| {
make_err!(
Code::NotFound,
"Object {} not found in either fast or slow store",
digest.hash_str()
)
})?;

let send_range = offset..length.map_or(usize::MAX, |length| length + offset);
let mut bytes_received: usize = 0;

let (mut fast_tx, fast_rx) = make_buf_channel_pair();
let (slow_tx, mut slow_rx) = make_buf_channel_pair();
Expand All @@ -212,20 +234,22 @@ impl StoreTrait for FastSlowStore {
// all the data they wanted, which could lead to an error when writing this
// EOF. If that was to happen, we could end up terminating this early and
// the resulting upload to the fast store might fail.
let _ = fast_tx.send_eof().await?;
let _ = writer_pin.send_eof().await?;
return Ok(());
}
let (fast_tx_res, writer_res) = join!(
fast_tx.send(output_buf.clone()).boxed(),
writer_pin.send(output_buf).boxed(),
);
if let Err(err) = fast_tx_res {
return Err(err).err_tip(|| "Failed to write to fast store in fast_slow store");
}
if let Err(err) = writer_res {
return Err(err).err_tip(|| "Failed to write result to writer in fast_slow store");
let (fast_res, slow_res) = join!(fast_tx.send_eof(), writer_pin.send_eof());
return fast_res.merge(slow_res);
}

let writer_fut = if let Some(range) =
Self::calculate_range(&(bytes_received..bytes_received + output_buf.len()), &send_range)
{
writer_pin.send(output_buf.slice(range)).right_future()
} else {
futures::future::ready(Ok(())).left_future()
};
bytes_received += output_buf.len();

let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut);
fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")?;
writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")?;
}
};

Expand Down
4 changes: 2 additions & 2 deletions cas/store/filesystem_store.rs
Expand Up @@ -482,7 +482,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
self.evicting_map
.get(digest)
.await
.ok_or_else(|| make_err!(Code::NotFound, "not found in filesystem store"))
.ok_or_else(|| make_err!(Code::NotFound, "{} not found in filesystem store", digest.hash_str()))
}

async fn update_file<'a>(
Expand Down Expand Up @@ -637,7 +637,7 @@ impl<Fe: FileEntry> StoreTrait for FilesystemStore<Fe> {
.evicting_map
.get(&digest)
.await
.ok_or_else(|| make_err!(Code::NotFound, "not found in filesystem store"))?;
.ok_or_else(|| make_err!(Code::NotFound, "{} not found in filesystem store", digest.hash_str()))?;
let read_limit = length.unwrap_or(usize::MAX) as u64;
let mut resumeable_temp_file = entry.read_file_part(offset as u64, read_limit).await?;

Expand Down
88 changes: 83 additions & 5 deletions cas/store/tests/fast_slow_store_test.rs
Expand Up @@ -116,7 +116,7 @@ mod fast_slow_store_tests {
}

#[tokio::test]
async fn partial_reads_do_not_copy_to_slow_store_test() -> Result<(), Error> {
async fn partial_reads_copy_full_to_fast_store_test() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) = make_stores();
let fast_slow_store = Pin::new(fast_slow_store.as_ref());
let fast_store = Pin::new(fast_store.as_ref());
Expand All @@ -127,13 +127,91 @@ mod fast_slow_store_tests {
slow_store.update_oneshot(digest, original_data.clone().into()).await?;

// This get() request should place the data in fast_store too.
fast_slow_store.get_part_unchunked(digest, 0, Some(50), None).await?;
assert_eq!(
original_data[10..60],
fast_slow_store.get_part_unchunked(digest, 10, Some(50), None).await?
);

// Data should not exist in fast store, but should exist in slow store because
// it was a partial read.
assert_eq!(fast_store.has(digest).await, Ok(None));
// Full data should exist in the fast store even though only partially
// read.
check_data(slow_store, digest, &original_data, "slow_store").await?;
check_data(fast_store, digest, &original_data, "fast_store").await?;

Ok(())
}

#[test]
fn calculate_range_test() {
let test = |start_range, end_range| FastSlowStore::calculate_range(&start_range, &end_range);
{
// Exact match.
let received_range = 0..1;
let send_range = 0..1;
let expected_results = Some(0..1);
assert_eq!(test(received_range, send_range), expected_results);
}
{
// Minus one on received_range.
let received_range = 1..4;
let send_range = 1..5;
let expected_results = Some(0..3);
assert_eq!(test(received_range, send_range), expected_results);
}
{
// Minus one on send_range.
let received_range = 1..5;
let send_range = 1..4;
let expected_results = Some(0..3);
assert_eq!(test(received_range, send_range), expected_results);
}
{
// Should have already sent all data (start fence post).
let received_range = 1..2;
let send_range = 0..1;
let expected_results = None;
assert_eq!(test(received_range, send_range), expected_results);
}
{
// Definiltly already sent data.
let received_range = 2..3;
let send_range = 0..1;
let expected_results = None;
assert_eq!(test(received_range, send_range), expected_results);
}
{
// All data should be sent (inside range).
let received_range = 3..4;
let send_range = 0..100;
let expected_results = Some(0..1); // Note: This is relative received_range.start.
assert_eq!(test(received_range, send_range), expected_results);
}
{
// Subset of received data should be sent.
let received_range = 1..100;
let send_range = 3..4;
let expected_results = Some(2..3); // Note: This is relative received_range.start.
assert_eq!(test(received_range, send_range), expected_results);
}
{
// We are clearly not at the offset yet.
let received_range = 0..1;
let send_range = 3..4;
let expected_results = None;
assert_eq!(test(received_range, send_range), expected_results);
}
{
// Not at offset yet (fence post).
let received_range = 0..1;
let send_range = 1..2;
let expected_results = None;
assert_eq!(test(received_range, send_range), expected_results);
}
{
// Head part of the received data should be sent.
let received_range = 1..3;
let send_range = 2..5;
let expected_results = Some(1..2);
assert_eq!(test(received_range, send_range), expected_results);
}
}
}
3 changes: 3 additions & 0 deletions cas/worker/local_worker.rs
Expand Up @@ -280,6 +280,9 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
futures.push(
tokio::spawn(start_action_fut).map(move |res| {
let res = res.err_tip(|| "Failed to launch spawn")?;
if let Err(err) = &res {
log::info!("\x1b[0;31mError executing action\x1b[0m: {}", err);
}
add_future_channel
.send(make_publish_future(res).boxed())
.map_err(|_| make_err!(Code::Internal, "LocalWorker could not send future"))?;
Expand Down
9 changes: 9 additions & 0 deletions cas/worker/running_actions_manager.rs
Expand Up @@ -894,6 +894,15 @@ impl RunningActionImpl {
let mut output_directory_symlinks = vec![];
let mut output_file_symlinks = vec![];

if execution_result.exit_code != 0 {
log::info!(
"Command returned exit code {} : {} {}",
execution_result.exit_code,
std::str::from_utf8(&execution_result.stdout).unwrap_or(""),
std::str::from_utf8(&execution_result.stderr).unwrap_or("")
);
}

let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
let cursor = Cursor::new(execution_result.stdout);
let (digest, mut cursor) = compute_digest(cursor).await?;
Expand Down
1 change: 1 addition & 0 deletions gencargo/fast_slow_store/Cargo.toml
Expand Up @@ -20,6 +20,7 @@ doctest = false

[dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }

# Local libraries.
Expand Down

0 comments on commit e0e0a88

Please sign in to comment.