Skip to content

Commit

Permalink
Add blake3 support for verify store (#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
steedmicro committed Jan 9, 2024
1 parent 4cfadb3 commit 3acefc7
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 51 deletions.
2 changes: 1 addition & 1 deletion deployment-examples/docker-compose/local-storage-cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
}
},
"verify_size": true,
"verify_hash": true
"hash_verification_function": "sha256"
}
},
"AC_MAIN_STORE": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
}
},
"verify_size": true,
"verify_hash": true
"hash_verification_function": "sha256"
}
}
},
Expand Down
10 changes: 6 additions & 4 deletions nativelink-config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,10 @@ the rest will be stored in AWS's S3:

This store is special. It's only job is to verify the content as it is fetched
and uploaded to ensure it meets some criteria or errors. This store should only
be added to the CAS. If `verify_hash` is set to true, it will apply a sha256
algorithm on the data as it is sent/received and at the end if it does not match
the name of the digest it will cancel the upload/download and return an error.
be added to the CAS. If `hash_verification_function` is set, it will apply the
hashing algorithm on the data as it is sent/received and at the end if it does
not match the name of the digest it will cancel the upload/download and return
an error. If it's not set, the hashing verification will be disabled.
If `verify_size` is set, a similar item will happen, but count the bytes sent
and check it against the digest instead.

