Skip to content

Commit

Permalink
Adds more eviction templates and functions in prep for filesystem store
Browse files Browse the repository at this point in the history
Adds ability to snapshot the eviction map and restore it. Also adds some
deref() functions that will eventually be used to delete files when needed.
  • Loading branch information
allada committed Nov 12, 2021
1 parent 791dc67 commit f2896a7
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 127 deletions.
1 change: 0 additions & 1 deletion cas/store/BUILD
Expand Up @@ -38,7 +38,6 @@ rust_library(
srcs = ["memory_store.rs"],
deps = [
"//config",
"//third_party:fast_async_mutex",
"//third_party:tokio",
"//util:common",
"//util:error",
Expand Down
17 changes: 5 additions & 12 deletions cas/store/dedup_store.rs
Expand Up @@ -20,7 +20,7 @@ use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::FramedRead;

use common::{DigestInfo, JoinHandleDropGuard};
use common::{DigestInfo, JoinHandleDropGuard, SerializableDigestInfo};
use config;
use error::{make_err, Code, Error, ResultExt};
use fastcdc::FastCDC;
Expand All @@ -33,16 +33,9 @@ const DEFAULT_NORM_SIZE: usize = 256 * 1024;
const DEFAULT_MAX_SIZE: usize = 512 * 1024;
const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10;

#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)]
#[repr(C)]
pub struct IndexEntry {
pub hash: [u8; 32],
pub size_bytes: u32,
}

#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)]
pub struct DedupIndex {
pub entries: Vec<IndexEntry>,
pub entries: Vec<SerializableDigestInfo>,
}

pub struct DedupStore {
Expand Down Expand Up @@ -136,9 +129,9 @@ impl StoreTrait for DedupStore {
let hash = Sha256::digest(&frame[..]);

let frame_len = frame.len();
let index_entry = IndexEntry {
let index_entry = SerializableDigestInfo {
hash: hash.into(),
size_bytes: frame_len as u32,
size_bytes: frame_len as u64,
};

let content_store_pin = Pin::new(content_store.as_ref());
Expand Down Expand Up @@ -207,7 +200,7 @@ impl StoreTrait for DedupStore {
let index_entries = {
// First we need to read from our index_store to get a list of all the files and locations.
let est_parts = (digest.size_bytes as usize / self.upload_normal_size) + 1;
let mut data = Vec::with_capacity(est_parts * size_of::<IndexEntry>());
let mut data = Vec::with_capacity(est_parts * size_of::<SerializableDigestInfo>());
self.pin_index_store()
.get_part(digest, &mut data, 0, None)
.await
Expand Down
26 changes: 10 additions & 16 deletions cas/store/memory_store.rs
Expand Up @@ -2,10 +2,9 @@

use std::marker::Send;
use std::sync::Arc;
use std::time::Instant;
use std::time::SystemTime;

use async_trait::async_trait;
use fast_async_mutex::mutex::Mutex;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

use common::DigestInfo;
Expand All @@ -15,31 +14,27 @@ use evicting_map::EvictingMap;
use traits::{ResultFuture, StoreTrait, UploadSizeInfo};

pub struct MemoryStore {
map: Mutex<EvictingMap<Vec<u8>, Instant>>,
map: EvictingMap<Vec<u8>, SystemTime>,
}

impl MemoryStore {
pub fn new(config: &config::backends::MemoryStore) -> Self {
let empty_policy = config::backends::EvictionPolicy::default();
let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy);
MemoryStore {
map: Mutex::new(EvictingMap::new(eviction_policy, Instant::now())),
map: EvictingMap::new(eviction_policy, SystemTime::now()),
}
}

pub async fn remove_entry(&self, digest: &DigestInfo) -> bool {
let mut map = self.map.lock().await;
map.remove(digest)
self.map.remove(digest).await
}
}

#[async_trait]
impl StoreTrait for MemoryStore {
fn has<'a>(self: std::pin::Pin<&'a Self>, digest: DigestInfo) -> ResultFuture<'a, Option<usize>> {
Box::pin(async move {
let mut map = self.map.lock().await;
Ok(map.size_for_key(&digest).map(|v| v as usize))
})
Box::pin(async move { Ok(self.map.size_for_key(&digest).await.map(|v| v as usize)) })
}

fn update<'a>(
Expand All @@ -56,8 +51,7 @@ impl StoreTrait for MemoryStore {
let mut buffer = Vec::with_capacity(max_size);
reader.read_to_end(&mut buffer).await?;
buffer.shrink_to_fit();
let mut map = self.map.lock().await;
map.insert(digest, Arc::new(buffer));
self.map.insert(digest, Arc::new(buffer)).await;
Ok(())
})
}
Expand All @@ -70,11 +64,11 @@ impl StoreTrait for MemoryStore {
length: Option<usize>,
) -> ResultFuture<'a, ()> {
Box::pin(async move {
let mut map = self.map.lock().await;
let value = map
let value = self
.map
.get(&digest)
.err_tip_with_code(|_| (Code::NotFound, format!("Hash {} not found", digest.str())))?
.as_ref();
.await
.err_tip_with_code(|_| (Code::NotFound, format!("Hash {} not found", digest.str())))?;
let default_len = value.len() - offset;
let length = length.unwrap_or(default_len).min(default_len);
writer
Expand Down
3 changes: 3 additions & 0 deletions util/BUILD
Expand Up @@ -23,6 +23,7 @@ rust_library(
"//third_party:hex",
"//third_party:lazy_init",
"//third_party:log",
"//third_party:serde",
"//third_party:tokio",
":error",
],
Expand All @@ -45,7 +46,9 @@ rust_library(
srcs = ["evicting_map.rs"],
deps = [
"//config",
"//third_party:fast_async_mutex",
"//third_party:lru",
"//third_party:serde",
"//util:common",
],
visibility = ["//visibility:public"],
Expand Down
14 changes: 14 additions & 0 deletions util/common.rs
Expand Up @@ -11,6 +11,7 @@ use hex::FromHex;
use lazy_init::LazyTransform;
pub use log;
use proto::build::bazel::remote::execution::v2::Digest;
use serde::{Deserialize, Serialize};
use tokio::task::{JoinError, JoinHandle};

use error::{make_input_err, Error, ResultExt};
Expand Down Expand Up @@ -119,6 +120,19 @@ impl Into<Digest> for DigestInfo {
}
}

impl Into<DigestInfo> for SerializableDigestInfo {
fn into(self) -> DigestInfo {
DigestInfo::new(self.hash, self.size_bytes as i64)
}
}

#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)]
#[repr(C)]
pub struct SerializableDigestInfo {
pub hash: [u8; 32],
pub size_bytes: u64,
}

/// Simple wrapper that will abort a future that is running in another spawn in the
/// event that this handle gets dropped.
pub struct JoinHandleDropGuard<T> {
Expand Down

0 comments on commit f2896a7

Please sign in to comment.