Skip to content

Commit

Permalink
Support empty digests in stores (#507)
Browse files Browse the repository at this point in the history
remote_execution.proto specification allows for uploading and fetching of empty digests regardless if they have been uploaded before a fetch. Expectation is the requests should always respond successfully. This can be an optimized path in cases but violates expectations in native link underlying storage consistencies.
  • Loading branch information
adam-singer committed Dec 19, 2023
1 parent 5a6b182 commit 41a85fb
Show file tree
Hide file tree
Showing 12 changed files with 466 additions and 7 deletions.
2 changes: 2 additions & 0 deletions nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rust_library(
name = "nativelink-store",
srcs = [
"src/ac_utils.rs",
"src/cas_utils.rs",
"src/completeness_checking_store.rs",
"src/compression_store.rs",
"src/dedup_store.rs",
Expand Down Expand Up @@ -106,6 +107,7 @@ rust_test_suite(
"@crate_index//:once_cell",
"@crate_index//:pretty_assertions",
"@crate_index//:rand",
"@crate_index//:sha2",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
],
Expand Down
39 changes: 39 additions & 0 deletions nativelink-store/src/cas_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 The Native Link Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use nativelink_util::common::DigestInfo;

const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
// Sha256 hash of zero bytes.
DigestInfo::new(
[
0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae,
0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55,
],
0,
),
// Blake3 hash of zero bytes.
DigestInfo::new(
[
0xaf, 0x13, 0x49, 0xb9, 0xf5, 0xf9, 0xa1, 0xa6, 0xa0, 0x40, 0x4d, 0xea, 0x36, 0xdc, 0xc9, 0x49, 0x9b, 0xcb,
0x25, 0xc9, 0xad, 0xc1, 0x12, 0xb7, 0xcc, 0x9a, 0x93, 0xca, 0xe4, 0x1f, 0x32, 0x62,
],
0,
),
];

#[inline]
pub fn is_zero_digest(digest: &DigestInfo) -> bool {
digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(digest)
}
10 changes: 10 additions & 0 deletions nativelink-store/src/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use nativelink_util::common::{DigestInfo, JoinHandleDropGuard};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use serde::{Deserialize, Serialize};

use crate::cas_utils::is_zero_digest;

// In the event the bytestream format changes this number should be incremented to prevent
// backwards compatibility issues.
pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1;
Expand Down Expand Up @@ -368,6 +370,14 @@ impl Store for CompressionStore {
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
if is_zero_digest(&digest) {
writer
.send_eof()
.await
.err_tip(|| "Failed to send zero EOF in filesystem store get_part_ref")?;
return Ok(());
}

let offset = offset as u64;
let (tx, mut rx) = make_buf_channel_pair();

Expand Down
33 changes: 31 additions & 2 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use async_trait::async_trait;
use bytes::BytesMut;
use filetime::{set_file_atime, FileTime};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{Future, TryFutureExt};
use futures::{join, Future, TryFutureExt};
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
Expand All @@ -37,6 +37,8 @@ use tokio::time::{sleep, timeout, Sleep};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{error, info, warn};

use crate::cas_utils::is_zero_digest;

// Default size to allocate memory of the buffer when reading files.
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;

Expand Down Expand Up @@ -604,6 +606,22 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
results: &mut [Option<usize>],
) -> Result<(), Error> {
self.evicting_map.sizes_for_keys(digests, results).await;
// We need to do a special pass to ensure our zero files exist.
// If our results failed and the result was a zero file, we need to
// create the file by spec.
for (digest, result) in digests.iter().zip(results.iter_mut()) {
if result.is_some() || !is_zero_digest(digest) {
continue;
}
let (mut tx, rx) = make_buf_channel_pair();
let update_fut = self.update(*digest, rx, UploadSizeInfo::ExactSize(0));
let (update_result, send_eof_result) = join!(update_fut, tx.send_eof());
update_result
.err_tip(|| format!("Failed to create zero file for digest {digest:?}"))
.merge(send_eof_result.err_tip(|| "Failed to send zero file EOF in filesystem store has"))?;

*result = Some(0);
}
Ok(())
}

Expand Down Expand Up @@ -635,6 +653,17 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
if is_zero_digest(&digest) {
self.has(digest)
.await
.err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
writer
.send_eof()
.await
.err_tip(|| "Failed to send zero EOF in filesystem store get_part_ref")?;
return Ok(());
}

let entry = self
.evicting_map
.get(&digest)
Expand Down
1 change: 1 addition & 0 deletions nativelink-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod ac_utils;
pub mod cas_utils;
pub mod completeness_checking_store;
pub mod compression_store;
pub mod dedup_store;
Expand Down
16 changes: 16 additions & 0 deletions nativelink-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};

use crate::cas_utils::is_zero_digest;

#[derive(Clone)]
pub struct BytesWrapper(Bytes);

Expand Down Expand Up @@ -73,6 +75,12 @@ impl Store for MemoryStore {
results: &mut [Option<usize>],
) -> Result<(), Error> {
self.evicting_map.sizes_for_keys(digests, results).await;
// We need to do a special pass to ensure our zero digest exist.
digests.iter().zip(results.iter_mut()).for_each(|(digest, result)| {
if is_zero_digest(digest) {
*result = Some(0);
}
});
Ok(())
}

Expand Down Expand Up @@ -110,6 +118,14 @@ impl Store for MemoryStore {
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
if is_zero_digest(&digest) {
writer
.send_eof()
.await
.err_tip(|| "Failed to send zero EOF in filesystem store get_part_ref")?;
return Ok(());
}

let value = self
.evicting_map
.get(&digest)
Expand Down
15 changes: 15 additions & 0 deletions nativelink-store/src/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tracing::info;

use crate::cas_utils::is_zero_digest;

// S3 parts cannot be smaller than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5mb.
Expand Down Expand Up @@ -244,6 +246,11 @@ impl Store for S3Store {
.iter()
.zip(results.iter_mut())
.map(|(digest, result)| async move {
// We need to do a special pass to ensure our zero digest exist.
if is_zero_digest(digest) {
*result = Some(0);
return Ok::<_, Error>(());
}
*result = self.has(digest).await?;
Ok::<_, Error>(())
})
Expand Down Expand Up @@ -432,6 +439,14 @@ impl Store for S3Store {
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
if is_zero_digest(&digest) {
writer
.send_eof()
.await
.err_tip(|| "Failed to send zero EOF in filesystem store get_part_ref")?;
return Ok(());
}

let s3_path = &self.make_s3_path(&digest);
let end_read_byte = length
.map_or(Some(None), |length| Some(offset.checked_add(length)))
Expand Down
61 changes: 61 additions & 0 deletions nativelink-store/tests/cas_utils_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 The Native Link Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(test)]
mod cas_utils_tests {
use blake3::Hasher as Blake3;
use nativelink_store::cas_utils::is_zero_digest;
use nativelink_util::common::DigestInfo;
use sha2::{Digest, Sha256};

#[test]
fn sha256_is_zero_digest() {
let digest = DigestInfo {
packed_hash: Sha256::new().finalize().into(),
size_bytes: 0,
};
assert_eq!(is_zero_digest(&digest), true);
}

#[test]
fn sha256_is_non_zero_digest() {
let mut hasher = Sha256::new();
hasher.update(b"a");
let digest = DigestInfo {
packed_hash: hasher.finalize().into(),
size_bytes: 1,
};
assert_eq!(is_zero_digest(&digest), false);
}

#[test]
fn blake_is_zero_digest() {
let digest = DigestInfo {
packed_hash: Blake3::new().finalize().into(),
size_bytes: 0,
};
assert_eq!(is_zero_digest(&digest), true);
}

#[test]
fn blake_is_non_zero_digest() {
let mut hasher = Blake3::new();
hasher.update(b"a");
let digest = DigestInfo {
packed_hash: hasher.finalize().into(),
size_bytes: 1,
};
assert_eq!(is_zero_digest(&digest), false);
}
}
52 changes: 50 additions & 2 deletions nativelink-store/tests/compression_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ use std::str::from_utf8;
use std::sync::Arc;

use bincode::{DefaultOptions, Options};
use bytes::Bytes;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_store::compression_store::{
CompressionStore, Footer, Lz4Config, SliceIndex, CURRENT_STREAM_FORMAT_VERSION, DEFAULT_BLOCK_SIZE,
FOOTER_FRAME_TYPE,
};
use nativelink_store::memory_store::MemoryStore;
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::common::DigestInfo;
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf};
use nativelink_util::common::{DigestInfo, JoinHandleDropGuard};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use sha2::{Digest, Sha256};
use tokio::io::AsyncReadExt;

/// Utility function that will build a Footer object from the input.
Expand Down Expand Up @@ -439,4 +441,50 @@ mod compression_store_tests {

Ok(())
}

#[tokio::test]
async fn get_part_is_zero_digest() -> Result<(), Error> {
let digest = DigestInfo {
packed_hash: Sha256::new().finalize().into(),
size_bytes: 0,
};

const BLOCK_SIZE: u32 = 32 * 1024;
let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default()));
let store_owned = CompressionStore::new(
nativelink_config::stores::CompressionStore {
backend: nativelink_config::stores::StoreConfig::memory(
nativelink_config::stores::MemoryStore::default(),
),
compression_algorithm: nativelink_config::stores::CompressionAlgorithm::lz4(
nativelink_config::stores::Lz4Config {
block_size: BLOCK_SIZE,
..Default::default()
},
),
},
inner_store.clone(),
)
.err_tip(|| "Failed to create compression store")?;
let store = Pin::new(Arc::new(store_owned));

let (mut writer, mut reader) = make_buf_channel_pair();

let _drop_guard = JoinHandleDropGuard::new(tokio::spawn(async move {
let _ = store
.as_ref()
.get_part_ref(digest, &mut writer, 0, None)
.await
.err_tip(|| "Failed to get_part_ref");
}));

let file_data = DropCloserReadHalf::take(&mut reader, 1024)
.await
.err_tip(|| "Error reading bytes")?;

let empty_bytes = Bytes::new();
assert_eq!(&file_data, &empty_bytes, "Expected file content to match");

Ok(())
}
}
Loading

0 comments on commit 41a85fb

Please sign in to comment.