Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize file uploads when source is file #723

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 6 additions & 68 deletions nativelink-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

use std::pin::Pin;

use bytes::{Bytes, BytesMut};
use futures::future::join;
use futures::{Future, FutureExt, TryFutureExt};
use bytes::BytesMut;
use futures::TryFutureExt;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasher;
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use nativelink_util::store_trait::Store;
use prost::Message;

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
Expand Down Expand Up @@ -91,7 +89,8 @@ pub async fn serialize_and_upload_message<'a, T: Message>(
) -> Result<DigestInfo, Error> {
let mut buffer = BytesMut::with_capacity(message.encoded_len());
let digest = message_to_digest(message, &mut buffer, hasher).err_tip(|| "In serialize_and_upload_message")?;
upload_buf_to_store(cas_store, digest, buffer.freeze())
cas_store
.update_oneshot(digest, buffer.freeze())
.await
.err_tip(|| "In serialize_and_upload_message")?;
Ok(digest)
Expand All @@ -102,64 +101,3 @@ pub fn compute_buf_digest(buf: &[u8], hasher: &mut impl DigestHasher) -> DigestI
hasher.update(buf);
hasher.finalize_digest()
}

fn inner_upload_file_to_store<'a, Fut: Future<Output = Result<(), Error>> + 'a>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
read_data_fn: impl FnOnce(DropCloserWriteHalf) -> Fut,
) -> impl Future<Output = Result<(), Error>> + 'a {
let (tx, rx) = make_buf_channel_pair();
join(
cas_store
.update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes as usize))
.map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store")),
read_data_fn(tx),
)
// Ensure we get errors reported from both sides
.map(|(upload_result, read_result)| upload_result.merge(read_result))
}

/// Uploads data to our store for given digest.
pub fn upload_buf_to_store(
cas_store: Pin<&dyn Store>,
digest: DigestInfo,
buf: Bytes,
) -> impl Future<Output = Result<(), Error>> + '_ {
inner_upload_file_to_store(cas_store, digest, move |mut tx| async move {
if !buf.is_empty() {
tx.send(buf)
.await
.err_tip(|| "Could not send buffer data to store in upload_buf_to_store")?;
}
tx.send_eof()
.await
.err_tip(|| "Could not send EOF to store in upload_buf_to_store")
})
}

/// Same as `upload_buf_to_store`, however it specializes in dealing with a `ResumeableFileSlot`.
/// This will close the reading file to close if writing the data takes a while.
pub fn upload_file_to_store<'a>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
mut file: fs::ResumeableFileSlot<'a>,
) -> impl Future<Output = Result<(), Error>> + 'a {
inner_upload_file_to_store(cas_store, digest, move |tx| async move {
let (_, mut tx) = file
.read_buf_cb(
(BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx),
move |(chunk, mut tx)| async move {
tx.send(chunk.freeze())
.await
.err_tip(|| "Failed to send in upload_file_to_store")?;
Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx))
},
)
.await
.err_tip(|| "Error in upload_file_to_store::read_buf_cb section")?;
tx.send_eof()
.await
.err_tip(|| "Could not send EOF to store in upload_file_to_store")?;
Ok(())
})
}
71 changes: 63 additions & 8 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::{join, FutureExt};
use nativelink_config::stores::StoreConfig;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::fs;
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::metrics_utils::Registry;
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use nativelink_util::store_trait::{slow_update_store_with_file, Store, StoreOptimizations, UploadSizeInfo};

// TODO(blaise.bruer) This store needs to be evaluated for more efficient memory usage,
// there are many copies happening internally.
Expand All @@ -45,12 +45,6 @@ impl FastSlowStore {
fast_store: Arc<dyn Store>,
slow_store: Arc<dyn Store>,
) -> Self {
let slow_store = if matches!(_config.slow, StoreConfig::noop) {
fast_store.clone()
} else {
slow_store
};

Self { fast_store, slow_store }
}

Expand Down Expand Up @@ -123,6 +117,12 @@ impl Store for FastSlowStore {
digests: &[DigestInfo],
results: &mut [Option<usize>],
) -> Result<(), Error> {
// If our slow store is a noop store, it'll always return a 404,
// so only check the fast store in such case.
let slow_store = self.slow_store.inner_store(None);
if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
return self.pin_fast_store().has_with_results(digests, results).await;
}
// Only check the slow store because if it's not there, then something
// down stream might be unable to get it. This should not affect
// workers as they only use get() and a CAS can use an
Expand All @@ -136,6 +136,17 @@ impl Store for FastSlowStore {
mut reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
// If either one of our stores is a noop store, bypass the multiplexing
// and just use the store that is not a noop store.
let slow_store = self.slow_store.inner_store(Some(digest));
if slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
return self.pin_fast_store().update(digest, reader, size_info).await;
}
let fast_store = self.fast_store.inner_store(Some(digest));
if fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
return self.pin_slow_store().update(digest, reader, size_info).await;
}

let (mut fast_tx, fast_rx) = make_buf_channel_pair();
let (mut slow_tx, slow_rx) = make_buf_channel_pair();

Expand Down Expand Up @@ -185,6 +196,50 @@ impl Store for FastSlowStore {
Ok(())
}

/// FastSlowStore has optimiations for dealing with files.
fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
optimization == StoreOptimizations::FileUpdates
}

/// Optimized variation to consume the file if one of the stores is a
/// filesystem store. This makes the operation a move instead of a copy
/// dramatically increasing performance for large files.
async fn update_with_whole_file(
self: Pin<&Self>,
digest: DigestInfo,
mut file: fs::ResumeableFileSlot<'static>,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot<'static>>, Error> {
let fast_store = self.fast_store.inner_store(Some(digest));
let slow_store = self.slow_store.inner_store(Some(digest));
if fast_store.optimized_for(StoreOptimizations::FileUpdates) {
if !slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
slow_update_store_with_file(Pin::new(slow_store), digest, &mut file, upload_size)
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
}
return Pin::new(fast_store)
.update_with_whole_file(digest, file, upload_size)
.await;
}

if slow_store.optimized_for(StoreOptimizations::FileUpdates) {
if !fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
slow_update_store_with_file(Pin::new(fast_store), digest, &mut file, upload_size)
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
}
return Pin::new(slow_store)
.update_with_whole_file(digest, file, upload_size)
.await;
}

slow_update_store_with_file(self, digest, &mut file, upload_size)
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file")?;
Ok(Some(file))
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
Expand Down
Loading
Loading