Skip to content

Commit

Permalink
Fix clippy messages in cas/store
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jul 12, 2023
1 parent 4062d1d commit 7fef931
Show file tree
Hide file tree
Showing 23 changed files with 178 additions and 208 deletions.
30 changes: 14 additions & 16 deletions cas/store/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,21 @@ pub async fn get_and_decode_digest<T: Message + Default>(
}

/// Takes a proto message and will serialize it and upload it to the provided store.
pub fn serialize_and_upload_message<'a, T: Message>(
pub async fn serialize_and_upload_message<'a, T: Message>(
message: &'a T,
cas_store: Pin<&'a dyn Store>,
) -> impl Future<Output = Result<DigestInfo, Error>> + 'a {
async move {
let mut buffer = BytesMut::new();
let digest = {
message
.encode(&mut buffer)
.err_tip(|| "Could not encode directory proto")?;
let mut hasher = Sha256::new();
hasher.update(&buffer);
DigestInfo::new(hasher.finalize().into(), buffer.len() as i64)
};
upload_to_store(cas_store, digest.clone(), &mut Cursor::new(buffer)).await?;
Ok(digest)
}
) -> Result<DigestInfo, Error> {
let mut buffer = BytesMut::new();
let digest = {
message
.encode(&mut buffer)
.err_tip(|| "Could not encode directory proto")?;
let mut hasher = Sha256::new();
hasher.update(&buffer);
DigestInfo::new(hasher.finalize().into(), buffer.len() as i64)
};
upload_to_store(cas_store, digest.clone(), &mut Cursor::new(buffer)).await?;
Ok(digest)
}