Expand All @@ -311,7 +312,8 @@ and check it against the digest instead.
}
},
"verify_size": true,
"verify_hash": true,
// sha256 or blake3
"hash_verification_function": "sha256",
}
},
"AC_MAIN_STORE": {
Expand Down
2 changes: 1 addition & 1 deletion nativelink-config/examples/filesystem_cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
}
},
"verify_size": true,
"verify_hash": true
"hash_verification_function": "sha256"
}
},
"AC_MAIN_STORE": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
}
},
"verify_size": true,
"verify_hash": true
"hash_verification_function": "sha256"
}
},
"AC_MAIN_STORE": {
Expand Down
14 changes: 1 addition & 13 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::schedulers::SchedulerConfig;
use crate::serde_utils::{
convert_numeric_with_shellexpand, convert_optinoal_numeric_with_shellexpand, convert_string_with_shellexpand,
};
use crate::stores::{StoreConfig, StoreRefName};
use crate::stores::{ConfigDigestHashFunction, StoreConfig, StoreRefName};

/// Name of the scheduler. This type will be used when referencing a
/// scheduler in the `CasConfig::schedulers`'s map key.
Expand Down Expand Up @@ -543,18 +543,6 @@ pub enum WorkerConfig {
local(LocalWorkerConfig),
}

#[allow(non_camel_case_types)]
#[derive(Deserialize, Debug, Clone, Copy)]
pub enum ConfigDigestHashFunction {
/// Use the sha256 hash function.
/// <https://en.wikipedia.org/wiki/SHA-2>
sha256,

/// Use the blake3 hash function.
/// <https://en.wikipedia.org/wiki/BLAKE_(hash_function)>
blake3,
}

#[derive(Deserialize, Debug, Clone, Copy)]
pub struct GlobalConfig {
/// Maximum number of open files that can be opened at one time.
Expand Down
23 changes: 18 additions & 5 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ use crate::serde_utils::{convert_numeric_with_shellexpand, convert_string_with_s
/// in the `CasConfig::stores`'s map key.
pub type StoreRefName = String;

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum ConfigDigestHashFunction {
/// Use the sha256 hash function.
/// <https://en.wikipedia.org/wiki/SHA-2>
sha256,

/// Use the blake3 hash function.
/// <https://en.wikipedia.org/wiki/BLAKE_(hash_function)>
blake3,
}

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StoreConfig {
Expand Down Expand Up @@ -329,12 +341,13 @@ pub struct VerifyStore {
#[serde(default)]
pub verify_size: bool,

/// If set this store will hash the contents and verify it matches the
/// digest hash before writing the entry to underlying store.
/// The digest hash function to hash the contents and to verify if the digest hash is
/// matching before writing the entry to underlying store.
///
/// This should be set to false for AC, but true for CAS stores.
#[serde(default)]
pub verify_hash: bool,
/// If None, the hash verification will be disabled.
///
/// This should be set to None for AC, but hashing function like `sha256` for CAS stores.
pub hash_verification_function: Option<ConfigDigestHashFunction>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
32 changes: 17 additions & 15 deletions nativelink-store/src/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use nativelink_config::stores::ConfigDigestHashFunction;
use nativelink_error::{make_input_err, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc};
use nativelink_util::metrics_utils::{Collector, CollectorState, CounterWithTime, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use sha2::{Digest, Sha256};

pub struct VerifyStore {
inner_store: Arc<dyn Store>,
verify_size: bool,
verify_hash: bool,
hash_verification_function: Option<ConfigDigestHashFunction>,

// Metrics.
size_verification_failures: CounterWithTime,
Expand All @@ -39,7 +40,7 @@ impl VerifyStore {
VerifyStore {
inner_store,
verify_size: config.verify_size,
verify_hash: config.verify_hash,
hash_verification_function: config.hash_verification_function,
size_verification_failures: CounterWithTime::default(),
hash_verification_failures: CounterWithTime::default(),
}
Expand All @@ -54,7 +55,8 @@ impl VerifyStore {
mut tx: DropCloserWriteHalf,
mut rx: DropCloserReadHalf,
size_info: UploadSizeInfo,
mut maybe_hasher: Option<([u8; 32], Sha256)>,
original_hash: [u8; 32],
mut maybe_hasher: Option<DigestHasher>,
) -> Result<(), Error> {
let mut sum_size: u64 = 0;
loop {
Expand All @@ -76,8 +78,9 @@ impl VerifyStore {
));
}
}
if let Some((original_hash, hasher)) = maybe_hasher {
let hash_result: [u8; 32] = hasher.finalize().into();
if let Some(hasher) = maybe_hasher.as_mut() {
// We are passing -1 here because we just need to get the hashing result not the size.
let hash_result: [u8; 32] = hasher.finalize_digest(-1).packed_hash;
if original_hash != hash_result {
self.hash_verification_failures.inc();
return Err(make_input_err!(
Expand All @@ -94,7 +97,7 @@ impl VerifyStore {
// This will allows us to hash while sending data to another thread.
let write_future = tx.send(chunk.clone());

if let Some((_, hasher)) = maybe_hasher.as_mut() {
if let Some(hasher) = maybe_hasher.as_mut() {
hasher.update(chunk.as_ref());
}

Expand Down Expand Up @@ -135,15 +138,14 @@ impl Store for VerifyStore {
}
}

let mut hasher = None;
if self.verify_hash {
hasher = Some((digest.packed_hash, Sha256::new()));
}
let hasher = self
.hash_verification_function
.map(|v| DigestHasher::from(DigestHasherFunc::from(v)));

let (tx, rx) = make_buf_channel_pair();

let update_fut = self.pin_inner().update(digest, rx, size_info);
let check_fut = self.inner_check_update(tx, reader, size_info, hasher);
let check_fut = self.inner_check_update(tx, reader, size_info, digest.packed_hash, hasher);

let (update_res, check_res) = tokio::join!(update_fut, check_fut);

Expand Down Expand Up @@ -181,9 +183,9 @@ impl MetricsComponent for VerifyStore {
"If the verification store is verifying the size of the data",
);
c.publish(
"verify_hash_enabled",
&self.verify_hash,
"If the verification store is verifying the hash of the data",
"hash_verification_function",
&format!("{:?}", self.hash_verification_function),
"Hash verification function to verify the contents of the data",
);
c.publish(
"size_verification_failures_total",
Expand Down
82 changes: 74 additions & 8 deletions nativelink-store/tests/verify_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ mod verify_store_tests {
nativelink_config::stores::MemoryStore::default(),
),
verify_size: false,
verify_hash: false,
hash_verification_function: None,
},
inner_store.clone(),
);
Expand Down Expand Up @@ -72,7 +72,7 @@ mod verify_store_tests {
nativelink_config::stores::MemoryStore::default(),
),
verify_size: true,
verify_hash: false,
hash_verification_function: None,
},
inner_store.clone(),
);
Expand Down Expand Up @@ -112,7 +112,7 @@ mod verify_store_tests {
nativelink_config::stores::MemoryStore::default(),
),
verify_size: true,
verify_hash: false,
hash_verification_function: None,
},
inner_store.clone(),
);
Expand All @@ -139,7 +139,7 @@ mod verify_store_tests {
nativelink_config::stores::MemoryStore::default(),
),
verify_size: true,
verify_hash: false,
hash_verification_function: None,
},
inner_store.clone(),
);
Expand Down Expand Up @@ -167,15 +167,15 @@ mod verify_store_tests {
}

#[tokio::test]
async fn verify_hash_true_suceeds_on_update() -> Result<(), Error> {
async fn verify_sha256_hash_true_suceeds_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default()));
let store_owned = VerifyStore::new(
&nativelink_config::stores::VerifyStore {
backend: nativelink_config::stores::StoreConfig::memory(
nativelink_config::stores::MemoryStore::default(),
),
verify_size: false,
verify_hash: true,
hash_verification_function: Some(nativelink_config::stores::ConfigDigestHashFunction::sha256),
},
inner_store.clone(),
);
Expand All @@ -196,15 +196,15 @@ mod verify_store_tests {
}

#[tokio::test]
async fn verify_hash_true_fails_on_update() -> Result<(), Error> {
async fn verify_sha256_hash_true_fails_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default()));
let store_owned = VerifyStore::new(
&nativelink_config::stores::VerifyStore {
backend: nativelink_config::stores::StoreConfig::memory(
nativelink_config::stores::MemoryStore::default(),
),
verify_size: false,
verify_hash: true,
hash_verification_function: Some(nativelink_config::stores::ConfigDigestHashFunction::sha256),
},
inner_store.clone(),
);
Expand All @@ -231,4 +231,70 @@ mod verify_store_tests {
);
Ok(())
}

#[tokio::test]
async fn verify_blake3_hash_true_suceeds_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default()));
let store_owned = VerifyStore::new(
&nativelink_config::stores::VerifyStore {
backend: nativelink_config::stores::StoreConfig::memory(
nativelink_config::stores::MemoryStore::default(),
),
verify_size: false,
hash_verification_function: Some(nativelink_config::stores::ConfigDigestHashFunction::blake3),
},
inner_store.clone(),
);
let store = Pin::new(&store_owned);

/// This value is blake3("123").
const HASH: &str = "b3d4f8803f7e24b8f389b072e75477cdbcfbe074080fb5e500e53e26e054158e";
const VALUE: &str = "123";
let digest = DigestInfo::try_new(HASH, 3).unwrap();
let result = store.update_oneshot(digest, VALUE.into()).await;
assert_eq!(result, Ok(()), "Expected success, got: {:?}", result);
assert_eq!(
Pin::new(inner_store.as_ref()).has(digest).await,
Ok(Some(VALUE.len())),
"Expected data to exist in store after update"
);
Ok(())
}

