Skip to content

Commit

Permalink
Add filesystem store
Browse files Browse the repository at this point in the history
This store will use the filesystem to store the files in a persistent
manner.
  • Loading branch information
allada committed Nov 18, 2021
1 parent 025305c commit d183cad
Show file tree
Hide file tree
Showing 20 changed files with 962 additions and 150 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-trait = "0.1.51"
fixed-buffer = "0.2.3"
futures = "0.3.17"
tokio = { version = "1.13.0", features = ["macros", "io-util", "fs", "rt-multi-thread"] }
tokio-stream = "0.1.8"
tokio-stream = { version = "0.1.8", features = ["fs"] }
tokio-util = { version = "0.6.9", features = ["io", "io-util"] }
tonic = "0.6.1"
lazy-init = "0.5.0"
Expand All @@ -37,6 +37,7 @@ bincode = "1.3.3"
bytes = "1.1.0"
byteorder = "1.4.3"
lazy_static = "1.4.0"
filetime = "0.2.15"

[dev-dependencies]
clap = "2.33.3"
Expand Down
1 change: 1 addition & 0 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
for (name, store_cfg) in cfg.stores {
store_manager
.make_store(&name, &store_cfg)
.await
.err_tip(|| format!("Failed to create store '{}'", name))?;
}

Expand Down
30 changes: 17 additions & 13 deletions cas/grpc_service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ async fn insert_into_store<T: Message>(
Ok(digest.size_bytes)
}

fn make_store_manager() -> Result<StoreManager, Error> {
async fn make_store_manager() -> Result<StoreManager, Error> {
let mut store_manager = StoreManager::new();
store_manager.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)?;
store_manager.make_store(
"main_ac",
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)?;
store_manager
.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)
.await?;
store_manager
.make_store(
"main_ac",
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)
.await?;
Ok(store_manager)
}

Expand Down Expand Up @@ -79,7 +83,7 @@ mod get_action_result {

#[tokio::test]
async fn empty_store() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&mut store_manager)?;

let raw_response = get_action_result(&ac_server, HASH1, 0).await;
Expand All @@ -95,7 +99,7 @@ mod get_action_result {

#[tokio::test]
async fn has_single_item() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&mut store_manager)?;
let ac_store_owned = store_manager.get_store("main_ac").unwrap();

Expand All @@ -113,7 +117,7 @@ mod get_action_result {

#[tokio::test]
async fn single_item_wrong_digest_size() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&mut store_manager)?;
let ac_store_owned = store_manager.get_store("main_ac").unwrap();

Expand Down Expand Up @@ -165,7 +169,7 @@ mod update_action_result {

#[tokio::test]
async fn one_item_update_test() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let ac_server = make_ac_server(&mut store_manager)?;
let ac_store_owned = store_manager.get_store("main_ac").unwrap();

Expand Down
22 changes: 12 additions & 10 deletions cas/grpc_service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use store::StoreManager;
const INSTANCE_NAME: &str = "foo_instance_name";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";

fn make_store_manager() -> Result<StoreManager, Error> {
async fn make_store_manager() -> Result<StoreManager, Error> {
let mut store_manager = StoreManager::new();
store_manager.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)?;
store_manager
.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)
.await?;
Ok(store_manager)
}

