From 44da5948b6859a216d2d385be003e13ee9eddc7a Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Mon, 4 Mar 2024 21:59:13 -0600 Subject: [PATCH] Optimize file uploads when source is file When using nativelink with a local worker/CAS setup, adds optimizations which make it faster to upload files from the worker to the CAS. This is specifically useful for Buck2 for users that want to build hermetically. closes: #409 --- nativelink-store/src/ac_utils.rs | 74 ++------------ nativelink-store/src/fast_slow_store.rs | 71 ++++++++++++-- nativelink-store/src/filesystem_store.rs | 84 ++++++++++++---- nativelink-store/src/noop_store.rs | 6 +- nativelink-store/tests/ac_utils_test.rs | 7 +- .../tests/fast_slow_store_test.rs | 30 ++++-- .../tests/filesystem_store_test.rs | 51 ++++++++++ nativelink-util/src/store_trait.rs | 96 ++++++++++++++++++- .../src/running_actions_manager.rs | 20 ++-- 9 files changed, 321 insertions(+), 118 deletions(-) diff --git a/nativelink-store/src/ac_utils.rs b/nativelink-store/src/ac_utils.rs index 2bced70a5..eea660a12 100644 --- a/nativelink-store/src/ac_utils.rs +++ b/nativelink-store/src/ac_utils.rs @@ -19,14 +19,12 @@ use std::pin::Pin; -use bytes::{Bytes, BytesMut}; -use futures::future::join; -use futures::{Future, FutureExt, TryFutureExt}; +use bytes::BytesMut; +use futures::TryFutureExt; use nativelink_error::{Code, Error, ResultExt}; -use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserWriteHalf}; -use nativelink_util::common::{fs, DigestInfo}; +use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasher; -use nativelink_util::store_trait::{Store, UploadSizeInfo}; +use nativelink_util::store_trait::Store; use prost::Message; // NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than @@ -91,7 +89,8 @@ pub async fn serialize_and_upload_message<'a, T: Message>( ) -> Result { let mut buffer = BytesMut::with_capacity(message.encoded_len()); let digest = message_to_digest(message, &mut buffer, hasher).err_tip(|| "In serialize_and_upload_message")?; - upload_buf_to_store(cas_store, digest, buffer.freeze()) + cas_store + .update_oneshot(digest, buffer.freeze()) .await .err_tip(|| "In serialize_and_upload_message")?; Ok(digest) @@ -102,64 +101,3 @@ pub fn compute_buf_digest(buf: &[u8], hasher: &mut impl DigestHasher) -> DigestI hasher.update(buf); hasher.finalize_digest() } - -fn inner_upload_file_to_store<'a, Fut: Future> + 'a>( - cas_store: Pin<&'a dyn Store>, - digest: DigestInfo, - read_data_fn: impl FnOnce(DropCloserWriteHalf) -> Fut, -) -> impl Future> + 'a { - let (tx, rx) = make_buf_channel_pair(); - join( - cas_store - .update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize)) - .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store")), - read_data_fn(tx), - ) - // Ensure we get errors reported from both sides - .map(|(upload_result, read_result)| upload_result.merge(read_result)) -} - -/// Uploads data to our store for given digest. -pub fn upload_buf_to_store( - cas_store: Pin<&dyn Store>, - digest: DigestInfo, - buf: Bytes, -) -> impl Future> + '_ { - inner_upload_file_to_store(cas_store, digest, move |mut tx| async move { - if !buf.is_empty() { - tx.send(buf) - .await - .err_tip(|| "Could not send buffer data to store in upload_buf_to_store")?; - } - tx.send_eof() - .await - .err_tip(|| "Could not send EOF to store in upload_buf_to_store") - }) -} - -/// Same as `upload_buf_to_store`, however it specializes in dealing with a `ResumeableFileSlot`. -/// This will close the reading file to close if writing the data takes a while. -pub fn upload_file_to_store<'a>( - cas_store: Pin<&'a dyn Store>, - digest: DigestInfo, - mut file: fs::ResumeableFileSlot<'a>, -) -> impl Future> + 'a { - inner_upload_file_to_store(cas_store, digest, move |tx| async move { - let (_, mut tx) = file - .read_buf_cb( - (BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx), - move |(chunk, mut tx)| async move { - tx.send(chunk.freeze()) - .await - .err_tip(|| "Failed to send in upload_file_to_store")?; - Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx)) - }, - ) - .await - .err_tip(|| "Error in upload_file_to_store::read_buf_cb section")?; - tx.send_eof() - .await - .err_tip(|| "Could not send EOF to store in upload_file_to_store")?; - Ok(()) - }) -} diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 8696f368d..828341e0d 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -19,13 +19,13 @@ use std::sync::Arc; use async_trait::async_trait; use futures::{join, FutureExt}; -use nativelink_config::stores::StoreConfig; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::fs; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; -use nativelink_util::store_trait::{Store, UploadSizeInfo}; +use nativelink_util::store_trait::{slow_update_store_with_file, Store, StoreOptimizations, UploadSizeInfo}; // TODO(blaise.bruer) This store needs to be evaluated for more efficient memory usage, // there are many copies happening internally. @@ -45,12 +45,6 @@ impl FastSlowStore { fast_store: Arc, slow_store: Arc, ) -> Self { - let slow_store = if matches!(_config.slow, StoreConfig::noop) { - fast_store.clone() - } else { - slow_store - }; - Self { fast_store, slow_store } } @@ -123,6 +117,12 @@ impl Store for FastSlowStore { digests: &[DigestInfo], results: &mut [Option], ) -> Result<(), Error> { + // If our slow store is a noop store, it'll always return a 404, + // so only check the fast store in such case. + let slow_store = self.slow_store.inner_store(None); + if slow_store.optimized_for(StoreOptimizations::NoopDownloads) { + return self.pin_fast_store().has_with_results(digests, results).await; + } // Only check the slow store because if it's not there, then something // down stream might be unable to get it. This should not affect // workers as they only use get() and a CAS can use an @@ -136,6 +136,17 @@ impl Store for FastSlowStore { mut reader: DropCloserReadHalf, size_info: UploadSizeInfo, ) -> Result<(), Error> { + // If either one of our stores is a noop store, bypass the multiplexing + // and just use the store that is not a noop store. + let slow_store = self.slow_store.inner_store(Some(digest)); + if slow_store.optimized_for(StoreOptimizations::NoopUpdates) { + return self.pin_fast_store().update(digest, reader, size_info).await; + } + let fast_store = self.fast_store.inner_store(Some(digest)); + if fast_store.optimized_for(StoreOptimizations::NoopUpdates) { + return self.pin_slow_store().update(digest, reader, size_info).await; + } + let (mut fast_tx, fast_rx) = make_buf_channel_pair(); let (mut slow_tx, slow_rx) = make_buf_channel_pair(); @@ -185,6 +196,50 @@ impl Store for FastSlowStore { Ok(()) } + /// FastSlowStore has optimiations for dealing with files. + fn optimized_for(&self, optimization: StoreOptimizations) -> bool { + optimization == StoreOptimizations::FileUpdates + } + + /// Optimized variation to consume the file if one of the stores is a + /// filesystem store. This makes the operation a move instead of a copy + /// dramatically increasing performance for large files. + async fn update_with_whole_file( + self: Pin<&Self>, + digest: DigestInfo, + mut file: fs::ResumeableFileSlot<'static>, + upload_size: UploadSizeInfo, + ) -> Result>, Error> { + let fast_store = self.fast_store.inner_store(Some(digest)); + let slow_store = self.slow_store.inner_store(Some(digest)); + if fast_store.optimized_for(StoreOptimizations::FileUpdates) { + if !slow_store.optimized_for(StoreOptimizations::NoopUpdates) { + slow_update_store_with_file(Pin::new(slow_store), digest, &mut file, upload_size) + .await + .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?; + } + return Pin::new(fast_store) + .update_with_whole_file(digest, file, upload_size) + .await; + } + + if slow_store.optimized_for(StoreOptimizations::FileUpdates) { + if !fast_store.optimized_for(StoreOptimizations::NoopUpdates) { + slow_update_store_with_file(Pin::new(fast_store), digest, &mut file, upload_size) + .await + .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?; + } + return Pin::new(slow_store) + .update_with_whole_file(digest, file, upload_size) + .await; + } + + slow_update_store_with_file(self, digest, &mut file, upload_size) + .await + .err_tip(|| "In FastSlowStore::update_with_whole_file")?; + Ok(Some(file)) + } + async fn get_part_ref( self: Pin<&Self>, digest: DigestInfo, diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index c8a504a22..aaf631663 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::borrow::Cow; -use std::ffi::OsString; +use std::ffi::{OsStr, OsString}; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; @@ -32,7 +32,7 @@ use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; -use nativelink_util::store_trait::{Store, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreOptimizations, UploadSizeInfo}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::task::spawn_blocking; use tokio::time::{sleep, timeout, Sleep}; @@ -61,6 +61,7 @@ pub struct SharedContext { enum PathType { Content, Temp, + Custom(OsString), } // Note: We don't store the full path of the file because it would cause @@ -77,18 +78,23 @@ pub struct EncodedFilePath { impl EncodedFilePath { #[inline] - fn get_file_path(&self) -> OsString { + fn get_file_path(&self) -> Cow<'_, OsStr> { get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest) } } #[inline] -fn get_file_path_raw(path_type: &PathType, shared_context: &SharedContext, digest: &DigestInfo) -> OsString { +fn get_file_path_raw<'a>( + path_type: &'a PathType, + shared_context: &SharedContext, + digest: &DigestInfo, +) -> Cow<'a, OsStr> { let folder = match path_type { PathType::Content => &shared_context.content_path, PathType::Temp => &shared_context.temp_path, + PathType::Custom(path) => return Cow::Borrowed(path), }; - to_full_path_from_digest(folder, digest) + Cow::Owned(to_full_path_from_digest(folder, digest)) } impl Drop for EncodedFilePath { @@ -99,7 +105,7 @@ impl Drop for EncodedFilePath { return; } - let file_path = self.get_file_path(); + let file_path = self.get_file_path().to_os_string(); let shared_context = self.shared_context.clone(); shared_context.active_drop_spawns.fetch_add(1, Ordering::Relaxed); tokio::spawn(async move { @@ -186,7 +192,7 @@ impl FileEntry for FileEntryImpl { block_size: u64, encoded_file_path: EncodedFilePath, ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot<'static>, OsString), Error> { - let temp_full_path = encoded_file_path.get_file_path(); + let temp_full_path = encoded_file_path.get_file_path().to_os_string(); let temp_file_result = fs::create_file(temp_full_path.clone()) .or_else(|mut err| async { let remove_result = fs::remove_file(&temp_full_path) @@ -252,7 +258,7 @@ impl FileEntry for FileEntryImpl { handler: F, ) -> Result { let encoded_file_path = self.get_encoded_file_path().read().await; - handler(encoded_file_path.get_file_path()).await + handler(encoded_file_path.get_file_path().to_os_string()).await } } @@ -285,6 +291,7 @@ impl LenEntry for FileEntryImpl { async fn touch(&self) -> bool { let result = self .get_file_path_locked(move |full_content_path| async move { + let full_content_path = full_content_path.to_os_string(); spawn_blocking(move || { set_file_atime(&full_content_path, FileTime::now()) .err_tip(|| format!("Failed to touch file in filesystem store {:?}", full_content_path)) @@ -471,7 +478,7 @@ pub struct FilesystemStore { block_size: u64, read_buffer_size: usize, sleep_fn: fn(Duration) -> Sleep, - rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>, + rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>, } impl FilesystemStore { @@ -482,7 +489,7 @@ impl FilesystemStore { pub async fn new_with_timeout_and_rename_fn( config: &nativelink_config::stores::FilesystemStore, sleep_fn: fn(Duration) -> Sleep, - rename_fn: fn(&OsString, &OsString) -> Result<(), std::io::Error>, + rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>, ) -> Result { let now = SystemTime::now(); @@ -583,8 +590,10 @@ impl FilesystemStore { drop(resumeable_temp_file); *entry.data_size_mut() = data_size; - let entry = Arc::new(entry); + self.emplace_file(final_digest, Arc::new(entry)).await + } + async fn emplace_file(&self, digest: DigestInfo, entry: Arc) -> Result<(), Error> { // This sequence of events is quite ticky to understand due to the amount of triggers that // happen, async'ness of it and the locking. So here is a breakdown of what happens: // 1. Here will hold a write lock on any file operations of this FileEntry. @@ -609,20 +618,16 @@ impl FilesystemStore { // See: https://github.com/TraceMachina/nativelink/issues/495 tokio::spawn(async move { let mut encoded_file_path = entry.get_encoded_file_path().write().await; - let final_path = get_file_path_raw( - &PathType::Content, - encoded_file_path.shared_context.as_ref(), - &final_digest, - ); + let final_path = get_file_path_raw(&PathType::Content, encoded_file_path.shared_context.as_ref(), &digest); - evicting_map.insert(final_digest, entry.clone()).await; + evicting_map.insert(digest, entry.clone()).await; let from_path = encoded_file_path.get_file_path(); // Internally tokio spawns fs commands onto a blocking thread anyways. // Since we are already on a blocking thread, we just need the `fs` wrapper to manage // an open-file permit (ensure we don't open too many files at once). let result = fs::call_with_permit(|| { - (rename_fn)(&from_path, &final_path) + (rename_fn)(from_path.as_ref(), final_path.as_ref()) .err_tip(|| format!("Failed to rename temp file to final path {final_path:?}")) }) .await; @@ -639,12 +644,12 @@ impl FilesystemStore { // It is possible that the item in our map is no longer the item we inserted, // So, we need to conditionally remove it only if the pointers are the same. evicting_map - .remove_if(&final_digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) + .remove_if(&digest, |map_entry| Arc::::ptr_eq(map_entry, &entry)) .await; return Err(err); } encoded_file_path.path_type = PathType::Content; - encoded_file_path.digest = final_digest; + encoded_file_path.digest = digest; Ok(()) }) .await @@ -703,6 +708,45 @@ impl Store for FilesystemStore { .err_tip(|| format!("While processing with temp file {:?}", temp_full_path)) } + fn optimized_for(&self, optimization: StoreOptimizations) -> bool { + optimization == StoreOptimizations::FileUpdates + } + + async fn update_with_whole_file( + self: Pin<&Self>, + digest: DigestInfo, + mut file: fs::ResumeableFileSlot<'static>, + upload_size: UploadSizeInfo, + ) -> Result>, Error> { + let path = file.get_path().as_os_str().to_os_string(); + let file_size = match upload_size { + UploadSizeInfo::ExactSize(size) => size as u64, + UploadSizeInfo::MaxSize(_) => file + .as_reader() + .await + .err_tip(|| format!("While getting metadata for {:?} in update_with_whole_file", path))? + .get_ref() + .as_ref() + .metadata() + .await + .err_tip(|| format!("While reading metadata for {:?}", path))? + .len(), + }; + let entry = Fe::create( + file_size, + self.block_size, + RwLock::new(EncodedFilePath { + shared_context: self.shared_context.clone(), + path_type: PathType::Custom(path), + digest, + }), + ); + self.emplace_file(digest, Arc::new(entry)) + .await + .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?; + return Ok(None); + } + async fn get_part_ref( self: Pin<&Self>, digest: DigestInfo, diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index e7c4ec351..a4dde4aa2 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -20,7 +20,7 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; -use nativelink_util::store_trait::{Store, UploadSizeInfo}; +use nativelink_util::store_trait::{Store, StoreOptimizations, UploadSizeInfo}; #[derive(Default)] pub struct NoopStore; @@ -54,6 +54,10 @@ impl Store for NoopStore { Ok(()) } + fn optimized_for(&self, optimization: StoreOptimizations) -> bool { + optimization == StoreOptimizations::NoopUpdates || optimization == StoreOptimizations::NoopDownloads + } + async fn get_part_ref( self: Pin<&Self>, _digest: DigestInfo, diff --git a/nativelink-store/tests/ac_utils_test.rs b/nativelink-store/tests/ac_utils_test.rs index 39d1d58f2..4d74d5b6b 100644 --- a/nativelink-store/tests/ac_utils_test.rs +++ b/nativelink-store/tests/ac_utils_test.rs @@ -18,10 +18,9 @@ use std::pin::Pin; use std::sync::Arc; use nativelink_error::{Error, ResultExt}; -use nativelink_store::ac_utils::upload_file_to_store; use nativelink_store::memory_store::MemoryStore; use nativelink_util::common::{fs, DigestInfo}; -use nativelink_util::store_trait::Store; +use nativelink_util::store_trait::{Store, UploadSizeInfo}; use rand::{thread_rng, Rng}; use tokio::io::AsyncWriteExt; @@ -70,7 +69,9 @@ mod ac_utils_tests { { // Upload our file. let resumeable_file = fs::open_file(filepath, u64::MAX).await?; - upload_file_to_store(store_pin, digest, resumeable_file).await?; + store_pin + .update_with_whole_file(digest, resumeable_file, UploadSizeInfo::ExactSize(expected_data.len())) + .await?; } { // Check to make sure the file was saved correctly to the store. diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index f505581f4..03c2b6719 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -393,7 +393,7 @@ mod fast_slow_store_tests { // Regression test for https://github.com/TraceMachina/nativelink/issues/665 #[tokio::test] - async fn slow_store_replaced_by_fast_store_when_noop() -> Result<(), Error> { + async fn has_checks_fast_store_when_noop() -> Result<(), Error> { let fast_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default())); let slow_store = Arc::new(NoopStore::new()); let fast_slow_store_config = nativelink_config::stores::FastSlowStore { @@ -406,17 +406,33 @@ mod fast_slow_store_tests { slow_store.clone(), )); + let data = make_random_data(100); + let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap(); + assert_eq!( - Arc::as_ptr(fast_slow_store.fast_store()), - Arc::as_ptr(&fast_store), - "Fast store should be the same as the fast_slow_store's fast store" + Pin::new(fast_slow_store.as_ref()).has(digest).await, + Ok(None), + "Expected data to not exist in store" ); + + // Upload some dummy data. + Pin::new(fast_store.as_ref()) + .update_oneshot(digest, data.clone().into()) + .await?; + assert_eq!( - Arc::as_ptr(fast_slow_store.slow_store()), - Arc::as_ptr(&fast_store), - "Slow store should be replaced by fast store when configured as noop" + Pin::new(fast_slow_store.as_ref()).has(digest).await, + Ok(Some(data.len())), + "Expected data to exist in store" ); + assert_eq!( + Pin::new(fast_slow_store.as_ref()) + .get_part_unchunked(digest, 0, None, None) + .await, + Ok(data.into()), + "Data read from store is not correct" + ); Ok(()) } } diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 1c727107e..d21b601e5 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -1185,4 +1185,55 @@ mod filesystem_store_tests { assert_eq!(long_entry.size_on_disk(), 8 * 1024); Ok(()) } + + // Ensure that update_with_whole_file() moves the file without making a copy. + #[cfg(target_family = "unix")] + #[tokio::test] + async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { + use std::os::unix::fs::MetadataExt; + let content_path = make_temp_path("content_path"); + let temp_path = make_temp_path("temp_path"); + + let value: String = "x".repeat(1024); + + let digest = DigestInfo::try_new(HASH1, value.len())?; + + let store = Box::pin( + FilesystemStore::::new_with_timeout_and_rename_fn( + &nativelink_config::stores::FilesystemStore { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + read_buffer_size: 1, + ..Default::default() + }, + |_| sleep(Duration::ZERO), + |from, to| std::fs::rename(from, to), + ) + .await?, + ); + + let mut file = fs::create_file(OsString::from(format!("{}/{}", temp_path, "dummy_file"))).await?; + let original_inode = file.as_reader().await?.get_ref().as_ref().metadata().await?.ino(); + + let result = store + .as_ref() + .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .await?; + assert!(result.is_none(), "Expected filesystem store to consume the file"); + + let expected_file_name = + OsString::from(format!("{}/{}-{}", content_path, digest.hash_str(), digest.size_bytes)); + let new_inode = fs::create_file(expected_file_name) + .await? + .as_reader() + .await? + .get_ref() + .as_ref() + .metadata() + .await? + .ino(); + assert_eq!(original_inode, new_inode, "Expected the same inode for the file"); + + Ok(()) + } } diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 56f128040..96717074d 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -15,14 +15,14 @@ use std::borrow::Cow; use std::collections::hash_map::DefaultHasher as StdHasher; use std::hash::{Hash, Hasher}; +use std::ops::Deref; use std::pin::Pin; use std::sync::{Arc, OnceLock}; use async_trait::async_trait; -use bytes::Bytes; -use futures::{join, try_join}; -// use lru::DefaultHasher; -use nativelink_error::{make_err, Code, Error, ResultExt}; +use bytes::{Bytes, BytesMut}; +use futures::{future, join, try_join, FutureExt}; +use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; use serde::{Deserialize, Serialize}; @@ -30,6 +30,7 @@ use serde::{Deserialize, Serialize}; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; use crate::digest_hasher::{default_digest_hasher_func, DigestHasher}; +use crate::fs; use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use crate::metrics_utils::Registry; @@ -64,6 +65,64 @@ pub enum UploadSizeInfo { MaxSize(usize), } +/// Utility to send all the data to the store from a file. +// Note: This is not inlined because some code may want to bypass any underlying +// optimizations that may be present in the inner store. +pub async fn slow_update_store_with_file( + store: Pin<&S>, + digest: DigestInfo, + file: &mut fs::ResumeableFileSlot<'static>, + upload_size: UploadSizeInfo, +) -> Result<(), Error> { + let (tx, rx) = make_buf_channel_pair(); + future::join( + store + .update(digest, rx, upload_size) + .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store")), + async move { + let (_, mut tx) = file + .read_buf_cb( + (BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx), + move |(chunk, mut tx)| async move { + tx.send(chunk.freeze()) + .await + .err_tip(|| "Failed to send in upload_file_to_store")?; + Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx)) + }, + ) + .await + .err_tip(|| "Error in upload_file_to_store::read_buf_cb section")?; + tx.send_eof() + .await + .err_tip(|| "Could not send EOF to store in upload_file_to_store")?; + Ok(()) + }, + ) + // Ensure we get errors reported from both sides. + .map(|(upload_result, read_result)| upload_result.merge(read_result)) + .await +} + +// TODO(allada) When 1.76.0 stabalizes more we can use `core::ptr::addr_eq` instead. +fn addr_eq(p: *const T, q: *const U) -> bool { + std::ptr::eq(p as *const (), q as *const ()) +} + +/// Optimizations that stores may want to expose to the callers. +/// This is useful for specific cases when the store can optimize the processing +/// of the data being processed. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum StoreOptimizations { + /// The store can optimize the upload process when it knows the data is coming from a file. + FileUpdates, + + /// If the store will ignore the data uploads. + NoopUpdates, + + /// If the store will never serve downloads. + NoopDownloads, +} + #[async_trait] pub trait Store: Sync + Send + Unpin + HealthStatusIndicator + 'static { /// Look up a digest in the store and return None if it does not exist in @@ -102,6 +161,35 @@ pub trait Store: Sync + Send + Unpin + HealthStatusIndicator + 'static { upload_size: UploadSizeInfo, ) -> Result<(), Error>; + /// Any optimizations the store might want to expose to the callers. + /// By default, no optimizations are exposed. + fn optimized_for(&self, _optimization: StoreOptimizations) -> bool { + false + } + + /// Specialized version of `.update()` which takes a `ResumeableFileSlot`. + /// This is useful if the underlying store can optimize the upload process + /// when it knows the data is coming from a file. + async fn update_with_whole_file( + self: Pin<&Self>, + digest: DigestInfo, + mut file: fs::ResumeableFileSlot<'static>, + upload_size: UploadSizeInfo, + ) -> Result>, Error> { + let inner_store = self.inner_store(Some(digest)); + if inner_store.optimized_for(StoreOptimizations::FileUpdates) { + error_if!( + addr_eq(inner_store, self.deref()), + "Store::inner_store() returned self when optimization present" + ); + return Pin::new(inner_store) + .update_with_whole_file(digest, file, upload_size) + .await; + } + slow_update_store_with_file(self, digest, &mut file, upload_size).await?; + Ok(Some(file)) + } + // Utility to send all the data to the store when you have all the bytes. async fn update_oneshot(self: Pin<&Self>, digest: DigestInfo, data: Bytes) -> Result<(), Error> { // TODO(blaise.bruer) This is extremely inefficient, since we have exactly diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index d4f9e1974..6f3effac4 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -44,8 +44,7 @@ use nativelink_proto::com::github::trace_machina::nativelink::remote_execution:: HistoricalExecuteResponse, StartExecute, }; use nativelink_store::ac_utils::{ - compute_buf_digest, get_and_decode_digest, serialize_and_upload_message, upload_buf_to_store, upload_file_to_store, - ESTIMATED_DIGEST_SIZE, + compute_buf_digest, get_and_decode_digest, serialize_and_upload_message, ESTIMATED_DIGEST_SIZE, }; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::{FileEntry, FilesystemStore}; @@ -56,7 +55,7 @@ use nativelink_util::action_messages::{ use nativelink_util::common::{fs, DigestInfo, JoinHandleDropGuard}; use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; use nativelink_util::metrics_utils::{AsyncCounterWrapper, CollectorState, CounterWithTime, MetricsComponent}; -use nativelink_util::store_trait::Store; +use nativelink_util::store_trait::{Store, UploadSizeInfo}; use parking_lot::Mutex; use prost::Message; use relative_path::RelativePath; @@ -262,7 +261,12 @@ async fn upload_file( .err_tip(|| "Could not rewind file")?; (digest, resumeable_file) }; - upload_file_to_store(cas_store, digest, resumeable_file) + cas_store + .update_with_whole_file( + digest, + resumeable_file, + UploadSizeInfo::ExactSize(digest.size_bytes as usize), + ) .await .err_tip(|| format!("for {full_path:?}"))?; @@ -1032,7 +1036,8 @@ impl RunningActionImpl { let stdout_digest_fut = self.metrics().upload_stdout.wrap(async { let data = execution_result.stdout; let digest = compute_buf_digest(&data, &mut hasher.hasher()); - upload_buf_to_store(cas_store, digest, data) + cas_store + .update_oneshot(digest, data) .await .err_tip(|| "Uploading stdout")?; Result::::Ok(digest) @@ -1040,9 +1045,10 @@ impl RunningActionImpl { let stderr_digest_fut = self.metrics().upload_stderr.wrap(async { let data = execution_result.stderr; let digest = compute_buf_digest(&data, &mut hasher.hasher()); - upload_buf_to_store(cas_store, digest, data) + cas_store + .update_oneshot(digest, data) .await - .err_tip(|| "Uploading stderr")?; + .err_tip(|| "Uploading stdout")?; Result::::Ok(digest) });