Skip to content

Commit

Permalink
Add ability for VerifyStore to check the sha256 hash of the digest
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jan 21, 2021
1 parent 0eb2dab commit 40ba2fb
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 5 deletions.
2 changes: 2 additions & 0 deletions cas/store/BUILD
Expand Up @@ -50,6 +50,8 @@ rust_library(
deps = [
"//config",
"//third_party:async_mutex",
"//third_party:hex",
"//third_party:sha2",
"//third_party:tokio",
"//util:async_fixed_buffer",
"//util:common",
Expand Down
66 changes: 66 additions & 0 deletions cas/store/tests/verify_store_test.rs
Expand Up @@ -28,6 +28,7 @@ mod verify_store_tests {
&config::backends::VerifyStore {
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: false,
verify_hash: false,
},
inner_store.clone(),
);
Expand Down Expand Up @@ -57,6 +58,7 @@ mod verify_store_tests {
&config::backends::VerifyStore {
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: true,
verify_hash: false,
},
inner_store.clone(),
);
Expand Down Expand Up @@ -89,6 +91,7 @@ mod verify_store_tests {
&config::backends::VerifyStore {
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: true,
verify_hash: false,
},
inner_store.clone(),
);
Expand All @@ -113,6 +116,7 @@ mod verify_store_tests {
&config::backends::VerifyStore {
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: true,
verify_hash: false,
},
inner_store.clone(),
);
Expand All @@ -136,4 +140,66 @@ mod verify_store_tests {
);
Ok(())
}

#[tokio::test]
async fn verify_hash_true_suceeds_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&config::backends::MemoryStore::default()));
let store_owned = VerifyStore::new(
&config::backends::VerifyStore {
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: false,
verify_hash: true,
},
inner_store.clone(),
);
let store = Pin::new(&store_owned);

/// This value is sha256("123").
const HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3";
const VALUE: &str = "123";
let digest = DigestInfo::try_new(&HASH, 3).unwrap();
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE))).await;
assert_eq!(result, Ok(()), "Expected success, got: {:?}", result);
assert_eq!(
Pin::new(inner_store.as_ref()).has(digest).await?,
true,
"Expected data to exist in store after update"
);
Ok(())
}

#[tokio::test]
async fn verify_hash_true_fails_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&config::backends::MemoryStore::default()));
let store_owned = VerifyStore::new(
&config::backends::VerifyStore {
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: false,
verify_hash: true,
},
inner_store.clone(),
);
let store = Pin::new(&store_owned);

/// This value is sha256("12").
const HASH: &str = "6b51d431df5d7f141cbececcf79edf3dd861c3b4069f0b11661a3eefacbba918";
const VALUE: &str = "123";
let digest = DigestInfo::try_new(&HASH, 3).unwrap();
let result = store.update(digest.clone(), Box::new(Cursor::new(VALUE))).await;
let err = result.unwrap_err().to_string();
const ACTUAL_HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3";
let expected_err = format!("Hashes do not match, got: {} but digest hash was {}", HASH, ACTUAL_HASH);
assert!(
err.contains(&expected_err),
"Error should contain '{}', got: {:?}",
expected_err,
err
);
assert_eq!(
Pin::new(inner_store.as_ref()).has(digest).await?,
false,
"Expected data to not exist in store after update"
);
Ok(())
}
}
31 changes: 27 additions & 4 deletions cas/store/verify_store.rs
Expand Up @@ -5,6 +5,8 @@ use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use hex;
use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, WriteHalf};

use async_fixed_buffer::AsyncFixedBuf;
Expand All @@ -15,13 +17,15 @@ use traits::{ResultFuture, StoreTrait};
pub struct VerifyStore {
inner_store: Arc<dyn StoreTrait>,
verify_size: bool,
verify_hash: bool,
}

impl VerifyStore {
pub fn new(config: &config::backends::VerifyStore, inner_store: Arc<dyn StoreTrait>) -> Self {
VerifyStore {
inner_store: inner_store,
verify_size: config.verify_size,
verify_hash: config.verify_hash,
}
}

Expand All @@ -35,6 +39,7 @@ async fn inner_check_update<'a>(
mut reader: Box<dyn AsyncRead + Send + Sync + Unpin + 'a>,
expected_size: i64,
verify_size: bool,
mut maybe_hasher: Option<([u8; 32], Sha256)>,
) -> Result<(), Error> {
let mut buffer = vec![0u8; 1024 * 4];
let mut sum_size: i64 = 0;
Expand All @@ -44,9 +49,12 @@ async fn inner_check_update<'a>(
.await
.err_tip(|| "Stream read terminated early")?;
sum_size += sz as i64;
tx.write_all(&buffer[0..sz])
.await
.err_tip(|| "Failed to write to underlying store")?;
let write_future = tx.write_all(&buffer[0..sz]);
// This will allows us to hash while sending data to another thread.
if let Some((_, hasher)) = maybe_hasher.as_mut() {
hasher.update(&buffer[0..sz]);
}
write_future.await.err_tip(|| "Failed to write to underlying store")?;
if sz != 0 {
continue;
}
Expand All @@ -56,6 +64,16 @@ async fn inner_check_update<'a>(
expected_size,
sum_size
);
if let Some((original_hash, hasher)) = maybe_hasher {
let hash_result: [u8; 32] = hasher.finalize().into();
error_if!(
original_hash != hash_result,
"Hashes do not match, got: {} but digest hash was {}",
hex::encode(original_hash),
hex::encode(hash_result),
);
}

// Note: EOF is not sent from write_all() only sent in write().
tx.write(&vec![]).await.err_tip(|| "Failed to write EOF byte")?;
tx.shutdown()
Expand All @@ -82,10 +100,15 @@ impl StoreTrait for VerifyStore {
let mut stream_closer = raw_fixed_buffer.get_closer();
let (rx, tx) = tokio::io::split(raw_fixed_buffer);

let hash_copy = digest.packed_hash;
let inner_store_clone = self.inner_store.clone();
let spawn_future =
tokio::spawn(async move { Pin::new(inner_store_clone.as_ref()).update(digest, Box::new(rx)).await });
let result = inner_check_update(tx, reader, expected_size, self.verify_size).await;
let mut hasher = None;
if self.verify_hash {
hasher = Some((hash_copy, Sha256::new()));
}
let result = inner_check_update(tx, reader, expected_size, self.verify_size, hasher).await;
stream_closer();
result.merge(
spawn_future
Expand Down
7 changes: 7 additions & 0 deletions config/backends.rs
Expand Up @@ -42,6 +42,13 @@ pub struct VerifyStore {
/// This should be set to false for AC, but true for CAS stores.
#[serde(default)]
pub verify_size: bool,

/// If set this store will hash the contents and verify it matches the
/// digest hash before writing the entry to underlying store.
///
/// This should be set to false for AC, but true for CAS stores.
#[serde(default)]
pub verify_hash: bool,
}

/// Eviction policy always works on LRU (Least Recently Used). Any time an entry
Expand Down
3 changes: 2 additions & 1 deletion config/examples/basic_cas.json
Expand Up @@ -5,7 +5,8 @@
"backend": {
"memory": {}
},
"verify_size": true
"verify_size": true,
"verify_hash": true
}
},
"AC_MAIN_STORE": {
Expand Down

0 comments on commit 40ba2fb

Please sign in to comment.