Skip to content

Commit

Permalink
MemoryStore now can be configured to evict entries
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jan 20, 2021
1 parent 34b9312 commit 5830d0b
Show file tree
Hide file tree
Showing 14 changed files with 402 additions and 35 deletions.
4 changes: 2 additions & 2 deletions cas/grpc_service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ fn make_store_manager() -> Result<StoreManager, Error> {
let mut store_manager = StoreManager::new();
store_manager.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore {}),
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)?;
store_manager.make_store(
"main_ac",
&config::backends::StoreConfig::memory(config::backends::MemoryStore {}),
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)?;
Ok(store_manager)
}
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn make_store_manager() -> Result<StoreManager, Error> {
let mut store_manager = StoreManager::new();
store_manager.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore {}),
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)?;
Ok(store_manager)
}
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn make_store_manager() -> Result<StoreManager, Error> {
let mut store_manager = StoreManager::new();
store_manager.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore {}),
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)?;
Ok(store_manager)
}
Expand Down
1 change: 1 addition & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rust_library(
"//third_party:tokio",
"//util:common",
"//util:error",
"//util:evicting_map",
":traits",
],
proc_macro_deps = ["//third_party:async_trait"],
Expand Down
9 changes: 2 additions & 7 deletions cas/store/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use config::{self, backends::StoreConfig};
use error::{Error, ResultExt};
use error::Error;
use memory_store::MemoryStore;
use verify_store::VerifyStore;

Expand All @@ -19,12 +19,7 @@ fn private_make_store(backend: &StoreConfig) -> Result<Arc<dyn Store>, Error> {
StoreConfig::memory(config) => Ok(Arc::new(MemoryStore::new(&config))),
StoreConfig::verify(config) => Ok(Arc::new(VerifyStore::new(
&config,
private_make_store(
config
.backend
.as_ref()
.err_tip(|| "Expected verify store to have 'backend'")?,
)?,
private_make_store(&config.backend)?,
))),
}
}
Expand Down
16 changes: 9 additions & 7 deletions cas/store/memory_store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

use std::collections::HashMap;
use std::marker::Send;
use std::sync::Arc;
use std::time::Instant;

use async_mutex::Mutex;
use async_trait::async_trait;
Expand All @@ -11,17 +11,19 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use common::DigestInfo;
use config;
use error::{Code, ResultExt};
use evicting_map::EvictingMap;
use traits::{ResultFuture, StoreTrait};

#[derive(Debug)]
pub struct MemoryStore {
map: Mutex<HashMap<[u8; 32], Arc<Vec<u8>>>>,
map: Mutex<EvictingMap<Instant>>,
}