Expand Down Expand Up @@ -87,7 +89,7 @@ pub mod write_tests {

#[tokio::test]
pub async fn chunked_stream_receives_all_data() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(&mut store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();

Expand Down Expand Up @@ -181,7 +183,7 @@ pub mod read_tests {

#[tokio::test]
pub async fn chunked_stream_reads_small_set_of_data() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(&mut store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();

Expand Down Expand Up @@ -221,7 +223,7 @@ pub mod read_tests {

#[tokio::test]
pub async fn chunked_stream_reads_10mb_of_data() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let bs_server = make_bytestream_server(&mut store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();

Expand Down Expand Up @@ -267,7 +269,7 @@ pub mod read_tests {
/// stream was never shutdown.
#[tokio::test]
pub async fn read_with_not_found_does_not_deadlock() -> Result<(), Error> {
let mut store_manager = make_store_manager().err_tip(|| "Couldn't get store manager")?;
let mut store_manager = make_store_manager().await.err_tip(|| "Couldn't get store manager")?;
let mut read_stream = {
let bs_server = make_bytestream_server(&mut store_manager).err_tip(|| "Couldn't make store")?;
let read_request = ReadRequest {
Expand Down Expand Up @@ -327,7 +329,7 @@ pub mod query_tests {

#[tokio::test]
pub async fn test_query_write_status_smoke_test() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let bs_server = Arc::new(make_bytestream_server(&mut store_manager)?);

let raw_data = "12456789abcdefghijk".as_bytes();
Expand Down
24 changes: 13 additions & 11 deletions cas/grpc_service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ const HASH2: &str = "9993456789abcdef000000000000000000000000000000000123456789a
const HASH3: &str = "7773456789abcdef000000000000000000000000000000000123456789abc777";
const BAD_HASH: &str = "BAD_HASH";

fn make_store_manager() -> Result<StoreManager, Error> {
async fn make_store_manager() -> Result<StoreManager, Error> {
let mut store_manager = StoreManager::new();
store_manager.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)?;
store_manager
.make_store(
"main_cas",
&config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
)
.await?;
Ok(store_manager)
}

Expand All @@ -51,7 +53,7 @@ mod find_missing_blobs {

#[tokio::test]
async fn empty_store() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let cas_server = make_cas_server(&mut store_manager)?;

let raw_response = cas_server
Expand All @@ -71,7 +73,7 @@ mod find_missing_blobs {

#[tokio::test]
async fn store_one_item_existence() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let cas_server = make_cas_server(&mut store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();

Expand All @@ -98,7 +100,7 @@ mod find_missing_blobs {

#[tokio::test]
async fn has_three_requests_one_bad_hash() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let cas_server = make_cas_server(&mut store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();

Expand Down Expand Up @@ -148,7 +150,7 @@ mod batch_update_blobs {

#[tokio::test]
async fn update_existing_item() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let cas_server = make_cas_server(&mut store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();

Expand Down Expand Up @@ -214,7 +216,7 @@ mod batch_read_blobs {

#[tokio::test]
async fn batch_read_blobs_read_two_blobs_success_one_fail() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let cas_server = make_cas_server(&mut store_manager)?;
let store_owned = store_manager.get_store("main_cas").unwrap();

Expand Down Expand Up @@ -308,7 +310,7 @@ mod end_to_end {

#[tokio::test]
async fn batch_update_blobs_two_items_existence_with_third_missing() -> Result<(), Box<dyn std::error::Error>> {
let mut store_manager = make_store_manager()?;
let mut store_manager = make_store_manager().await?;
let cas_server = make_cas_server(&mut store_manager)?;

const VALUE1: &str = "1";
Expand Down
40 changes: 40 additions & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ rust_library(
srcs = ["lib.rs"],
deps = [
"//config",
"//third_party:futures",
"//util:error",
":compression_store",
":dedup_store",
":fast_slow_store",
":filesystem_store",
":memory_store",
":s3_store",
":traits",
Expand Down Expand Up @@ -149,6 +151,27 @@ rust_library(
visibility = ["//cas:__pkg__"]
)

rust_library(
name = "filesystem_store",
srcs = ["filesystem_store.rs"],
deps = [
"//config",
"//third_party:bytes",
"//third_party:filetime",
"//third_party:futures",
"//third_party:rand",
"//third_party:tokio",
"//third_party:tokio_stream",
"//util:buf_channel",
"//util:common",
"//util:error",
"//util:evicting_map",
":traits",
],
proc_macro_deps = ["//third_party:async_trait"],
visibility = ["//cas:__pkg__"]
)

rust_test(
name = "fast_slow_store_test",
srcs = ["tests/fast_slow_store_test.rs"],
Expand Down Expand Up @@ -251,3 +274,20 @@ rust_test(
":traits",
],
)


rust_test(
name = "filesystem_store_test",
srcs = ["tests/filesystem_store_test.rs"],
deps = [
"//config",
"//third_party:filetime",
"//third_party:pretty_assertions",
"//third_party:rand",
"//third_party:tokio",
"//util:common",
"//util:error",
":filesystem_store",
":traits",
],
)
Loading

0 comments on commit d183cad

Please sign in to comment.