Skip to content

Commit

Permalink
CAS service now supports instance name partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jan 19, 2021
1 parent 0f7b2d0 commit faba6c9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
43 changes: 29 additions & 14 deletions cas/grpc_service/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ use error::{error_if, make_input_err, Error, ResultExt};
use store::{Store, StoreManager};

pub struct CasServer {
store: Arc<dyn Store>,
stores: HashMap<String, Arc<dyn Store>>,
}

impl CasServer {
pub fn new(config: &HashMap<InstanceName, CasStoreConfig>, store_manager: &StoreManager) -> Result<Self, Error> {
for (_instance_name, cas_cfg) in config {
let mut stores = HashMap::with_capacity(config.len());
for (instance_name, cas_cfg) in config {
let store = store_manager
.get_store(&cas_cfg.cas_store)
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", cas_cfg.cas_store))?;
// TODO(allada) We don't yet support instance_name.
return Ok(CasServer { store: store.clone() });
.ok_or_else(|| make_input_err!("'cas_store': '{}' does not exist", cas_cfg.cas_store))?
.clone();
stores.insert(instance_name.to_string(), store);
}
Err(make_input_err!("No configuration configured for 'cas' service"))
Ok(CasServer { stores: stores })
}

pub fn into_service(self) -> Server<CasServer> {
Expand All @@ -47,12 +48,17 @@ impl CasServer {

async fn inner_find_missing_blobs(
&self,
request: Request<FindMissingBlobsRequest>,
grpc_request: Request<FindMissingBlobsRequest>,
) -> Result<Response<FindMissingBlobsResponse>, Error> {
let mut futures = futures::stream::FuturesOrdered::new();
for digest in request.into_inner().blob_digests.into_iter() {
let store_owned = self.store.clone();
let request = grpc_request.into_inner();
for digest in request.blob_digests.into_iter() {
let digest: DigestInfo = digest.try_into()?;
let store_owned = self
.stores
.get(&request.instance_name)
.err_tip(|| format!("'instance_name' not configured for '{}'", digest.str()))?
.clone();
futures.push(tokio::spawn(async move {
let store = Pin::new(store_owned.as_ref());
store
Expand All @@ -78,10 +84,15 @@ impl CasServer {
grpc_request: Request<BatchUpdateBlobsRequest>,
) -> Result<Response<BatchUpdateBlobsResponse>, Error> {
let mut futures = futures::stream::FuturesOrdered::new();
for request in grpc_request.into_inner().requests {
let inner_request = grpc_request.into_inner();
for request in inner_request.requests {
let digest: DigestInfo = request.digest.err_tip(|| "Digest not found in request")?.try_into()?;
let digest_copy = digest.clone();
let store_owned = self.store.clone();
let store_owned = self
.stores
.get(&inner_request.instance_name)
.err_tip(|| format!("'instance_name' not configured for '{}'", digest.str()))?
.clone();
let request_data = request.data;
futures.push(tokio::spawn(
async move {
Expand Down Expand Up @@ -118,11 +129,15 @@ impl CasServer {
grpc_request: Request<BatchReadBlobsRequest>,
) -> Result<Response<BatchReadBlobsResponse>, Error> {
let mut futures = futures::stream::FuturesOrdered::new();

for digest in grpc_request.into_inner().digests {
let inner_request = grpc_request.into_inner();
for digest in inner_request.digests {
let digest: DigestInfo = digest.try_into()?;
let digest_copy = digest.clone();
let store_owned = self.store.clone();
let store_owned = self
.stores
.get(&inner_request.instance_name)
.err_tip(|| format!("'instance_name' not configured for '{}'", digest.str()))?
.clone();

futures.push(tokio::spawn(
async move {
Expand Down
4 changes: 2 additions & 2 deletions cas/grpc_service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use config;
use error::Error;
use store::StoreManager;

const INSTANCE_NAME: &str = "foo";
const INSTANCE_NAME: &str = "foo_instance_name";
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
const HASH2: &str = "9993456789abcdef000000000000000000000000000000000123456789abc999";
const HASH3: &str = "7773456789abcdef000000000000000000000000000000000123456789abc777";
Expand All @@ -34,7 +34,7 @@ fn make_store_manager() -> Result<StoreManager, Error> {
fn make_cas_server(store_manager: &mut StoreManager) -> Result<CasServer, Error> {
CasServer::new(
&hashmap! {
"main".to_string() => config::cas_server::CasStoreConfig{
"foo_instance_name".to_string() => config::cas_server::CasStoreConfig{
cas_store: "main_cas".to_string(),
}
},
Expand Down

0 comments on commit faba6c9

Please sign in to comment.