Skip to content

Commit

Permalink
Optimize file uploads when source is file
Browse files Browse the repository at this point in the history
When using nativelink with a local worker/CAS setup, adds
optimizations which make it faster to upload files from the worker
to the CAS.

This is specifically useful for Buck2 for users that want to
build hermetically.

closes: #409
  • Loading branch information
allada committed Mar 5, 2024
1 parent 0fa9a40 commit e005e3f
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 118 deletions.
74 changes: 6 additions & 68 deletions nativelink-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

use std::pin::Pin;

use bytes::{Bytes, BytesMut};
use futures::future::join;
use futures::{Future, FutureExt, TryFutureExt};
use bytes::BytesMut;
use futures::TryFutureExt;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasher;
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use nativelink_util::store_trait::Store;
use prost::Message;

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
Expand Down Expand Up @@ -91,7 +89,8 @@ pub async fn serialize_and_upload_message<'a, T: Message>(
) -> Result<DigestInfo, Error> {
let mut buffer = BytesMut::with_capacity(message.encoded_len());
let digest = message_to_digest(message, &mut buffer, hasher).err_tip(|| "In serialize_and_upload_message")?;
upload_buf_to_store(cas_store, digest, buffer.freeze())
cas_store
.update_oneshot(digest, buffer.freeze())
.await
.err_tip(|| "In serialize_and_upload_message")?;
Ok(digest)
Expand All @@ -102,64 +101,3 @@ pub fn compute_buf_digest(buf: &[u8], hasher: &mut impl DigestHasher) -> DigestI
hasher.update(buf);
hasher.finalize_digest()
}

fn inner_upload_file_to_store<'a, Fut: Future<Output = Result<(), Error>> + 'a>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
read_data_fn: impl FnOnce(DropCloserWriteHalf) -> Fut,
) -> impl Future<Output = Result<(), Error>> + 'a {
let (tx, rx) = make_buf_channel_pair();
join(
cas_store
.update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize))
.map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store")),
read_data_fn(tx),
)
// Ensure we get errors reported from both sides
.map(|(upload_result, read_result)| upload_result.merge(read_result))
}

/// Uploads data to our store for given digest.
pub fn upload_buf_to_store(
cas_store: Pin<&dyn Store>,
digest: DigestInfo,
buf: Bytes,
) -> impl Future<Output = Result<(), Error>> + '_ {
inner_upload_file_to_store(cas_store, digest, move |mut tx| async move {
if !buf.is_empty() {
tx.send(buf)
.await
.err_tip(|| "Could not send buffer data to store in upload_buf_to_store")?;
}
tx.send_eof()
.await
.err_tip(|| "Could not send EOF to store in upload_buf_to_store")
})
}

/// Same as `upload_buf_to_store`, however it specializes in dealing with a `ResumeableFileSlot`.
/// This will close the reading file to close if writing the data takes a while.
pub fn upload_file_to_store<'a>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
mut file: fs::ResumeableFileSlot<'a>,
) -> impl Future<Output = Result<(), Error>> + 'a {
inner_upload_file_to_store(cas_store, digest, move |tx| async move {
let (_, mut tx) = file
.read_buf_cb(
(BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx),
move |(chunk, mut tx)| async move {
tx.send(chunk.freeze())
.await
.err_tip(|| "Failed to send in upload_file_to_store")?;
Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx))
},
)
.await
.err_tip(|| "Error in upload_file_to_store::read_buf_cb section")?;
tx.send_eof()
.await
.err_tip(|| "Could not send EOF to store in upload_file_to_store")?;
Ok(())
})
}
71 changes: 63 additions & 8 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::{join, FutureExt};
use nativelink_config::stores::StoreConfig;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::fs;
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::metrics_utils::Registry;
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use nativelink_util::store_trait::{slow_update_store_with_file, Store, StoreOptimizations, UploadSizeInfo};

// TODO(blaise.bruer) This store needs to be evaluated for more efficient memory usage,
// there are many copies happening internally.
Expand All @@ -45,12 +45,6 @@ impl FastSlowStore {
fast_store: Arc<dyn Store>,
slow_store: Arc<dyn Store>,
) -> Self {
let slow_store = if matches!(_config.slow, StoreConfig::noop) {
fast_store.clone()
} else {
slow_store
};

Self { fast_store, slow_store }
}

Expand Down Expand Up @@ -123,6 +117,12 @@ impl Store for FastSlowStore {
digests: &[DigestInfo],
results: &mut [Option<usize>],
) -> Result<(), Error> {
// If our slow store is a noop store, it'll always return a 404,
// so only check the fast store in such case.
let slow_store = self.slow_store.inner_store(None);
if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
return self.pin_fast_store().has_with_results(digests, results).await;
}
// Only check the slow store because if it's not there, then something
// down stream might be unable to get it. This should not affect
// workers as they only use get() and a CAS can use an
Expand All @@ -136,6 +136,17 @@ impl Store for FastSlowStore {
mut reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
// If either one of our stores is a noop store, bypass the multiplexing
// and just use the store that is not a noop store.
let slow_store = self.slow_store.inner_store(Some(digest));
if slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
return self.pin_fast_store().update(digest, reader, size_info).await;
}
let fast_store = self.fast_store.inner_store(Some(digest));
if fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
return self.pin_slow_store().update(digest, reader, size_info).await;
}

let (mut fast_tx, fast_rx) = make_buf_channel_pair();
let (mut slow_tx, slow_rx) = make_buf_channel_pair();

Expand Down Expand Up @@ -185,6 +196,50 @@ impl Store for FastSlowStore {
Ok(())
}

/// FastSlowStore has optimiations for dealing with files.
fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
optimization == StoreOptimizations::FileUpdates
}

/// Optimized variation to consume the file if one of the stores is a
/// filesystem store. This makes the operation a move instead of a copy
/// dramatically increasing performance for large files.
async fn update_with_whole_file(
self: Pin<&Self>,
digest: DigestInfo,
mut file: fs::ResumeableFileSlot<'static>,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot<'static>>, Error> {
let fast_store = self.fast_store.inner_store(Some(digest));
let slow_store = self.slow_store.inner_store(Some(digest));
if fast_store.optimized_for(StoreOptimizations::FileUpdates) {
if !slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
slow_update_store_with_file(Pin::new(slow_store), digest, &mut file, upload_size)
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
}
return Pin::new(fast_store)
.update_with_whole_file(digest, file, upload_size)
.await;
}

if slow_store.optimized_for(StoreOptimizations::FileUpdates) {
if !fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
slow_update_store_with_file(Pin::new(fast_store), digest, &mut file, upload_size)
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
}
return Pin::new(slow_store)
.update_with_whole_file(digest, file, upload_size)
.await;
}

slow_update_store_with_file(self, digest, &mut file, upload_size)
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file")?;
Ok(Some(file))
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
Loading

0 comments on commit e005e3f

Please sign in to comment.