impl MemoryStore {
pub fn new(_config: &config::backends::MemoryStore) -> Self {
pub fn new(config: &config::backends::MemoryStore) -> Self {
let empty_policy = config::backends::EvictionPolicy::default();
let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy);
MemoryStore {
map: Mutex::new(HashMap::new()),
map: Mutex::new(EvictingMap::new(eviction_policy, Instant::now())),
}
}
}
Expand All @@ -30,7 +32,7 @@ impl MemoryStore {
impl StoreTrait for MemoryStore {
fn has<'a>(self: std::pin::Pin<&'a Self>, digest: DigestInfo) -> ResultFuture<'a, bool> {
Box::pin(async move {
let map = self.map.lock().await;
let mut map = self.map.lock().await;
Ok(map.contains_key(&digest.packed_hash))
})
}
Expand All @@ -57,7 +59,7 @@ impl StoreTrait for MemoryStore {
length: Option<usize>,
) -> ResultFuture<'a, ()> {
Box::pin(async move {
let map = self.map.lock().await;
let mut map = self.map.lock().await;
let value = map
.get(&digest.packed_hash)
.err_tip_with_code(|_| (Code::NotFound, format!("Hash {} not found", digest.str())))?
Expand Down
8 changes: 4 additions & 4 deletions cas/store/tests/memory_store_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 Nathan (Blaise) Bruer. All rights reserved.
// Copyright 2020-2021 Nathan (Blaise) Bruer. All rights reserved.

use std::pin::Pin;

Expand All @@ -19,7 +19,7 @@ mod memory_store_tests {

#[tokio::test]
async fn insert_one_item_then_update() -> Result<(), Error> {
let store_owned = MemoryStore::new(&config::backends::MemoryStore {});
let store_owned = MemoryStore::new(&config::backends::MemoryStore::default());
let store = Pin::new(&store_owned);

{
Expand Down Expand Up @@ -73,7 +73,7 @@ mod memory_store_tests {

#[tokio::test]
async fn read_partial() -> Result<(), Error> {
let store_owned = MemoryStore::new(&config::backends::MemoryStore {});
let store_owned = MemoryStore::new(&config::backends::MemoryStore::default());
let store = Pin::new(&store_owned);

const VALUE1: &str = "1234";
Expand All @@ -97,7 +97,7 @@ mod memory_store_tests {

#[tokio::test]
async fn errors_with_invalid_inputs() -> Result<(), Error> {
let store_owned = MemoryStore::new(&config::backends::MemoryStore {});
let store_owned = MemoryStore::new(&config::backends::MemoryStore::default());
let store = Pin::new(&store_owned);
const VALUE1: &str = "123";
{
Expand Down
12 changes: 6 additions & 6 deletions cas/store/tests/verify_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ mod verify_store_tests {

#[tokio::test]
async fn verify_size_false_passes_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&config::backends::MemoryStore {}));
let inner_store = Arc::new(MemoryStore::new(&config::backends::MemoryStore::default()));
let store_owned = VerifyStore::new(
&config::backends::VerifyStore {
backend: None,
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: false,
},
inner_store.clone(),
Expand All @@ -50,10 +50,10 @@ mod verify_store_tests {

#[tokio::test]
async fn verify_size_true_fails_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&config::backends::MemoryStore {}));
let inner_store = Arc::new(MemoryStore::new(&config::backends::MemoryStore::default()));
let store_owned = VerifyStore::new(
&config::backends::VerifyStore {
backend: None,
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: true,
},
inner_store.clone(),
Expand Down Expand Up @@ -82,10 +82,10 @@ mod verify_store_tests {

#[tokio::test]
async fn verify_size_true_suceeds_on_update() -> Result<(), Error> {
let inner_store = Arc::new(MemoryStore::new(&config::backends::MemoryStore {}));
let inner_store = Arc::new(MemoryStore::new(&config::backends::MemoryStore::default()));
let store_owned = VerifyStore::new(
&config::backends::VerifyStore {
backend: None,
backend: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
verify_size: true,
},
inner_store.clone(),
Expand Down
32 changes: 28 additions & 4 deletions config/backends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@ pub enum StoreConfig {
verify(Box<VerifyStore>),
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Default)]
pub struct MemoryStore {
// TODO(allada) MemoryStore needs to implement deleting deleting old objects when they are deleted.
/// Policy used to evict items out of the store. Failure to set this
/// value will cause items to never be removed from the store causing
/// infinite memory usage.
pub eviction_policy: Option<EvictionPolicy>,
}

#[derive(Deserialize, Debug)]
pub struct VerifyStore {
#[serde(default)]
/// The underlying store wrap around. All content will first flow
/// through self before forwarding to backend. In the event there
/// is an error detected in self, the connection to the backend
/// will be terminated, and early termination should always cause
/// updates to fail on the backend.
pub backend: Option<StoreConfig>,
pub backend: StoreConfig,

/// If set the store will verify the size of the data before accepting
/// an upload of data.
Expand All @@ -41,3 +43,25 @@ pub struct VerifyStore {
#[serde(default)]
pub verify_size: bool,
}

/// Eviction policy always works on LRU (Least Recently Used). Any time an entry
/// is touched it updates the timestamp. Inserts and updates will execute the
/// eviction policy removing any expired entries and/or the oldest entries
/// until the store size becomes smaller than max_bytes.
#[derive(Deserialize, Debug, Default)]
pub struct EvictionPolicy {
/// Maximum number of bytes before eviction takes place.
/// Default: 0. Zero means never evict based on size.
#[serde(default)]
pub max_bytes: usize,

/// Maximum number of seconds for an entry to live before an eviction.
/// Default: 0. Zero means never evict based on time.
#[serde(default)]
pub max_seconds: u32,

/// Maximum size of the store before an eviction takes place.
/// Default: 0. Zero means never evict based on count.
#[serde(default)]
pub max_count: u64,
}
7 changes: 6 additions & 1 deletion config/examples/basic_cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
}
},
"AC_MAIN_STORE": {
"memory": {}
"memory": {
"eviction_policy": {
// 100mb.
"max_bytes": 100000000,
}
}
}
},
"servers": [{
Expand Down
27 changes: 25 additions & 2 deletions util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ rust_library(
visibility = ["//visibility:public"],
)

rust_library(
name = "evicting_map",
srcs = ["evicting_map.rs"],
deps = [
"//config",
"//third_party:lru",
],
visibility = ["//visibility:public"],
)

rust_library(
name = "async_fixed_buffer",
srcs = ["async_fixed_buffer.rs"],
Expand All @@ -39,8 +49,8 @@ rust_library(
)

rust_test(
name = "utils_tests",
srcs = ["tests/async_fixed_buffer_tests.rs"],
name = "async_fixed_buffer_test",
srcs = ["tests/async_fixed_buffer_test.rs"],
deps = [
"//third_party:futures",
"//third_party:pretty_assertions",
Expand All @@ -49,3 +59,16 @@ rust_test(
":error",
],
)

rust_test(
name = "evicting_map_test",
srcs = ["tests/evicting_map_test.rs"],
deps = [
"//config",
"//third_party:hex",
"//third_party:mock_instant",
"//third_party:tokio",
":error",
":evicting_map",
],
)

0 comments on commit 5830d0b

Please sign in to comment.