diff --git a/Cargo.lock b/Cargo.lock index a7cbc1019..664b9fec4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1074,6 +1074,7 @@ version = "0.0.0" dependencies = [ "async-trait", "buf_channel", + "bytes", "common", "config", "error", diff --git a/cas/store/BUILD b/cas/store/BUILD index 1d0e49527..aa85a29c8 100644 --- a/cas/store/BUILD +++ b/cas/store/BUILD @@ -157,6 +157,7 @@ rust_library( "//util:common", "//util:error", "//util:metrics_utils", + "@crate_index//:bytes", "@crate_index//:futures", ], ) diff --git a/cas/store/fast_slow_store.rs b/cas/store/fast_slow_store.rs index aaf668999..b40f001e5 100644 --- a/cas/store/fast_slow_store.rs +++ b/cas/store/fast_slow_store.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::{max, min}; +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -81,6 +83,26 @@ impl FastSlowStore { fn pin_slow_store(&self) -> Pin<&dyn StoreTrait> { Pin::new(self.slow_store.as_ref()) } + + /// Returns the range of bytes that should be sent given a slice bounds + /// offset so the output range maps the received_range.start to 0. + // TODO(allada) This should be put into utils, as this logic is used + // elsewhere in the code. + pub fn calculate_range(received_range: &Range, send_range: &Range) -> Option> { + // Protect against subtraction overflow. + if received_range.start >= received_range.end { + return None; + } + + let start = max(received_range.start, send_range.start); + let end = min(received_range.end, send_range.end); + if received_range.contains(&start) && received_range.contains(&(end - 1)) { + // Offset both to the start of the received_range. + Some(start - received_range.start..end - received_range.start) + } else { + None + } + } } #[async_trait] @@ -181,21 +203,21 @@ impl StoreTrait for FastSlowStore { if fast_store.has(digest).await?.is_some() { return fast_store.get_part(digest, writer, offset, length).await; } - // We can only copy the data to our fast store if we are copying everything. - if offset != 0 || length.is_some() { - return slow_store.get_part(digest, writer, offset, length).await; - } - let sz_result = slow_store + let sz = slow_store .has(digest) .await - .err_tip(|| "Failed to run has() on slow store")?; - let Some(sz) = sz_result else { - return Err(make_err!( - Code::NotFound, - "Object not found in either fast or slow store" - )); - }; + .err_tip(|| "Failed to run has() on slow store")? + .ok_or_else(|| { + make_err!( + Code::NotFound, + "Object {} not found in either fast or slow store", + digest.hash_str() + ) + })?; + + let send_range = offset..length.map_or(usize::MAX, |length| length + offset); + let mut bytes_received: usize = 0; let (mut fast_tx, fast_rx) = make_buf_channel_pair(); let (slow_tx, mut slow_rx) = make_buf_channel_pair(); @@ -212,20 +234,22 @@ impl StoreTrait for FastSlowStore { // all the data they wanted, which could lead to an error when writing this // EOF. If that was to happen, we could end up terminating this early and // the resulting upload to the fast store might fail. - let _ = fast_tx.send_eof().await?; - let _ = writer_pin.send_eof().await?; - return Ok(()); - } - let (fast_tx_res, writer_res) = join!( - fast_tx.send(output_buf.clone()).boxed(), - writer_pin.send(output_buf).boxed(), - ); - if let Err(err) = fast_tx_res { - return Err(err).err_tip(|| "Failed to write to fast store in fast_slow store"); - } - if let Err(err) = writer_res { - return Err(err).err_tip(|| "Failed to write result to writer in fast_slow store"); + let (fast_res, slow_res) = join!(fast_tx.send_eof(), writer_pin.send_eof()); + return fast_res.merge(slow_res); } + + let writer_fut = if let Some(range) = + Self::calculate_range(&(bytes_received..bytes_received + output_buf.len()), &send_range) + { + writer_pin.send(output_buf.slice(range)).right_future() + } else { + futures::future::ready(Ok(())).left_future() + }; + bytes_received += output_buf.len(); + + let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut); + fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")?; + writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")?; } }; diff --git a/cas/store/filesystem_store.rs b/cas/store/filesystem_store.rs index b41f0b87c..3ebc299f9 100644 --- a/cas/store/filesystem_store.rs +++ b/cas/store/filesystem_store.rs @@ -482,7 +482,7 @@ impl FilesystemStore { self.evicting_map .get(digest) .await - .ok_or_else(|| make_err!(Code::NotFound, "not found in filesystem store")) + .ok_or_else(|| make_err!(Code::NotFound, "{} not found in filesystem store", digest.hash_str())) } async fn update_file<'a>( @@ -637,7 +637,7 @@ impl StoreTrait for FilesystemStore { .evicting_map .get(&digest) .await - .ok_or_else(|| make_err!(Code::NotFound, "not found in filesystem store"))?; + .ok_or_else(|| make_err!(Code::NotFound, "{} not found in filesystem store", digest.hash_str()))?; let read_limit = length.unwrap_or(usize::MAX) as u64; let mut resumeable_temp_file = entry.read_file_part(offset as u64, read_limit).await?; diff --git a/cas/store/tests/fast_slow_store_test.rs b/cas/store/tests/fast_slow_store_test.rs index de91b20ac..e31bdeb2b 100644 --- a/cas/store/tests/fast_slow_store_test.rs +++ b/cas/store/tests/fast_slow_store_test.rs @@ -116,7 +116,7 @@ mod fast_slow_store_tests { } #[tokio::test] - async fn partial_reads_do_not_copy_to_slow_store_test() -> Result<(), Error> { + async fn partial_reads_copy_full_to_fast_store_test() -> Result<(), Error> { let (fast_slow_store, fast_store, slow_store) = make_stores(); let fast_slow_store = Pin::new(fast_slow_store.as_ref()); let fast_store = Pin::new(fast_store.as_ref()); @@ -127,13 +127,91 @@ mod fast_slow_store_tests { slow_store.update_oneshot(digest, original_data.clone().into()).await?; // This get() request should place the data in fast_store too. - fast_slow_store.get_part_unchunked(digest, 0, Some(50), None).await?; + assert_eq!( + original_data[10..60], + fast_slow_store.get_part_unchunked(digest, 10, Some(50), None).await? + ); - // Data should not exist in fast store, but should exist in slow store because - // it was a partial read. - assert_eq!(fast_store.has(digest).await, Ok(None)); + // Full data should exist in the fast store even though only partially + // read. check_data(slow_store, digest, &original_data, "slow_store").await?; + check_data(fast_store, digest, &original_data, "fast_store").await?; Ok(()) } + + #[test] + fn calculate_range_test() { + let test = |start_range, end_range| FastSlowStore::calculate_range(&start_range, &end_range); + { + // Exact match. + let received_range = 0..1; + let send_range = 0..1; + let expected_results = Some(0..1); + assert_eq!(test(received_range, send_range), expected_results); + } + { + // Minus one on received_range. + let received_range = 1..4; + let send_range = 1..5; + let expected_results = Some(0..3); + assert_eq!(test(received_range, send_range), expected_results); + } + { + // Minus one on send_range. + let received_range = 1..5; + let send_range = 1..4; + let expected_results = Some(0..3); + assert_eq!(test(received_range, send_range), expected_results); + } + { + // Should have already sent all data (start fence post). + let received_range = 1..2; + let send_range = 0..1; + let expected_results = None; + assert_eq!(test(received_range, send_range), expected_results); + } + { + // Definiltly already sent data. + let received_range = 2..3; + let send_range = 0..1; + let expected_results = None; + assert_eq!(test(received_range, send_range), expected_results); + } + { + // All data should be sent (inside range). + let received_range = 3..4; + let send_range = 0..100; + let expected_results = Some(0..1); // Note: This is relative received_range.start. + assert_eq!(test(received_range, send_range), expected_results); + } + { + // Subset of received data should be sent. + let received_range = 1..100; + let send_range = 3..4; + let expected_results = Some(2..3); // Note: This is relative received_range.start. + assert_eq!(test(received_range, send_range), expected_results); + } + { + // We are clearly not at the offset yet. + let received_range = 0..1; + let send_range = 3..4; + let expected_results = None; + assert_eq!(test(received_range, send_range), expected_results); + } + { + // Not at offset yet (fence post). + let received_range = 0..1; + let send_range = 1..2; + let expected_results = None; + assert_eq!(test(received_range, send_range), expected_results); + } + { + // Head part of the received data should be sent. + let received_range = 1..3; + let send_range = 2..5; + let expected_results = Some(1..2); + assert_eq!(test(received_range, send_range), expected_results); + } + } } diff --git a/cas/worker/local_worker.rs b/cas/worker/local_worker.rs index d037e6741..1812193b5 100644 --- a/cas/worker/local_worker.rs +++ b/cas/worker/local_worker.rs @@ -280,6 +280,9 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, futures.push( tokio::spawn(start_action_fut).map(move |res| { let res = res.err_tip(|| "Failed to launch spawn")?; + if let Err(err) = &res { + log::info!("\x1b[0;31mError executing action\x1b[0m: {}", err); + } add_future_channel .send(make_publish_future(res).boxed()) .map_err(|_| make_err!(Code::Internal, "LocalWorker could not send future"))?; diff --git a/cas/worker/running_actions_manager.rs b/cas/worker/running_actions_manager.rs index 32b6db3b2..67bdda254 100644 --- a/cas/worker/running_actions_manager.rs +++ b/cas/worker/running_actions_manager.rs @@ -894,6 +894,15 @@ impl RunningActionImpl { let mut output_directory_symlinks = vec![]; let mut output_file_symlinks = vec![]; + if execution_result.exit_code != 0 { + log::info!( + "Command returned exit code {} : {} {}", + execution_result.exit_code, + std::str::from_utf8(&execution_result.stdout).unwrap_or(""), + std::str::from_utf8(&execution_result.stderr).unwrap_or("") + ); + } + let stdout_digest_fut = self.metrics().upload_stdout.wrap(async { let cursor = Cursor::new(execution_result.stdout); let (digest, mut cursor) = compute_digest(cursor).await?; diff --git a/gencargo/fast_slow_store/Cargo.toml b/gencargo/fast_slow_store/Cargo.toml index 86c21f60d..39344530d 100644 --- a/gencargo/fast_slow_store/Cargo.toml +++ b/gencargo/fast_slow_store/Cargo.toml @@ -20,6 +20,7 @@ doctest = false [dependencies] async-trait = { workspace = true } +bytes = { workspace = true } futures = { workspace = true } # Local libraries.