Skip to content

Commit

Permalink
Add retry to GrpcScheduler (#324)
Browse files Browse the repository at this point in the history
Closes #322.
  • Loading branch information
chrisstaite-menlo committed Oct 27, 2023
1 parent ef794c2 commit 21519ce
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 35 deletions.
3 changes: 3 additions & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,14 @@ rust_library(
":action_messages",
":platform_property_manager",
":scheduler",
"//util:retry",
"//config",
"//proto",
"//util:common",
"//util:error",
"@crate_index//:futures",
"@crate_index//:parking_lot",
"@crate_index//:rand",
"@crate_index//:tokio",
"@crate_index//:tonic",
],
Expand Down
137 changes: 103 additions & 34 deletions cas/scheduler/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@
// limitations under the License.

use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use futures::{stream::unfold, TryFutureExt};
use parking_lot::Mutex;
use rand::{rngs::OsRng, Rng};
use tokio::select;
use tokio::sync::watch;
use tokio::time::sleep;
use tonic::{transport, Request, Streaming};

use action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
Expand All @@ -30,16 +35,38 @@ use proto::build::bazel::remote::execution::v2::{
ExecutionPolicy, GetCapabilitiesRequest, WaitExecutionRequest,
};
use proto::google::longrunning::Operation;
use retry::{ExponentialBackoff, Retrier, RetryResult};
use scheduler::ActionScheduler;

pub struct GrpcScheduler {
capabilities_client: CapabilitiesClient<transport::Channel>,
execution_client: ExecutionClient<transport::Channel>,
platform_property_managers: Mutex<HashMap<String, Arc<PlatformPropertyManager>>>,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
retry: config::stores::Retry,
retrier: Retrier,
}

impl GrpcScheduler {
pub fn new(config: &config::schedulers::GrpcScheduler) -> Result<Self, Error> {
let jitter_amt = config.retry.jitter;
Self::new_with_jitter(
config,
Box::new(move |delay: Duration| {
if jitter_amt == 0. {
return delay;
}
let min = 1. - (jitter_amt / 2.);
let max = 1. + (jitter_amt / 2.);
delay.mul_f32(OsRng.gen_range(min..max))
}),
)
}

pub fn new_with_jitter(
config: &config::schedulers::GrpcScheduler,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
) -> Result<Self, Error> {
let endpoint = transport::Channel::balance_list(std::iter::once(
transport::Endpoint::new(config.endpoint.clone())
.err_tip(|| format!("Could not parse {} in GrpcScheduler", config.endpoint))?,
Expand All @@ -49,9 +76,38 @@ impl GrpcScheduler {
capabilities_client: CapabilitiesClient::new(endpoint.clone()),
execution_client: ExecutionClient::new(endpoint),
platform_property_managers: Mutex::new(HashMap::new()),
jitter_fn,
retry: config.retry.clone(),
retrier: Retrier::new(Box::new(|duration| Box::pin(sleep(duration)))),
})
}

async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
where
F: FnMut(I) -> Fut + Send + Copy,
Fut: Future<Output = Result<R, Error>> + Send,
R: Send,
I: Send + Clone,
{
let retry_config = ExponentialBackoff::new(Duration::from_millis(self.retry.delay as u64))
.map(|d| (self.jitter_fn)(d))
.take(self.retry.max_retries); // Remember this is number of retries, so will run max_retries + 1.
self.retrier
.retry(
retry_config,
unfold(input, move |input| async move {
let input_clone = input.clone();
Some((
request(input_clone)
.await
.map_or_else(RetryResult::Retry, RetryResult::Ok),
input,
))
}),
)
.await
}

async fn stream_state(mut result_stream: Streaming<Operation>) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
if let Some(initial_response) = result_stream
.message()
Expand Down Expand Up @@ -98,29 +154,32 @@ impl ActionScheduler for GrpcScheduler {
return Ok(platform_property_manager.clone());
}

// Not in the cache, lookup the capabilities with the upstream.
let capabilities = self
.capabilities_client
.clone()
.get_capabilities(GetCapabilitiesRequest {
instance_name: instance_name.to_string(),
})
.await?
.into_inner();
let platform_property_manager = Arc::new(PlatformPropertyManager::new(
capabilities
.execution_capabilities
.err_tip(|| "Unable to get execution properties in GrpcScheduler")?
.supported_node_properties
.iter()
.map(|property| (property.clone(), config::schedulers::PropertyType::Exact))
.collect(),
));
self.perform_request(instance_name, |instance_name| async move {
// Not in the cache, lookup the capabilities with the upstream.
let capabilities = self
.capabilities_client
.clone()
.get_capabilities(GetCapabilitiesRequest {
instance_name: instance_name.to_string(),
})
.await?
.into_inner();
let platform_property_manager = Arc::new(PlatformPropertyManager::new(
capabilities
.execution_capabilities
.err_tip(|| "Unable to get execution properties in GrpcScheduler")?
.supported_node_properties
.iter()
.map(|property| (property.clone(), config::schedulers::PropertyType::Exact))
.collect(),
));

self.platform_property_managers
.lock()
.insert(instance_name.to_string(), platform_property_manager.clone());
Ok(platform_property_manager)
self.platform_property_managers
.lock()
.insert(instance_name.to_string(), platform_property_manager.clone());
Ok(platform_property_manager)
})
.await
}

async fn add_action(&self, action_info: ActionInfo) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
Expand All @@ -141,11 +200,14 @@ impl ActionScheduler for GrpcScheduler {
digest_function: digest_function::Value::Sha256.into(),
};
let result_stream = self
.execution_client
.clone()
.execute(Request::new(request))
.await
.err_tip(|| "Sending action to upstream scheduler")?
.perform_request(request, |request| async move {
self.execution_client
.clone()
.execute(Request::new(request))
.await
.err_tip(|| "Sending action to upstream scheduler")
})
.await?
.into_inner();
Self::stream_state(result_stream).await
}
Expand All @@ -158,15 +220,22 @@ impl ActionScheduler for GrpcScheduler {
name: unique_qualifier.action_name(),
};
let result_stream = self
.execution_client
.clone()
.wait_execution(Request::new(request))
.perform_request(request, |request| async move {
self.execution_client
.clone()
.wait_execution(Request::new(request))
.await
.err_tip(|| "While getting wait_execution stream")
})
.and_then(|result_stream| Self::stream_state(result_stream.into_inner()))
.await;
if let Err(err) = result_stream {
log::info!("Error response looking up action with upstream scheduler: {}", err);
return None;
match result_stream {
Ok(result_stream) => Some(result_stream),
Err(err) => {
log::warn!("Error response looking up action with upstream scheduler: {}", err);
None
}
}
Self::stream_state(result_stream.unwrap().into_inner()).await.ok()
}

async fn clean_recently_completed_actions(&self) {}
Expand Down
5 changes: 4 additions & 1 deletion config/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use serde::Deserialize;
use serde_utils::{convert_numeric_with_shellexpand, convert_string_with_shellexpand};

use crate::stores::StoreRefName;
use crate::stores::{Retry, StoreRefName};

#[allow(non_camel_case_types)]
#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -128,6 +128,9 @@ pub struct GrpcScheduler {
/// The upstream scheduler to forward requests to.
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub endpoint: String,
/// Retry configuration to use when a network request fails.
#[serde(default)]
pub retry: Retry,
}

#[derive(Deserialize, Debug)]
Expand Down
3 changes: 3 additions & 0 deletions gencargo/grpc_scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ doctest = false

[dependencies]
async-trait = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }

Expand All @@ -32,3 +34,4 @@ config = { workspace = true }
proto = { workspace = true }
common = { workspace = true }
error = { workspace = true }
retry = { workspace = true }

0 comments on commit 21519ce

Please sign in to comment.