#[tokio::test]
async fn verify_blake3_hash_true_fails_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default()));
let store_owned = VerifyStore::new(
&nativelink_config::stores::VerifyStore {
backend: nativelink_config::stores::StoreConfig::memory(
nativelink_config::stores::MemoryStore::default(),
),
verify_size: false,
hash_verification_function: Some(nativelink_config::stores::ConfigDigestHashFunction::blake3),
},
inner_store.clone(),
);
let store = Pin::new(&store_owned);

/// This value is blake3("12").
const HASH: &str = "b944a0a3b20cf5927e594ff306d256d16cd5b0ba3e27b3285f40d7ef5e19695b";
const VALUE: &str = "123";
let digest = DigestInfo::try_new(HASH, 3).unwrap();
let result = store.update_oneshot(digest, VALUE.into()).await;
let err = result.unwrap_err().to_string();
const ACTUAL_HASH: &str = "b3d4f8803f7e24b8f389b072e75477cdbcfbe074080fb5e500e53e26e054158e";
let expected_err = format!("Hashes do not match, got: {} but digest hash was {}", HASH, ACTUAL_HASH);
assert!(
err.contains(&expected_err),
"Error should contain '{}', got: {:?}",
expected_err,
err
);
assert_eq!(
Pin::new(inner_store.as_ref()).has(digest).await,
Ok(None),
"Expected data to not exist in store after update"
);
Ok(())
}
}
2 changes: 1 addition & 1 deletion nativelink-util/src/digest_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::OnceLock;

use blake3::Hasher as Blake3Hasher;
use nativelink_config::cas_server::ConfigDigestHashFunction;
use nativelink_config::stores::ConfigDigestHashFunction;
use nativelink_error::{make_err, make_input_err, Code, Error};
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction;
use sha2::{Digest, Sha256};
Expand Down
3 changes: 2 additions & 1 deletion src/bin/nativelink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use futures::FutureExt;
use hyper::server::conn::Http;
use hyper::{Response, StatusCode};
use nativelink_config::cas_server::{
CasConfig, CompressionAlgorithm, ConfigDigestHashFunction, GlobalConfig, ListenerConfig, ServerConfig, WorkerConfig,
CasConfig, CompressionAlgorithm, GlobalConfig, ListenerConfig, ServerConfig, WorkerConfig,
};
use nativelink_config::stores::ConfigDigestHashFunction;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_scheduler::default_scheduler_factory::scheduler_factory;
use nativelink_scheduler::worker::WorkerId;
Expand Down

0 comments on commit 3acefc7

Please sign in to comment.