From 7af7cc6500254f38e705ed00c28ea2927549419a Mon Sep 17 00:00:00 2001 From: Nathan Bruer Date: Mon, 18 Jan 2021 19:16:54 -0800 Subject: [PATCH] Action Cache service now properly honors instance_name --- cas/grpc_service/ac_server.rs | 36 +++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/cas/grpc_service/ac_server.rs b/cas/grpc_service/ac_server.rs index 8ff8cd116..0bb0e5e0a 100644 --- a/cas/grpc_service/ac_server.rs +++ b/cas/grpc_service/ac_server.rs @@ -21,20 +21,20 @@ use error::{make_input_err, Code, Error, ResultExt}; use store::{Store, StoreManager}; pub struct AcServer { - ac_store: Arc, + stores: HashMap>, } impl AcServer { pub fn new(config: &HashMap, store_manager: &StoreManager) -> Result { - for (_instance_name, ac_cfg) in config { - let ac_store = store_manager + let mut stores = HashMap::with_capacity(config.len()); + for (instance_name, ac_cfg) in config { + let store = store_manager .get_store(&ac_cfg.ac_store) - .ok_or_else(|| make_input_err!("'ac_store': '{}' does not exist", ac_cfg.ac_store))?; - return Ok(AcServer { - ac_store: ac_store.clone(), - }); + .ok_or_else(|| make_input_err!("'ac_store': '{}' does not exist", ac_cfg.ac_store))? + .clone(); + stores.insert(instance_name.to_string(), store); } - Err(make_input_err!("No configuration configured for 'ac' service")) + Ok(AcServer { stores: stores.clone() }) } pub fn into_service(self) -> Server { @@ -57,8 +57,14 @@ impl AcServer { // TODO(allada) There is a security risk here of someone taking all the memory on the instance. let mut store_data = Vec::with_capacity(digest.size_bytes as usize); let mut cursor = Cursor::new(&mut store_data); - let ac_store = Pin::new(self.ac_store.as_ref()); - ac_store.get(digest.clone(), &mut cursor).await?; + let instance_name = get_action_request.instance_name; + let store = Pin::new( + self.stores + .get(&instance_name) + .err_tip(|| format!("'instance_name' not configured for '{}'", &instance_name))? + .as_ref(), + ); + store.get(digest.clone(), &mut cursor).await?; let action_result = ActionResult::decode(Cursor::new(&store_data)) .err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))?; @@ -89,8 +95,14 @@ impl AcServer { .encode(&mut store_data) .err_tip(|| "Provided ActionResult could not be serialized")?; - let ac_store = Pin::new(self.ac_store.as_ref()); - ac_store.update(digest, Box::new(Cursor::new(store_data))).await?; + let instance_name = update_action_request.instance_name; + let store = Pin::new( + self.stores + .get(&instance_name) + .err_tip(|| format!("'instance_name' not configured for '{}'", &instance_name))? + .as_ref(), + ); + store.update(digest, Box::new(Cursor::new(store_data))).await?; Ok(Response::new(action_result)) } }