/// Given a bytestream computes the digest for the data.
Expand Down Expand Up @@ -137,7 +135,7 @@ pub fn upload_to_store<'a, R: AsyncRead + Unpin>(
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during upload_to_store")?;
if chunk.len() == 0 {
if chunk.is_empty() {
break; // EOF.
}
tx.send(chunk.freeze())
Expand Down
38 changes: 15 additions & 23 deletions cas/store/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl UploadState {
indexes: vec![SliceIndex { ..Default::default() }; max_index_count],
index_count: max_index_count as u32,
uncompressed_data_size: 0, // Updated later.
config: header.config.clone(),
config: header.config,
version: CURRENT_STREAM_FORMAT_VERSION,
};

Expand Down Expand Up @@ -224,7 +224,7 @@ impl CompressionStore {
}
};
Ok(CompressionStore {
inner_store: inner_store,
inner_store,
config: lz4_config,
bincode_options: DefaultOptions::new().with_fixint_encoding(),
})
Expand All @@ -250,11 +250,7 @@ impl StoreTrait for CompressionStore {
let inner_store = self.inner_store.clone();
let update_fut = JoinHandleDropGuard::new(tokio::spawn(async move {
Pin::new(inner_store.as_ref())
.update(
digest,
rx,
UploadSizeInfo::MaxSize(output_state.max_output_size as usize),
)
.update(digest, rx, UploadSizeInfo::MaxSize(output_state.max_output_size))
.await
.err_tip(|| "Inner store update in compression store failed")
}))
Expand All @@ -278,17 +274,17 @@ impl StoreTrait for CompressionStore {
}

let mut received_amt = 0;
let mut index_count = 0;
let mut index_count: u32 = 0;
for index in &mut output_state.footer.indexes {
let chunk = reader
.take(self.config.block_size as usize)
.await
.err_tip(|| "Failed to read take in update in compression store")?;
if chunk.len() == 0 {
if chunk.is_empty() {
break; // EOF.
}

received_amt = received_amt + chunk.len();
received_amt += chunk.len();
error_if!(
received_amt > output_state.input_max_size,
"Got more data than stated in compression store upload request"
Expand All @@ -301,11 +297,11 @@ impl StoreTrait for CompressionStore {

// For efficiency reasons we do some raw slice manipulation so we can write directly
// into our buffer instead of having to do another allocation.
let mut raw_compressed_data = unsafe {
let raw_compressed_data = unsafe {
std::slice::from_raw_parts_mut(compressed_data_buf.chunk_mut().as_mut_ptr(), max_output_size)
};

let compressed_data_sz = compress_into(&chunk, &mut raw_compressed_data)
let compressed_data_sz = compress_into(&chunk, raw_compressed_data)
.map_err(|e| make_err!(Code::Internal, "Compression error {:?}", e))?;
unsafe {
compressed_data_buf.advance_mut(compressed_data_sz);
Expand All @@ -330,13 +326,11 @@ impl StoreTrait for CompressionStore {
// one index too many.
// Note: We need to be careful that if we don't have any data (zero bytes) it
// doesn't go to -1.
if index_count > 0 {
index_count -= 1;
}
index_count = index_count.saturating_sub(1);
output_state
.footer
.indexes
.resize(index_count, SliceIndex { ..Default::default() });
.resize(index_count as usize, SliceIndex { ..Default::default() });
output_state.footer.index_count = output_state.footer.indexes.len() as u32;
output_state.footer.uncompressed_data_size = received_amt as u64;
{
Expand Down Expand Up @@ -396,7 +390,7 @@ impl StoreTrait for CompressionStore {
config: Lz4Config { block_size: 0 },
upload_size: UploadSizeInfo::ExactSize(0),
};
let header_size = self.bincode_options.serialized_size(&EMPTY_HEADER).unwrap() as u64;
let header_size = self.bincode_options.serialized_size(&EMPTY_HEADER).unwrap();
let chunk = rx
.take(header_size as usize)
.await
Expand Down Expand Up @@ -440,7 +434,7 @@ impl StoreTrait for CompressionStore {

let mut uncompressed_data_sz: u64 = 0;
let mut remaining_bytes_to_send: u64 = length.unwrap_or(usize::MAX) as u64;
let mut chunks_count = 0;
let mut chunks_count: u32 = 0;
while frame_type != FOOTER_FRAME_TYPE {
error_if!(
frame_type != CHUNK_FRAME_TYPE,
Expand All @@ -465,11 +459,11 @@ impl StoreTrait for CompressionStore {

// For efficiency reasons we do some raw slice manipulation so we can write directly
// into our buffer instead of having to do another allocation.
let mut raw_decompressed_data = unsafe {
let raw_decompressed_data = unsafe {
std::slice::from_raw_parts_mut(uncompressed_data.chunk_mut().as_mut_ptr(), max_output_size)
};

let uncompressed_chunk_sz = decompress_into(&chunk, &mut raw_decompressed_data)
let uncompressed_chunk_sz = decompress_into(&chunk, raw_decompressed_data)
.map_err(|e| make_err!(Code::Internal, "Decompression error {:?}", e))?;
unsafe { uncompressed_data.advance_mut(uncompressed_chunk_sz) };
let new_uncompressed_data_sz = uncompressed_data_sz + uncompressed_chunk_sz as u64;
Expand Down Expand Up @@ -506,9 +500,7 @@ impl StoreTrait for CompressionStore {
frame_sz = chunk.get_u32_le();
}
// Index count will always be +1 (unless it is zero bytes long).
if chunks_count > 0 {
chunks_count -= 1;
}
chunks_count = chunks_count.saturating_sub(1);
{
// Read and validate footer.
let chunk = rx
Expand Down
5 changes: 2 additions & 3 deletions cas/store/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use tokio_util::codec::FramedRead;

use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, StreamReader};
use common::{log, DigestInfo, JoinHandleDropGuard};
use config;
use error::{make_err, Code, Error, ResultExt};
use fastcdc::FastCDC;
use traits::{StoreTrait, UploadSizeInfo};
Expand Down Expand Up @@ -94,7 +93,7 @@ impl DedupStore {
}
}

fn pin_index_store<'a>(&'a self) -> Pin<&'a dyn StoreTrait> {
fn pin_index_store(&self) -> Pin<&dyn StoreTrait> {
Pin::new(self.index_store.as_ref())
}
}
Expand Down Expand Up @@ -187,7 +186,7 @@ impl StoreTrait for DedupStore {
let index_entry = DigestInfo::new(hash.into(), frame_len as i64);

let content_store_pin = Pin::new(content_store.as_ref());
let digest = DigestInfo::new(hash.clone().into(), frame.len() as i64);
let digest = DigestInfo::new(hash.into(), frame.len() as i64);
if content_store_pin.has(digest.clone()).await?.is_some() {
// If our store has this digest, we don't need to upload it.
return Ok(index_entry);
Expand Down
25 changes: 12 additions & 13 deletions cas/store/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,39 @@ use size_partitioning_store::SizePartitioningStore;
use store::{Store, StoreManager};
use verify_store::VerifyStore;

pub fn store_factory<'a>(
backend: &'a StoreConfig,
store_manager: &'a Arc<StoreManager>,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn Store>, Error>> + 'a>> {
type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Arc<dyn Store>, Error>> + 'a>;

pub fn store_factory<'a>(backend: &'a StoreConfig, store_manager: &'a Arc<StoreManager>) -> Pin<FutureMaybeStore<'a>> {
Box::pin(async move {
let store: Arc<dyn Store> = match backend {
StoreConfig::memory(config) => Arc::new(MemoryStore::new(&config)),
StoreConfig::s3_store(config) => Arc::new(S3Store::new(&config)?),
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
StoreConfig::s3_store(config) => Arc::new(S3Store::new(config)?),
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
&config,
config,
store_factory(&config.backend, store_manager).await?,
)),
StoreConfig::compression(config) => Arc::new(CompressionStore::new(
*config.clone(),
store_factory(&config.backend, store_manager).await?,
)?),
StoreConfig::dedup(config) => Arc::new(DedupStore::new(
&config,
config,
store_factory(&config.index_store, store_manager).await?,
store_factory(&config.content_store, store_manager).await?,
)),
StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new(
&config,
config,
store_factory(&config.fast, store_manager).await?,
store_factory(&config.slow, store_manager).await?,
)),
StoreConfig::filesystem(config) => Arc::new(<FilesystemStore>::new(&config).await?),
StoreConfig::ref_store(config) => Arc::new(RefStore::new(&config, store_manager.clone())),
StoreConfig::filesystem(config) => Arc::new(<FilesystemStore>::new(config).await?),
StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, store_manager.clone())),
StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new(
&config,
config,
store_factory(&config.lower_store, store_manager).await?,
store_factory(&config.upper_store, store_manager).await?,
)),
StoreConfig::grpc(config) => Arc::new(GrpcStore::new(&config).await?),
StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?),
};
Ok(store)
})
Expand Down
13 changes: 6 additions & 7 deletions cas/store/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use futures::{join, FutureExt};

use buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use common::DigestInfo;
use config;
use error::{make_err, Code, Error, ResultExt};
use traits::{StoreTrait, UploadSizeInfo};

Expand All @@ -45,7 +44,7 @@ impl FastSlowStore {
Self { fast_store, slow_store }
}

pub fn fast_store<'a>(&'a self) -> &'a Arc<dyn StoreTrait> {
pub fn fast_store(&self) -> &Arc<dyn StoreTrait> {
&self.fast_store
}

Expand All @@ -59,7 +58,7 @@ impl FastSlowStore {
.has(digest.clone())
.await
.err_tip(|| "While querying in populate_fast_store")?;
if let Some(_) = maybe_size_info {
if maybe_size_info.is_some() {
return Ok(());
}
// TODO(blaise.bruer) This is extremely inefficient, since we are just trying
Expand All @@ -74,11 +73,11 @@ impl FastSlowStore {
get_res.err_tip(|| "Failed to populate()").merge(drain_res)
}

fn pin_fast_store<'a>(&'a self) -> Pin<&'a dyn StoreTrait> {
fn pin_fast_store(&self) -> Pin<&dyn StoreTrait> {
Pin::new(self.fast_store.as_ref())
}

fn pin_slow_store<'a>(&'a self) -> Pin<&'a dyn StoreTrait> {
fn pin_slow_store(&self) -> Pin<&dyn StoreTrait> {
Pin::new(self.slow_store.as_ref())
}
}
Expand Down Expand Up @@ -109,7 +108,7 @@ impl StoreTrait for FastSlowStore {
.recv()
.await
.err_tip(|| "Failed to read buffer in fastslow store")?;
if buffer.len() == 0 {
if buffer.is_empty() {
// EOF received.
fast_tx
.send_eof()
Expand Down Expand Up @@ -192,7 +191,7 @@ impl StoreTrait for FastSlowStore {
.recv()
.await
.err_tip(|| "Failed to read data data buffer from slow store")?;
if output_buf.len() == 0 {
if output_buf.is_empty() {
// Write out our EOF.
// It is possible for the client to disconnect the stream because they got
// all the data they wanted, which could lead to an error when writing this
Expand Down
Loading

0 comments on commit 7fef931

Please sign in to comment.